JavaRush /Курсы /C# SELF /Асинхронные потоки данных

Асинхронные потоки данных

C# SELF
62 уровень , 0 лекция
Открыта

1. Введение

Вы уже знакомы с async и await. Они отлично работают для "одиночных" асинхронных действий, вроде загрузки одного файла. Но что, если данные приходят потоком, или ресурс требует асинхронной очистки?

Проблема с асинхронными "коллекциями": Представьте, что нужно получить миллионы записей из базы данных. Если метод возвращает Task<List<T>>, вы ждёте, пока все данные загрузятся в память. Это неэффективно и создаёт задержки. Синхронный IEnumerable<T> тоже не подходит, если каждый элемент нужно получать асинхронно.

Проблема с асинхронным освобождением ресурсов: IDisposable и using отлично справляются с синхронной очисткой. Но как быть, если закрытие сетевого соединения или запись буферов в файл сама по себе асинхронна? Вызвать await внутри синхронного Dispose() нельзя, что приводит к блокировке потока или некорректной очистке.

Чтобы решить эти проблемы, и были введены IAsyncEnumerable<T> и IAsyncDisposable.

2. Асинхронные потоки данных

IAsyncEnumerable<T> — это асинхронный аналог IEnumerable<T>. Он позволяет асинхронно производить элементы последовательности один за другим, не дожидаясь, пока будут готовы все данные.

Когда это нужно?

  • Чтение больших файлов построчно асинхронно: например, гигабайтные логи.
  • Стриминг данных из сети или базы данных: результаты запроса к API, которые приходят частями.
  • Реализация серверных потоковых API: например, gRPC Streaming.
  • В любом сценарии, где данные генерируются или поступают асинхронно и должны обрабатываться постепенно.

Как это работает?

  1. IAsyncEnumerable<T>: интерфейс с методом GetAsyncEnumerator(CancellationToken cancellationToken). Токен отмены важен!
  2. IAsyncEnumerator<T>: интерфейс с ValueTask<bool> MoveNextAsync() (переход к следующему) и Current (текущий элемент). Он также наследует IAsyncDisposable.
  3. await foreach: удобно итерируется по IAsyncEnumerable<T>. Компилятор сам вызывает MoveNextAsync() и Current. Главное, await foreach гарантирует вызов DisposeAsync() для итератора после завершения итерации, даже при ошибках.

Создание IAsyncEnumerable<T> с async yield return

Вы можете использовать yield return внутри async-метода, который возвращает IAsyncEnumerable<T>. Это позволяет создавать асинхронные генераторы. Ваш метод может использовать await, чтобы приостановить генерацию, дождаться асинхронной операции, а затем возобновить работу.

Пример: Простой асинхронный генератор


async IAsyncEnumerable<int> GenerateNumbersAsync()
{
    for (int i = 0; i < 3; i++)
    {
        Console.WriteLine($"Генерирую: {i}");
        await Task.Delay(100); // Имитация асинхронной работы
        yield return i; 
    }
}

// Использование:
async Task ConsumeAsyncNumbers()
{
    await foreach (var number in GenerateNumbersAsync())
    {
        Console.WriteLine($"Получено: {number}");
    }
}
// Вызовите: await ConsumeAsyncNumbers();

Пример: Чтение файла построчно асинхронно


async IAsyncEnumerable<string> ReadFileLinesAsync(string filePath)
{
    using var reader = new StreamReader(filePath); // 'using' здесь (StreamReader реализует IAsyncDisposable)
    string? line;
    while ((line = await reader.ReadLineAsync()) != null) 
    {
        yield return line;
    }
}

// Использование:
async Task ProcessFileAsync()
{
    await File.WriteAllLinesAsync("data.txt", new[] { "Строка 1", "Строка 2", "Строка 3" });
    await foreach (var line in ReadFileLinesAsync("data.txt")) 
    {
        Console.WriteLine($"Обработана строка: {line}");
    }
}
// Вызовите: await ProcessFileAsync();

Пример: Асинхронный генератор с отменой (CancellationToken)


async IAsyncEnumerable<int> GetCancelableSequence(
    [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken token = default)
{
    for (int i = 0; i < 10; i++)
    {
        token.ThrowIfCancellationRequested(); // Проверяем отмену
        await Task.Delay(200, token); // Task.Delay тоже поддерживает отмену через token
        yield return i;
    }
}

// Использование:
async Task ConsumeAndCancel()
{
    var cts = new CancellationTokenSource(500); // Отмена через 500мс
    try
    {
        await foreach (var num in GetCancelableSequence(cts.Token))
        {
            Console.WriteLine($"Получено: {num}");
        }
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Генерация отменена!");
    }
}
// Вызовите: await ConsumeAndCancel();

Атрибут [EnumeratorCancellation] позволяет передать CancellationToken в асинхронный генератор. Это даёт возможность отменить итерацию, если вызывающий код запросит отмену через CancellationTokenSource. Без этого атрибута токен не будет автоматически передан в GetAsyncEnumerator.

3. Асинхронное управление ресурсами

Проблема синхронного IDisposable

Метод Dispose() в IDisposable синхронен (void Dispose()). Вы не можете использовать await внутри него. Если закрытие соединения с БД или сброс буферов в файл — долгие и асинхронные операции, синхронный Dispose() заблокирует поток, что плохо для асинхронных приложений.

Решение: IAsyncDisposable

IAsyncDisposable решает эту проблему. Он содержит единственный метод: ValueTask DisposeAsync() — асинхронный метод для очистки.

await using

Это асинхронный аналог using. Он предназначен для объектов, реализующих IAsyncDisposable.

  • await using гарантирует, что DisposeAsync() будет вызван по завершении блока кода, в котором объявлен ресурс (или при выходе из него из-за исключения).
  • Позволяет корректно освобождать ресурсы асинхронно, избегая блокировок.

Пример: Базовый IAsyncDisposable и await using


class MyAsyncResource : IAsyncDisposable
{
    public MyAsyncResource() => Console.WriteLine("Ресурс открыт.");
    
    public async ValueTask DisposeAsync()
    {
        Console.WriteLine("Начинаю асинхронную очистку...");
        await Task.Delay(200); // Имитация асинхронной очистки
        Console.WriteLine("Асинхронная очистка завершена.");
    }
}

// Использование await using
async Task UseAndDisposeResource()
{
    await using var resource = new MyAsyncResource(); 
    Console.WriteLine("Ресурс используется...");
} // Здесь автоматически вызывается resource.DisposeAsync()

// Вызовите: await UseAndDisposeResource();

Пример: Несколько await using блоков


async Task UseMultipleResources()
{
    await using var res1 = new MyAsyncResource();
    await using var res2 = new MyAsyncResource();
    Console.WriteLine("Использую оба ресурса...");
} // ресурсы освобождаются в порядке LIFO (Last In, First Out): res2.DisposeAsync() вызывается первым, затем res1.DisposeAsync().
// Вызовите: await UseMultipleResources();

Совместимость IAsyncEnumerable<T> с IAsyncDisposable

Важно: IAsyncEnumerator<T> (который используется await foreach) сам по себе наследует IAsyncDisposable. Это значит, что если ваш асинхронный генератор использует ресурсы (как StreamReader в примере выше), которые могут быть очищены асинхронно, await foreach позаботится об этом. Он вызовет DisposeAsync() у итератора, когда итерация завершится.

2
Задача
C# SELF, 62 уровень, 0 лекция
Недоступна
Асинхронное чтение файла
Асинхронное чтение файла
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ