Tag Archives: async-await

Async LINQ To Objects Over MongoDB

I’ve been working with MongoDB for the past few years and lately the guys behind the C# driver are working on some interesting things, especially around async-await support.

This year they released a complete rewrite of the driver with an async-only API. Now since IEnumerator doesn’t support asynchronous enumeration (because MoveNext is synchronous) they used their own enumerator interface called IAsyncCursor which looks like this:


    public interface IAsyncCursor<out TDocument> : IDisposable
    {
        IEnumerable<TDocument> Current { get; }
        Task<bool> MoveNextAsync(CancellationToken cancellationToken = default(CancellationToken));
    }
    

To enable asynchronous iteration without having to implement it yourself they added a subset of the LINQ operators, but only those that materialize the query (e.g. ForEachAsync, ToListAsync, etc.)

In the latest version of the driver (v2.1) they added full “LINQ to MongoDB” support (i.e. IQueryable) that you can use to create your mongo queries. Just like with LINQ to SQL you can use Where, Select and so forth to build a complex expression-based query and have it sent to the server only when you start enumerating the IQueryable.

The problem arises when you need an expression to be executed on the client-side and not be sent to the server. This is mainly relevant for expressions that can’t be translated into a mongo query. For example the following code comparing two of the queried document’s fields (which is unsupported in MongoDB) will throw an exception saying: “Unsupported filter: ([FirstName] == [LastName]).”


    async Task LinqToMongo()
    {
        IMongoCollection<Hamster> collection = GetCollection();
        IMongoQueryable<Hamster> queryable = collection.AsQueryable();
        queryable = queryable.Where(_ => _.FirstName == _.LastName); // unsupported filter
        Console.WriteLine(await queryable.CountAsync());
    }

Normally you would just cast the IQueryable into an IEnumerable (hopefully with AsEnumerable) and use LINQ to Objects which also supports deferred execution. However, since IEnumerable is synchronous doing that defeats the whole purpose of using async-await to begin with. You could also materialize the whole collection into memory and then use client-side filters but that can take too much memory and time.

A former coworker of mine (Tsach Vayness) suggested finding an existing library with async LINQ to Objects support and plugging it into the MongoDB C# driver. That enables using all the LINQ to Objects operators over MongoDB. There are a few of these libraries and the best, in my opinion, is Reactive Extensions’ Interactive Extensions (Ix-Async on nuget.org).

All that’s needed is an adapter from mongo’s IAsyncCursorSource, IAsyncCursor to Interactive Extensions’ IAsyncEnumerable, IAsyncEnumerator (which are already pretty similar) and then you can use all of Ix’s operators on the MongoDB cursors. Here’s the previous example comparing two of the queried document’s fields fixed by moving the filter to the client-side:


    Task LinqToObjects()
    {
        IMongoCollection<Hamster> collection = GetCollection();
        IMongoQueryable<Hamster> queryable = collection.AsQueryable();
        IAsyncEnumerable<Hamster> asyncEnumerable = queryable.ToAsyncEnumerable();
        asyncEnumerable = asyncEnumerable.Where(_ => _.FirstName == _.LastName);
        Console.WriteLine(await asyncEnumerable.Count());
    }

Most of the the “magic” enabling this happens in AsyncEnumeratorAdapter.MoveNext. First, you create an IAsyncCursor out of the IAsyncCursorSource in an async fashion with ToCursorAsync (which is possible because MoveNext returns a Task). Then you call (and await) MoveNext on the created _asyncCursor. If it returned true then _asyncCursor.Current contains a batch of items you can enumerate and call _asyncCursor.MoveNext again when the batch is completed. You repeat that continually until the underlying MoveNext returns false meaning there are no more items to enumerate.


    public async Task<bool> MoveNext(CancellationToken cancellationToken)
    {
        if (_asyncCursor == null)
        {
            _asyncCursor = await _asyncCursorSource.ToCursorAsync(cancellationToken);
        }
        if (_batchEnumerator != null && _batchEnumerator.MoveNext())
        {
            return true;
        }
        if (_asyncCursor != null && await _asyncCursor.MoveNextAsync(cancellationToken))
        {
            _batchEnumerator?.Dispose();
            _batchEnumerator = _asyncCursor.Current.GetEnumerator();
            return _batchEnumerator.MoveNext();
        }
        return false;
    }
 

Here is the full code for the adapters:


    public static class AsyncCursorExtensions
    {
        public static IAsyncEnumerable<T> ToAsyncEnumerable<T>(this IAsyncCursorSource<T> asyncCursorSource) => 
            new AsyncEnumerableAdapter<T>(asyncCursorSource);

        private class AsyncEnumerableAdapter<T> : IAsyncEnumerable<T>
        {
            private readonly IAsyncCursorSource<T> _asyncCursorSource;

            public AsyncEnumerableAdapter(IAsyncCursorSource<T> asyncCursorSource)
            {
                _asyncCursorSource = asyncCursorSource;
            }

            public IAsyncEnumerator<T> GetEnumerator() => 
                new AsyncEnumeratorAdapter<T>(_asyncCursorSource);
        }

        private class AsyncEnumeratorAdapter<T> : IAsyncEnumerator<T>
        {
            private readonly IAsyncCursorSource<T> _asyncCursorSource;
            private IAsyncCursor<T> _asyncCursor;
            private IEnumerator<T> _batchEnumerator;

            public T Current => _batchEnumerator.Current;

            public AsyncEnumeratorAdapter(IAsyncCursorSource<T> asyncCursorSource)
            {
                _asyncCursorSource = asyncCursorSource;
            }

            public async Task<bool> MoveNext(CancellationToken cancellationToken)
            {
                if (_asyncCursor == null)
                {
                    _asyncCursor = await _asyncCursorSource.ToCursorAsync(cancellationToken);
                }
                if (_batchEnumerator != null && _batchEnumerator.MoveNext())
                {
                    return true;
                }
                if (_asyncCursor != null && await _asyncCursor.MoveNextAsync(cancellationToken))
                {
                    _batchEnumerator?.Dispose();
                    _batchEnumerator = _asyncCursor.Current.GetEnumerator();
                    return _batchEnumerator.MoveNext();
                }
                return false;
            }

            public void Dispose()
            {
                _asyncCursor?.Dispose();
                _asyncCursor = null;
            }
        }
    }

Advertisements

On The Efficiency Of ValueTask

The corefxlab repository contains library suggestions for corefx which itself is a repo containing the .NET Core foundational libraries. One of the gems hidden among these libraries is ValueTask<T> that was added by Stephen Toub as part of the System.Threading.Tasks.Channels library but may be extremely useful on its own. The full implementation of ValueTask<T> can be found here, but this is an interesting subset of the API:


    public struct ValueTask
    {
        public ValueTask(TResult result);
        public ValueTask(Task task);
        public static implicit operator ValueTask(Task task);
        public static implicit operator ValueTask(TResult result);
        public Task AsTask();
        public bool IsRanToCompletion { get; }
        public TResult Result { get; }
        public ValueTaskAwaiter GetAwaiter();
        public ValueTaskAwaiter ConfigureAwait(bool continueOnCapturedContext);
        // ...
    }

I first noticed ValueTask<T> in the documented API when reviewing the channels PR made to corefxlab. I suggested adding a short explanation which Stephen quickly provided:

ValueTask<T> is a discriminated union of a T and a Task<T>, making it allocation-free for ReadAsync<T> to synchronously return a T value it has available (in contrast to using Task.FromResult<T>, which needs to allocate a Task<T> instance). ValueTask<T> is awaitable, so most consumption of instances will be indistinguishable from with a Task<T>.”

ValueTask, being a struct, enables writing async methods that do not allocate memory when they run synchronously without compromising API consistency. Imagine having an interface with a Task returning method. Each class implementing this interface must return a Task even if they happen to execute synchronously (hopefully using Task.FromResult). You can of course have 2 different methods on the interface, a synchronous one and an async one but this requires 2 different implementations to avoid “sync over async” and “async over sync”.

ValueTask<T> has implicit conversions from both T and Task<T> and can be awaited by itself which makes it extremely simple to use. Consider this possibly async provider API returning a ValueTask<T>:


    interface IHamsterProvider
    {
        ValueTask<Hamster> GetHamsterAsync(string name);
    }

This provider interface can be implemented synchronously (for in-memory hamsters) by performing a lookup in a Dictionary and returning the Hamster instance (which is implicitly converted into a ValueTask<Hamster> without any additional allocations):


    class LocalHamsterProvider : IHamsterProvider
    {
        readonly ConcurrentDictionary<string, Hamster> _dictionary; // ...
        public ValueTask<Hamster> GetHamsterAsync(string name)
        {
            Hamster hamster = _dictionary[name];
            return hamster;
        }
    }

Or asynchronously (for hamsters stored in MongoDB) by performing an asynchronous query and returning the Task<Hamster> instance (which is implicitly converted into ValueTask<Hamster> as well):


    class MongoHamsterProvider : IHamsterProvider
    {
        IMongoCollection<Hamster> _collection; // ...
        public ValueTask<Hamster> GetHamsterAsync(string name)
        {
            Task<Hamster> task = _collection.Find(_ => _.Name == name).SingleAsync();
            return task;
        }
    }

The consumer of this API can await the ValueTask<Hamster> as if it was a Task<Hamster> without knowing whether it was performed asynchronously or not with the benefit of no added allocations:


    Hamster hamster = await Locator.Get<IHamsterProvider>().GetHamsterAsync("bar");

While this example shows 2 different implementations, one synchronous and the other asynchronous, they could easily be combined. Imagine a provider using a local cache for hamsters and falling back to the DB when needed.
The few truly asynchronous calls to GetHamsterAsync would indeed require allocating a Task<Hamster> (which will be implicitly converted to ValueTask<Hamster>) but the rest would complete synchronously and allocation-free:


    class HamsterProvider : IHamsterProvider
    {
        readonly ConcurrentDictionary<string, Hamster> _dictionary; // ...
        IMongoCollection<Hamster> _collection; // ...
        public ValueTask<Hamster> GetHamsterAsync(string name)
        {
            Hamster hamster;
            if (_dictionary.TryGetValue(name, out hamster))
            {
                return hamster;
            }
            Task<Hamster> task = _collection.Find(_ => _.Name == name).SingleAsync();
            task.ContinueWith(_ => _dictionary.TryAdd(_.Result.Name, _.Result));
            return task;
        }
    }
    

This kind of “hybrid” use of async-await is very common, since these outbound operations to a data store, remote API, etc. can usually benefit from some kind of caching.

It’s hard to quantify how much of an improvement widespread usage of ValueTask can bring as it greatly depends on the actual usage but avoiding allocations not only saves the allocation cost but also greatly reduces the garbage collection overhead. That’s why it was requested to be added to .NET Core regardless of the channels library.

LongRunning Is Useless For Task.Run With async-await

Back in the olden days of .Net 4.0 we didn’t have Task.Run. All we had to start a task was the complicated Task.Factory.StartNew. Among its parameters there’s a TaskCreationOptions often used to specify TaskCreationOptions.LongRunning. That flag gives TPL a hint that the task you’re about to execute will be longer than usual.

Nowadays with .Net 4.5 and above we mostly use the simpler and safer Task.Run but it isn’t uncommon to wonder how do you pass TaskCreationOptions.LongRunning as a parameter like we used to do with Task.Factory.StartNew.

The answer is that you can’t. This of course isn’t limited just to TaskCreationOptions.LongRunning. You can’t pass any of the TaskCreationOptions values. However most of them (like TaskCreationOptions.AttachedToParent) are there for extremely esoteric cases while TaskCreationOptions.LongRunning is there for your run-of-the-mill long running task.

An often suggested workaround is to go back to Task.Factory.StartNew, which is perfectly fine for synchronous delegates (i.e. Action, Func<T>), however for asynchronous delegates (i.e. Func<Task>, Func<Task<T>>) there’s the whole Task<Task> confusion. Since Task.Factory.StartNew doesn’t have specific overloads for async-await asynchronous delegates map to the Func<T> where T is a Task. That makes the return value a Task<T> where T is a Task, hence Task<Task>.

The .Net team anticipated this issue and it can be easily solved by using TaskExtensions.Unwrap (which is the accepted answer on the relevant Stack Overflow question 1):

Task<Task> task = Task.Factory.StartNew(async () =>
{
    while (IsEnabled)
    {
        await FooAsync();
        await Task.Delay(TimeSpan.FromSeconds(10));
    }
}, TaskCreationOptions.LongRunning);

Task actualTask = task.Unwrap();

However that hides the actual issue which is:

Task.Run with TaskCreationOptions.LongRunning doesn’t make sense for async-await.

The internal implementation 2 creates a new dedicated thread when you use TaskCreationOptions.LongRunning. Here’s the code for ThreadPoolTaskScheduler.QueueTask:

protected internal override void QueueTask(Task task)
{
    if ((task.Options & TaskCreationOptions.LongRunning) != 0)
    {
        // Run LongRunning tasks on their own dedicated thread.
        Thread thread = new Thread(s_longRunningThreadWork);
        thread.IsBackground = true; // Keep this thread from blocking process shutdown
        thread.Start(task);
    }
    else
    {
        // Normal handling for non-LongRunning tasks.
        bool forceToGlobalQueue = ((task.Options & TaskCreationOptions.PreferFairness) != 0);
        ThreadPool.UnsafeQueueCustomWorkItem(task, forceToGlobalQueue);
    }
}

But when an async method reaches an await for an uncompleted task the thread it’s running on is released. When the task is completed the rest will be scheduled again, this time on a different ThreadPool thread. That means that we created a new thread needlessly. This doesn’t only waste time for the developer but also hurts performance as creating new threads is costly (otherwise we wouldn’t need the ThreadPool in the first place).

So, if you ask yourself how to use Task.Run with TaskCreationOptions.LongRunning when your delegate is asynchronous, save yourself and your application some time and keep using Task.Run as it is:

Task task = Task.Run(async () =>
{
    while (IsEnabled)
    {
        await FooAsync();
        await Task.Delay(TimeSpan.FromSeconds(10));
    }
});

  1. Which is the top result when searching for “Task.Run and LongRunning”
  2. Which could possibly change in the future. 

LogicalOperationStack Is Broken With async-await

Trace.CorrelationManager.LogicalOperationStack enables having nested logical operation identifiers where the most common case is logging (NDC). Evidently it doesn’t work with async-await.

The Issue

Here’s a simple example using LogicalFlow, my simple wrapper over the LogicalOperationStack:

static void Main()
{
    OuterOperationAsync().Wait();
}

static async Task OuterOperationAsync()
{
    Console.WriteLine(LogicalFlow.CurrentOperationId);
    using (LogicalFlow.StartScope())
    {
        Console.WriteLine("\t" + LogicalFlow.CurrentOperationId);
        await InnerOperationAsync();
        Console.WriteLine("\t" + LogicalFlow.CurrentOperationId);
        await InnerOperationAsync();
        Console.WriteLine("\t" + LogicalFlow.CurrentOperationId);
    }
    Console.WriteLine(LogicalFlow.CurrentOperationId);
}

static async Task InnerOperationAsync()
{
    using (LogicalFlow.StartScope())
    {
        await Task.Yield();
    }
}

LogicalFlow:

public static class LogicalFlow
{
    public static Guid CurrentOperationId
    {
        get
        {
            return Trace.CorrelationManager.LogicalOperationStack.Count > 0
                ? (Guid) Trace.CorrelationManager.LogicalOperationStack.Peek()
                : Guid.Empty;
        }
    }

    public static IDisposable StartScope()
    {
        Trace.CorrelationManager.StartLogicalOperation();
        return new Stopper();
    }

    private static void StopScope()
    {
        Trace.CorrelationManager.StopLogicalOperation();
    }

    private class Stopper : IDisposable
    {
        private bool _isDisposed;
        public void Dispose()
        {
            if (!_isDisposed)
            {
                StopScope();
                _isDisposed = true;
            }
        }
    }
}

And the output is:

00000000-0000-0000-0000-000000000000
    49985135-1e39-404c-834a-9f12026d9b65
    54674452-e1c5-4b1b-91ed-6bd6ea725b98
    c6ec00fd-bff8-4bde-bf70-e073b6714ae5
54674452-e1c5-4b1b-91ed-6bd6ea725b98

The specific Guid values don’t really matter. Both the outer lines should show Guid.Empty (i.e. 00000000-0000-0000-0000-000000000000) and the inner lines should show the same Guid value.

Multithreading

You might say that LogicalOperationStack is using a Stack which is not thread-safe and that’s why the output is wrong. But while that’s true in general, in this case there’s never more than a single thread accessing the LogicalOperationStack at the same time because every async operation is awaited when called and there’s no use of combinators such as Task.WhenAll.

The Root Cause

The root cause is that LogicalOperationStack is stored in the CallContext which has a copy-on-write behavior. That means that as long as you don’t explicitly set something into the CallContext (and you don’t with StartLogicalOperation as you just add to an existing stack) you’re using the parent context and not your own.

This can be shown by simply setting anything into the CallContext before adding to the existing stack. For example if we changed StartScope to this:

public static IDisposable StartScope()
{
    CallContext.LogicalSetData("Bar", "Arnon");
    Trace.CorrelationManager.StartLogicalOperation();
    return new Stopper();
}

The output is correct:

00000000-0000-0000-0000-000000000000
    fdc22318-53ef-4ae5-83ff-6c3e3864e37a
    fdc22318-53ef-4ae5-83ff-6c3e3864e37a
    fdc22318-53ef-4ae5-83ff-6c3e3864e37a
00000000-0000-0000-0000-000000000000

I’ve contacted the relevant developer at Microsoft and his response was this:

I wasn’t aware of this, but it does seem broken. The copy-on-write logic is supposed to behave exactly as if we’d really created a copy of the ExecutionContext on entry into the method. However, copying the ExecutionContext would have created a deep copy of the CorrelationManager context, as it’s special-cased in CallContext.Clone(). We don’t take that into account in the copy-on-write logic.”

Solution

Use an ImmutableStack stored in the CallContext instead of the LogicalOperationStack as it’s both thread-safe and immutable so when you call Pop you get back a new ImmutableStack that you then must set back into the CallContext.

public static class LogicalFlow
{
    private static ImmutableStack<Guid> LogicalStack
    {
        get
        {
            return CallContext.LogicalGetData("LogicalFlow") as ImmutableStack<Guid> ?? ImmutableStack.Create<Guid>();
        }
        set
        {
            CallContext.LogicalSetData("LogicalFlow", value);
        }
    }

    public static Guid CurrentId
    {
        get
        {
            var logicalStack = LogicalStack;
            return logicalStack.IsEmpty ? Guid.Empty : logicalStack.Peek();
        }
    }
    
    public static IDisposable StartScope()
    {
        LogicalStack = LogicalStack.Push(Guid.NewGuid()); // Here's where the CallContext is copied using copy-on-write
        return new Stopper();
    }
    
    private static void StopScope()
    {
        LogicalStack = LogicalStack.Pop();
    }
}

Another options is to to store the stack in the new System.Threading.AsyncLocal class added in .Net 4.6 (or Stephen Cleary‘s AsyncLocal) instead which should handle that issue correctly.

Note: This came after I asked (and answered) a question on Stack Overflow: Is LogicalOperationStack incompatible with async in .Net 4.5 which contains further discussion.