Persistent Connections and F5 iRules

At Rackspace Cloud Office we rely on the amazing power of F5’s BigIP network devices for most of our application routing. In case you don’t know, BigIP (usually referred to simply as ‘F5’) is an application gateway device that can perform a variety of functions that bridge the gap between your network and the application. In our case, we use F5 to route traffic to a variety of web applications, but really it can be any type of network application. A user’s HTTP request is routed into our data center and terminates at the F5. The resolvable public IP for our URLs is owned by the F5 and also owns the SSL cert for the domain. The F5 will then “reverse proxy” the traffic to an internal server. This may be a single server, but more likely it’s one of multiple servers in a pool (i.e. web farm), and the F5 will round-robin traffic to those servers. The F5 can also execute user-created scripts, called iRules, for each request to allow you to make intelligent decisions based on data in the request, e.g. User-Agent header, or the path in the URL. One of our applications uses an iRule to select a destination based on the path in the URL: one destination is a pool of servers running an ASP.NET application, the other is a pool of servers that simply serve static files. The iRule is executed for every HTTP request and if, for example, the path starts with `/static/`, then the static-content pool is selected for the request.
Here is an example of an iRule that works as described.

when HTTP_REQUEST {
  if { [string tolower [HTTP::uri]] starts_with "/static/" } {
     pool pool-static
  }
}

If no pool is selected by the iRule, then a configured default pool is selected, which for this example would be the application-server pool. Once the pool is selected, the F5 then selects a server from the pool and forwards the HTTP request to the server, and the response from that server is sent back to the requester. Simple enough.

But what about persistent connections (aka Keep-Alive)? Think about that for a second and you can see how persistent connections could throw a few wrenches into how it should work. Just to review: the HTTP 1.1 protocol declares that the underlying TCP connection should stay open until the client or server closes it. This saves a lot of time especially since a client can invoke multiple calls to the server to fetch sub-resources for a page. In this day & age of SSL-everywhere, persistent connections is a godsend. (Side-note: HTTP/2 spec allows for *multiple* concurrent persistent connections!) Thankfully the F5 handles HTTP persistent connections quite well, but in order to stay out of trouble you need to know how it works under the hood. More on that in a minute.

Quick digression: The interesting part about how application gateways work, is they are sometimes acting as a true network device, providing a NAT into or out of a network. This what a consumer wifi router does to provide Internet access to your home network, and quite often this exact same functionality is used to provide internet access to an isolated application network. However, there are other times when an application gateway is simulating the job of a network device. This is the case as outlined above: incoming HTTP requests from the client are terminated at the F5, including any encryption via SSL. Then once a pool & destination server is selected, a totally separate TCP connection is created on a different (internal) network from the F5 to the destination server. Once the second connection is up, a new HTTP request is created that is basically a copy of the incoming request, and sent to the destination server. On the return trip, the response from the server is copied to create an HTTP response back to the client to fulfill the original request. This sounds like a lot of work, and it is! (F5s manage to do a surprising amount of this work at the hardware level.) But the net effect is that it appears as though the F5 is routing the HTTP request at a network layer, however routing is a network layer 3 function, and the F5 is operating on layer 5/6. A more correct way to think about it is that it’s proxying the request, but even that, as we will see, isn’t entirely accurate.

So what happens to our simulated request routing when we have persistent connections? At this point in the discussion it’s not too complicated: as long as the TCP connection from the client stays open, then the F5 will keep the second TCP connection to the internal server open. Any subsequent HTTP request that comes in over the existing external connection will be proxy’d over the existing internal connection. The F5 keeps a map of which external connections map to which internal connections, and internal connections are never re-used (a.k.a. connection pooling) due to obvious security concerns. If the client closes the external connection, then the corresponding internal connection is also closed. I’m almost positive the opposite is true as well: if the internal server closes its connection, the external connection to the client is closed as well. (I haven’t had time to verify this, but I can think of some serious security concerns if this wasn’t the case.)

Now let’s take it a step further: What if we have an iRule that can split incoming requests between two pools of servers? What if, for every request, an iRule has to determine whether we are proxying to an application server pool, or a static-content server pool. What happens to the persistent connection? This is where the F5 has to behave in a way that is transparent to the client, but may have an impact on how the request is routed to an internal pool.

Here’s where I have to put a great big disclaimer up: I’m not an expert at F5 routing, nor have I had time to exhaustively research this. What I’m about to state is based on circumstantial evidence from troubleshooting this issue for the past couple of days. I’ll update accordingly if an expert tells me I’m wrong, which is likely. ūüôā

Let’s take it step-by-step:

  1. A client creates a connection to the F5 and makes an HTTP request.
  2. The F5 runs an iRule which explicitly selects an application pool server, which fulfills the request. The internal connection is left open and is mapped as the “persistent connection” to the external connection.
  3. Using the existing external connection, the client makes another HTTP request for static content.
  4. The F5 runs the same iRule as before, which explicitly selects a static-content pool server, which fulfills the request. This new internal connection to the static-content pool server is separate and distinct from the internal connection created in step 2. Because it is the internal connection used for the most recent request for this external connection, the F5 now maps the internal connection to the static-content server as the “persistent connection” to the external connection. The other internal connection to the application server is no longer mapped as the “persistent connection”.
  5. The client makes a third request, again over the existing external connection.
  6. Again, the F5 runs the same iRule, however this time a pool is not explicitly selected. Typically that’s not a big deal, because the endpoint should always have a default pool. The default pool is used if the iRule didn’t explicitly select one. However this scenario isn’t typical: there’s an existing connection. If there’s an existing established connection that’s mapped, the default pool will not be used. The F5 will use the existing mapped internal connection unless the iRule explicitly selects a different pool.

This is why I said that proxying isn’t exactly an accurate description. Application gateways have to be far more intelligent about how they handle things. It’s not just forwarding bits and it’s not just a store-and-forward algorithm. It has to track state and make some unique choices about how it implements the HTTP standard, but do it in a way that is compatible with the usability and flexibility offered by iRules and other configuration.

I had to figure this out when the web application in our dev environment starting returning random 404s. After some investigation we determined that the requests returning 404 had been routed to the static-content server. This was very odd because the url for these requests didn’t match the criteria in the iRule for that pool. After quite a bit of digging (i.e. wireshark) we found that the requests were going to the static-content server because an existing connection existed to that server. The iRule wasn’t explicitly selecting the application server pool at the end. We didn’t think we needed to since the application server pool was the default pool for this endpoint. However the default pool wasn’t being used because there was already an existing connection. When we changed the iRule to select the default pool at the end of the script, the random 404s stopped occurring. Example:

when HTTP_REQUEST {
  if { [string tolower [HTTP::uri]] starts_with "/static/" } {
     pool pool-static
  }
  else {
     pool pool-application
  }
}

TL;DR; flowchart:
iRuleFlowchart

List of resources about Enigma and The Bombe

Here’s a list of resources on some of the subjects I touched on in my Enigma-Bombe talk.

(Disclaimer: some of the links below are sponsored, which will help fund future youtube videos on The Bombe.)




The Code Book by Simon Singh is the book that started me down the road to wanting to learn more about the Bombe.  It has a great synopsis of both the Babington Plot and the Zimmerman Telegram, and how the use of encryption influenced historical events.  It also gives a very coherent explanation of how Rejewski first cracked the Enigma cipher.

Alan Turing: The Enigma is by far the most comprehensive autobiography of Alan Turing. It goes into great detail on his work on the Bombe, and includes a surprising amount of technical detail.



The amazing Bombe and Enigma simulator, built by Magnus Ekhall and Fredrik Hallenberg.



Home page for the Bombe-rebuild project at Bletchley Park.
Also, their Facebook page.



Build an Enigma machine with a Pringles tube! Hint: If you’re in the US, be careful how you print out the PDF. It’s designed for European A4 paper. If you print it out correctly on 8.5×11, then some of the edges will get cut off.


Solving the Enigma:
History of the Cryptanalytic Bombe by Jennifer Wilcox

This is a great paper on the history of the Bombe with a focus on the American Bombe and how Bletchley Park worked with the US Navy on decrypting 4-rotor Enigma messages.

Cache Comparison: Memcached vs. Sql Server ……… (wait…, what?)

Update 3: Woohoo! Sql Server 2016 has really delivered a lot of improved features for in-memory tables. New post is coming with details….

UPDATE 2: Due to Sql Server 2016 improving many of the features of in-memory tables, I’m postponing some of this work until that version is released. I’ve heard it addresses the row-size limit.

UPDATE: Due to the promising results below, I’ve started an open-source project to do a proper implementation.

So I finally got around to looking at Sql Server’s awesome new feature: in-memory tables. I’ve written a lot of code that moves real-time data through Sql Server to reporting systems, so this is of great interest to me. I really could have used this feature about 10 years ago. After delving into the details I concluded there are definitely some limitations, but lots of exciting possibilities as well. (E.g. We’ll be migrating our ASP session database to persisted in-memory tables soon.) I won’t delve into how in-memory tables work; many others have already done a good job of that. The focus of this post is to explore the idea of using non-persisted in-memory tables as a replacement for memcached. Yes, you read that right. ¬†I’m thinking of replacing memcached with Sql Server.

At Rackspace, we use memcached to store a couple different things that don’t need to be persisted to disk (e.g. throttling data). We also use it as a fast caching layer for our main operational database via NHibernate. Those are two very common use-cases, so let’s define them for reference:
1.¬†Canonical data source for non-critical data that doesn’t need to be persisted.
2. Secondary low-latency data source for critical high-use data that is persisted to a DB.

We started using memcached quite a while ago, before other alternatives became available, and due to some of its limitations we’ve been mulling over a replacement. When I started reviewing Sql Server’s in-memory tables it got me thinking: If it’s all in memory… I wonder if it’s as fast as memcached? And if so, would it be a viable replacement?

Let’s start with that first question, because the results will help better answer the second question with respect to the use cases above. ¬†All the code for this test is available on my Github repo. Note: You have to set up Sql Server with the correct file structure for in-memory tables. ¬†Also, the build of memcached for Windows that I use can be downloaded from this stack overflow discussion.

Notes on this performance test:

  • The test app, sql server, and memcached are all running on my laptop, so there’s no network latency.
  • Before the stopwatch starts, I create a client and make an initial write to establish a connection.
  • All read/writes are done sequentially, not asynchronously.
  • The Sql Server tables are in-memory only.
    (in-memory tables can optionally be persisted, but it is not enabled for this test.)
  • I also included writing to regular ol’ disk-based tables for reference.

All times are in micro-seconds.

~~~~~~~~~~~~~~~~~~~~~~~~~Writes~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~Reads~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# rows memcached Sql in-memory Sql on-disk memcached Sql in-memory Sql on-disk
1000 460 624 1521 448 578 584
5000 660 716 1762 358 616 670
10000 975 1729 4370 933 1590 1772

I would probably get more consistent and real-world metrics with the test app running on one server, and memcached and Sql server on their own, but I think this is an adequate back-of-the-envelope kind of test. Here’s the interesting part: Over multiple tests there was a good amount of overlap between the best & worst times of each platform, however on each individual test, Sql Server was always a little slower than memcached. ¬†This makes sense when you think about it. Memcached has one job: implement a hash in memory. Sql Server is infinitely more complicated, but that’s not a bad thing in this case because Sql Server gives you an incredibly flexible framework to customize your cache.¬†You could implement different caches in a number of ways that involve multiple tables, or various columns to better fit your caching needs. And you get to manipulate that cache using the incredibly flexible T-SQL. ¬†You d have to be careful not to make it too complex however: In some additional testing I did, inserting into two tables took about 1.5 times as long as inserting into just one table. Also, in-memory tables don’t support foreign keys or left outer joins. ¬†My point is this: if you try to do anything custom with memcached, you’ll end up having to do it in the client code, which is bad technological debt that’s hard to migrate/upgrade, etc. ¬†Implementing something custom in natively compiled stored procedures is centrally located and will be easy to modify (or fix!) later on.

There are of course other considerations than just speed. Storage is a big one. Memcached can store a value up to 1MB by default. (More if you change the config but it’s not recommended.) The new in-memory tables can only store a max of 8060 bytes. If you’re storing unicode text as NVARCHAR (as I did) that will halve it, so you may want to consider a binary serializer and possibly some compression.

So what does Sql Server have going for it? Security comes to mind, and the complete lack of it in memcached has particularly been on my mind. I really like the fact that using Sql Server as a cache has built-in security, and I can control that security in a very granular way. Memcached and most other memory stores can’t even come close to Sql Server’s authentication and authorization capabilities. Sql Server is also good at things like clustering and fail-over models, which simply doesn’t exist in the memcached world. Memcached relies on the clients to handle redundancy, which is almost worse than having no redundancy at all since you could potentially have different webheads talking to different memcached servers, delivering stale data or causing stale data to be written back to the persistence layer. Fail-over redundancy should always be implemented at either the server application level or network level. Another thing I like: I already have a Sql Server Always-On cluster in my environment, so if I want to use Sql Server for my caching then I don’t have to worry about standing up another cluster of servers to run a cache. As long as I have the memory to spare, I get caching for free. There’s also a lot of possibilities about how you can segment your data in Sql Server. Right now we have multiple disparate data sets all intermingled in memcached, sharing the same key space. Using Sql Server I could set up different tables for different purposes, which may also have different security requirements. The client and stored procedures could also be modified to execute multiple reads/writes per call, which could significantly reduce time-per-call¬†and possibly (on average) beat memcache response times.

Last thing: I haven’t explored it yet, but there’s a chance you could more tightly integrate your caching layer with your persistent layer by having it all in Sql Server. E.g. you could serve a dashboard from an in-memory cache that’s recompiled every 30 seconds from multiple underlying persistent tables. This is an actual use-case I’ve had to implement many times in the past. One thing though: precompiled stored procedures are vital to realizing the performance of an in-memory table, but they can’t query regular persisted tables. However stored procedures that aren’t precompiled can still access in-memory tables. This means that the pre-compiled stored procedures that can quickly access in-memory tables can’t jump over to the tables on disk if the data isn’t cached. That’s a shame because that would be a great feature for an L2 cache.

Let’s summarize in a table:

Feature memcached Sql Server in-memory tables
Microseconds per write (heavily rounded): ~100-500 ~150-650
Microseconds per read (heavily rounded): ~100-500 ~150-650
Max Value size (uncompressed): 1 MB ~8 kB
Security: None. Lots.
Clients available: Many, for every language. None. Although you’re just calling stored procs, so kinda.
(I’m probably going to write a custom NHibernate L2 cache adapter for it soon.)
Customizable: None, unless you feel like forking the project and working with C++ code. Yes! And it’s as easy as creating tables and writing SQL.
Support for SQL: None. Of course!

So should you consider this as a viable option? I certainly will, although I have a lot more testing to complete. I really like the security and the flexibility. If I want to change something, I just change my table definition and update (or create new) stored procedures. The size of the value is a significant concern for me, so I may do some tests with compression, although that could have a considerable CPU hit on the client, and with such small payloads the compression header + body may end up being more than the original payload. Again, maybe Sql Server 2016 will improve the size limit. I will post updates as I explore this option more.
-Steve

Using Tasks with IObservable

This is a follow up to my post on using Async/Await with IEnumerable and Yield-Return.

Jump to the code on Github.

In my previous post we looked at using tasks with IEnumerable. But what if you are returning tasks from an IObservable instead of an IEnumerable? How would you run them and how do you limit the concurrency?  Better yet, can we take in an IObservable<Task<T>>, run the tasks automatically, and then produce an IObservable<T>?

The short answer is yes, but it’s not trivial due to the concurrency requirement. ¬†Let’s solve it without that requirement first. I’ll be using a fictional Account type as my “T” just like in my previous post.

   var repo = new AccountRepository();
   var tasks = repo.GetAccountWithSubs("123123123"); //this returns IEnumerable
   var tasksSource = tasks.ToObservable();           //this converts IEnumerable to IObservable<Task<Account>>

   var accountSource = tasksSource.Select(ts => ts.Result); //this returns an IObservable<Account>
   var subscription = accountSource.Subscribe(
        acc => {
                  Console.WriteLine(acc.AccountNumber);
               },
        ex =>  { },
        () => { }
     );

Thanks to the LINQ extensions for IObservable it’s pretty easy to “run” the task and return the result as a new IObservable. Examples like this really show the beautiful parity between IEnumerable and IObservable.

Unfortunately, this example runs all the tasks synchronously. What if we want to run multiple tasks in parallel? To accomplish this, we would need an observer receiving calls from the first IObservable (i.e. the parent), wait for the tasks to complete, and then to call the observer for the second IObserverable with the account object. On top of that, we need to be able to propagate any exceptions swallowed by the task to the subscriber. We also have to support the user canceling the IObservable before the collection is done via the dispose method. In order to implement all this functionality, we’ll have to do it “the long way”, which is to say, we’ll have to actually implement our own IObservable for the job. i.e. we’ll be given an IObservable<Task<T>> and we need to create and return an IObservable<T>. To get the ball rolling, let’s re-implement the same functionality in the code above, but with our own made-from-scratch IObservable. (Side note: I tried to do this using the Observable.Create method, but it became too complex for delegates to handle.)

    internal class TObservable<T> : IObservable<T>, IDisposable
    {
        private readonly IObservable<Task<T>> taskObservable;
        private IDisposable taskObservableSubscriber;
        private IObserver<T> observer;

        public TObservable(IObservable<Task<T>> taskObservable)
        {
            this.taskObservable = taskObservable;
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            this.observer = observer;
            this.taskObservableSubscriber =
                             taskObservable.Subscribe(OnNext, OnError, OnCompleted);
            return this;
        }

        private void OnNext(Task<T> task)
        {
            this.observer.OnNext(task.Result);
        }

        private void OnError(Exception exception)
        {
            this.observer.OnError(exception);
        }

        private void OnCompleted()
        {
            this.observer.OnCompleted();
        }

        public void Dispose()
        {
            if (this.taskObservableSubscriber != null)
            {
                this.taskObservableSubscriber.Dispose();
                this.taskObservableSubscriber = null;
            }
        }
    }

Most of what’s happening here is just listening for events from the “parent” IObservable, and the echoing those events back out to the registered observer. The only exception is for the OnNext method, which receives a task, then synchronously runs the task (by referencing the Result property) and sends the value to the observer. ¬†Again, this should mirror the same functionality we achieved using the Select statement in the first code snippet, but now we have a framework we can expand and add additional functionality.

Before we move one I wanted to point out that our class also implements IDisposable. This saves us from having to create a new object that implements IDisposable just to be returned to the subscriber. Another option is to use the new Disposable.Create method in the Subscribe method:

return Disposable.Create(() =>
{
   if (this.taskObservableSubscriber != null)
   {
      this.taskObservableSubscriber.Dispose();
      this.taskObservableSubscriber = null;
   }
});

This seems like a nice terse way to code it, and the code that’s executed is identical, but having our own dispose method to call will turn out to be more convenient as we add additional functionality.

The first change is to support raising exceptions that’s contained in the task. As a reminder, tasks will swallow any exceptions until that task is await’ed. Since we aren’t using async/await in our code, we need to check each task to see if it’s faulted. If there’s an exception, we should call the subscriber’s onError and end processing of the IObservable. Here are the changes to OnNext and OnError to support this:

        private void OnNext(Task<T> task)
        {
            if (task.IsFaulted)
            {
                this.OnError(task.Exception);
            }
            else
            {
                this.observer.OnNext(task.Result);
            }
        }

        private void OnError(Exception exception)
        {
            this.Dispose();
            this.observer.OnError(exception);
        }

We check the task for an error condition and then raise the error to the subscriber. Before raising the error however, we call our own dispose method, which calls dispose on the “parent” IObservable, which prevents OnNext from being called again with additional tasks. This is important because once OnError (or OnCompleted) is called, the IObservable is not allowed to make any further calls to the observer, and as a result of raising an error the observer may take some kind of drastic measure (e.g. kill the process) and we want to give the parent IObservable an opportunity to clean up. (e.g. close files, write logs, etc.)

Next we need to support the scenario of a task being cancelled. This requires some discretion on the part of the coder: How do we raise that as an IObservable event? You may want to change this to suite your needs. I’ll be raising it as an exception, but you may want to call OnCompleted instead.

        private void OnNext(Task<T> task)
        {
            if (task.IsFaulted)
            {
                this.OnError(task.Exception);
            }
            else if (task.IsCanceled)
            {
                // you may want to call OnCompleted instead depending on if the cancellation is expected or not.
                this.OnError(new OperationCanceledException("The task was cancelled."));
            }
            else
            {
                this.observer.OnNext(task.Result);
            }
        }

Now we can move on to the core functionality: running multiple tasks in parallel. This gets a little trickier because we can’t just parrot events like we’ve been doing. We have to track tasks that are in progress, and then call the subscriber’s OnNext with the task’s result as they complete. Most of this code is directly converted from my extension methods that do the same thing with IEnumerable. First let’s add some local variables for tracking tasks and initialize them in the constructor.

        ....
        private readonly int maxConcurrency;
        private Task<T>[] currentTasks;
        private int taskCount = 0;
        private int nextIndex = 0;

        public TObservable(IObservable<Task<T>> taskObservable, int maxConcurrency)
        {
            this.taskObservable = taskObservable;
            this.maxConcurrency = maxConcurrency;
            this.currentTasks = new Task<T>[maxConcurrency];
        }

Next, let’s take the code that calls the subscriber and put it in its own function. We’ll need to call it from multiple places soon.

        private void CallSubscriber(Task<T> task)
        {
            if (task.IsFaulted)
            {
                this.OnError(task.Exception);
            }
            else if (task.IsCanceled)
            {
                // you may want to call OnCompleted instead based on how you're using the cancellation token.
                this.OnError(new OperationCanceledException("The task was cancelled."));
            }
            else
            {
                this.observer.OnNext(task.Result);
            }
        }

Now we can use the OnNext method to track our tasks in an array and wait for them to finish:

        private void OnNext(Task<T> task)
        {
            currentTasks[nextIndex] = task;
            taskCount++;
            if (taskCount == maxConcurrency)
            {
                nextIndex = Task.WaitAny(currentTasks);
                CallSubscriber(currentTasks[nextIndex]);
                currentTasks[nextIndex] = null;
                taskCount--;
            }
            else
            {
                nextIndex++;
            }
        }

The task is added to an array. If the array is full, it waits for the first task to finish (via WaitAny). Once a task is done, it signals the subscriber (via CallSubscriber) and then removes it from the array. The next time OnNext is called, that empty spot in the array will be taken by the new task. This continues until OnError or OnCompleted is called. Let’s run down the happy path and look at OnCompleted first.

        private void OnCompleted()
        {
            while (taskCount > 0)
            {
                currentTasks = currentTasks.Where(t => t != null).ToArray();
                nextIndex = Task.WaitAny(currentTasks);
                CallSubscriber(currentTasks[nextIndex]);
                currentTasks[nextIndex] = null;
                taskCount--;
            }
            this.Dispose();
            this.observer.OnCompleted();
        }

The WaitAny method doesn’t like null values, so we have to rebuild our array to remove them. Then we wait for a task to complete, notify the subscriber, and remove it from the array. Same as above. Once we are done running the tasks, we dispose the parent subscriber and then call OnCompleted on our subscriber to let them know we’ve completed.

The OnError case is straight forward. We just dispose the parent subscriber and then echo the exception to our subscriber like we did before:

        private void OnError(Exception exception)
        {
            this.Dispose();
            this.observer.OnError(exception);
        }

One problem: what if we have tasks still in progress? Do we orphan them? We could possibly add in a cancellation token and then take care of it in the Dispose method:

        public void Dispose()
        {
            if (taskCount > 0)
            {
                //cancel running tasks?
            }
        }

I’ll leave it up to you how to handle that one.

 

Here’s the full code of the gist:

Using Async/Await with IEnumerable and Yield-Return

Have you tried doing a yield return inside an async method? It doesn’t work. In fact, it won’t even compile. This isn’t a bug or missing feature; the two are just fundamentally incompatible. When you use yield return, a lot of code is generated by the compiler.¬† The async/await pattern also generates code, but is even more complicated at run time. Regardless of how much code is generated, you can’t defer execution of an IEnumerable expression. This has caused me more than a little grief over the past few months and I’m hoping to share some thoughts (and some code) that came out of it.

Jump to the code on GitHub.

Background

A lot of what I do is “back end” programming, usually running as a windows service or in response to a queued message. ¬†I’ve recently been working on a process that has to sift through tens-of-thousands of accounts and crunch some numbers. ¬†Sometimes information is fetched by an API call, or by hitting one or more databases. ¬†All these request-and-wait operations are a prime target for using async/await; that’s what it was designed for. ¬†In fact by using async/await I can process multiple accounts in parallel and increase my throughput anywhere from 4 to 10 times. ¬†Hurrah! ¬†But we are getting ahead of ourselves…

Of course there’s a catch: ¬†We have a quite a few large enterprise customers that have thousands of sub-accounts that must be processed as part of the main account, and many of those sub-accounts have thousands of services assigned to them. ¬†When we initially ran a non-optimized version of our code, one of the larger accounts actually generated a stack overflow. That’s the first stack overflow I’ve seen in years that wasn’t caused by an infinite loop. ¬†The overflow was caused by loading every service for every sub-account, along with all the services for the main account. ¬†This obviously wasn’t going to work for our current customers, and we had a mandate to handle even bigger customers in the future. (I’m obliged to point out that these memory issues happened on a small development server. The code may have run fine on our massive prod boxes, but anytime your software runs out of memory, it means it’s time to go back and do some refactoring.)

To solve the memory consumption issue we modified our code to process the services one sub-account at a time. ¬†To do this, we now passed in an IEnumerable<Account>. (Previously it was a List<Service>.)¬† ¬†The repository class that delivers accounts to our system was actually quite complex (part API and part database), and would have been very difficult to implement any kind of¬†pagination. ¬†Thankfully that wasn’t an issue since we were able to refactor it to use a yield-return, which returns sub-accounts as they are fetched. Each sub-account is processed, and the next sub-account is fetched. ¬†This allows the garbage collector to clean up account objects after they have been processed. ¬†This would meet our scaling requirements since only one account object needed to be in memory at any time. ¬†We could now process our large customers. ¬†Great. ¬†However, we were still limited by the number of accounts we could process in parallel. We could do some parallelization, but the most important I/O, fetching an account, didn’t include async/await, and it was uncertain how to utilize it since we were heavily reliant on yield-return. ¬†We had solved our memory issues, but now we had a performance issue because we were always synchronously waiting on I/O.

So, here’s where we are with our code. Notice we are yield-returning each account as we fetch it. ¬†Because of the scope within the foreach, sub-account objects are disposed once they are processed. Nothing too difficult.


public void ProcessAccount(string accountNumber) {
   foreach (var account in GetAccountWithSubAccounts(accountNumber)) {
       // ... process the accounts....
   }
}
public IEnumerable<Account> GetAccountWithSubAccounts(string accountNumber) {
    //this is an async repository, so we have to call .Result.
    var parentAccount = accountRepository.GetAccount(accountNumber).Result;
    yield return parentAccount; //this returns an account object.

    foreach (var childAccountNumber in parentAccount.ChildAccountNumbers) {
        var childAccount = accountRepository.GetAccount(childAccountNumber).Result;
        yield return childAccount;
    }
}

Problem 1: How to combine Async/Await with IEnumerable

The bottom line is you can’t. So we’ll do the next best thing: instead of returning an IEnumerable of Account, we’ll return an IEnumerable of Task<Account>. ¬†Let’s look at the updated code and then we’ll discuss:


public void ProcessAccount(string accountNumber) {
   foreach (var accountTask in GetTasksForAccountWithSubAccounts(accountNumber)) {
       var account = accountTask.Result;
       // ... process the accounts....
   }
}

//notice, no async keyword...
public IEnumerable<Task<Account>> GetTasksForAccountWithSubAccounts(string accountNumber) {
    //get the parent account from the repo....
    var parentAccountTask = accountRepository.GetAccount(accountNumber);
    yield return parentAccountTask;

    var parentAccount = parentAccountTask.Result;
    foreach (var childAccountNumber in parentAccount.ChildAccountNumbers) {
        //notice there is no await. we want to return the task, not the account.
        var childAccountTask = accountRepository.GetAccount(childAccountNumber);
        yield return childAccountTask;
    }
}

The first thing we notice is that the GetTasksForAccountWithSubAccounts doesn’t have async or await keywords. ¬†As we’ve already discussed, it uses yield-return so it can’t defer execution. ¬†Instead, we are getting the task from the repository and using yield-return to send the task back to the caller. ¬†Something else that’s important to note: because we are returning a true IEnumerable (vs a List that’s impersonating IEnumerable), each task doesn’t get created until the foreach statement calls MoveNext() on the enumerator. This means the scope of each task is one iteration of the foreach loop. This strategy will scale well and meet our growth requirements.

One odd thing you may have noticed: we have to “run” the parentAccountTask (via the Result parameter) to get the child account numbers. ¬†Because we’ve already performed a yield-return on that task, it will have already been processed by the caller. ¬†This means referencing the Result property won’t block.

We are headed in the right direction but this doesn’t solve our core problem: we aren’t running anything in parallel.

Problem 2: How to run X number of tasks from an IEnumerable<Task> without reifying the enumerable.

We need to be able to start multiple tasks at the same time and wait for them to finish. ¬†Let’s see if one of the static helper methods in the Task class can help us:

public static Task WhenAll( IEnumerable<Task> tasks )

Hey cool! It takes IEnumerable. ¬†Maybe we can use this method and rely on the task manager to throttle our requests. Let’s take a look at what WhenAll is doing under the hood:

//This code block Copyright (c) Microsoft Corporation.  All rights reserved.
public static Task<TResult[]> WhenAll<TResult>(IEnumerable<Task<TResult>> tasks)
{
    .....
    List<Task<TResult>> taskList = new List<Task<TResult>>();
    foreach (Task<TResult> task in tasks) {
        ....
        taskList.Add(task);
    }
    // Delegate the rest to InternalWhenAll<TResult>().
    return InternalWhenAll<TResult>(taskList.ToArray());
}

What! It reifies the enumerable to a list, then converts it to an array. (An array! What is this, .NET 2.0?) ¬†That does us no good. It will create all the tasks at once and then run them. The WhenAny method performs the same operations. The WaitAll and WaitAny¬†methods don’t even take IEnumerable, only an array.

The Tasks.Parallel class has a lot of static methods that look promising, but you have to wrap each task in an Action, which, when you look under the hood, is wrapped in a new task. Ugh. I already have tasks. Why not just run the tasks I already have?  Also, Tasks.Parallel was not made with Async operations in mind; it was designed for CPU bound parallel processing.

Okay, enough of this. Time to crank out some code.

So the extension methods I wrote are a little dense. ¬†Instead of dissecting those I’m going to build it out like I did initially (in class form) for easier discussion, then we’ll look at how it was refactored into extension methods.


    public class ParallelTaskRunner<T>
    {
        private int taskCount;
        private Task<T>[] currentTasks;
        private int maxConcurrency;

        public ParallelTaskRunner(int maxConcurrency)
        {
            this.maxConcurrency = maxConcurrency;
            currentTasks = new Task<T>[maxConcurrency];
        }

        public T AddTask(Task<T> task)
        {
            var returnValue = default(T);
            AddTaskToArray(task);
            taskCount++;
            if (taskCount == maxConcurrency)
            {
                var taskindex = Task.WaitAny(currentTasks);
                returnValue = currentTasks[taskindex].Result;
                currentTasks[taskindex] = null;
                taskCount--;
            }
            return returnValue;
        }

        public IEnumerable<T> WaitRemainingTasks()
        {
            var runningTasks = currentTasks.Where(t => t != null).ToArray();
            if (taskCount > 0)
            {
                Task.WaitAll(runningTasks);
            }
            return runningTasks.Select(t => t.Result);
        }

        private void AddTaskToArray(Task<T> task)
        {
            for (int i = 0; i < currentTasks.Length; i++)
            {
                if (currentTasks[i] == null)
                {
                    currentTasks[i] = task;
                    break;
                }
            }
        }
    }

And here’s how we use it…

   var accountTasks = GetTasksForAccountWithSubAccounts(someAccountNumber);
   var taskRunner = new ParallelTaskRunner<Account>(5);
   foreach (var task in tasks) {
      var account = taskRunner.AddTask(task);
      if (account != null) {
         // do stuff with account....
      }
   }
   foreach (var account in taskRunner.WaitRemainingTasks()) {
      // do stuff with account....
   }

The constructor just initializes a few things.  The AddTask method is where all the magic happens.  It adds a task to the array, and if the array is full it will do a Task.WaitAny. This will block and only return when any of the tasks is complete. The value for that task is then returned, at which point the caller can process the return value, and then add the next task by calling AddTask again.  This continues until all the tasks have been added.  Then the remaining tasks are processed and returned by calling WaitRemainingTasks.

The extension methods are a little more efficient, especially when adding new tasks to the array.  There are two almost-identical methods: one for Task and another for Task<T>.  I tried to combine them but had issues with the type of the Action parameter.  If anyone has suggestions for combining those let me know.

One of the big differences between my original code above and the extension methods (shown in full below) is the return type (or the delegate type for the extension methods): the code above returns T. The delegate takes either Task or Task<T>. ¬†This is important because, as it’s written, exceptions are swallowed into the task. ¬†You should always check the status of the task to see if it’s faulted.

Let’s see the same example using the generic extension method:

   //this will return IEnumerable<Task<Account>>
   var accountTasks = GetTasksForAccountWithSubAccounts(someAccountNumber);
   accountTasks.RunTasks(5, task =>
      {
         if (task.Status == TaskStatus.Faulted)
         {
            Console.WriteLine(task.Exception);
            // ....deal with exception....
         }
         else
         {
            var account = task.Result;
            //.... do stuff....
         }
      });

Possibilities for changes & improvements:

  • I’ll probably create a variation that uses WhenAny instead of WaitAny. ¬†WhenAny¬†returns a Task<T> instead of the index. Thus, the action parameter would become a Func<Task<T>>. This would allow the entire extension method to be async, which could certainly be desirable.
  • The delegate is currently an Action. ¬†It could be changed to a Func to allow a bool to be returned to determine if the entire process should be cancelled. The other option is to have an overload that includes a cancellation token, which would be consistent with the async/await pattern.

Related Blog Articles:

Also check out Concurrency in c# Cookbook, by Stephen Cleary. I just finished the pre-release edition and it was extremely helpful.

Here’s the full source of the gist:

Unit test verification using StringBuilderStream

UPDATE!
Thanks to reddit user Porges, I now know that you *can* access the underlying buffer of a memory stream after it’s disposed. Use the ToArray or GetBuffer methods.

I recently had to write a unit test for a method that called the following private function:


private async Task WriteStreamToFileAsync(Stream stream, string filePath)
{
  // FileSystem.OpenFile is a wrapper for System.IO.OpenFile...
  using (var fileStream = FileSystem.OpenFile(filePath, FileMode.CreateNew))
  {
    await stream.CopyToAsync(fileStream);
    await stream.FlushAsync();
  }
}

The FileSystem object is a very thin wrapper around the System.IO.* methods of the same name. This allows a Mock to be injected for the unit tests. During the Setup() for the unit test, I new’d up a class level MemoryStream object and had it returned by the Mock call to OpenFile when the FileMode passed in is set to CreateNew. The goal here is to verify what was written to the MemoryStream after each unit test.

     private MemoryStream writableStream;
     private Mock<fileSystem> fileSystem;
     [Setup]
     public void Setup() {
         writableStream = new MemoryStream();
         fileSystem= new Mock<fileSystem>();
         fileSystem.Setup(f => f.OpenFile(It.IsAny<string>(), FileMode.CreateNew))
                .Returns(writableStream);
         //... other setups omitted.
     }

Then I wrote some unit tests that called the method being tested (which subsequently called the WriteStreamToFileAsync function that writes to the stream), followed by the verification. The verification consisted of seeking back to the start of the stream, and then reading the stream to verify its contents.

However when I actually ran the test, it threw an exception stating that the stream was closed. This makes sense after inspecting the method that uses the stream. It’s wrapped in a Using statement: of course the stream would be closed since its being disposed. Quite inconveniently, Microsoft decided to stay true to the philosophy of the Dispose method and does not allow the stream to be reopened. I haven’t looked under the hood, but I’m guessing any memory allocated for storage is released upon disposal. Bummer.

We could modify the WriteStreamToFileAsync function, but changing production code to conform to unit tests is bad for your production code (in this case, a potential memory leak) and creates brittle tests. Another option is to feed the unit test a stream that writes to a file on disk, then the verification could read the file to assert its contents. That will work, but it would really slow down the unit tests and introduce an unnecessary external dependency. What’s really needed is a stream that stores data in memory, but still exposes its underlying data storage mechanism after being disposed.

With those requirements in mind, I wrote StringBuilderStream. As the name implies, it uses a StringBuilder object as the underlying storage mechanism. The data I was trying to verify was text, so using a StringBuilder for storage has the added benefit of giving me a string that I could immediately verify without pulling it out of a stream. Also, a StringBuilder will automatically grow as needed, just like a MemoryStream object. (The full gist for StringBuilderStream is at the bottom of this post.)

Now using the StringBuilderStream, here’s my working unit test:

     private const string testString = "ipsum lorem blah blah blah...";
     private ObjectToBeTested objectToBeTested;
     private StringBuilderStream writableStream;
     private Mock<fileSystem> fileSystem;
     [Setup]
     public void Setup() {
         writableStream = new StringBuilderStream();
         fileSystem= new Mock<fileSystem>();
         fileSystem.Setup(f => f.OpenFile(It.IsAny<string>(), FileMode.CreateNew))
                .Returns(writableStream);
         //... other setups omitted.
         objectToBeTested = new ObjectToBeTested(fileSystem.Object);
     }
     [Test]
     public void WriteToFile() {
        objectToBeTested.WriteToFile(testString);
        // you don't need to seek to the beginning, just use the overloaded ToString method....
        Assert.That(writableStream.ToString(), Is.EqualsTo(testString));
     }

A few notes:

  • You should still dispose of a StringBuilderStream object; I didn’t include the teardown for brevity.
  • The StringBuilderStream does support seeking and reading via the standard Stream methods.
  • If you seek to a spot other than the end and start writing, everything from that point to the end of the StringBuilder is erased. I know that’s not typical Stream behavior but it was a shortcut to save time. I may change that later but I don’t see a lot of use cases for seeking inside a StringBuilderStream.

Here’s the gist for StringBuilderStream: