问题 .NET Custom Threadpool包含单独的实例


什么是最推荐的.NET自定义线程池,它可以有单独的实例,即每个应用程序有多个线程池? 我需要一个无限的队列大小(构建一个爬虫),并且需要为我正在爬行的每个站点并行运行一个单独的线程池。

编辑: 我需要尽可能快地挖掘这些站点以获取信息,为每个站点使用单独的线程池将使我能够控制在任何给定时间在每个站点上工作的线程数。 (不超过2-3)

谢谢 Roey


8905
2018-06-23 10:31


起源

为什么你需要为每个站点运行一个单独的线程池? - AnthonyWJones
花些时间观看Daniel Moth的视频: channel9.msdn.com/pdc2008/TL26。你会发现无限制的队列不是无限的表现之路。 - Groo
我同意安东尼的观点。我认为你不需要多个线程池。线程池用于管理和平衡整个系统的工作。如果你有多个游泳池,你最终会超载你的机器。不要忘记线程创建的开销。这就是线程池的用途 - 它小心地将线程数量管理到最佳数量,并在这些线程之间划分工作。 - Simon P Stevens
我需要为每个站点提供一个单独的线程池,以便在每个站点上使用超过2-3个爬行线程。使用.NET threadpool无法保证这一点(因为池中的所有线程可能同时处理来自同一站点的任务。)实现此目的的任何替代方案? - Roey


答案:


我相信 智能线程池 可以做到这一点。它的ThreadPool类被实例化,因此您应该能够根据需要创建和管理单独的站点特定实例。


7
2018-06-23 10:36





Ami bar写了一个优秀的Smart线程池,可以实例化。

看一看 这里


3
2018-06-23 10:38



-1表示重复答案。 AdamRalph已经发布了这个帖子。 - Robert MacLean
+1答案是2分钟后。 AdamRalphs的答案可能尚未解决。 - Simon


问Jon Skeet: http://www.yoda.arachsys.com/csharp/miscutil/

.Net(TPL)的并行扩展 如果你想要大量的并行运行任务,实际上应该工作得更好。


1
2018-06-23 10:34



这用于并行化到多个CPU核心,但它并没有解决我的问题,因为单个TaskManagers无法告诉我他们何时完成所有任务。 - Roey


这个免费的nuget库在这里: CodeFluentRuntimeClient 有一个可以重用的CustomThreadPool类。它是非常可配置的,您可以更改池线程优先级,数字,COM单元状态,甚至名称(用于调试),以及文化。


0
2018-02-14 09:30





使用BlockingCollection可以用作线程的队列。 这是它的实现。 更新于2018-04-23:

public class WorkerPool<T> : IDisposable
{
    BlockingCollection<T> queue = new BlockingCollection<T>();
    List<Task> taskList;
    private CancellationTokenSource cancellationToken;
    int maxWorkers;
    private bool wasShutDown;

    int waitingUnits;

    public WorkerPool(CancellationTokenSource cancellationToken, int maxWorkers)
    {
        this.cancellationToken = cancellationToken;
        this.maxWorkers = maxWorkers;
        this.taskList = new List<Task>();
    }
    public void enqueue(T value)
    {
        queue.Add(value);
        waitingUnits++;
    }
    //call to signal that there are no more item
    public void CompleteAdding()
    {
        queue.CompleteAdding();          
    }

    //create workers and put then running
    public void startWorkers(Action<T> worker)
    {
        for (int i = 0; i < maxWorkers; i++)
        {
            taskList.Add(new Task(() =>
            {
                string myname = "worker " + Guid.NewGuid().ToString();

                try
                {
                    while (!cancellationToken.IsCancellationRequested)
                    {                     
                        var value = queue.Take();
                        waitingUnits--;
                        worker(value);
                    }
                }
                catch (Exception ex) when (ex is InvalidOperationException)  //throw when collection is closed with  CompleteAdding method. No pretty way to do this.
                {
                    //do nothing
                }
            }));
        }

        foreach (var task in taskList)
        {
            task.Start();
        }
    }

    //wait for all workers to be finish their jobs
    public void await()
    {
        while (waitingUnits >0 || !queue.IsAddingCompleted)
            Thread.Sleep(100);

        shutdown();
    }

    private void shutdown()
    {
        wasShutDown = true;
        Task.WaitAll(taskList.ToArray());            
    }

    //case something bad happen dismiss all pending work
    public void Dispose()
    {
        if (!wasShutDown)
        {
            queue.CompleteAdding();
            shutdown();
        }
    }
}

然后像这样使用:

WorkerPool<int> workerPool = new WorkerPool<int>(new CancellationTokenSource(), 5);

workerPool.startWorkers(value =>
{
    log.Debug(value);
});
//enqueue all the work
for (int i = 0; i < 100; i++)
{
    workerPool.enqueue(i);
}
//Signal no more work
workerPool.CompleteAdding();

//wait all pending work to finish
workerPool.await();

您只需创建新的WorkPool对象就可以进行多次民意调查。


0
2018-04-16 16:57