Introducing the new Task type

Tue, December 30, 2008, 06:46 AM under ParallelComputing
In a previous post I made the point about the need to finely partition our compute bound operations and enumerated the benefits of fine grained parallelism. In another post I showed how it is a mistake to directly use Threads to achieve fine grained parallelism. The problem was that the unit of partitioning in our user mode app was also the unit of scheduling of the OS.

System.Threading.Tasks.Task
We are introducing in mscorlib of .NET 4 the System.Threading.Tasks.Task type that represents a lightweight unit of work. The code from my previous post would look like this with Tasks (and it does not suffer from any of the 3 problems that the original code suffers from):
static void WalkTree(Tree tree) 
{
if (tree == null) return;
Task left = new Task((o) => WalkTree(tree.Left));
left.Start();
Task righ = new Task((o) => WalkTree(tree.Righ));
righ.Start();
left.Wait();
righ.Wait();
ProcessItem(tree.Data);
}
Tasks run on the new improved CLR 4 ThreadPool engine – I will not repeat here in this post the performance and load balancing benefits, but will instead focus on the rich API itself.

Creation and Scheduling
An example of the API is what we saw above where we used the Task with the same pattern that we use threads (create and then later start). You can see another example of the creation API if we modify the original Main method to look like this:
static void Main() 
{
Tree tr = Tree.CreateSomeTree(9, 1);
Stopwatch sw = Stopwatch.StartNew();
Task t =Task.StartNew(delegate { WalkTree(tr); });
t.Wait();
Console.WriteLine("Elapsed= " + sw.ElapsedMilliseconds.ToString());
Console.ReadLine();
}
Notice how we can create Tasks and start them with a single statement (StartNew), which is similar to how we use the ThreadPool.QueueUserWorkItem with the added benefit of having the reference to the work in the form of the variable 't'.

Waiting
Also notice above how we preserve the semantics of the code prior to the change by waiting for the work to complete before the Console.WriteLine statement. We saw this method further above in the method WalkTree. In fact in WalkTree, we can change the two calls (left.Wait and righ.Wait) with the more flexible Task.WaitAll(left, right) and there are other options such as a WaitAny method that would block only until any one of the tasks you pass into it complete.

Continuations
We can further change the body of the Main method as follows:
Tree tr = Tree.CreateSomeTree(9, 1);   
Stopwatch sw = Stopwatch.StartNew();
Task t = Task.StartNew(delegate{ WalkTree(tr);});
t.ContinueWith(tt => Console.WriteLine("Done"), TaskContinuationKind.OnAny);
t.Wait(2500);
Console.WriteLine("Elapsed= " + sw.ElapsedMilliseconds.ToString());
Notice how we are waiting with a timeout this time which means that after 2.5 seconds we will see on the console "Elapsed..." (given that our WalkTree work takes longer than that to complete). However, at that point the CPU usage will remain at 100% as our work is still being executed. When it completes, as the CPU usage drops down again, we will also see in the console "Done". This should verify your expectation of the self explanatory ContinueWith method. It is a very powerful method (more here) that enables patterns such as pipelining. You can have many continuations off the same task and you can configure the circumstances under which to continue via the TaskContinuationKind that I encourage you to explore along with the various overloads.

Cancellation
Cancellation is well integrated in the API. Cancelling a task that is scheduled in some queue and has not executed yet means that it will not be executed at all. For a task that is already running, cooperation is needed which means that the task can check a boolean property (IsCancellationRequested) to see if cancellation was requested and act accordingly. Finally, you can see if a task is actually cancelled via another boolean property (IsCanceled) on the Task type. If we modify the 2 lines of code above as follows:
    t.ContinueWith(tt => Console.WriteLine("done"));
t.Wait(2500);
t.Cancel();
...we will see the "Elapsed" message followed immediately by a drop in CPU utilization and the "Done" message.
Note that for the cancelation above to behave as expected, we are assuming that when we cancel a Task, all tasks created in that scope also get cancelled, i.e. when we cancel 't' all the tasks created in WalkTree also get cancelled. This is not the default, but we can easily configure it as such by changing the ctor call in WalkTree for both left and right to be as follows:
...= new Task((o) => WalkTree(tree.Left), TaskCreationOptions.RespectParentCancellation);

Parent Child Relationships
The above correctly implies that there is a parent child relationship between tasks that are created in the scope of an executing task. It is worth noting that parent tasks implicitly wait for their children to complete which is why the waiting worked as expected further above. If we wanted to opt out of that we can create detached children via the TaskCreationOptions.Detached option. I encourage you to experiment with the other TaskCreationOptions...

Task with Result
Let's go way back and peek at the original serial implementation of WalkTree and let's modify it so it actually returns a result:
static int WalkTree(Tree tree) 
{
if (tree == null) return 0;
int left = WalkTree(tree.Left);
int righ = WalkTree(tree.Righ);
return ProcessItem(tree.Data) + left + righ;
}
...as we ponder the question of "How do we parallelize that?" take look again at the code we have at the top of this post that parallelized the version that did not return results.
We can change it to return 0 when there are no more leaf nodes and change it to return the results of ProcessItem, but we have an issue with how to obtain the results of the WalkTree(righ) and WalkTree(left) and add them to our return results. In other words: we are passing a delegate to the Task ctor that returns a result and we need a way to store it somewhere. The obvious place to store it is the Task itself! However, we want this strongly typed so we use generics and we have type that inherits from Task which is Task<T> (in the CTP bits it is called a Future<T>). This new type has a property for returning the Value and the call will block if the task is still executing or it will return immediately if it has executed and the value is already stored. So the code can be modified as follows:
static int WalkTree(Tree tree) 
{
if (tree == null) return 0;
Task<int> left = new Task<int>((o) => WalkTree(tree.Left), TaskCreationOptions.RespectParentCanellation);
left.Start();
Task<int> righ = new Task<int> ((o) => WalkTree(tree.Righ) , TaskCreationOptions.RespectParentCanellation);
righ.Start();
return ProcessItem(tree.Data) + left.Value + righ.Value;
}
Note that if we did not want to block on Value then we could have queried the IsCompleted property of the Task.

In Summary
Above I have given you a brief glimpse of the rich API that Task exposes (and there is a lot more such as a nice exception handling model that aggregates exceptions thrown in parallel into a single AggregateException). Combined with my other posts referenced above, you should feel comfortable (if not compelled) to use this new Task API in all parallelism scenarios where previously you considered using directly Threads or the ThreadPool. Furthermore, the rich API has hopefully enticed you to use it even if you had not considered the ThreadPool or threads before.