JavaRush /Курси /C# SELF /Асинхронні потоки даних

Асинхронні потоки даних

C# SELF
Рівень 62 , Лекція 0
Відкрита

1. Вступ

Ви вже знайомі з async і await. Вони чудово працюють для «окремих» асинхронних дій, на кшталт завантаження одного файла. Але що, якщо дані надходять потоком або ресурс потребує асинхронного звільнення?

Проблема з асинхронними «колекціями»: Уявіть, що потрібно отримати мільйони записів із бази даних. Якщо метод повертає Task<List<T>>, ви чекаєте, поки усі дані завантажаться в памʼять. Це неефективно і створює затримки. Синхронний IEnumerable<T> теж не підходить, якщо кожен елемент треба отримувати асинхронно.

Проблема з асинхронним звільненням ресурсів: IDisposable і using чудово справляються зі синхронним очищенням. Але що робити, якщо закриття мережевого зʼєднання чи запис буферів у файл саме по собі асинхронне? Викликати await усередині синхронного Dispose() не можна, що призводить до блокування потоку або некоректного очищення.

Щоб розвʼязати ці проблеми, було запроваджено IAsyncEnumerable<T> та IAsyncDisposable.

2. Асинхронні потоки даних

IAsyncEnumerable<T> — це асинхронний аналог IEnumerable<T>. Він дає змогу асинхронно повертати елементи послідовності один за одним, не чекаючи, доки буде готово все.

Коли це потрібно?

  • Читання великих файлів построково, асинхронно: наприклад, гігабайтні журнали.
  • Стримінг даних із мережі або бази даних: результати запиту до API, що надходять частинами.
  • Реалізація серверних потокових API: наприклад, gRPC Streaming.
  • У будь-якому сценарії, де дані генеруються або надходять асинхронно, і їх потрібно обробляти поступово.

Як це працює?

  1. IAsyncEnumerable<T>: інтерфейс із методом GetAsyncEnumerator(CancellationToken cancellationToken). Токен скасування важливий.
  2. IAsyncEnumerator<T>: інтерфейс із ValueTask<bool> MoveNextAsync() (перехід до наступного) і Current (поточний елемент). Також наслідує IAsyncDisposable.
  3. await foreach: дає змогу легко перебирати IAsyncEnumerable<T>. Компілятор сам викликає MoveNextAsync() і звертається до Current. Головне: await foreach гарантує виклик DisposeAsync() для ітератора після завершення ітерації, навіть у разі помилок.

Створення IAsyncEnumerable<T> за допомогою async yield return

Ви можете використовувати yield return усередині async-методу, який повертає IAsyncEnumerable<T>. Це дає змогу створювати асинхронні генератори. Ваш метод може використовувати await, щоб призупинити генерацію, дочекатися асинхронної операції, а потім відновити роботу.

Приклад: простий асинхронний генератор


async IAsyncEnumerable<int> GenerateNumbersAsync()
{
    for (int i = 0; i < 3; i++)
    {
        Console.WriteLine($"Генерую: {i}");
        await Task.Delay(100); // Імітація асинхронної роботи
        yield return i; 
    }
}

// Використання:
async Task ConsumeAsyncNumbers()
{
    await foreach (var number in GenerateNumbersAsync())
    {
        Console.WriteLine($"Отримано: {number}");
    }
}
// Виклик: await ConsumeAsyncNumbers();

Приклад: читання файлу построково, асинхронно


async IAsyncEnumerable<string> ReadFileLinesAsync(string filePath)
{
    using var reader = new StreamReader(filePath); // 'using' тут (StreamReader реалізує IDisposable)
    string? line;
    while ((line = await reader.ReadLineAsync()) != null) 
    {
        yield return line;
    }
}

// Використання:
async Task ProcessFileAsync()
{
    await File.WriteAllLinesAsync("data.txt", new[] { "Рядок 1", "Рядок 2", "Рядок 3" });
    await foreach (var line in ReadFileLinesAsync("data.txt")) 
    {
        Console.WriteLine($"Оброблений рядок: {line}");
    }
}
// Виклик: await ProcessFileAsync();

Приклад: асинхронний генератор зі скасуванням (CancellationToken)


async IAsyncEnumerable<int> GetCancelableSequence(
    [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken token = default)
{
    for (int i = 0; i < 10; i++)
    {
        token.ThrowIfCancellationRequested(); // Перевіряємо, чи скасовано
        await Task.Delay(200, token); // Task.Delay також підтримує скасування через токен
        yield return i;
    }
}

// Використання:
async Task ConsumeAndCancel()
{
    var cts = new CancellationTokenSource(500); // Скасування через 500 мс
    try
    {
        await foreach (var num in GetCancelableSequence(cts.Token))
        {
            Console.WriteLine($"Отримано: {num}");
        }
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Генерацію скасовано!");
    }
}
// Виклик: await ConsumeAndCancel();

Атрибут [EnumeratorCancellation] дає змогу передати CancellationToken до асинхронного генератора. Це дозволяє скасувати ітерацію, якщо код, що викликає, ініціює скасування через CancellationTokenSource. Без цього атрибута токен не буде автоматично передано у GetAsyncEnumerator.

3. Асинхронне керування ресурсами

Проблема синхронного IDisposable

Метод Dispose() у IDisposable синхронний (void Dispose()). Ви не можете використовувати await усередині нього. Якщо закриття зʼєднання з базою даних або скидання буферів у файл — тривалі та асинхронні операції, синхронний Dispose() заблокує потік, а це погано для асинхронних застосунків.

Рішення: IAsyncDisposable

IAsyncDisposable розвʼязує цю проблему. Він містить єдиний метод: ValueTask DisposeAsync() — асинхронний метод для очищення.

await using

Це асинхронний аналог using. Він призначений для обʼєктів, що реалізують IAsyncDisposable.

  • await using гарантує, що DisposeAsync() буде викликано після завершення блоку коду, у якому оголошено ресурс (або під час виходу з нього через виняток).
  • Дозволяє коректно звільняти ресурси асинхронно, уникаючи блокувань.

Приклад: базовий IAsyncDisposable і await using


class MyAsyncResource : IAsyncDisposable
{
    public MyAsyncResource() => Console.WriteLine("Ресурс відкрито.");
    
    public async ValueTask DisposeAsync()
    {
        Console.WriteLine("Починаю асинхронне очищення...");
        await Task.Delay(200); // Імітація асинхронного очищення
        Console.WriteLine("Асинхронне очищення завершено.");
    }
}

// Використання await using
async Task UseAndDisposeResource()
{
    await using var resource = new MyAsyncResource(); 
    Console.WriteLine("Ресурс використовується...");
} // Тут автоматично викликається resource.DisposeAsync()

// Виклик: await UseAndDisposeResource();

Приклад: кілька блоків await using


async Task UseMultipleResources()
{
    await using var res1 = new MyAsyncResource();
    await using var res2 = new MyAsyncResource();
    Console.WriteLine("Використовую обидва ресурси...");
} // ресурси звільняються в порядку LIFO (Last In, First Out): res2.DisposeAsync() викликається першим, потім res1.DisposeAsync().
// Виклик: await UseMultipleResources();

Сумісність IAsyncEnumerable<T> з IAsyncDisposable

Важливо: IAsyncEnumerator<T> (який використовує await foreach) сам по собі наслідує IAsyncDisposable. Це означає, що якщо ваш асинхронний генератор використовує ресурси (як StreamReader у прикладі вище), які можуть очищатися асинхронно, await foreach подбає про це. Він викличе DisposeAsync() у ітератора, коли ітерацію буде завершено.

Коментарі
ЩОБ ПОДИВИТИСЯ ВСІ КОМЕНТАРІ АБО ЗАЛИШИТИ КОМЕНТАР,
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ