Blocking calls in Akka Actors Blocking calls in Akka Actors multithreading multithreading

Blocking calls in Akka Actors


It really depends on the use-case. If the queries do not need to be serialized, then you can execute the query in a future and send the results back to the sender as follows:

import scala.concurrent.{ future, blocking}import akka.pattern.pipeval resFut = future {  blocking {    executeQuery()  }}resFut pipeTo sender

You could also create a dedicated dispatcher exclusively for the DB calls and use a router for actor creation. This way you can also easily limit the number of concurrent DB requests.


Really great intro "The Neophyte's Guide to Scala Part 14: The Actor Approach to Concurrency" http://danielwestheide.com/blog/2013/02/27/the-neophytes-guide-to-scala-part-14-the-actor-approach-to-concurrency.html.

Actor receives message, wraps blocking code to future, in it's Future.onSuccess method - sends out results using other async messages. But beware that sender variable could change, so close it (make a local reference in the future object).

p.s.: The Neophyte's Guide to Scala - really great book.

Updated: (added sample code)

We have worker and manager. Manager sets work to be done, worker reports "got it" and starts long process ( sleep 1000 ). Meanwhile system pings manager with messages "alive" and manager pings worker with them. When work done - worker notifies manager on it.

NB: execution of sleep 1000 done in imported "default/global" thread pool executor - you can get thread starvation.NB: val commander = sender is needed to "close" a reference to original sender, cause when onSuccess will be executed - current sender within actor could be already set to some other 'sender' ...

Log:

01:35:12:632 Humming ...01:35:12:633 manager: flush sent01:35:12:633 worker: got command01:35:12:633 manager alive01:35:12:633 manager alive01:35:12:633 manager alive01:35:12:660 worker: started01:35:12:662 worker: alive01:35:12:662 manager: resource allocated01:35:12:662 worker: alive01:35:12:662 worker: alive01:35:13:661 worker: done01:35:13:663 manager: work is done01:35:17:633 Shutdown!

Code:

import akka.actor.{Props, ActorSystem, ActorRef, Actor}import com.typesafe.config.ConfigFactoryimport java.text.SimpleDateFormatimport java.util.Dateimport scala.concurrent._import ExecutionContext.Implicits.globalobject Sample {  private val fmt = new SimpleDateFormat("HH:mm:ss:SSS")  def printWithTime(msg: String) = {    println(fmt.format(new Date()) + " " + msg)  }  class WorkerActor extends Actor {    protected def receive = {      case "now" =>        val commander = sender        printWithTime("worker: got command")        future {          printWithTime("worker: started")          Thread.sleep(1000)          printWithTime("worker: done")        }(ExecutionContext.Implicits.global) onSuccess {          // here commander = original sender who requested the start of the future          case _ => commander ! "done"         }        commander ! "working"      case "alive?" =>        printWithTime("worker: alive")    }  }  class ManagerActor(worker: ActorRef) extends Actor {    protected def receive = {      case "do" =>        worker ! "now"        printWithTime("manager: flush sent")      case "working" =>        printWithTime("manager: resource allocated")      case "done" =>        printWithTime("manager: work is done")      case "alive?" =>        printWithTime("manager alive")        worker ! "alive?"    }  }  def main(args: Array[String]) {    val config = ConfigFactory.parseString("" +      "akka.loglevel=DEBUG\n" +      "akka.debug.lifecycle=on\n" +      "akka.debug.receive=on\n" +      "akka.debug.event-stream=on\n" +      "akka.debug.unhandled=on\n" +      ""    )    val system = ActorSystem("mine", config)    val actor1 = system.actorOf(Props[WorkerActor], "worker")    val actor2 = system.actorOf(Props(new ManagerActor(actor1)), "manager")    actor2 ! "do"    actor2 ! "alive?"    actor2 ! "alive?"    actor2 ! "alive?"    printWithTime("Humming ...")    Thread.sleep(5000)    printWithTime("Shutdown!")    system.shutdown()  }}


You are right to be thinking about the Thread Pool if you are considering doing blocking calls in Akka. The more blocking you do, the larger the Thread Pool you will need. A completely Non-Blocking system only really needs a pool of threads equal to the number of CPU cores of your machine. The reference configuration uses a pool of 3 times the number of CPU cores on the machine to allow for some blocking:

    # The core pool size factor is used to determine thread pool core size    # using the following formula: ceil(available processors * factor).    # Resulting size is then bounded by the core-pool-size-min and    # core-pool-size-max values.    core-pool-size-factor = 3.0

source

But you might want to increase akka.default-dispatcher.fork-join-executor.core-pool-size-factor to a higher number if you do more blocking, or make a dispatcher other than the default specifically for blocking calls with a higher fork-join-executor.core-pool-size-factor

WRT what is the best way to do blocking calls in Akka. I would recommend scaling out by making multiple instances of the actors that do blocking calls and putting a router infront of them to make them look like a single actor to the rest of your application.