问题 是否有“慢”的Future.traverse版本?


我发现为一个用户请求构建大量期货通常是一种不好的做法。这些期货可以填补执行上下文,这将影响其他请求。这不太可能是你真正想要的。保持期货数量很小很简单 - 仅在for-comprehensions中创建新期货,使用flatMap等。但有时可能需要为每个Seq项目创建Future。使用Future.sequence或Future.traverse导致上述问题。所以我最终得到了这个解决方案,它不会同时为每个收集项创建Futures:

  def ftraverse[A, B](xs: Seq[A])(f: A => Future[B])(implicit ec: ExecutionContext): Future[Seq[B]] = {
    if(xs.isEmpty) Future successful Seq.empty[B]
    else f(xs.head) flatMap { fh => ftraverse(xs.tail)(f) map (r => fh +: r) }
  }

我想知道,也许我正在发明一个轮子,实际上这个功能已经存在于Scala的标准库中?另外我想知道,你遇到过描述的问题,你是如何解决的?也许,如果这是Futures的一个众所周知的问题,我应该在Future.scala中创建一个pull请求,这样这个函数(或者它的更通用版本)会包含在标准库中吗?

UPD:更通用的版本,有限的并行性:

  def ftraverse[A, B](xs: Seq[A], chunkSize: Int, maxChunks: Int)(f: A => Future[B])(implicit ec: ExecutionContext): Future[Seq[B]] = {
    val xss = xs.grouped(chunkSize).toList
    val chunks = xss.take(maxChunks-1) :+ xss.drop(maxChunks-1).flatten
    Future.sequence{ chunks.map(chunk => ftraverse(chunk)(f) ) } map { _.flatten }
  } 

9850
2018-02-14 10:23


起源

看看scalaz中的'Task'。它实现了 traverse 在Haskell Traversable类型类中定义,它以这种方式运行(一次一个操作)。 - James Davies


答案:


不,标准库中没有这样的东西。是否有 应该 是不是,我不能说。我不认为想要执行是很常见的 Futures严格按顺序排列。但是当你想要的时候,你可以很容易地实现自己的方法。我个人只是为了这个目的在我自己的库中保留一个方法。但是,使用标准库有一种方法可以很方便。如果有 ,它应该更通用。

修改电流实际上非常简单 traverse 处理 Future顺序而不是并行。这里是 当前版本,使用 foldLeft 而不是递归:

def traverse[A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
    in.foldLeft(Future.successful(cbf(in))) { (fr, a) =>
      val fb = fn(a)
      for (r <- fr; b <- fb) yield (r += b)
    }.map(_.result())

Futures是在创建之前创建的 flatMap 通过分配 val fb = fn(a) (并因此执行)。所有人都需要做的就是搬家 fn(a) 在 - 的里面 flatMap 延迟后续的创作 Future在集合中。

def traverseSeq[A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
    in.foldLeft(Future.successful(cbf(in))) { (fr, a) =>
      for (r <- fr; b <- fn(a)) yield (r += b)
    }.map(_.result())

另一种方法可以限制执行大量的影响 Futures是通过使用不同的 ExecutionContext 对他们来说例如,在Web应用程序中,我可能会保留一个 ExecutionContext 用于数据库调用,一个用于调用Amazon S3,另一个用于缓慢的数据库调用。

一个非常简单的实现可以使用固定的线程池:

import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
val executorService = Executors.newFixedThreadPool(4)
val executionContext = ExecutionContext.fromExecutorService(executorService)

大批 Future在这里执行将填补 ExecutionContext,但它会阻止他们填补其他情况。

如果你正在使用Akka,你可以轻松创建 ExecutionContext来自配置使用 调度员 在一个 ActorSystem

my-dispatcher {
  type = Dispatcher
  executor = "fork-join-executor"
  fork-join-executor {
    parallelism-min = 2
    parallelism-factor = 2.0
    parallelism-max = 10
  }
  throughput = 100
}

如果你有 ActorSystem 叫 system 你可以通过以下方式访问它:

implicit val executionContext = system.dispatchers.lookup("my-dispatcher")

所有这些都取决于您的使用案例。虽然我将异步计算分成不同的上下文,但有时候我仍然想要 traverse 顺序地平滑这些上下文的使用。


11
2018-02-14 15:41





您的问题似乎与您创建的期货数量无关,但与其执行的公平性无关。考虑如何回调期货(mapflatMaponCompletefold等等)处理:它们被放置在执行者的队列中,并在父母期货的结果完成时执行。

如果你所有的期货共享同一个执行者(即队列),他们确实会像你说的那样互相辱骂。解决这个公平问题的常用方法是使用Akka演员。对于每个请求,启动一个新的actor(具有自己的队列)并拥有所有actor 那种类型 分享一个 ExecutionContext。您可以限制演员在分享该演员之前执行的另一个演员之前将执行的最大消息数量 ExecutionContext 使用 throughput 配置属性。


4
2018-02-14 18:57





假设期货的创建不是那么精细,以至于开销将是禁止的(在这种情况下,建议使用并行集合的答案可能是最有用的),您可以为此创建一个不同的,隐式定义的执行上下文。在其下运行的期货由不同的执行者以其自己的线程支持。

你可以打电话 ExecutionContext.fromExecutorService 要么 ExecutionContext.fromExecutor 这样做。


0
2018-02-14 22:46





这不是并行集合的用途吗?

val parArray = (1 to 1000000).toArray.par
sum = parArray.map(_ + _)
res0: Int = 1784293664

看起来像普通的同步方法调用,但并行集合将使用线程池并行计算地图(竞争条件!)。你会在这里找到更多细节: http://docs.scala-lang.org/overviews/parallel-collections/overview.html


0
2018-02-14 20:18



感谢您的评论,我试图清理我的答案并将最重要的信息放在那里 - Alexej Haak
请注意,问题不是关于如何并行做事,而是相反。 - Aleksander Alekseev