问题 Java中的线程安全队列和“master / worker”程序的模式/原则


我有一个问题,我认为是经典的主/工模式,我正在寻求实施方面的建议。以下是我目前正在考虑的问题:

有某种全球性的“队列”,它是一个“保持工作”的中心位置。据推测,这个队列将由一种“主”对象管理。线程将被生成以找到要做的工作,当他们找到要做的工作时,他们会告诉主要事物(无论是什么)将“添加到要完成的工作队列”。

主人,也许在一个间隔,将产生实际执行工作的其他线程。一旦线程完成其工作,我希望它通知主服务器工作已完成。然后,主服务器可以从队列中删除此工作。

我以前在Java中完成了大量的线程编程,但它们都先于JDK 1.5,因此我不熟悉处理这种情况的相应新API。我知道JDK7会有fork-join,这对我来说可能是一个解决方案,但我无法在这个项目中使用早期访问产品。

我认为问题是:

1)如何让“线程完成工作”与主人沟通,告诉他们他们的工作已经完成,主人现在可以从队列中删除工作

2)如何有效地保证工作只安排一次。例如,假设这个队列有一百万个项目,它想告诉一个工人“去做这100件事”。什么是最有效的方法来保证当它为下一个工人安排工作时,它会得到“接下来的100件事”,而不是“我已经安排的100件事”?

3)为队列选择适当的数据结构。我在这里的想法是,“寻找工作要做的线程”可能会发现不止一次做同样的工作,并且他们会向主人发送一条消息说“这里的工作”,并且主人会意识到工作已经已经安排好,因此应该忽略该消息。我想确保选择正确的数据结构,以便这种计算尽可能便宜。

传统上,我会在数据库中以有限状态机方式完成此任务,从开始到完成工作“任务”。但是,在这个问题中,我不想使用数据库,因为队列的容量和波动性很大。另外,我想尽量保持它的重量轻。如果可以避免,我不想使用任何应用服务器。

很有可能我所描述的这个问题是一个众所周知的名称和一套公认的解决方案的常见问题,但我,我的低级非CS学位,不知道这叫什么(即请温柔)。

感谢任何和所有指针。


7698
2017-07-22 11:38


起源

你可能也想看看 lambda-the-ultimate.org/node/3521 “Java Fork / Join框架” - Pete Kirkham


答案:


据我了解您的要求,您需要 ExecutorService的。 ExecutorService有

submit(Callable task)

返回值的方法是 未来。未来是一种阻碍从工人到主人沟通的方式。您可以轻松地扩展此机制以异步方式工作。是的,ExecutorService还像ThreadPoolExecutor一样维护工作队列。因此,在大多数情况下,您不需要为调度而烦恼。 java.util.concurrent包已经有了线程安全队列的有效实现(ConcurrentLinked queue - nonblocking和LinkedBlockedQueue - blocking)。


7
2017-07-22 12:29



为了补充@dotsid所建议的内容,我想指出这个标准库会做很多(如果不是全部)OP要求并且使用起来很简单并且有效。您可以毫不费力地扩展到100个或数千个任务。 - Peter Lawrey
感谢大家的深思熟虑的回应。我不确定这是否是“规范”的答案,但最后,在阅读了Goetz的书之后,我最终得到的结果看起来很像这个答案。 - marc esher


查看 java.util.concurrent中 在Java库中。

根据您的应用程序,它可能就像将一些阻塞队列和ThreadPoolExecutor拼凑在一起一样简单。

还有,这本书 Java并发实践 作者:Brian Goetz可能对您有所帮助。


4
2017-07-22 11:42





首先,为什么你想在工人开始做之后拿着物品?通常,您将拥有一个工作队列,并且工作人员将项目从此队列中取出。这也将解决“如何防止工人获得相同的项目”问题。

对你的问题:

1)如何让“线程做”   工作“与主人沟通   告诉他们他们的工作是   完成,主人现在可以   从队列中删除工作

主人可以听取工人的意见 监听器/观察者模式

2)如何有效地拥有主人   保证工作永远   预定一次。例如,让我们说   这个队列有一百万个项目   想告诉工人“去做这些   100件事。“什么是最有效的   保证当它的方式   它安排工作到下一个工人   获得“接下来的100件事”而不是   “我已经做了100件事   计划“?

往上看。我会让工人把物品拉出队列。

3)选择适当的数据   队列的结构。我的想法   这是“寻找工作的线索   做“可能会找到相同的   他们不止一次地工作   发消息给大师说   “这是工作”,主人会   意识到工作已经存在   预定,因此应该   忽略这条消息。我想确保   我选择了正确的数据结构   这样计算便宜   尽可能。

有一个实现 阻止队列 从Java 5开始


4
2017-07-22 11:52



谢谢大家的回复。蒂姆,对你的第一个问题,这是一个很好的问题:我认为我需要将项目保留在队列中,因为“工人线程外出并找到要做的工作”需要知道已经安排了哪些工作。举一个具体的例子,想象一个程序必须走出去找到“旧文件移动”。线程找到它们,将它们添加到队列中。但是在后续运行中,如果这些文件尚未移动,“finder”线程将找到相同的文件。合理?处理这个问题的更合适的方法是什么?再次感谢。 - marc esher
也许你不需要为此烦恼。异步系统有一个很好的品质 - 幂等性。应该保护系统免受双重消息处理(在数学f(x)中说话应该等于f(f(x)),因此如果一个消息处理两次,系统状态不会改变)。你的例子是系统中幂等性的好例子。我们可以将关于一个特定文件的消息两次传递给工作人员,并且没有任如果文件已经移动,我们只是跳过任务。 - Denis Bazhenov
您可以定义工作队列,并在此旁边的工作列表。当工作线程从队列中获取项目时,将其添加到工作列表中。完成工作后,您可以将其从工作列表中删除。如果项目作为新项目提交,您可以检查它是否已在队列中或列表中忽略它。 - Tim Büthe


不要忘记Jini和Javaspaces。您所描述的内容听起来非常像天基架构所擅长的经典制作人/消费者模式。

制作人将把作业写入空间。 1个或多个消费者将取出工作(在交易下)并同时处理该工作,然后将结果写回来。由于它处于交易之下,如果出现问题,则该作业再次可供另一个消费者使用。

您可以通过添加更多消费者来轻松扩展。当消费者是独立的虚拟机并且您可以跨网络扩展时,这尤其有效。


1
2017-07-22 12:42





如果您对Spring的想法持开放态度,那么请查看他们的Spring Integration项目。它为您提供了开箱即用的所有队列/线程池样板,让您专注于业务逻辑。使用@annotations将配置保持在最低限度。

顺便说一句,Goetz非常好。


0
2017-07-22 12:17





这听起来不像是一个主工作者问题,而是一个线程池上方的专门客户端。鉴于你有很多清理线程而不是很多处理单元,所以简单地做一个scavaging pass然后计算传递可能是值得的。通过将工作项存储在Set中,唯一性约束将删除重复项。第二遍可以将所有工作提交给ExecutorService以并行执行该过程。

主工作者模型通常假定数据提供者具有所有工作并将其提供给主管理。主控制工作执行并处理分布式计算,超时,失败,重试等。分叉连接抽象是一种递归而非迭代的数据提供者。 map-reduce抽象是一个多步骤的master-worker,在某些场景中很有用。

主工作者的一个很好的例子是平凡的并行问题,例如寻找素数。另一个是数据加载,其中每个条目都是独立的(验证,转换,阶段)。处理已知工作集,处理故障等的需要使得master-worker模型与线程池不同。这就是为什么master必须在控制中并将工作单元推出,而threadpool允许worker从共享队列中提取工作。


0
2017-08-01 04:23