Scala Future/Promise fast-fail pipeline Scala Future/Promise fast-fail pipeline multithreading multithreading

Scala Future/Promise fast-fail pipeline


If I understand the question correctly, what we are looking for is a fail-fast sequence implementation, which is akin to a failure-biased version of firstCompletedOf

Here, we eagerly register a failure callback in case one of the futures fails early on, ensuring that we fail as soon as any of the futures fail.

import scala.concurrent.{Future, Promise}import scala.util.{Success, Failure}import scala.concurrent.ExecutionContext.Implicits.globaldef failFast[T](futures: Seq[Future[T]]): Future[Seq[T]] = {  val promise = Promise[Seq[T]]  futures.foreach{f => f.onFailure{case ex => promise.failure(ex)}}  val res = Future.sequence(futures)  promise.completeWith(res).future}

In contrast to Future.sequence, this implementation will fail as soon as any of the futures fail, regardless of ordering.Let's show that with an example:

import scala.util.Try// help method to measure timedef resilientTime[T](t: =>T):(Try[T], Long) = {  val t0 = System.currentTimeMillis  val res = Try(t)  (res, System.currentTimeMillis-t0)}import scala.concurrent.duration._import scala.concurrent.Await

First future will fail (failure in 2 seconds)

val f1 = Future[Int]{Thread.sleep(2000); throw new Exception("boom")}val f2 = Future[Int]{Thread.sleep(5000); 42}val f3 = Future[Int]{Thread.sleep(10000); 101}val res = failFast(Seq(f1,f2,f3))resilientTime(Await.result(res, 10.seconds))// res: (scala.util.Try[Seq[Int]], Long) = (Failure(java.lang.Exception: boom),1998)

Last future will fail. Failure also in 2 seconds. (note the order in the sequence construction)

val f1 = Future[Int]{Thread.sleep(2000); throw new Exception("boom")}val f2 = Future[Int]{Thread.sleep(5000); 42}val f3 = Future[Int]{Thread.sleep(10000); 101}val res = failFast(Seq(f3,f2,f1))resilientTime(Await.result(res, 10.seconds))// res: (scala.util.Try[Seq[Int]], Long) = (Failure(java.lang.Exception: boom),1998)

Comparing with Future.sequence where failure depends on the ordering (failure in 10 seconds):

val f1 = Future[Int]{Thread.sleep(2000); throw new Exception("boom")}val f2 = Future[Int]{Thread.sleep(5000); 42}val f3 = Future[Int]{Thread.sleep(10000); 101}val seq = Seq(f3,f2,f1)resilientTime(Await.result(Future.sequence(seq), 10.seconds))//res: (scala.util.Try[Seq[Int]], Long) = (Failure(java.lang.Exception: boom),10000)


Use Future.sequence:

val both = Future.sequence(Seq(  firstFuture,  secondFuture));

This is the correct way to aggregate two or more futures where the failure of one fails the aggregated future and the aggregated future completes when all inner futures complete. An older version of this answer suggested a for-comprehension which while very common would not reject immediately of one of the futures rejects but rather wait for it.


Zip the futures

val f1 = Future { doSomething() }val f2 = Future { doSomething() }val resultF = f1 zip f2

resultF future fails if any one of f1 or f2 is failed

Time taken to resolve is min(f1time, f2time)

scala> import scala.util._import scala.util._scala> import scala.concurrent._import scala.concurrent._scala> import scala.concurrent.ExecutionContext.Implicits.globalimport scala.concurrent.ExecutionContext.Implicits.globalscala> val f = Future { Thread.sleep(10000); throw new Exception("f") }f: scala.concurrent.Future[Nothing] = scala.concurrent.impl.Promise$DefaultPromise@da1f03escala> val g = Future { Thread.sleep(20000); throw new Exception("g") }g: scala.concurrent.Future[Nothing] = scala.concurrent.impl.Promise$DefaultPromise@634a98e3scala> val x = f zip gx: scala.concurrent.Future[(Nothing, Nothing)] = scala.concurrent.impl.Promise$DefaultPromise@3447e854scala> x onComplete { case Success(x) => println(x) case Failure(th) => println(th)}result: java.lang.Exception: f