问题 创建动态(增长/缩小)线程池


我需要在Java中实现一个线程池(java.util.concurrent),其空闲时线程数达到某个最小值,当作业提交到它上面的速度超过完成执行时,它会增长到上限(但绝不会更远) ,当完成所有作业并且不再提交作业时,缩小回到下限。

你会如何实现这样的东西?我想这将是一个相当常见的使用场景,但显然是 java.util.concurrent.Executors 工厂方法只能创建在提交许多作业时无限增长的固定大小的池和池。该 ThreadPoolExecutor 班级提供 corePoolSize 和 maximumPoolSize 参数,但它的文档似乎暗示,有史以来唯一的方法 corePoolSize 线程同时使用有界作业队列,在这种情况下,如果你已到达 maximumPoolSize 线程,你会得到工作拒绝,你必须自己处理?我想出了这个:

//pool creation
ExecutorService pool = new ThreadPoolExecutor(minSize, maxSize, 500, TimeUnit.MILLISECONDS,
    new ArrayBlockingQueue<Runnable>(minSize));
...

//submitting jobs
for (Runnable job : ...) {
    while (true) {
        try {
            pool.submit(job);
            System.out.println("Job " + job + ": submitted");
            break;
        } catch (RejectedExecutionException e) {
            // maxSize jobs executing concurrently atm.; re-submit new job after short wait
            System.out.println("Job " + job + ": rejected...");
            try {
                Thread.sleep(300);
            } catch (InterruptedException e1) {
            }
        }
    }
}

我忽略了什么吗?有一个更好的方法吗?此外,根据一个人的要求,上述代码至少(我认为)之前不会完成可能会有问题。 (total number of jobs) - maxSize 工作已经完成。因此,如果您希望能够将任意数量的作业提交到池中并立即进行而无需等待其中任何一个完成,我看不出如何在没有专门的“作业总结”线程的情况下执行此操作保存所有提交的作业所需的无限队列。 AFAICS,如果你为ThreadPoolExecutor本身使用一个无界的队列,它的线程数将永远不会超过corePoolSize。


11532
2018-06-28 16:49


起源

我不得不承认,我没有看到动态大小的线程池的有用性。您的电路板上的处理器数量是否会在应用程序正常运行期间发生变化? - corsiKa
为什么是 newCachedThreadPool 不适合你的情况?它会自动杀死不再使用的线程。 - Tudor
如果您的空闲线程没有死亡会有什么影响?假设您有一个最大尺寸的固定大小的游泳池?会发生什么? - Peter Lawrey
AFAICS,newCachedThreadPool创建一个池,如果您向其中提交许多长时间运行的作业,其线程数可能会无限增长。 - Olaf Klischat
不,处理器数量不会在运行时更改,但如果作业主要受I / O限制而不是CPU限制,我无法看到这是如何相关的。在这种情况下,即使在单处理器系统上使用多个线程,您也可以实现更高的作业吞吐量。 - Olaf Klischat


答案:


可能对您有帮助的一个技巧是分配一个 RejectedExecutionHandler 使用相同的线程将作业提交到阻塞队列。这将阻止当前线程并消除对某种循环的需要。

在这里看到我的答案:

如果需要处理太多数据,我如何让ThreadPoolExecutor命令等待?

这是从该答案中复制的拒绝处理程序。

final BlockingQueue queue = new ArrayBlockingQueue<Runnable>(200);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads,
       0L, TimeUnit.MILLISECONDS, queue);
// by default (unfortunately) the ThreadPoolExecutor will call the rejected
// handler when you submit the 201st job, to have it block you do:
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
   public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      // this will block if the queue is full
      executor.getQueue().put(r);
   }
});

只要您意识到您使用的有界阻塞队列,您就应该能够使用核心/最大线程数 第一 在核心线程之上创建任何线程之前填充。因此,如果您有10个核心线程并且您希望第11个作业启动第11个线程,那么您将需要一个大小为0的阻塞队列(可能是 SynchronousQueue)。我觉得这是一个真正的限制,否则很棒 ExecutorService 类。


4
2018-06-28 16:58





当增长和缩小与线程一起出现时,我脑海中只有一个名字: 来自java.util.concurrent的CachedThreadPool 包。

ExecutorService executor = Executors.newCachedThreadPool();

CachedThreadPool()可以 重用线程, 以及 需要时创建新线程。 是的,如果是 线程空闲60秒,CachedThreadPool将终止它。 所以这很轻巧 - 用你的话来说是成长和缩小!


8
2018-06-28 17:40



是的,但没有限制。 - Gray
您可以手动使用底层ThreadPoolExecutor并设置maximumPoolSize,甚至可以在运行时使用 - Askar Kalykov


maximumPoolSize 至 Integer.MAX_VALUE。如果你有超过20亿个线程......那么,祝你好运。

无论如何,Javadoc ThreadPoolExecutor 状态:

通过将maximumPoolSize设置为基本无限制的值(例如Integer.MAX_VALUE),可以允许池容纳任意数量的并发任务。最典型的情况是,核心和最大池大小仅在构造时设置,但也可以使用setCorePoolSize(int)和setMaximumPoolSize(int)动态更改。

使用类似于无限制的任务队列 LinkedBlockingQueue,这应该具有任意大容量。


1
2018-06-28 16:58



downvoter会关心解释吗? - Louis Wasserman
谢谢也参考这个 stackoverflow.com/questions/28567238/... 萎缩打击其他问题解决的其他问题 - Vahid
他想让它受限制,而不是无限制...... - Xerus