JavaRush /Курси /C# SELF /Поглиблені патерни та особливості

Поглиблені патерни та особливості Concurrent-колекцій

C# SELF
Рівень 58 , Лекція 3
Відкрита

1. «Виробник‑Споживач» + Concurrent

Ми вже розглядали патерн «Виробник‑Споживач» під час обговорення ConcurrentQueue. Розгляньмо його детальніше для випадку з кількома виробниками й споживачами, а також коректною сигналізацією завершення.

Головна перевага ConcurrentQueue (і інших Concurrent-колекцій) полягає в тому, що колекція сама забезпечує потокобезпечність. Не потрібно обгортати Enqueue або TryDequeue у lock — кілька потоків можуть безпечно взаємодіяти через спільну чергу.

Приклад: Кілька виробників і кілька споживачів

Кілька робочих потоків генерують завдання, а кілька інших потоків їх обробляють.

using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

ConcurrentQueue<string> taskQueue = new ConcurrentQueue<string>();
CancellationTokenSource cts = new CancellationTokenSource(); // Для скасування роботи споживачів

// Метод для виробника
void Producer(string name, int count)
{
    for (int i = 0; i < count; i++)
    {
        string task = $"Завдання_{name}_{i}";
        taskQueue.Enqueue(task);
        Console.WriteLine($"[P:{name}] Додав: {task}");
        Thread.Sleep(10); 
    }
}

// Метод для споживача
void Consumer(string name)
{
    while (!cts.Token.IsCancellationRequested || taskQueue.Count > 0)
    {
        if (taskQueue.TryDequeue(out string task))
        {
            Console.WriteLine($"[C:{name}] Обробив: {task}");
            Thread.Sleep(20); 
        }
        else
        {
            Thread.Sleep(50); // Чекаємо, якщо черга порожня
        }
    }
    Console.WriteLine($"[C:{name}] Завершив роботу.");
}

// Запуск прикладу в Main:
Task.Run(() => Producer("A", 10)); // Виробник A
Task.Run(() => Producer("B", 10)); // Виробник B
Task.Run(() => Consumer("1"));    // Споживач 1
Task.Run(() => Consumer("2"));    // Споживач 2

Thread.Sleep(1000); // Даємо час попрацювати
cts.Cancel();       // Сигнал споживачам завершувати роботу
Thread.Sleep(500); // Даємо час споживачам добрати залишки й завершитися

Тут кілька виробників і споживачів одночасно працюють з однією ConcurrentQueue без станів гонки: методи Enqueue і TryDequeue атомарні.

Важливість сигналів завершення роботи (CancellationTokenSource)

Ми використовуємо CancellationTokenSource (cts) для сигналізації споживачам про необхідність завершення роботи. Це критично для патерна Producer‑Consumer:

  • Виробники завершили роботу. Коли додавання елементів закінчено, споживачі не повинні безкінечно чекати на порожню чергу.
  • Застосунок завершується. Потрібно коректно зупинити споживачів.

CancellationTokenSource і CancellationToken надають стандартний механізм: споживач періодично перевіряє IsCancellationRequested і за потреби викликає ThrowIfCancellationRequested().

2. BlockingCollection<T>

Хоча ConcurrentQueue<T> добре підходить для «виробник‑споживач», вона вимагає ручного очікування під час порожньої черги та самостійної сигналізації завершення. Для зручнішої реалізації у .NET є BlockingCollection<T> — це не окрема колекція, а обгортка над будь‑якою IProducerConsumerCollection<T> (наприклад, над ConcurrentQueue).

Переваги BlockingCollection:

  • Блокувальні операції. Add()/Take() блокують потік, якщо колекція повна або порожня. Не потрібно вручну перевіряти IsEmpty.
  • Обмеження розміру. Можна задати ємність (Capacity). Add() заблокується, коли досягнуто ліміт, — зручно для контролю пам’яті.
  • Зручне завершення. CompleteAdding() сигналізує про завершення додавання, а GetConsumingEnumerable() дозволяє споживачу обробляти елементи до повного завершення.

Приклад: Producer‑Consumer з BlockingCollection<T>

using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

// BlockingCollection за замовчуванням використовує ConcurrentQueue
BlockingCollection<int> numbers = new BlockingCollection<int>(capacity: 10); // Черга з лімітом 10

void ProducerBC(int count)
{
    for (int i = 0; i < count; i++)
    {
        numbers.Add(i); // Блокує, якщо черга повна
        Console.WriteLine($"[P] Додав: {i}");
        Thread.Sleep(50);
    }
    numbers.CompleteAdding(); // Сигналізуємо, що виробник завершив додавання
    Console.WriteLine("[P] Виробник завершив додавання.");
}

void ConsumerBC()
{
    // GetConsumingEnumerable блокує, доки є елементи або доки не буде викликано CompleteAdding
    foreach (var item in numbers.GetConsumingEnumerable())
    {
        Console.WriteLine($"[C] Обробив: {item}");
        Thread.Sleep(100);
    }
    Console.WriteLine("[C] Споживач завершив роботу.");
}

// Запуск прикладу в Main:
Task producerTask = Task.Run(() => ProducerBC(15)); // 15 елементів, ліміт 10
Task consumerTask = Task.Run(ConsumerBC);
Task.WaitAll(producerTask, consumerTask); // Чекаємо завершення

Зверніть увагу, наскільки чистішим стає код споживача завдяки GetConsumingEnumerable(). Якщо потрібні блокувальні операції або обмеження розміру — BlockingCollection — ваш інструмент.

3. Додаткові методи та властивості Concurrent-колекцій

IsEmpty, Count

  • IsEmpty (bool): чи порожня колекція.
  • Count (int): поточна кількість елементів.

Приклад: Використання IsEmpty та Count

using System.Collections.Concurrent;

ConcurrentQueue<string> q = new ConcurrentQueue<string>();
Console.WriteLine($"Черга порожня? {q.IsEmpty}"); // True

q.Enqueue("A");
q.Enqueue("B");
Console.WriteLine($"Елементів у черзі: {q.Count}"); // 2
Console.WriteLine($"Черга порожня? {q.IsEmpty}"); // False

q.TryDequeue(out var itemA);
Console.WriteLine($"Елементів у черзі: {q.Count}"); // 1

Перетворення в масиви (ToArray())

Усі Concurrent-колекції надають метод ToArray(), який повертає моментальний знімок елементів.

Приклад: Використання ToArray()

using System.Collections.Concurrent;

ConcurrentStack<int> s = new ConcurrentStack<int>();
s.Push(10);
s.Push(20);
s.Push(30);

int[] items = s.ToArray(); // Створить новий масив: [30, 20, 10] (для LIFO‑стеку)
Console.WriteLine($"Елементи в масиві: {string.Join(", ", items)}");

// Колекція залишається незмінною
Console.WriteLine($"Елементів у стеку після ToArray: {s.Count}"); // 3

Очищення колекцій

У .NET 6+ багато Concurrent-колекцій отримали метод Clear() для видалення всіх елементів.

Приклад: Очищення колекції

using System.Collections.Concurrent;

ConcurrentBag<string> bag = new ConcurrentBag<string>();
bag.Add("Alpha");
bag.Add("Beta");
Console.WriteLine($"Елементів у мішку: {bag.Count}"); // 2

bag.Clear(); // Очищаємо колекцію
Console.WriteLine($"Елементів у мішку після очищення: {bag.Count}"); // 0
Console.WriteLine($"Мішок порожній? {bag.IsEmpty}"); // True

4. Особливості поведінки Concurrent-колекцій

Важливо пам’ятати про моментальний знімок даних. Властивості на кшталт Count і результати ToArray() відображають стан колекції в конкретний момент часу. В умовах паралельних змін це значення може застаріти буквально одразу після отримання.

Приклад: Count і ToArray() — «моментальний знімок»

using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

ConcurrentQueue<int> snapshotQueue = new ConcurrentQueue<int>();

void AddItemsContinuously()
{
    for (int i = 0; i < 1000; i++)
    {
        snapshotQueue.Enqueue(i);
        Thread.Sleep(1); 
    }
}

// Запуск прикладу в Main:
Task.Run(AddItemsContinuously); // Потік, який постійно додає елементи

Thread.Sleep(100); // Даємо трохи часу для додавання
Console.WriteLine($"Поточний Count: {snapshotQueue.Count}"); // Може бути 50, 80, 120...
Thread.Sleep(100);
Console.WriteLine($"Поточний Count знову: {snapshotQueue.Count}"); // Буде інше значення
int[] currentItems = snapshotQueue.ToArray();
Console.WriteLine($"Кількість елементів у ToArray(): {currentItems.Length}"); // Може відрізнятися від останнього Count

Не покладайтеся на Count як на строгий гарант актуальної кількості елементів під час активних змін.

5. Нюанси ітерації Concurrent-колекціями

Окремі операції (Add, TryTake, TryPop, GetOrAdd тощо) — потокобезпечні. Але ітерація оператором foreach колекцією, яку паралельно змінюють інші потоки, не гарантує, що ви побачите всі елементи або лише їх — можливі пропуски та несподіванки.

Приклад: Ітерація під час модифікації

using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

ConcurrentQueue<int> iterQueue = new ConcurrentQueue<int>();

// Додаємо початкові елементи
for (int i = 0; i < 10; i++) iterQueue.Enqueue(i);

// Потік‑модифікатор
void Modifier()
{
    for (int i = 10; i < 20; i++)
    {
        iterQueue.Enqueue(i); // Додаємо нові елементи
        Thread.Sleep(50);
    }
}

// Потік‑ітератор
void Iterator()
{
    Console.WriteLine("Починаємо ітерацію...");
    int count = 0;
    foreach (var item in iterQueue) // Ітеруємо
    {
        Console.Write($"{item} ");
        count++;
        Thread.Sleep(30); // Імітація роботи, даємо модифікатору змінити колекцію
    }
    Console.WriteLine($"\nІтерацію завершено. Прочитано {count} елементів.");
    Console.WriteLine($"Поточна кількість у черзі: {iterQueue.Count}");
}

// Запуск прикладу в Main:
Task.Run(Modifier);
Task.Run(Iterator);
Thread.Sleep(1500); // Даємо час на роботу

Правило: якщо потрібен фіксований набір елементів (наприклад, для звіту), спершу візьміть «знімок» через ToArray(), а потім ітеруйте цей масив:

// Правильний спосіб ітерації, якщо колекція може змінюватися
int[] snapshot = iterQueue.ToArray();
foreach (var item in snapshot)
{
    // Тепер ви перебираєте незмінний масив‑знімок
}

На цьому завершуємо наше занурення у поглиблені патерни й особливості Concurrent-колекцій: ми розглянули Producer‑Consumer з кількома учасниками та BlockingCollection, а також важливі нюанси роботи з Count, ToArray() й ітерацією.

Коментарі
ЩОБ ПОДИВИТИСЯ ВСІ КОМЕНТАРІ АБО ЗАЛИШИТИ КОМЕНТАР,
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ