我发现为一个用户请求构建大量期货通常是一种不好的做法。这些期货可以填补执行上下文,这将影响其他请求。这不太可能是你真正想要的。保持期货数量很小很简单 - 仅在for-comprehensions中创建新期货,使用flatMap等。但有时可能需要为每个Seq项目创建Future。使用Future.sequence或Future.traverse导致上述问题。所以我最终得到了这个解决方案,它不会同时为每个收集项创建Futures:
def ftraverse[A, B](xs: Seq[A])(f: A => Future[B])(implicit ec: ExecutionContext): Future[Seq[B]] = {
if(xs.isEmpty) Future successful Seq.empty[B]
else f(xs.head) flatMap { fh => ftraverse(xs.tail)(f) map (r => fh +: r) }
}
我想知道,也许我正在发明一个轮子,实际上这个功能已经存在于Scala的标准库中?另外我想知道,你遇到过描述的问题,你是如何解决的?也许,如果这是Futures的一个众所周知的问题,我应该在Future.scala中创建一个pull请求,这样这个函数(或者它的更通用版本)会包含在标准库中吗?
UPD:更通用的版本,有限的并行性:
def ftraverse[A, B](xs: Seq[A], chunkSize: Int, maxChunks: Int)(f: A => Future[B])(implicit ec: ExecutionContext): Future[Seq[B]] = {
val xss = xs.grouped(chunkSize).toList
val chunks = xss.take(maxChunks-1) :+ xss.drop(maxChunks-1).flatten
Future.sequence{ chunks.map(chunk => ftraverse(chunk)(f) ) } map { _.flatten }
}
不,标准库中没有这样的东西。是否有 应该 是不是,我不能说。我不认为想要执行是很常见的 Future
s严格按顺序排列。但是当你想要的时候,你可以很容易地实现自己的方法。我个人只是为了这个目的在我自己的库中保留一个方法。但是,使用标准库有一种方法可以很方便。如果有 是,它应该更通用。
修改电流实际上非常简单 traverse
处理 Future
顺序而不是并行。这里是 当前版本,使用 foldLeft
而不是递归:
def traverse[A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
in.foldLeft(Future.successful(cbf(in))) { (fr, a) =>
val fb = fn(a)
for (r <- fr; b <- fb) yield (r += b)
}.map(_.result())
该 Future
s是在创建之前创建的 flatMap
通过分配 val fb = fn(a)
(并因此执行)。所有人都需要做的就是搬家 fn(a)
在 - 的里面 flatMap
延迟后续的创作 Future
在集合中。
def traverseSeq[A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
in.foldLeft(Future.successful(cbf(in))) { (fr, a) =>
for (r <- fr; b <- fn(a)) yield (r += b)
}.map(_.result())
另一种方法可以限制执行大量的影响 Future
s是通过使用不同的 ExecutionContext
对他们来说例如,在Web应用程序中,我可能会保留一个 ExecutionContext
用于数据库调用,一个用于调用Amazon S3,另一个用于缓慢的数据库调用。
一个非常简单的实现可以使用固定的线程池:
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
val executorService = Executors.newFixedThreadPool(4)
val executionContext = ExecutionContext.fromExecutorService(executorService)
大批 Future
在这里执行将填补 ExecutionContext
,但它会阻止他们填补其他情况。
如果你正在使用Akka,你可以轻松创建 ExecutionContext
来自配置使用 调度员 在一个 ActorSystem
:
my-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-factor = 2.0
parallelism-max = 10
}
throughput = 100
}
如果你有 ActorSystem
叫 system
你可以通过以下方式访问它:
implicit val executionContext = system.dispatchers.lookup("my-dispatcher")
所有这些都取决于您的使用案例。虽然我将异步计算分成不同的上下文,但有时候我仍然想要 traverse
顺序地平滑这些上下文的使用。
您的问题似乎与您创建的期货数量无关,但与其执行的公平性无关。考虑如何回调期货(map
, flatMap
, onComplete
, fold
等等)处理:它们被放置在执行者的队列中,并在父母期货的结果完成时执行。
如果你所有的期货共享同一个执行者(即队列),他们确实会像你说的那样互相辱骂。解决这个公平问题的常用方法是使用Akka演员。对于每个请求,启动一个新的actor(具有自己的队列)并拥有所有actor 那种类型 分享一个 ExecutionContext
。您可以限制演员在分享该演员之前执行的另一个演员之前将执行的最大消息数量 ExecutionContext
使用 throughput
配置属性。