Tag Archives: tasks

Using Tasks with IObservable

This is a follow up to my post on using Async/Await with IEnumerable and Yield-Return.

Jump to the code on Github.

In my previous post we looked at using tasks with IEnumerable. But what if you are returning tasks from an IObservable instead of an IEnumerable? How would you run them and how do you limit the concurrency?  Better yet, can we take in an IObservable<Task<T>>, run the tasks automatically, and then produce an IObservable<T>?

The short answer is yes, but it’s not trivial due to the concurrency requirement.  Let’s solve it without that requirement first. I’ll be using a fictional Account type as my “T” just like in my previous post.

   var repo = new AccountRepository();
   var tasks = repo.GetAccountWithSubs("123123123"); //this returns IEnumerable
   var tasksSource = tasks.ToObservable();           //this converts IEnumerable to IObservable<Task<Account>>

   var accountSource = tasksSource.Select(ts => ts.Result); //this returns an IObservable<Account>
   var subscription = accountSource.Subscribe(
        acc => {
                  Console.WriteLine(acc.AccountNumber);
               },
        ex =>  { },
        () => { }
     );

Thanks to the LINQ extensions for IObservable it’s pretty easy to “run” the task and return the result as a new IObservable. Examples like this really show the beautiful parity between IEnumerable and IObservable.

Unfortunately, this example runs all the tasks synchronously. What if we want to run multiple tasks in parallel? To accomplish this, we would need an observer receiving calls from the first IObservable (i.e. the parent), wait for the tasks to complete, and then to call the observer for the second IObserverable with the account object. On top of that, we need to be able to propagate any exceptions swallowed by the task to the subscriber. We also have to support the user canceling the IObservable before the collection is done via the dispose method. In order to implement all this functionality, we’ll have to do it “the long way”, which is to say, we’ll have to actually implement our own IObservable for the job. i.e. we’ll be given an IObservable<Task<T>> and we need to create and return an IObservable<T>. To get the ball rolling, let’s re-implement the same functionality in the code above, but with our own made-from-scratch IObservable. (Side note: I tried to do this using the Observable.Create method, but it became too complex for delegates to handle.)

    internal class TObservable<T> : IObservable<T>, IDisposable
    {
        private readonly IObservable<Task<T>> taskObservable;
        private IDisposable taskObservableSubscriber;
        private IObserver<T> observer;

        public TObservable(IObservable<Task<T>> taskObservable)
        {
            this.taskObservable = taskObservable;
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            this.observer = observer;
            this.taskObservableSubscriber =
                             taskObservable.Subscribe(OnNext, OnError, OnCompleted);
            return this;
        }

        private void OnNext(Task<T> task)
        {
            this.observer.OnNext(task.Result);
        }

        private void OnError(Exception exception)
        {
            this.observer.OnError(exception);
        }

        private void OnCompleted()
        {
            this.observer.OnCompleted();
        }

        public void Dispose()
        {
            if (this.taskObservableSubscriber != null)
            {
                this.taskObservableSubscriber.Dispose();
                this.taskObservableSubscriber = null;
            }
        }
    }

Most of what’s happening here is just listening for events from the “parent” IObservable, and the echoing those events back out to the registered observer. The only exception is for the OnNext method, which receives a task, then synchronously runs the task (by referencing the Result property) and sends the value to the observer.  Again, this should mirror the same functionality we achieved using the Select statement in the first code snippet, but now we have a framework we can expand and add additional functionality.

Before we move one I wanted to point out that our class also implements IDisposable. This saves us from having to create a new object that implements IDisposable just to be returned to the subscriber. Another option is to use the new Disposable.Create method in the Subscribe method:

return Disposable.Create(() =>
{
   if (this.taskObservableSubscriber != null)
   {
      this.taskObservableSubscriber.Dispose();
      this.taskObservableSubscriber = null;
   }
});

This seems like a nice terse way to code it, and the code that’s executed is identical, but having our own dispose method to call will turn out to be more convenient as we add additional functionality.

The first change is to support raising exceptions that’s contained in the task. As a reminder, tasks will swallow any exceptions until that task is await’ed. Since we aren’t using async/await in our code, we need to check each task to see if it’s faulted. If there’s an exception, we should call the subscriber’s onError and end processing of the IObservable. Here are the changes to OnNext and OnError to support this:

        private void OnNext(Task<T> task)
        {
            if (task.IsFaulted)
            {
                this.OnError(task.Exception);
            }
            else
            {
                this.observer.OnNext(task.Result);
            }
        }

        private void OnError(Exception exception)
        {
            this.Dispose();
            this.observer.OnError(exception);
        }

We check the task for an error condition and then raise the error to the subscriber. Before raising the error however, we call our own dispose method, which calls dispose on the “parent” IObservable, which prevents OnNext from being called again with additional tasks. This is important because once OnError (or OnCompleted) is called, the IObservable is not allowed to make any further calls to the observer, and as a result of raising an error the observer may take some kind of drastic measure (e.g. kill the process) and we want to give the parent IObservable an opportunity to clean up. (e.g. close files, write logs, etc.)

Next we need to support the scenario of a task being cancelled. This requires some discretion on the part of the coder: How do we raise that as an IObservable event? You may want to change this to suite your needs. I’ll be raising it as an exception, but you may want to call OnCompleted instead.

        private void OnNext(Task<T> task)
        {
            if (task.IsFaulted)
            {
                this.OnError(task.Exception);
            }
            else if (task.IsCanceled)
            {
                // you may want to call OnCompleted instead depending on if the cancellation is expected or not.
                this.OnError(new OperationCanceledException("The task was cancelled."));
            }
            else
            {
                this.observer.OnNext(task.Result);
            }
        }

Now we can move on to the core functionality: running multiple tasks in parallel. This gets a little trickier because we can’t just parrot events like we’ve been doing. We have to track tasks that are in progress, and then call the subscriber’s OnNext with the task’s result as they complete. Most of this code is directly converted from my extension methods that do the same thing with IEnumerable. First let’s add some local variables for tracking tasks and initialize them in the constructor.

        ....
        private readonly int maxConcurrency;
        private Task<T>[] currentTasks;
        private int taskCount = 0;
        private int nextIndex = 0;

        public TObservable(IObservable<Task<T>> taskObservable, int maxConcurrency)
        {
            this.taskObservable = taskObservable;
            this.maxConcurrency = maxConcurrency;
            this.currentTasks = new Task<T>[maxConcurrency];
        }

Next, let’s take the code that calls the subscriber and put it in its own function. We’ll need to call it from multiple places soon.

        private void CallSubscriber(Task<T> task)
        {
            if (task.IsFaulted)
            {
                this.OnError(task.Exception);
            }
            else if (task.IsCanceled)
            {
                // you may want to call OnCompleted instead based on how you're using the cancellation token.
                this.OnError(new OperationCanceledException("The task was cancelled."));
            }
            else
            {
                this.observer.OnNext(task.Result);
            }
        }

Now we can use the OnNext method to track our tasks in an array and wait for them to finish:

        private void OnNext(Task<T> task)
        {
            currentTasks[nextIndex] = task;
            taskCount++;
            if (taskCount == maxConcurrency)
            {
                nextIndex = Task.WaitAny(currentTasks);
                CallSubscriber(currentTasks[nextIndex]);
                currentTasks[nextIndex] = null;
                taskCount--;
            }
            else
            {
                nextIndex++;
            }
        }

The task is added to an array. If the array is full, it waits for the first task to finish (via WaitAny). Once a task is done, it signals the subscriber (via CallSubscriber) and then removes it from the array. The next time OnNext is called, that empty spot in the array will be taken by the new task. This continues until OnError or OnCompleted is called. Let’s run down the happy path and look at OnCompleted first.

        private void OnCompleted()
        {
            while (taskCount > 0)
            {
                currentTasks = currentTasks.Where(t => t != null).ToArray();
                nextIndex = Task.WaitAny(currentTasks);
                CallSubscriber(currentTasks[nextIndex]);
                currentTasks[nextIndex] = null;
                taskCount--;
            }
            this.Dispose();
            this.observer.OnCompleted();
        }

The WaitAny method doesn’t like null values, so we have to rebuild our array to remove them. Then we wait for a task to complete, notify the subscriber, and remove it from the array. Same as above. Once we are done running the tasks, we dispose the parent subscriber and then call OnCompleted on our subscriber to let them know we’ve completed.

The OnError case is straight forward. We just dispose the parent subscriber and then echo the exception to our subscriber like we did before:

        private void OnError(Exception exception)
        {
            this.Dispose();
            this.observer.OnError(exception);
        }

One problem: what if we have tasks still in progress? Do we orphan them? We could possibly add in a cancellation token and then take care of it in the Dispose method:

        public void Dispose()
        {
            if (taskCount > 0)
            {
                //cancel running tasks?
            }
        }

I’ll leave it up to you how to handle that one.

 

Here’s the full code of the gist: