问题 如何在Scala中使用actor时限制并发?


我来自Java,我提交的地方 Runnable到了 ExecutorService 由线程池支持。在Java中非常清楚如何设置线程池大小的限制。

我对使用Scala actor感兴趣,但我不清楚如何限制并发性。

让我们假设,我正在创建一个接受“工作”的网络服务。提交作业 POST 请求,我希望我的服务入队,然后立即返回 202 Accepted  - 即异步处理作业。

如果我使用actor来处理队列中的作业,我如何限制处理的同时作业的数量?

我可以想到几种不同的方法来解决这个问题;我想知道是否有社区最佳实践,或者至少是一些明确建立的方法,这些方法在Scala世界中有些标准。

我想到的一种方法是拥有一个协调员角色来管理工作队列和工作处理角色;我想它可以使用一个简单的int字段来跟踪当前正在处理的作业数量。我确信这种方法会有一些问题,例如确保跟踪何时发生错误以减少数量。这就是为什么我想知道Scala是否已经提供了更简单或更封装的方法。

顺便说一下,我试着问这个问题 不久以前 但我问得很厉害。

谢谢!


932
2018-02-22 16:09


起源



答案:


您可以覆盖系统属性 actors.maxPoolSize 和 actors.corePoolSize 它限制了actor线程池的大小,然后在你的actor可以处理的池中抛出尽可能多的作业。为什么你认为你需要 风门 你的反应?


5
2018-02-22 18:23



非常有用,谢谢!我不确定我会用这个词 风门但无论如何,有时需要限制同时“进程”的数量,因为他们所做的工作是资源密集型的。 - Avi Flax
这种方法可能无法产生预期的结果。它将允许作业排队,直到JVM内存不足。限制actor可以使用的线程数只会限制实际并发执行的作业数。我通过比演员之前更快地生成工作来产生OOM错误,所以你必须要小心。 - Erik Engbrecht
我认为这种方法的一个缺点是它是全球性的。有时我需要运行不同类型的进程,这些进程具有不同的资源利用率 - 使用Java线程池,我可以轻松地使用具有不同设置的不同池。同 actors.maxPoolSize,我只能为所有演员使用一个数字,因为它们都是由相同的线程池驱动的,对吧? - Avi Flax


答案:


您可以覆盖系统属性 actors.maxPoolSize 和 actors.corePoolSize 它限制了actor线程池的大小,然后在你的actor可以处理的池中抛出尽可能多的作业。为什么你认为你需要 风门 你的反应?


5
2018-02-22 18:23



非常有用,谢谢!我不确定我会用这个词 风门但无论如何,有时需要限制同时“进程”的数量,因为他们所做的工作是资源密集型的。 - Avi Flax
这种方法可能无法产生预期的结果。它将允许作业排队,直到JVM内存不足。限制actor可以使用的线程数只会限制实际并发执行的作业数。我通过比演员之前更快地生成工作来产生OOM错误,所以你必须要小心。 - Erik Engbrecht
我认为这种方法的一个缺点是它是全球性的。有时我需要运行不同类型的进程,这些进程具有不同的资源利用率 - 使用Java线程池,我可以轻松地使用具有不同设置的不同池。同 actors.maxPoolSize,我只能为所有演员使用一个数字,因为它们都是由相同的线程池驱动的,对吧? - Avi Flax


我真的很鼓励你看看Akka,Scala的替代Actor实现。

http://www.akkasource.org

Akka已经有了JAX-RS [1]集成,您可以将它与LoadBalancer [2]配合使用,以限制在并行中可以执行的操作数量:

[1] http://doc.akkasource.org/rest [2] http://github.com/jboner/akka/blob/master/akka-patterns/src/main/scala/Patterns.scala


6
2018-02-22 20:24



有意思,我会检查出来,谢谢! - Avi Flax


你真的有两个问题。

第一个是控制actor使用的线程池。这可以通过设置系统属性actors.maxPoolSize来完成。

第二个是提交到池中的任务数量的失控增长。您可能会或可能不会关注此问题,但是完全有可能通过过快地生成太多任务来触发故障情况,例如内存不足错误以及在某些情况下可能会出现更微妙的问题。

每个工作线程都维护一个任务队列。 dequeue实现为一个数组,工作线程将动态放大到某个最大大小。在2.7.x中,队列可以自己增长很大,并且我已经看到当与大量并发线程结合时触发内存不足错误。最大出队人数小于2.8。出队也可以填补。

解决此问题需要您控制生成的任务数,这可能意味着您已经概述了某种协调器。当启动一种数据处理管道的actor比管道中的后者快得多时,我遇到了这个问题。为了控制进程,我通常让链中的actor稍后在每个X消息的链中更早地ping回演员,并且在X消息之后使链中的那些更早停止并等待ping回来。您也可以使用更集中的协调员来完成。


3
2018-02-23 01:34