Async/await and the Task Parallel Library don't mix

I found out the hard way a few days ago that async/await and the Task Parallel Library don't mix very well. I was working on a project at work and was running code that we hadn't properly run since we upgraded our project to .NET Framework 4.5. We had gone through that same code and converted it from being TPL Task based to the sweet async/await syntax for proper asynchronous behavior, or so we thought.

What I was doing was fetching a bulk of about 500 items, then looping through them doing a web request for each of them (a POST request to be exact). Early on, while our project was .NET 4.0, we optimized the loop to execute in parallel so we weren't limited to doing one web request at a time. After converting the code to async/await we had only run it for small batches (about 50 or so) and not hit any issues. Once we had a larger set of data to work with we saw that our application in a sense was making a DDoS attack against our webserver.

After a small investigation we found the culprit. The TPL does not seem to know how to work with async methods in .NET 4.5, which results in it not properly waiting for the results before moving along to the next task, which resulted in our code making some 500 web requests at a time, in a pretty tight loop. Uh oh!

Here is an example. Let's for simplicity's sake say we have a method like this:

public static async Task DoStuff(int i)  
{
    int item = i;
    await Task.Run(() =>
        {
            Console.WriteLine(item + ":" + Thread.CurrentThread.ManagedThreadId);
            Thread.Sleep(500);
        });
}

We were calling it in a method similar to this:

const int ChunkSize = 500;  
var query = Enumerable.Range(0, 100000).ToArray();  
var numChunks = (int)Math.Ceiling((double)query.Length / ChunkSize);  
Console.WriteLine("Splitting the work load into " + numChunks + " chunks");

for (int i = 0; i < numChunks; i++)  
{
    Console.WriteLine("Fetching " + (i * ChunkSize).ToString() + " to " + ((i + 1) * ChunkSize).ToString());
    query.Skip(i * ChunkSize).Take(ChunkSize).GroupBy(x => x % 40).AsParallel().WithDegreeOfParallelism(40).ForAll(async group =>
    {
        foreach (var item in group)
        {
            await DoStuff(item);
        }
    });
}

Looks good in theory, but does not work properly in reality. The TPL does not seem to understand async lambdas as parameters into the ForAll. The same problem applies to Parallel.For. The code compiles but this will run in a pretty tight loop and seems to start execution of all 500 tasks (the ChunkSize) at the same time.

There is a workaround however, and that entails using the TPL dataflow library (which is a package that can be fetched on NuGet). Our method changed to something similar to this which worked like it was supposed to do:

const int ChunkSize = 500;  
var query = Enumerable.Range(0, 100000).ToArray();  
var numChunks = (int)Math.Ceiling((double)query.Length / ChunkSize);  
Console.WriteLine("Splitting the work load into " + numChunks + " chunks");

for (int i = 0; i < query.Length; i += ChunkSize)  
{
    var doStuffBlock = new ActionBlock<IGrouping<int, int>>(
    async group =>
    {
        foreach (var item in group)
        {
            await DoStuff(item);
            Interlocked.Increment(ref progressCount);
        }
    },
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 40
    });

    Console.WriteLine("Fetching " + (i).ToString() + " to " + (i + ChunkSize).ToString());
    var failedGroupedByUser = query.Skip(i).Take(ChunkSize).GroupBy(x => x % 40);
    foreach (var item in failedGroupedByUser)
    {
        doStuffBlock.Post(item);
    }

    doStuffBlock.Complete();
    doStuffBlock.Completion.Wait();
}

So, if you want to process asynchronous methods in parallel, take a look at using the TPL DataFlow library.

  1. The Task Parallel Library on MSDN
  2. TPL DataFlow Library on NuGet

Stefán Jökull Sigurðarson

Read more posts by this author.

Iceland