Home > Articles

  • Print
  • + Share This
This chapter is from the book

Item 32: Compose Asynchronous Work Using Task Objects

Tasks are the abstraction for the work you’ve offloaded to another resource. The Task type and related classes and structs have rich APIs to manipulate tasks and the work that has been offloaded. Tasks are also objects that can be manipulated using their methods and properties. Tasks can be aggregated to compose larger-grained tasks. They can be ordered, or they can run in parallel. You use await expressions to enforce an ordering: Code that follows the await expression will not execute until the awaited task has completed. You can specify that tasks may start only in response to the completion of another task. Overall, the rich set of APIs used by tasks enables you to write elegant algorithms that work with these objects and the work they represent. The more you learn how to use tasks as objects, the more elegant your asynchronous code will be.

Let’s begin with an asynchronous method that starts a number of tasks and awaits for each to finish. A very naïve implementation would be this:

public static async Task<IEnumerable<StockResult>>
ReadStockTicker(IEnumerable<string> symbols)
{
var results = new List<StockResult>();
foreach (var symbol in symbols)
{
var result = await ReadSymbol(symbol);
results.Add(result);
}
return results;
}

These several tasks are independent, and there’s no reason for you to wait for each task to finish before starting the next. One change you could make is to start all tasks, and then wait for all of them to finish before executing the continuations:

public static async Task<IEnumerable<StockResult>>
ReadStockTicker(IEnumerable<string> symbols)
{
var resultTasks = new List<Task<StockResult>>();
foreach (var symbol in symbols)
{
resultTasks.Add(ReadSymbol(symbol));
}
var results = await Task.WhenAll(resultTasks);
return results.OrderBy(s => s.Price);
}

This would be the correct implementation if the continuation requires the results of all the tasks to continue effectively. Using WhenAll, you create a new task that completes when all the tasks it is watching have completed. The result from Task.WhenAll is an array of all the completed (or faulted) tasks.

At other times, you might start several different tasks, all of which generate the same result. Your goal in such a case is to try different sources, and continue working with the first task that finishes. The Task.WhenAny() method creates a new task that is complete as soon as any one of the tasks it is awaiting is complete.

Suppose you want to read a single stock symbol from multiple online sources, and return the first result that completes. You could use WhenAny to determine which of the started tasks completed first:

public static async Task<StockResult>
ReadStockTicker(string symbol, IEnumerable<string> sources)
{
var resultTasks = new List<Task<StockResult>>();
foreach (var source in sources)
{
resultTasks.Add(ReadSymbol(symbol, source));
}
return await Task.WhenAny(resultTasks);
}

Sometimes you may want to execute the continuation as each task completes. A naïve implementation might look like this:

public static async Task<IEnumerable<StockResult>>
ReadStockTicker(IEnumerable<string> symbols)
{
var resultTasks = new List<Task<StockResult>>();
var results = new List<StockResult>();
foreach (var symbol in symbols)
{
resultTasks.Add(ReadSymbol(symbol));
}
foreach(var task in resultTasks)
{
var result = await task;
results.Add(result);
}
return results;
}

There’s no guarantee that the tasks will finish in the order you’ve started them. This could be a very inefficient algorithm: Any number of completed tasks may be stuck in the queue, awaiting processing behind a task that is simply taking longer.

You might try to improve this using Task.WhenAny(). The implementation would look like this:

public static async Task<IEnumerable<StockResult>>
ReadStockTicker(IEnumerable<string> symbols)
{
var resultTasks = new List<Task<StockResult>>();
var results = new List<StockResult>();
foreach (var symbol in symbols)
{
resultTasks.Add(ReadSymbol(symbol));
}
while (resultTasks.Any())
{
// Each time through the loop, this creates a
// new task. That can be expensive.
Task<StockResult> finishedTask = await
Task.WhenAny(resultTasks);
var result = await finishedTask;
resultTasks.Remove(finishedTask);
results.Add(result);
}
var first = await Task.WhenAny(resultTasks);
return await first;
}

As the comment indicates, this strategy is not a good way to create the desired behavior. You create new tasks each time you call Task.WhenAny(). As the number of tasks you want to manage grows, this algorithm performs more and more allocations, and becomes increasingly more inefficient.

As an alternative, you can use the TaskCompletionSource class. TaskCompletionSource enables you to return a Task object that you manipulate to produce the result at a later point in time. Effectively, you can produce the result for any method asynchronously. The most common use of this strategy is to provide a conduit between a source Task (or Tasks) and a destination Task (or Tasks). You write the code that should execute when the source task completes. Your code then awaits the source task, and updates the destination task by using the TaskCompletionSource.

In the next example, let’s assume you have an array of source tasks. You’ll create an array of destination TaskCompletionSource objects. As each task finishes, you’ll update one of the destination tasks using its TaskCompletionSource. Here’s the code:

public static Task<T>[] OrderByCompletion<T>(
this IEnumerable<Task<T>> tasks)
{
// Copy to List because it gets enumerated multiple times.
var sourceTasks = tasks.ToList();

// Allocate the sources; allocate the output tasks.
// Each output task is the corresponding task from
// each completion source.
var completionSources =
new TaskCompletionSource<T>[sourceTasks.Count];
var outputTasks = new Task<T>[completionSources.Length];
for (int i = 0; i < completionSources.Length; i++)
{
completionSources[i] = new TaskCompletionSource<T>();
outputTasks[i] = completionSources[i].Task;
}

// Magic, part 1:
// Each task has a continuation that puts its
// result in the next open location in the completion
// sources array.
int nextTaskIndex = -1;
Action<Task<T>> continuation = completed =>
{
var bucket = completionSources
[Interlocked.Increment(ref nextTaskIndex)];
if (completed.IsCompleted)
bucket.TrySetResult(completed.Result);
else if (completed.IsFaulted)
bucket.TrySetException(completed.Exception);
};

// Magic, part 2:
// For each input task, configure the
// continuation to set the output task.
// As each task completes, it uses the next location.
foreach (var inputTask in sourceTasks)
{
inputTask.ContinueWith(continuation,
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
}

return outputTasks;
}

There’s quite a bit going on here, so let’s look at it section by section. First, the method allocates an array of TaskCompletionSource objects. Then, it defines the continuation code that will run as each source task completes. This continuation code sets the next slot in the destination TaskCompletionSource objects to complete. It uses the InterlockedIncrement() method to update the next open slot in a thread-safe manner. Finally, it sets the continuation for each Task object to execute this code. Ultimately, the method returns the sequence of tasks from the array of TaskCompletionSource objects.

The caller can now enumerate the list of tasks, which will be ordered by their completion time. Let’s walk through one typical run that starts 10 tasks. Suppose the tasks finish in this order: 3, 7, 2, 0, 4, 9, 1, 6, 5, 8. When task 3 finishes, its continuation will run, placing its Task result in slot 0 of the destination array. Next, task 7 finishes, placing its result in slot 1. Task 2 places its result in slot 2. This process continues until task 8 finishes, placing its result in slot 9. See Figure 3.1.

03fig01.jpg

Figure 3.1 Ordering tasks based on their completion

Let’s extend the code so that it handles tasks that end up in the faulted state. The only change necessary is in the continuation:

// Magic, part 1:
// Each task has a continuation that puts its
// result in the next open location in the completion
// sources array.
int nextTaskIndex = -1;
Action<Task<T>> continuation = completed =>
{
var bucket = completionSources
[Interlocked.Increment(ref nextTaskIndex)];
if (completed.IsCompleted)
bucket.TrySetResult(completed.Result);
else if (completed.IsFaulted)
bucket.TrySetException(completed.Exception);
};

A number of methods and APIs are available that support programming with tasks and enabling actions when tasks complete or fault. Using these methods makes it easier to construct elegant algorithms that process the results of asynchronous code when they are ready. These extensions, which are found in the Task library, specify actions that take place when tasks finish. The easily readable code manipulates tasks as they finish in a very inefficient fashion.

  • + Share This
  • 🔖 Save To Your Account