Перевод статьи Стефэна Тоуба "Обработка задач по мере их завершения"

Данный пост является переводом статьи Stephen Toub “Processing tasks as they complete”.

Недавно несколько человек спросили меня, как обрабатывать результаты выполнения задач по мере их завершения.

Допустим у разработчика есть несколько задач, представляющих асинхронные операции которые он инициировал, и он хочет обработать результаты выполнения этих задач, например:

1
2
3
4
5
6
List<Task<T>> tasks = …;
foreach(var t in tasks) {
try { Process(await t); }
catch(OperationCanceledException) {}
catch(Exception exc) { Handle(exc); }
}

Такой подход подходит для многих ситуаций. Однако он накладывает дополнительное ограничение при обработке, которое на самом деле не подразумевалось в исходной постановке задачи: этот код в конечном итоге обрабатывает задачи в той последовательности, в которой они были вызваны, а не в порядке их завершения, что означает, что некоторые задачи, которые уже были завершены, могут быть недоступны для обработки, поскольку предыдущая задача в последовательности может быть еще не завершена.

Есть много способов реализовать подобное решение. Один из подходов включает простое использование метода ContinueWith у Task, например:

1
2
3
4
5
6
7
8
List<Task<T>> tasks = …;
foreach(var t in tasks)
t.ContinueWith(completed => {
switch(completed.Status) {
case TaskStatus.RanToCompletion: Process(completed.Result); break;
case TaskStatus.Faulted: Handle(completed.Exception.InnerException); break;
}
}, TaskScheduler.Default);

Это решение устраняет дополнительное ограничение, заключавшееся в том, что исходное решение заставляло обработку всех продолжений выполняться последовательно (и в исходном SynchronizationContext, если он был), тогда как данное решение позволяет выполнять обработку параллельно и в ThreadPool. Если вы хотите использовать данный подход, но также заставить задачи выполняться последовательно, можно сделать это, подставив сериализующий планировщик для метода ContinueWith (то есть планировщик, который заставит продолжения выполняться исключительно с оглядкой друг на друга). Например, вы можете использовать ConcurrentExclusiveSchedulerPair:

1
2
3
4
5
6
7
8
9
List<Task<T>> tasks = …;
var scheduler = new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler;
foreach(var t in tasks)
t.ContinueWith(completed => {
switch(completed.Status) {
case TaskStatus.RanToCompletion: Process(completed.Result); break;
case TaskStatus.Faulted: Handle(completed.Exception.InnerException); break;
}
}, scheduler);

или если вы были в потоке пользовательского интерфейса вашего приложения вы могли бы предоставить планировщик, который представляет данный поток пользовательского интерфейса, например:

1
var scheduler = TaskScheduler.FromCurrentSynchronizationContext();

Подход, основанный на методе ContinueWith, заставляет вас использовать модель, основанную на обратных вызовах (callbacks), для выполнения вашей обработки результатов. Если вы хотите, чтобы обработка выполнялась последовательно по мере завершения задач, но с использованием async/await, а не с использованием ContinueWith, это также возможно.

Есть несколько способов добиться этого. Относительно простой способ — использовать Task.WhenAny. WhenAny принимает набор задач и асинхронно предоставляет первую из них, которая завершится. Таким образом, вы можете многократно вызывать WhenAny, каждый раз удаляя ранее выполненную задачу, чтобы асинхронно ожидать завершения следующей:

1
2
3
4
5
6
7
8
List<Task<T>> tasks = …;
while(tasks.Count > 0) {
var t = await Task.WhenAny(tasks);
tasks.Remove(t);
try { Process(await t); }
catch(OperationCanceledException) {}
catch(Exception exc) { Handle(exc); }
}

Функционально это норм, и пока количество задач невелико, производительность тоже должна быть в порядке. Однако, если количество задач становится велико, то это может привести к непренебрежимому снижению производительности. Здесь мы фактически создали алгоритм O(N^2): для каждой задачи мы ищем в списке задачу для ее удаления, что является операцией O(N), и мы регистрируем продолжение для каждой задачи, что тоже является операцией O(N). Например, если бы у нас было 10 000 задач, в течение всей этой операции мы бы в конечном итоге зарегистрировали и отменили регистрацию более 50 миллионов продолжений как части вызовов WhenAny. Правда не всё так плохо как кажется, поскольку WhenAny разумно управляет своими ресурсами, например: не регистрируя продолжения уже завершенных задач; останавливаясь, как только находит завершенную задачу; повторно используя один и тот же объект продолжения для всех задач и т. д. Тем не менее, здесь есть работа, которую мы можем избежать, если профилирование сочтёт этот код проблематичным.

Альтернативный подход заключается в создании нового метода «комбинатора», специально предназначенного для этой цели. При работе с коллекцией экземпляров задач типа Task<T> метод WhenAny возвращает Task<Task<T>>; это задача, которая завершится, когда завершится первая из поставленных на выполнение задач, и результатом задачи Task<Task<T>> будет первая завершившаяся задача коллекции. В нашем случае нам нужна не только первая задача, но и все задачи, упорядоченные по мере их завершения. Мы можем представить это с помощью Task<Task<T>>[]. Воспринимайте это как массив корзин, куда мы будем помещать входящие задачи по мере их завершения, по одной задаче на корзину. Таким образом, если вы хотите, чтобы первая задача была завершена, вы можете дождаться (await) первой корзины этого массива, а если вы хотите, чтобы шестая задача была завершена, вы можете дождаться (await) шестой корзины этого массива.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static Task<Task<T>> [] Interleaved<T>(IEnumerable<Task<T>> tasks)
{
var inputTasks = tasks.ToList();

var buckets = new TaskCompletionSource<Task<T>>[inputTasks.Count];
var results = new Task<Task<T>>[buckets.Length];
for (int i = 0; i < buckets.Length; i++)
{
buckets[i] = new TaskCompletionSource<Task<T>>();
results[i] = buckets[i].Task;
}

int nextTaskIndex = -1;
Action<Task<T>> continuation = completed =>
{
var bucket = buckets[Interlocked.Increment(ref nextTaskIndex)];
bucket.TrySetResult(completed);
};

foreach (var inputTask in inputTasks)
inputTask.ContinueWith(continuation,
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);

return results;
}

Итак, что здесь происходит? Сначала мы преобразуем наше перечисление задач в список List<Task<T>>; это делается для того, чтобы любые задачи, которые могут быть созданы ленивым отложенным способом путем перечисления перечисляемого, были материализованы один раз. Затем мы создаем экземпляры TaskCompletionSource<Task<T>> для представления корзин, по одной корзине на каждую задачу, которая в конечном итоге будет завершена. Затем мы цепляем продолжение к каждой входящей задаче: это продолжение получит следующую доступную корзину и сохранит в ней только что выполненную задачу. С помощью такого комбинатора можно переписать исходный код следующим образом:

1
2
3
4
5
6
7
List<Task<T>> tasks = …;
foreach(var bucket in Interleaved(tasks)) {
var t = await bucket;
try { Process(await t); }
catch(OperationCanceledException) {}
catch(Exception exc) { Handle(exc); }
}

Чтобы закрыть данное обсуждение, давайте посмотрим на то, что это даёт на практике. Рассмотрим:

1
2
3
4
5
6
7
8
9
10
11
12
var tasks = new[] { 
Task.Delay(3000).ContinueWith(_ => 3),
Task.Delay(1000).ContinueWith(_ => 1),
Task.Delay(2000).ContinueWith(_ => 2),
Task.Delay(5000).ContinueWith(_ => 5),
Task.Delay(4000).ContinueWith(_ => 4),
};
foreach (var bucket in Interleaved(tasks)) {
var t = await bucket;
int result = await t;
Console.WriteLine(“{0}: {1}”, DateTime.Now, result);
}

У нас есть массив задач Task, каждая из которых завершится через N секунд и вернет целое число N (например, первая задача в массиве завершится через 3 секунды и вернет число 3). Затем мы перебираем эти задачи, используя наш самодельный метод Interleaved, распечатывая результаты по мере их получения. Когда я запускаю этот код, я вижу следующий вывод:

1
2
3
4
5
8/2/2012 7:37:48 AM: 1
8/2/2012 7:37:49 AM: 2
8/2/2012 7:37:50 AM: 3
8/2/2012 7:37:51 AM: 4
8/2/2012 7:37:52 AM: 5

и это именно то поведение, которое мы хотели. Обратите внимание на время вывода каждого элемента. Все задачи были запущены одновременно, поэтому все их таймеры работают одновременно. По мере того как каждая задача завершается, наш цикл может обработать её, и в результате мы получаем по одной строке вывода каждую секунду.

Для контраста рассмотрим тот же пример, но без использования Interleaved:

1
2
3
4
5
6
7
8
9
10
11
var tasks = new[] { 
Task.Delay(3000).ContinueWith(_ => 3),
Task.Delay(1000).ContinueWith(_ => 1),
Task.Delay(2000).ContinueWith(_ => 2),
Task.Delay(5000).ContinueWith(_ => 5),
Task.Delay(4000).ContinueWith(_ => 4),
};
foreach (var t in tasks) {
int result = await t;
Console.WriteLine(“{0}: {1}”, DateTime.Now, result);
}

При запуске этого варианта видим:

1
2
3
4
5
8/2/2012 7:42:08 AM: 3
8/2/2012 7:42:08 AM: 1
8/2/2012 7:42:08 AM: 2
8/2/2012 7:42:10 AM: 5
8/2/2012 7:42:10 AM: 4

А теперь взгляните на время. Поскольку идёт обработка задач по порядку, мы не можем распечатать результаты для задачи 1 или задачи 2 до тех пор, пока задача 3 не будет завершена (поскольку она была перед ними в массиве). Точно так же мы не можем распечатать результат для задачи 4, пока задача 5 не будет завершена.