Peregrine's View

Yet another C# / WPF / MVVM blog …

C# General

C# – Async / Await (Part 3)

Experts say you can’t concentrate on more than one task at a time.
Marilyn vos Savant

In the previous couple of posts I’ve shown how tasks can be used to run a block of code in the background without degrading the performance of the application user interface. Using Task.WhenAll() / Task.WhenAny() it is possible to run multiple tasks in parallel, and then wait for them all / the first one to complete.

For example, given a method such as

private static async Task<string> GetWebContent(string url, IProgress<string> progress)
{
    string result;
    progress.Report($"{DateTime.Now:HH:mm:ss.fff} - Downloading {url}");
    var downloadResult = await perIOAsync.ReadAllBytesFromUrlAsync(url, TimeSpan.FromSeconds(5));

    if (downloadResult.IsCompletedOk)
    {
        result = downloadResult.Data.ToUtf8String();
    }
    else if (downloadResult.IsCancelled)
    {
        result = "Cancelled";
    }
    else if (downloadResult.IsTimedOut)
    {
        result = "Timed Out";
    }
    else
    {
        result = downloadResult.ErrorMessage + "\r\n\r\n" + downloadResult.Exception.GetText();
    }

    progress.Report($"{DateTime.Now:HH:mm:ss.fff} - Completed {url}");

    return result;
}

which will download the contents of a web url as a string, multiple web sites can be downloaded at the same time as a grouped operation.

But what if you have a hundred web sites to download, or a thousand, or a million … ? If you try to download them all at once, then at some point you’re going to overload the capabilities of your pc / network and the entire process will grind to a halt. In this post I’m going to introduce a method for executing a collection of tasks, but limiting the number that are processing at any one time.

perTaskThrottle.ForEachAsyncThrottled() takes a collection of input parameters, and an asychronous function to be executed on each one.

/// <summary>
/// Run multiple tasks in parallel - up to concurrentTasks tasks may run at any one time
/// </summary>
/// <typeparam name="TInput"></typeparam>
/// <typeparam name="TResult"></typeparam>
/// <param name="sourceItems"></param>
/// <param name="func"></param>
/// <param name="token"></param>
/// <param name="concurrentTasks"></param>
/// <returns></returns>
public static async Task<IDictionary<TInput, TResult>> ForEachAsyncThrottled<TInput, TResult>(
    this IEnumerable<TInput> sourceItems,
    Func<TInput, Task<TResult>> func,
    CancellationToken token,
    int concurrentTasks = 1)
{
    var result = new ConcurrentDictionary<TInput, TResult>();

    var tasksList = new List<Task>();
    using (var semaphoreSlim = new SemaphoreSlim(concurrentTasks))
    {
        foreach (var item in sourceItems)
        {
            token.ThrowIfCancellationRequested();

            // if there are already concurrentTasks tasks executing, pause until one has completed ( semaphoreSlim.Release() )
            await semaphoreSlim.WaitAsync(perTimeSpanHelper.Forever, token).ConfigureAwait(false);

            token.ThrowIfCancellationRequested();

            Action<Task<TResult>> okContinuation = async task =>
            {
                // task has already completed if status is CompletedOk, but using await once more is better than using task.Result
                var taskResult = await task;
                result[item] = taskResult;
            };

            // ReSharper disable once AccessToDisposedClosure
            Action<Task> allContinuation = task => semaphoreSlim.Release();

            tasksList.Add(func.Invoke(item)
                .ContinueWith(okContinuation, TaskContinuationOptions.OnlyOnRanToCompletion)
                .ContinueWith(allContinuation, token));

            token.ThrowIfCancellationRequested();
        }

        if (!token.IsCancellationRequested)
        {
            await Task.WhenAll(tasksList).ConfigureAwait(false);
        }
    }

    return result;
}

The result type is a dictionary as there is no guarantee that when multiple task as running in parallel, their results will be geneated in exactly the same order as the input list.

The key to throttling the number of tasks executing at once is SemaphoreSlim. This is a lightweight version of the regular .Net semaphore class, that operates in an asynchronous manner. The constructor specifies the number of tasks that can be running at any one time. For each item in the input source list, the module calls semaphoreSlim.WaitAsync(). If there is a free slot, then the next task will immediately be triggered. If not, then the code will pause, waiting for one of the previous tasks to complete – which will call semaphoreSlim.Release().

Note how two continuations are used for each task executed. One which adds to the result dictionary is only called when the task completes cleanly. The other, which is called after each task terminates, whether it completes ok or not, will trigger the next task to begin.

The exact number of tasks that can be executed in parallel will depend greatly on the specific scenario – you should experiment with different values of the concurrentTasks parameter to see which gives the best performance in your case.

The demo application for this post shows this task throtting in action. For proof of concept, I’ve limited the concurrent tasks to 3 – a modern PC / network would certainly cope with entire URL list all at once without blinking an eye

As usual the code samples for this blog are available on Github, along with my own personal C# / WPF library.

If you find this article useful, or you have any other feedback, please leave a comment below.

Leave a Reply

Your email address will not be published. Required fields are marked *