Happy New Year! Over the last three posts, I've described an Akka based Content Ingestion Pipeline modelled after the NutchGORA pipeline. This pipeline was my vehicle for learning Akka, something I've been planning to do for a while. In this post (the final installment on this series, at least so far), I explore how to distribute this application horizontally across multiple machines (scale out).
Akka is distributed by design. The code I've built so far can be regarded as a single server version of a distributed system. According to the Akka remoting documentation (emphasis mine):
Everything in Akka is designed to work in a distributed setting: all interactions of actors use purely message passing and everything is asynchronous. This effort has been undertaken to ensure that all functions are available equally when running within a single JVM or on a cluster of hundreds of machines. The key for enabling this is to go from remote to local by way of optimization instead of trying to go from local to remote by way of generalization.
Akka can work with remote Actors in two ways, either by looking them up in a remote ActorSystem, or by creating them in the remote ActorSystem. I use the latter approach. The components that do the heavy lifting in the pipeline are the workers, and scaling out to handle more incoming requests would imply increasing the number of workers or making them faster, both of which can be done by giving them their own dedicated hardware.
The architecture diagram has been updated with the distribution boundaries, they are indicated by the gray boxes below. The master node is the large gray box on the top, and contains the REST Interface, the Controller and the Router actors. The worker nodes (can be an array of nodes for each worker class) are the ones that wrap the Fetch, Parse and Index worker arrays.
Each of these nodes are wrapped in an Akka ActorSystem, which can be accessed by an URI from other ActorSystems. So in addition to the HTTP interface that the master node exposes to the outside world, it also exposes a host:port and has a name that other Akka ActorSystems can use to communicate with it.
For testing, I configured the pipeline with just 2 ActorSystems - the master node listening on localhost:2552 and identified by URI akka.tcp://DelSym@localhost:2552, and one remote node listening on localhost:2553 and identified by URI akka.tcp://remote@localhost:2553. Here is some code to create a named (name supplied from command line) remote Akka ActorSystem using configuration parameters in the remote.conf file.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | // Source: src/main/scala/com/mycompany/delsym/remote/RemoteAkka.scala
package com.mycompany.delsym.remote
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
object RemoteAkka extends App {
val name = if (args.isEmpty) "remote" else args(0)
val conf = ConfigFactory.load("remote")
val host = conf.getString("akka.remote.netty.tcp.hostname")
val port = conf.getInt("akka.remote.netty.tcp.port")
val system = ActorSystem(name, conf)
Console.println("Remote system [%s] listening on %s:%d"
.format(name, host, port))
sys.addShutdownHook {
Console.println("Shutting down Remote Akka")
system.shutdown
}
}
|
The remote.conf file looks like this. This is meant to be used in order to start up ActorSystems on multiple nodes in a network.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | // Source: src/main/resources/remote.conf
akka {
log-dead-letters-during-shutdown = off
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2553
}
}
}
|
I then added a property in application.conf to specify a list of ActorSystem URIs for the routers. The routers are Round Robin routers, so giving them a list of ActorSystem URIs will cause them to cycle through the URIs, creating remote Actors and distributing evenly across multiple remote ActorSystems. The Controller Actor code (which instantiates the routers) has been modified to create local workers if the node list is empty and remote workers otherwise. The updated code for the Controller is shown below:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 | // Source: src/main/scala/com/mycompany/delsym/actors/Controller.scala
package com.mycompany.delsym.actors
import scala.collection.JavaConversions.asScalaBuffer
import scala.concurrent.duration.DurationInt
import com.mycompany.delsym.daos.HtmlOutlinkFinder
import com.mycompany.delsym.daos.MockOutlinkFinder
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigList
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.AddressFromURIString
import akka.actor.OneForOneStrategy
import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.actor.actorRef2Scala
import akka.remote.routing.RemoteRouterConfig
import akka.routing.RoundRobinRouter
import akka.routing.RouterConfig
class Controller extends Actor with ActorLogging {
override val supervisorStrategy = OneForOneStrategy(
maxNrOfRetries = 10,
withinTimeRange = 1.minute) {
case _: Exception => SupervisorStrategy.Restart
}
val reaper = context.actorOf(Props[Reaper], name="reaper")
val conf = ConfigFactory.load()
val numFetchers = conf.getInt("delsym.fetchers.numworkers")
val fetchNodes = conf.getList("delsym.fetchers.nodes")
val numParsers = conf.getInt("delsym.parsers.numworkers")
val parseNodes = conf.getList("delsym.parsers.nodes")
val numIndexers = conf.getInt("delsym.indexers.numworkers")
val indexNodes = conf.getList("delsym.indexers.nodes")
val testUser = conf.getBoolean("delsym.testuser")
val outlinkFinder = if (testUser) new MockOutlinkFinder()
else new HtmlOutlinkFinder()
val queueSizes = scala.collection.mutable.Map[String,Long]()
val fetchers = context.actorOf(Props[FetchWorker]
.withRouter(buildRouter(numFetchers, fetchNodes)),
name="fetchers")
reaper ! Register(fetchers)
queueSizes += (("fetchers", 0L))
val parsers = context.actorOf(Props[ParseWorker]
.withRouter(buildRouter(numParsers, parseNodes)),
name="parsers")
reaper ! Register(parsers)
queueSizes += (("parsers", 0L))
val indexers = context.actorOf(Props[IndexWorker]
.withRouter(buildRouter(numIndexers, indexNodes)),
name="indexers")
reaper ! Register(indexers)
queueSizes += (("indexers", 0L))
def receive = {
case m: Fetch => {
increment("fetchers")
fetchers ! m
}
case m: FetchComplete => {
decrement("fetchers")
if (m.fwd) parsers ! Parse(m.url)
}
case m: Parse => {
increment("parsers")
parsers ! m
}
case m: ParseComplete => {
decrement("parsers")
outlinks(m.url).map(outlink =>
fetchers ! Fetch(outlink._1, outlink._2, outlink._3))
if (m.fwd) indexers ! Index(m.url)
}
case m: Index => {
increment("indexers")
indexers ! m
}
case m: IndexComplete => {
decrement("indexers")
}
case m: Stats => {
sender ! queueSize()
}
case m: Stop => {
reaper ! Stop(0)
}
case _ => log.info("Unknown message received.")
}
def buildRouter(n: Int, nodes: ConfigList): RouterConfig = {
if (nodes.isEmpty) RoundRobinRouter(n)
else {
val addrs = nodes.unwrapped()
.map(node => node.asInstanceOf[String])
.map(node => AddressFromURIString(node))
.toSeq
RemoteRouterConfig(RoundRobinRouter(n), addrs)
}
}
def queueSize(): Stats = Stats(queueSizes.toMap)
def outlinks(url: String):
List[(String,Int,Map[String,String])] = {
outlinkFinder.findOutlinks(url) match {
case Right(triples) => triples
case Left(f) => List.empty
}
}
def increment(key: String): Unit = {
queueSizes += ((key, queueSizes(key) + 1))
}
def decrement(key: String): Unit = {
queueSizes += ((key, queueSizes(key) - 1))
}
}
|
The documentation indicates that a better approach would be to declare the routers in configuration, so the local configuration would be different from the distributed configuration. I did not do this because my test case refers to the routers as /controller/* but the actual code refers to it as /api/controller/* (I should probably change the test code but I was too lazy). But in any case, changing from a local to a remote router configuration is simply a matter of wrapping the Router Configuration with a RemoteRouterConfig (buildRouter function in the code above), so this approach works fine also.
Going from local to remote also requires you to think about serialization. I have chosen to use Java serialization, and I have configured Akka (via the application.conf file) to automatically use Java serialization for my messages. In addition, the distributed version of the master ActorSystem also exposes its own address in the configuration and sets the provider to a remote ActorRef provider. The other important difference is the non empty nodes list value under the delsym namespace for each of the fetcher, parser and indexer. The remote configuration is shown below:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 | // Source: src/main/resources/application.conf.remote
akka {
loglevel = INFO
stdout-loglevel = INFO
akka.loggers = ["akka.event.slf4j.Slf4jLogger"]
log-dead-letters-during-shutdown = off
actor {
provider = "akka.remote.RemoteActorRefProvider"
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2552
}
}
serializers {
java = "akka.serialization.JavaSerializer"
}
serialization-bindings {
"com.mycompany.delsym.actors.DelsymMessage" = java
}
}
}
spray {
can {
server {
server-header = "DelSym REST API"
}
}
}
delsym {
testuser = true
fetchers {
numworkers = 5
refreshIntervalDays = 30
numRetries = 3
nodes = ["akka.tcp://remote@127.0.0.1:2553"]
}
parsers {
numworkers = 5
nodes = ["akka.tcp://remote@127.0.0.1:2553"]
}
indexers {
numworkers = 5
nodes = ["akka.tcp://remote@127.0.0.1:2553"]
}
mongodb {
host = "127.0.0.1"
port = 27017
dbname = "delsymdb"
collname = "documents"
}
solr {
server = "http://127.0.0.1:8983/solr/collection1/"
dbfieldnames = "_id,url,p_title,p_author,textContent"
solrfieldnames = "id,url,title,author,text"
commitInterval = 10
}
rest {
host = "127.0.0.1"
port = 8080
timeout = 1
}
}
|
There are now 3 versions of application.conf in the Delsym repo on GitHub. You will have to link to the correct one depending on whether you want to run the mock tests, run in local (all actors on single ActorSystem) or remote (multiple ActorSystems) mode.
The effort to build the code for this part of the pipeline was mostly conceptual, ie, understanding how the different components fit together. I found the following Akka reference pages very useful. The pages are all for Akka version 2.2.3 (latest stable version) that I used for this work - the default pages that show up (in response to a Google search for example) are for version 2.3 which is still in development. The 2.3 code is different enough for this detail to be annoying, so mentioning it here.
In addition, I also found the akka-sample-remote-scala useful, although the pattern shown there is slightly different from what I used. Another useful source was the Remoting chapter from the Akka Essentials book.
I was able to run the ActorFlowTest unit test with Mock workers (minus the asserts, since the workers update counters on the remote ActorSystem which I no longer have control over) and verify from the logs that the fetching, parsing and indexing happen on my remote ActorSystem at localhost:2553. The code also exits cleanly which means Deathwatch works fine with remote workers. However, I saw lots of messages sent to the dead-letter mailbox which I haven't been able to figure out yet (they are not application messages) - I will post an update here (and bugfix to the DelSym GitHub repo once I do.
Be the first to comment. Comments are moderated to prevent spam.
Post a Comment