问题 如何将其转换为异步任务?


鉴于以下代码......

static void DoSomething(int id) {
    Thread.Sleep(50);
    Console.WriteLine(@"DidSomething({0})", id);
}

我知道我可以将其转换为异步任务,如下所示......

static async Task DoSomethingAsync(int id) {
    await Task.Delay(50);
    Console.WriteLine(@"DidSomethingAsync({0})", id);
}

通过这样做,如果我多次调用(Task.WhenAll),一切都会比使用Parallel.Foreach或甚至在循环内调用更快更高效。

但是有一分钟,让我们假装Task.Delay()不存在,我实际上必须使用Thread.Sleep();我知道实际情况并非如此,但这是概念代码,延迟/睡眠通常是IO操作,其中没有异步选项(例如早期EF)。

我试过以下......

static async Task DoSomethingAsync2(int id) {
    await Task.Run(() => {
        Thread.Sleep(50);
        Console.WriteLine(@"DidSomethingAsync({0})", id);
    });
}

但是,尽管它运行没有错误,但据说 Lucien Wischik 这实际上是不好的做法,因为它只是从池中启动线程来完成每个任务(使用以下控制台应用程序也会更慢 - 如果在DoSomethingAsync和DoSomethingAsync2调用之间切换,您可以看到它在时间上的显着差异需要完成)...

static void Main(string[] args) {
    MainAsync(args).Wait();
}

static async Task MainAsync(String[] args) {

    List<Task> tasks = new List<Task>();
    for (int i = 1; i <= 1000; i++)
        tasks.Add(DoSomethingAsync2(i)); // Can replace with any version
    await Task.WhenAll(tasks);

}

然后我尝试了以下......

static async Task DoSomethingAsync3(int id) {
    await new Task(() => {
        Thread.Sleep(50);
        Console.WriteLine(@"DidSomethingAsync({0})", id);
    });
}

移植它代替原始的DoSomethingAsync,测试永远不会完成,屏幕上不会显示任何内容!

我还尝试了其他多种不能编译或不完整的变体!

因此,考虑到您无法调用任何现有异步方法的约束并且必须在异步任务中完成Thread.Sleep和Console.WriteLine,您如何以与原始代码一样高效的方式执行此操作?

对于那些感兴趣的人来说,这里的目标是让我更好地理解如何创建我自己的异步方法,而不是任何人。尽管进行了许多搜索,但这似乎是缺少示例的一个领域 - 虽然有成千上万的调用异步方法的示例依次调用其他异步方法我找不到任何将现有void方法转换为异步任务的方法除了那些使用Task.Run(()=> {})方法之外,没有调用进一步的异步任务。


6371
2018-02-15 16:37


起源

您的最后一个示例永远不会完成 Start 在你创建的任务上。 - Lee
@lee:如果我等待新的Task(()=> {})。Start()是不能编译的,因为Task.Start()返回void。 - Martin Robins
如果您需要以有效的方式等待而不阻塞线程,您可以使用 TaskCompletionSource 代替。 - Lee
您需要在尝试之前启动任务 await 它。 var task = new Task(...); task.Start(); await task; - Lee
@lee:还尝试了静态异步任务DoSomethingAsync2(int id){Task task = new Task(()=> {Thread.Sleep(50); Console.WriteLine(@“DidSomethingAsync({0})”,id);} ); task.Start();等待任务;但这和DoSomethingAsync2示例一样慢。 - Martin Robins


答案:


有两种任务:执行代码的任务(例如, Task.Run 和朋友),以及那些回应某些外部事件的人(例如, TaskCompletionSource<T> 和朋友)。

你在寻找什么 TaskCompletionSource<T>。常见情况有各种“速记”形式,因此您不必总是使用 TaskCompletionSource<T> 直。例如, Task.FromResult 要么 TaskFactory.FromAsyncFromAsync 如果您有现有的,则最常用 *Begin/*End 实施您的I / O;否则,你可以使用 TaskCompletionSource<T> 直。

有关更多信息,请参阅的“I / O绑定任务”部分 实现基于任务的异步模式

Task 构造函数(不幸的是)是基于任务的并行性的保留,不应该在异步代码中使用。它只能用于创建基于代码的任务,而不能用于创建外部事件任务。

因此,考虑到您无法调用任何现有异步方法的约束并且必须在异步任务中完成Thread.Sleep和Console.WriteLine,您如何以与原始代码一样高效的方式执行此操作?

我会使用某种计时器,让它完成一个 TaskCompletionSource<T>当计时器开火时。我几乎肯定是实际的 Task.Delay 无论如何都要实现。


7
2018-02-15 17:08



谢谢,但您对Timer和TaskCompletionSource的引用可能会忽略这一点; Thread.Sleep只是一个长期运行的任务的例子,我想完成我没有异步方法调用的地方。也许我应该用一个长时间运行的循环替换它以获得更清晰。 - Martin Robins
@MartinRobins:问题是“是什么让你的任务长期运行”?如果它实际上正在运行 码, 那么正确的解决方案是同步执行或使用 Task.Run 如果你想在线程池线程上运行它。如果它是基于I / O的,或由于外部事件而完成,则使用 TaskFactory.FromAsync 要么 TaskCompletionSource<T>。没有其他可能性。 - Stephen Cleary
您能否详细说明TaskCompletionSource选项,因为我可以找到的示例似乎不适合我的场景。 - Martin Robins
@MartinRobins:你能描述一下你的情景吗? TCS用于手动触发任务完成。你设置你想要的任何回调(完成TCS),然后返回 Task 属性。 - Stephen Cleary
@MartinRobins, Task.Delay 和 Task.Run+Thread.Sleep 不具有可比性,因为最后一个涉及线程的创建/调度。每种场景都有自己的最佳实现。 - Paulo Morgado


所以,考虑到你不能调用任何现有的约束   异步方法,必须完成Thread.Sleep和   在异步任务中Console.WriteLine,你是如何在一个   与原始代码一样有效的方式?

IMO,这是一个 非常 你真正需要坚持的综合约束 Thread.Sleep。在这种约束下,你仍然可以略微改善你的 Thread.Sleep基于代码。而不是这个:

static async Task DoSomethingAsync2(int id) {
    await Task.Run(() => {
        Thread.Sleep(50);
        Console.WriteLine(@"DidSomethingAsync({0})", id);
    });
}

你可以这样做:

static Task DoSomethingAsync2(int id) {
    return Task.Run(() => {
        Thread.Sleep(50);
        Console.WriteLine(@"DidSomethingAsync({0})", id);
    });
}

这样,您就可以避免编译器生成的状态机类的开销。这两个代码片段之间存在细微差别 如何传播异常

无论如何,这是  减速的瓶颈在哪里。

(使用以下控制台应用程序也会更慢 - 如果你   在DoSomethingAsync和DoSomethingAsync2调用之间交换,你可以看到一个   完成所需时间的显着差异)

让我们再看一下你的主循环代码:

static async Task MainAsync(String[] args) {

    List<Task> tasks = new List<Task>();
    for (int i = 1; i <= 1000; i++)
        tasks.Add(DoSomethingAsync2(i)); // Can replace with any version
    await Task.WhenAll(tasks);

}

从技术上讲,它要求并行运行1000个任务,每个任务都应该在自己的线程上运行。在理想的宇宙中,你期望执行 Thread.Sleep(50) 平行1000次,在大约50ms完成整个事情。

然而, 这个要求永远不会满足 由TPL的默认任务调度程序,有一个很好的理由:线程是一种宝贵且昂贵的资源。而且,并发操作的实际数量限于CPU /核心数。所以实际上,默认大小为 ThreadPool, 我越来越 21个泳池线程(峰值时) 并行提供此操作。这就是为什么 DoSomethingAsync2 / Thread.Sleep 花了这么长时间 DoSomethingAsync / Task.DelayDoSomethingAsync 不阻塞池线程,它只在超时完成时请求一个。因此,更多 DoSomethingAsync 任务实际上可以并行运行 DoSomethingAsync2 那些。

考试 (控制台应用程序):

// https://stackoverflow.com/q/21800450/1768303

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace Console_21800450
{
    public class Program
    {
        static async Task DoSomethingAsync(int id)
        {
            await Task.Delay(50);
            UpdateMaxThreads();
            Console.WriteLine(@"DidSomethingAsync({0})", id);
        }

        static async Task DoSomethingAsync2(int id)
        {
            await Task.Run(() =>
            {
                Thread.Sleep(50);
                UpdateMaxThreads();
                Console.WriteLine(@"DidSomethingAsync2({0})", id);
            });
        }

        static async Task MainAsync(Func<int, Task> tester)
        {
            List<Task> tasks = new List<Task>();
            for (int i = 1; i <= 1000; i++)
                tasks.Add(tester(i)); // Can replace with any version
            await Task.WhenAll(tasks);
        }

        volatile static int s_maxThreads = 0;

        static void UpdateMaxThreads()
        {
            var threads = Process.GetCurrentProcess().Threads.Count;
            // not using locks for simplicity
            if (s_maxThreads < threads)
                s_maxThreads = threads;
        }

        static void TestAsync(Func<int, Task> tester)
        {
            s_maxThreads = 0;
            var stopwatch = new Stopwatch();
            stopwatch.Start();

            MainAsync(tester).Wait();

            Console.WriteLine(
                "time, ms: " + stopwatch.ElapsedMilliseconds +
                ", threads at peak: " + s_maxThreads);
        }

        static void Main()
        {
            Console.WriteLine("Press enter to test with Task.Delay ...");
            Console.ReadLine();
            TestAsync(DoSomethingAsync);
            Console.ReadLine();

            Console.WriteLine("Press enter to test with Thread.Sleep ...");
            Console.ReadLine();
            TestAsync(DoSomethingAsync2);
            Console.ReadLine();
        }

    }
}

输出:

按Enter键以使用Task.Delay进行测试...
...
时间,ms:1077,峰值线程:13

按Enter键以使用Thread.Sleep进行测试...
...
时间,ms:8684,高峰期线程:21

是否有可能改善时间数据 Thread.SleepDoSomethingAsync2?我能想到的唯一方法就是使用 TaskCreationOptions.LongRunning 同 Task.Factory.StartNew

在任何实际应用程序中执行此操作之前,您应该三思而后行

static async Task DoSomethingAsync2(int id)
{
    await Task.Factory.StartNew(() =>
    {
        Thread.Sleep(50);
        UpdateMaxThreads();
        Console.WriteLine(@"DidSomethingAsync2({0})", id);
    }, TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness);
}

// ...

static void Main()
{
    Console.WriteLine("Press enter to test with Task.Delay ...");
    Console.ReadLine();
    TestAsync(DoSomethingAsync);
    Console.ReadLine();

    Console.WriteLine("Press enter to test with Thread.Sleep ...");
    Console.ReadLine();
    TestAsync(DoSomethingAsync2);
    Console.ReadLine();
}

输出:

按Enter键以使用Thread.Sleep进行测试...
...
时间,ms:3600,峰值线程:163

时间越来越好,但价格却很高。此代码要求任务计划程序为每个新任务创建一个新线程。做  期望这个帖子来自池:

Task.Factory.StartNew(() =>
{
    Thread.Sleep(1000);
    Console.WriteLine("Thread pool: " + 
        Thread.CurrentThread.IsThreadPoolThread); // false!
}, TaskCreationOptions.LongRunning).Wait();

5
2018-02-16 01:32