Task Parallel Library (пространство имен System.Threading.Tasks) появилась еще в .NET 4, но большой популярностью не пользовалась. На подходе компилятор C# 5, который поддерживает конструкцию async\await. В скомпилированом коде async метод превращается в метод возвращающий Task\Task<T>.  Поэтому в скором времени использование TPL станет повсеместным.

Task Parallel Library – имеет очень мощные механизмы: вложенные (nested) задачи, локальные очереди для потоков, чтобы уменьшить латентность, work-stealing алгоритмы для работы, кучи опций запуска задач и продолжений, различные планировщики. Все создано в лучших традициях оптимизации асинхронного кода.

Но, TPL создавался до мозга костей практиками и цели сделать библиотеку удобной для потребителей даже не ставилось судя по всему.

  1. Как это не смешно, то в стандартной библиотеке нету метода, позволяющего обернуть значение T в Task<T>. (Только в .NET 4.5 появился Task.FromResult)
  2. Сам объект Task представляет из себя “задачу”, которая может быть как еще не запущена, так и уже выполняться.  Поэтому работа с разными Task_ами будет осуществляться по-разному.
  3. Task не является immutable объектом, метод ContinueWith изменяет саму задачу.
  4. Функция ContinueWith срабатывает при любом завершении задачи, в том числе отмене или ошибке. Необходимо каждый раз указывать флаги.
  5. Исключения внутри задач собираются в один объект, что затрудняет структурированную их обработку.
  6. Метод ContinueWith принимает Func<Task, T>, а не Func<Task, Task>. Это значит что единственный способ связать две задачи – создать вложенную (nested), что может привести к переполнению стека при завершении цепочки вложенных задач.

Последний пункт особенно актуален.  Даже был сделан extension-метод Unwrap, который позволяет писать код в виде:

var task = SomeFuncReturningTask(...)
                .ContinueWith(t => SomeOtherFuncReturningTask(t.Result))
                .Unwrap().ContinueWith(...)
                .Unwrap().ContinueWith(...);

Тут надо обратить внимание на пункт 4, вызовы методов станут еще более многословными.

Async\await в C# 5 частично решает проблему, позволяя писать в виде:

var r1 = await SomeFuncReturningTask();
var r2 = await SomeOtherFuncReturningTask();

Но есть еще другие варианты использования где даже async\await не помогает, например рекурсивные функции.

Также для .NET существует библиотека Rx, которая создавалась теоретиками (которые Linq придумали). Причем основная часть Rx была получена составлением монады двойственной к IEnumerable.

Для сравнения “теоретического” и “практического” подхода попробую написать функцию вычисления высоты дерева.

Само дерево:

class Tree<T>
{
    public Tree()
    {
        this.Children = new List<Tree<T>>();
    }

    public T Data { get; set; }
    public List<Tree<T>> Children { get; private set; }
}

Код тестов очень простой:

var t = new Tree<int> { Data = 0 };
for (int i = 1; i < 10 * 1000; i++)
{
    t = new Tree<int>
    {
        Data = i,
        Children = { t }
    };
}

Console.WriteLine(HeightObservable(t, x => x.Children).Wait());
Console.WriteLine(HeightTask(t, x => x.Children).Result);

Высота дерева специально выбрана большой – 10000, чтобы проверить насколько эффективно библиотека работает со стеком.

Функция вычисления высоты на Rx:

static IObservable<int> HeightObservable<T>(T element, Func<T, IEnumerable<T>> childSelector)
{
    return childSelector(element)
            .ToObservable()
            .SelectMany(e => HeightObservable(e, childSelector))
            .Aggregate(0, Math.Max)
            .Select(x => x + 1)
            .ObserveOn(Scheduler.ThreadPool);
}

Последний ObserveOn нужен чтобы не возникало stack overflow. Rx стремится как можно меньше использовать concurrency внутри себя и по-умолчанию большинство вызовов “продолжений”(continuation) выполняется синхронно, что приводит к переполнению стека в рекурсивном вызове.
Если использовать Rx v2, то можно без ObserveOn обойтись.

Теперь на Task\C# 5. Попытка №1

static async Task<int> HeightTask<T>(T element, Func<T, IEnumerable<T>> childSelector)
{
    var max = 0;
    foreach (var child in childSelector(element))
    {
        var v = await HeightTask(child, childSelector);
        if (v > max)
        {
            max = v;
        }
    }
    return await Task.FromResult(max + 1);
}

Падает со stack overflow . Await вычисляет аргумент и получает awaiter у результата. Таким образом идет полный обход всего дерева в первом же await. Кроме этого вычисление высот поддеревьев идет по-очереди. Код не распараллелен.

Чтобы распараллелить код нужна функция IEnumerable<Task<T>> –> Task<IEnumerable<T>>, которая параллельно выполняет задачи и собирает результат в одну последовательность. Такая функция называется ForkJoin.

Первое что пришло в голову написать:

static Task<IEnumerable<T>> ForkJoin<T>(IEnumerable<Task<T>> tasks)
{
    var result = new List<T>();
    //...
    foreach (var task in tasks)
    {
        task.ContinueWith(t => result.Add(t.Result) /*...*/);
    }
    //...
}

Такой код не работает. Падает со stackoverflow на foreach, потому что при получении первого элемента пытается синхронно обойти все дерево.

Нужно написать функцию, которая асинхронно обходит IEnumerable

static Task<int> EnumerateWithTask<T>(IEnumerable<Task<T>> tasks, Action<Task<T>> continuation)
{
    var tcs = new TaskCompletionSource<int>();
    var enumerator = tasks.GetEnumerator();
    Action recursive = null;
    var count = 0;

    recursive = () =>
        {
            Task.Factory
                .StartNew<bool>(enumerator.MoveNext)
                .ContinueWith(t =>
                    {
                        if (t.IsFaulted)
                        {
                            tcs.TrySetException(t.Exception.InnerExceptions);
                        }
                        else if (t.IsCompleted)
                        {
                            if (!t.Result)
                            {
                                tcs.TrySetResult(count);
                            }
                            else
                            {
                                count++;
                                enumerator.Current.ContinueWith(continuation, TaskContinuationOptions.ExecuteSynchronously);
                                recursive();
                            }
                        }
                    }, TaskContinuationOptions.ExecuteSynchronously);
        };

    recursive();
    return tcs.Task;
}

В этой функции вместо итерации используется рекурсия. Так как рекурсия вызывается внутри продолжения Task, то переполнения стека не получается. Функция возвращает Task, которая считает число элементов после окончания обхода последовательности.

Теперь можно написать и ForkJoin:

static Task<IEnumerable<T>> ForkJoin<T>(IEnumerable<Task<T>> tasks)
{
    var result = new List<T>();
    var tcs = new TaskCompletionSource<IEnumerable<T>>();

    Task<int> countTask = null;
    var completedCount = 0;
    var isCompleted = false;

    countTask = EnumerateWithTask(tasks, t =>
            {
                if (!isCompleted)
                {
                    if (t.IsCanceled)
                    {
                        tcs.TrySetCanceled();
                        isCompleted = true;
                    }
                    else if (t.IsFaulted)
                    {
                        tcs.TrySetException(t.Exception.InnerExceptions);
                        isCompleted = true;
                    }
                    else if (t.IsCompleted)
                    {
                        result.Add(t.Result);
                        completedCount++;
                    }

                    if ((countTask.IsCompleted) && completedCount == countTask.Result)
                    {
                        tcs.TrySetResult(result.AsReadOnly());
                        isCompleted = true;
                    }
                }
            });

    countTask.ContinueWith(t =>
        {
            if (!isCompleted)
            {
                if (completedCount == t.Result)
                {
                    tcs.TrySetResult(result.AsReadOnly());
                    isCompleted = true;
                }
            }
        });
    return tcs.Task;
}

Когда писал этот код получил разрыв мозга. Боюсь даже представить что будет если придется такой код читать. Скорее всего в этом коде есть ошибки и он не обрабатывает отмену. Так что для production придется помучаться еще сильнее.

Ну и наконец сам код расчета высоты дерева на Task_ах, попытка №2:

static Task<int> HeightTask<T>(T element, Func<T, IEnumerable<T>> childSelector)
{
    return ForkJoin(childSelector(element).Select(e => HeightTask(e, childSelector)))
            .ContinueWith(t => t.Result.Aggregate(0, Math.Max, x => x + 1));
}

Замеры показывают что Rx проигрывает по времени выполнения Task_ам примерно в 2 раза (на Rx v2). Это при том что реально асинхронных операций нет. Реальная асинхронность сделает разницу незаметной. Сложность кода Rx на порядок (в 10 раз) меньше, чем Tasks.

Новомодные фишки типа async\await не помогли в этой задаче вообще никак. Причиной этому служит то, что  Rx спроектирован на основе монад (как и async в F#). Монады позволяют комбинировать вычисления с некоторым контекстом (в данном случае с “продолжениями”) с помощью небольшого набора функций. На базе этих функций можно построить много других.

Async\await – не более чем переписывание кода в компиляторе, никаких монадических конструкций само по себе оно не создает, а TPL не предоставляет средства для композиции. Вот и получаются проблемы на любом коде, сложнее того что в примерах.

Как ни странно но сейчас не существует библиотеки, которая содержит код, решающий проблемы. Максимум некоторые расширения можно найти в примерах на MSDN.

Заключение

Крайне не рекомендую использовать Tasks для высокоуровневого кода. Rx подходит на эту роль гораздо лучше.

Теги : Reactive Extensions, .NET, TPL, Architecture