JavaRush /Курси /C# SELF /Channel: Виробник–споживач (

Channel: Виробник–споживач ( Channel)

C# SELF
Рівень 62 , Лекція 3
Відкрита

1. Вступ

Сьогодні ми виходимо на новий рівень! Час познайомитися з особливим інструментом — Channel. Цей інструмент створено спеціально для сучасних асинхронних застосунків у .NET: там, де звичайні блокування або не допомагають, або суттєво уповільнюють роботу.

На вас чекає знайомство з патерном «Виробник–споживач» (Producer–Consumer), який не втрачає популярності з 60‑х років XX століття. Ви створите простий асинхронний «конвеєр», де одні потоки або завдання щось виробляють (наприклад, завантажують файли, обчислюють числа, очікують події), а інші потоки їх обробляють (наприклад, зберігають, записують у базу даних, відображають в інтерфейсі користувача UI).

Чому з’явився Channel?

  1. Патерн «Виробник–споживач» давно розв’язували чергами: виробник кладе завдання в чергу, споживач їх дістає. Але BlockingCollection<T>, черги на основі ConcurrentQueue<T>, навіть ручна синхронізація за допомогою lock — усе це не асинхронні підходи. Тобто потік може лише блокуватися в очікуванні даних, а не повертати керування планувальнику async/await.
  2. Асинхронність у .NET — це фундамент сучасної архітектури. Блокувати потоки заради очікування елементів — дорого й неефективно. Потрібно вміти очікувати появи даних без блокування — саме це й розв’язує Channel.
  3. Гнучкість: за допомогою каналів можна побудувати складні конвеєри обробки даних, розділяти логіку потоків, додавати проміжні кроки й балансування навантаження — і все це без болю низькорівневої синхронізації.

Що таке Channel? (Аналогія та архітектура)

Уявімо, що у вас є естафетна паличка (або стрічка на транспортері), якою можна передавати об’єкти з одного місця в інше — і нікому не треба особисто зустрічатися з колегою із сусіднього відділу. Головне — щоб паличка не загубилася дорогою.

Channel — це вбудований у .NET засіб для асинхронної передачі даних між різними завданнями (tasks), потоками або частинами програми. Він реалізує асинхронну чергу з підтримкою очікування як вставляння, так і вилучення елементів.

  • Виробник додає до каналу елементи (наприклад, запити на обробку);
  • Споживач дістає елементи — і справу зроблено!

2. Клас Channel<T> і як він влаштований

Усе починається з простору імен:

using System.Threading.Channels;

На відміну від звичних колекцій, Channel — це фабрика, яка створює спеціальні об’єкти для передачі даних.

Основні типи:

  • ChannelWriter<T> — «записувач» (виробник). Лише додає елементи.
  • ChannelReader<T> — «зчитувач» (споживач). Лише витягує елементи.
  • Канал (Channel) розділяє відповідальність: записувач нічого не знає про зчитувача — і навпаки.

У .NET є кілька реалізацій каналу, кожна зі своїми особливостями: unbounded (без обмежень за розміром), bounded (з обмеженням на кількість елементів), single‑producer‑single‑consumer (SPSC), multi‑producer‑multi‑consumer (MPMC) тощо. Почнемо з найуніверсальнішої.

Простий приклад: асинхронна черга завдань

using System;
using System.Threading.Channels;
using System.Threading.Tasks;

class Program
{
    static async Task Main()
    {
        // Створюємо канал без обмеження розміру
        var channel = Channel.CreateUnbounded<int>();

        // Завдання-виробник
        var producer = Task.Run(async () =>
        {
            for (int i = 0; i < 10; i++)
            {
                Console.WriteLine($"Виробник: додає {i} до каналу");
                await channel.Writer.WriteAsync(i); // Асинхронний запис!
                await Task.Delay(100); // Імітація роботи
            }
            channel.Writer.Complete(); // Повідомляємо, що більше не додаватимемо
        });

        // Завдання-споживач
        var consumer = Task.Run(async () =>
        {
            await foreach (var item in channel.Reader.ReadAllAsync())
            {
                Console.WriteLine($"Споживач: отримав {item} з каналу");
                await Task.Delay(200); // Імітація обробки
            }
            Console.WriteLine("Споживач: канал закрито");
        });

        await Task.WhenAll(producer, consumer);
    }
}

Що тут відбувається?

  • Channel.CreateUnbounded<int>() — створюємо канал без обмежень за розміром черги.
  • Виробник записує числа від 0 до 9 у канал за допомогою WriteAsync.
  • Після завершення запису викликається Complete() — сигнал «Більше елементів не буде!».
  • Споживач перебирає всі елементи через ReadAllAsync() (також асинхронно), доки канал не закриється.
  • Затримки (Task.Delay) імітують реальну роботу: бачите — числа пишуться швидше, ніж читаються.

3. Чому все це працює асинхронно?

Звичайні блокувальні черги (наприклад, BlockingCollection або ті, що захищені lock) можуть лише блокувати потік. А це означає, що ми втрачаємо цінні ресурси, якщо маємо багато завдань або прагнемо максимальної продуктивності.

З каналами:

  • Якщо виробник швидший, канал накопичує елементи (обмеження — лише пам’ять або максимальний розмір, якщо його задано).
  • Якщо споживач швидший, він чекатиме, доки щось з’явиться — і не блокуватиме потік, а віддасть його планувальнику.

Це ідеально для сценаріїв, коли ви не знаєте заздалегідь, хто буде швидшим — виробники чи споживачі.

Застосування в реальному житті

  • Асинхронне логування: запис повідомлень у файл або базу даних виконується в окремому потоці;
  • Обробка веб‑запитів: одне завдання завантажує добірку сторінок, інше аналізує їхній вміст;
  • Сканування та індексування каталогів: одні завдання обходять файлову систему, інші обчислюють статистику по файлах;
  • Складні конвеєри обробки даних: наприклад, в ETL (Extract–Transform–Load) завданнях один крок перетворює сировину на напівфабрикати, інший — на готовий продукт.

4. Обмежений канал (Bounded Channel)

«Безлімітні» канали — це, звісно, зручно, але пам’ять усе ж не безмежна (навіть якщо ваш комп’ютер здається потужним).

Обмежений канал (bounded) дозволяє встановити максимальну кількість елементів, які можуть одночасно перебувати всередині. Якщо канал заповнений — виробник чекає, поки споживач щось витягне.

Приклад:

var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(3)
{
    FullMode = BoundedChannelFullMode.Wait // (за замовчуванням) — чекати, поки звільниться місце
});

Тут лише три елементи можуть одночасно перебувати в каналі. Якщо виробник спробує записати четвертий — він чекатиме.

Кілька виробників і споживачів

var channel = Channel.CreateUnbounded<int>();

// 2 виробники
for (int producerId = 0; producerId < 2; producerId++)
{
    Task.Run(async () =>
    {
        for (int i = 0; i < 5; i++)
        {
            int value = producerId * 100 + i;
            Console.WriteLine($"Виробник {producerId}: додає {value}");
            await channel.Writer.WriteAsync(value);
            await Task.Delay(50);
        }
        // Кожен виробник викликає Complete() — це небезпечно!
    });
}
// Хитрість: Complete() має викликатися лише один раз, коли ВСІ виробники завершили роботу.
// Для прикладу залишимо одне завдання-споживач:
Task.Run(async () =>
{
    await foreach (var item in channel.Reader.ReadAllAsync())
    {
        Console.WriteLine($"Споживач отримав {item}");
        await Task.Delay(100);
    }
});

Увага! Канал слід закривати (через Complete()) лише після завершення роботи всіх виробників. Інакше хтось іще може спробувати записати, а канал уже закритий! У реальних застосунках для цього зазвичай використовують лічильник завдань або Task.WhenAll.

5. Практика: обробка зображень через канал

Давайте трохи ускладнимо завдання. Уявімо, що маємо каталог із зображеннями. Одне завдання шукає зображення й поміщає їхні шляхи в канал, інше — бере шлях і робить із цим файлом щось корисне (наприклад, рахує розмір або перетворює).

Пояснення: для простоти приклад працюватиме з іменами файлів (без роботи із зображеннями), але суть ідентична.

using System;
using System.IO;
using System.Threading.Channels;
using System.Threading.Tasks;

class Program
{
    static async Task Main()
    {
        var channel = Channel.CreateBounded<string>(5);

        // Виробник: шукає файли .jpg у каталозі
        var producer = Task.Run(async () =>
        {
            foreach (var file in Directory.EnumerateFiles(@"images", "*.jpg"))
            {
                await channel.Writer.WriteAsync(file);
                Console.WriteLine($"Додано в чергу: {file}");
                await Task.Delay(50); // Імітуємо затримку пошуку
            }
            channel.Writer.Complete(); // Кінець черги
        });

        // Споживач: читає й "обробляє" файли
        var consumer = Task.Run(async () =>
        {
            await foreach (var file in channel.Reader.ReadAllAsync())
            {
                Console.WriteLine($"Оброблення файла: {file}");
                await Task.Delay(200); // Імітація обробки
            }
            Console.WriteLine("Усі зображення оброблено!");
        });

        await Task.WhenAll(producer, consumer);
    }
}

6. Конфігурація Channel: опції та нюанси

Канали можна налаштовувати за допомогою опцій під час створення — ось головні параметри для обмежених каналів:

Опція Опис
Capacity
Максимальна кількість елементів, які можуть одночасно бути в каналі
SingleWriter
true, якщо у вас лише один виробник (прискорює роботу)
SingleReader
true, якщо у вас лише один споживач (прискорює роботу)
FullMode
Що робити, якщо канал заповнився? Можливі значення: Wait, DropWrite, DropOldest, DropNewest

Приклад з опціями:

var options = new BoundedChannelOptions(10)
{
    SingleWriter = false,
    SingleReader = true,
    FullMode = BoundedChannelFullMode.Wait
};
var channel = Channel.CreateBounded<string>(options);

7. Асинхронні методи: ReadAsync, WriteAsync, ReadAllAsync

Чому async такий важливий?

Методи WriteAsync і ReadAsync не блокують потік. Якщо немає що читати — завдання ставиться на паузу, звільняючи потік для інших завдань. Це особливо важливо для серверних та UI‑застосунків, де зайве блокування може призвести до зависань.

ReadAllAsync — зручно для сучасного C#

Можна ітеруватися асинхронно:

await foreach (var item in channel.Reader.ReadAllAsync())
{
    // Працюємо з item
}

Channel<T> і потокобезпечні колекції: у чому різниця?

ConcurrentQueue<T> і BlockingCollection<T> добре підходять для сценаріїв із потоками, але не для чистої асинхронності (await-сценарії).

Channel<T> спроєктовано саме для асинхронних конвеєрних (pipeline) застосунків. З точки зору потокобезпеки, і ті, і інші колекції чудово справляються, але канали дають гнучкість та інтегруються із сучасними можливостями C# (IAsyncEnumerable тощо).

8. Помилки й типові пастки

Не забувайте викликати Complete() у записувача, коли всі елементи додано. Інакше споживач зависатиме в очікуванні нових елементів безкінечно.

Не викликайте Complete() кілька разів, якщо записувачів багато — робіть це лише після того, як абсолютно всі виробники завершили роботу.

Після закриття каналу писати елементи більше не можна, але читати залишки — можна.

Стан гонки під час одночасного запису: якщо канал закрито, а хтось іще намагається записати — отримаєте виняток.

Коментарі
ЩОБ ПОДИВИТИСЯ ВСІ КОМЕНТАРІ АБО ЗАЛИШИТИ КОМЕНТАР,
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ