Using Async/Await with IEnumerable and Yield-Return

Have you tried doing a yield return inside an async method? It doesn’t work. In fact, it won’t even compile. This isn’t a bug or missing feature; the two are just fundamentally incompatible. When you use yield return, a lot of code is generated by the compiler.  The async/await pattern also generates code, but is even more complicated at run time. Regardless of how much code is generated, you can’t defer execution of an IEnumerable expression. This has caused me more than a little grief over the past few months and I’m hoping to share some thoughts (and some code) that came out of it.

Jump to the code on GitHub.

Background

A lot of what I do is “back end” programming, usually running as a windows service or in response to a queued message.  I’ve recently been working on a process that has to sift through tens-of-thousands of accounts and crunch some numbers.  Sometimes information is fetched by an API call, or by hitting one or more databases.  All these request-and-wait operations are a prime target for using async/await; that’s what it was designed for.  In fact by using async/await I can process multiple accounts in parallel and increase my throughput anywhere from 4 to 10 times.  Hurrah!  But we are getting ahead of ourselves…

Of course there’s a catch:  We have a quite a few large enterprise customers that have thousands of sub-accounts that must be processed as part of the main account, and many of those sub-accounts have thousands of services assigned to them.  When we initially ran a non-optimized version of our code, one of the larger accounts actually generated a stack overflow. That’s the first stack overflow I’ve seen in years that wasn’t caused by an infinite loop.  The overflow was caused by loading every service for every sub-account, along with all the services for the main account.  This obviously wasn’t going to work for our current customers, and we had a mandate to handle even bigger customers in the future. (I’m obliged to point out that these memory issues happened on a small development server. The code may have run fine on our massive prod boxes, but anytime your software runs out of memory, it means it’s time to go back and do some refactoring.)

To solve the memory consumption issue we modified our code to process the services one sub-account at a time.  To do this, we now passed in an IEnumerable<Account>. (Previously it was a List<Service>.)   The repository class that delivers accounts to our system was actually quite complex (part API and part database), and would have been very difficult to implement any kind of pagination.  Thankfully that wasn’t an issue since we were able to refactor it to use a yield-return, which returns sub-accounts as they are fetched. Each sub-account is processed, and the next sub-account is fetched.  This allows the garbage collector to clean up account objects after they have been processed.  This would meet our scaling requirements since only one account object needed to be in memory at any time.  We could now process our large customers.  Great.  However, we were still limited by the number of accounts we could process in parallel. We could do some parallelization, but the most important I/O, fetching an account, didn’t include async/await, and it was uncertain how to utilize it since we were heavily reliant on yield-return.  We had solved our memory issues, but now we had a performance issue because we were always synchronously waiting on I/O.

So, here’s where we are with our code. Notice we are yield-returning each account as we fetch it.  Because of the scope within the foreach, sub-account objects are disposed once they are processed. Nothing too difficult.


public void ProcessAccount(string accountNumber) {
   foreach (var account in GetAccountWithSubAccounts(accountNumber)) {
       // ... process the accounts....
   }
}
public IEnumerable<Account> GetAccountWithSubAccounts(string accountNumber) {
    //this is an async repository, so we have to call .Result.
    var parentAccount = accountRepository.GetAccount(accountNumber).Result;
    yield return parentAccount; //this returns an account object.

    foreach (var childAccountNumber in parentAccount.ChildAccountNumbers) {
        var childAccount = accountRepository.GetAccount(childAccountNumber).Result;
        yield return childAccount;
    }
}

Problem 1: How to combine Async/Await with IEnumerable

The bottom line is you can’t. So we’ll do the next best thing: instead of returning an IEnumerable of Account, we’ll return an IEnumerable of Task<Account>.  Let’s look at the updated code and then we’ll discuss:


public void ProcessAccount(string accountNumber) {
   foreach (var accountTask in GetTasksForAccountWithSubAccounts(accountNumber)) {
       var account = accountTask.Result;
       // ... process the accounts....
   }
}

//notice, no async keyword...
public IEnumerable<Task<Account>> GetTasksForAccountWithSubAccounts(string accountNumber) {
    //get the parent account from the repo....
    var parentAccountTask = accountRepository.GetAccount(accountNumber);
    yield return parentAccountTask;

    var parentAccount = parentAccountTask.Result;
    foreach (var childAccountNumber in parentAccount.ChildAccountNumbers) {
        //notice there is no await. we want to return the task, not the account.
        var childAccountTask = accountRepository.GetAccount(childAccountNumber);
        yield return childAccountTask;
    }
}

The first thing we notice is that the GetTasksForAccountWithSubAccounts doesn’t have async or await keywords.  As we’ve already discussed, it uses yield-return so it can’t defer execution.  Instead, we are getting the task from the repository and using yield-return to send the task back to the caller.  Something else that’s important to note: because we are returning a true IEnumerable (vs a List that’s impersonating IEnumerable), each task doesn’t get created until the foreach statement calls MoveNext() on the enumerator. This means the scope of each task is one iteration of the foreach loop. This strategy will scale well and meet our growth requirements.

One odd thing you may have noticed: we have to “run” the parentAccountTask (via the Result parameter) to get the child account numbers.  Because we’ve already performed a yield-return on that task, it will have already been processed by the caller.  This means referencing the Result property won’t block.

We are headed in the right direction but this doesn’t solve our core problem: we aren’t running anything in parallel.

Problem 2: How to run X number of tasks from an IEnumerable<Task> without reifying the enumerable.

We need to be able to start multiple tasks at the same time and wait for them to finish.  Let’s see if one of the static helper methods in the Task class can help us:

public static Task WhenAll( IEnumerable<Task> tasks )

Hey cool! It takes IEnumerable.  Maybe we can use this method and rely on the task manager to throttle our requests. Let’s take a look at what WhenAll is doing under the hood:

//This code block Copyright (c) Microsoft Corporation.  All rights reserved.
public static Task<TResult[]> WhenAll<TResult>(IEnumerable<Task<TResult>> tasks)
{
    .....
    List<Task<TResult>> taskList = new List<Task<TResult>>();
    foreach (Task<TResult> task in tasks) {
        ....
        taskList.Add(task);
    }
    // Delegate the rest to InternalWhenAll<TResult>().
    return InternalWhenAll<TResult>(taskList.ToArray());
}

What! It reifies the enumerable to a list, then converts it to an array. (An array! What is this, .NET 2.0?)  That does us no good. It will create all the tasks at once and then run them. The WhenAny method performs the same operations. The WaitAll and WaitAny methods don’t even take IEnumerable, only an array.

The Tasks.Parallel class has a lot of static methods that look promising, but you have to wrap each task in an Action, which, when you look under the hood, is wrapped in a new task. Ugh. I already have tasks. Why not just run the tasks I already have?  Also, Tasks.Parallel was not made with Async operations in mind; it was designed for CPU bound parallel processing.

Okay, enough of this. Time to crank out some code.

So the extension methods I wrote are a little dense.  Instead of dissecting those I’m going to build it out like I did initially (in class form) for easier discussion, then we’ll look at how it was refactored into extension methods.


    public class ParallelTaskRunner<T>
    {
        private int taskCount;
        private Task<T>[] currentTasks;
        private int maxConcurrency;

        public ParallelTaskRunner(int maxConcurrency)
        {
            this.maxConcurrency = maxConcurrency;
            currentTasks = new Task<T>[maxConcurrency];
        }

        public T AddTask(Task<T> task)
        {
            var returnValue = default(T);
            AddTaskToArray(task);
            taskCount++;
            if (taskCount == maxConcurrency)
            {
                var taskindex = Task.WaitAny(currentTasks);
                returnValue = currentTasks[taskindex].Result;
                currentTasks[taskindex] = null;
                taskCount--;
            }
            return returnValue;
        }

        public IEnumerable<T> WaitRemainingTasks()
        {
            var runningTasks = currentTasks.Where(t => t != null).ToArray();
            if (taskCount > 0)
            {
                Task.WaitAll(runningTasks);
            }
            return runningTasks.Select(t => t.Result);
        }

        private void AddTaskToArray(Task<T> task)
        {
            for (int i = 0; i < currentTasks.Length; i++)
            {
                if (currentTasks[i] == null)
                {
                    currentTasks[i] = task;
                    break;
                }
            }
        }
    }

And here’s how we use it…

   var accountTasks = GetTasksForAccountWithSubAccounts(someAccountNumber);
   var taskRunner = new ParallelTaskRunner<Account>(5);
   foreach (var task in tasks) {
      var account = taskRunner.AddTask(task);
      if (account != null) {
         // do stuff with account....
      }
   }
   foreach (var account in taskRunner.WaitRemainingTasks()) {
      // do stuff with account....
   }

The constructor just initializes a few things.  The AddTask method is where all the magic happens.  It adds a task to the array, and if the array is full it will do a Task.WaitAny. This will block and only return when any of the tasks is complete. The value for that task is then returned, at which point the caller can process the return value, and then add the next task by calling AddTask again.  This continues until all the tasks have been added.  Then the remaining tasks are processed and returned by calling WaitRemainingTasks.

The extension methods are a little more efficient, especially when adding new tasks to the array.  There are two almost-identical methods: one for Task and another for Task<T>.  I tried to combine them but had issues with the type of the Action parameter.  If anyone has suggestions for combining those let me know.

One of the big differences between my original code above and the extension methods (shown in full below) is the return type (or the delegate type for the extension methods): the code above returns T. The delegate takes either Task or Task<T>.  This is important because, as it’s written, exceptions are swallowed into the task.  You should always check the status of the task to see if it’s faulted.

Let’s see the same example using the generic extension method:

   //this will return IEnumerable<Task<Account>>
   var accountTasks = GetTasksForAccountWithSubAccounts(someAccountNumber);
   accountTasks.RunTasks(5, task =>
      {
         if (task.Status == TaskStatus.Faulted)
         {
            Console.WriteLine(task.Exception);
            // ....deal with exception....
         }
         else
         {
            var account = task.Result;
            //.... do stuff....
         }
      });

Possibilities for changes & improvements:

  • I’ll probably create a variation that uses WhenAny instead of WaitAny.  WhenAny returns a Task<T> instead of the index. Thus, the action parameter would become a Func<Task<T>>. This would allow the entire extension method to be async, which could certainly be desirable.
  • The delegate is currently an Action.  It could be changed to a Func to allow a bool to be returned to determine if the entire process should be cancelled. The other option is to have an overload that includes a cancellation token, which would be consistent with the async/await pattern.

Related Blog Articles:

Also check out Concurrency in c# Cookbook, by Stephen Cleary. I just finished the pre-release edition and it was extremely helpful.

Here’s the full source of the gist: