1. «Виробник‑Споживач» + Concurrent
Ми вже розглядали патерн «Виробник‑Споживач» під час обговорення ConcurrentQueue. Розгляньмо його детальніше для випадку з кількома виробниками й споживачами, а також коректною сигналізацією завершення.
Головна перевага ConcurrentQueue (і інших Concurrent-колекцій) полягає в тому, що колекція сама забезпечує потокобезпечність. Не потрібно обгортати Enqueue або TryDequeue у lock — кілька потоків можуть безпечно взаємодіяти через спільну чергу.
Приклад: Кілька виробників і кілька споживачів
Кілька робочих потоків генерують завдання, а кілька інших потоків їх обробляють.
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
ConcurrentQueue<string> taskQueue = new ConcurrentQueue<string>();
CancellationTokenSource cts = new CancellationTokenSource(); // Для скасування роботи споживачів
// Метод для виробника
void Producer(string name, int count)
{
for (int i = 0; i < count; i++)
{
string task = $"Завдання_{name}_{i}";
taskQueue.Enqueue(task);
Console.WriteLine($"[P:{name}] Додав: {task}");
Thread.Sleep(10);
}
}
// Метод для споживача
void Consumer(string name)
{
while (!cts.Token.IsCancellationRequested || taskQueue.Count > 0)
{
if (taskQueue.TryDequeue(out string task))
{
Console.WriteLine($"[C:{name}] Обробив: {task}");
Thread.Sleep(20);
}
else
{
Thread.Sleep(50); // Чекаємо, якщо черга порожня
}
}
Console.WriteLine($"[C:{name}] Завершив роботу.");
}
// Запуск прикладу в Main:
Task.Run(() => Producer("A", 10)); // Виробник A
Task.Run(() => Producer("B", 10)); // Виробник B
Task.Run(() => Consumer("1")); // Споживач 1
Task.Run(() => Consumer("2")); // Споживач 2
Thread.Sleep(1000); // Даємо час попрацювати
cts.Cancel(); // Сигнал споживачам завершувати роботу
Thread.Sleep(500); // Даємо час споживачам добрати залишки й завершитися
Тут кілька виробників і споживачів одночасно працюють з однією ConcurrentQueue без станів гонки: методи Enqueue і TryDequeue атомарні.
Важливість сигналів завершення роботи (CancellationTokenSource)
Ми використовуємо CancellationTokenSource (cts) для сигналізації споживачам про необхідність завершення роботи. Це критично для патерна Producer‑Consumer:
- Виробники завершили роботу. Коли додавання елементів закінчено, споживачі не повинні безкінечно чекати на порожню чергу.
- Застосунок завершується. Потрібно коректно зупинити споживачів.
CancellationTokenSource і CancellationToken надають стандартний механізм: споживач періодично перевіряє IsCancellationRequested і за потреби викликає ThrowIfCancellationRequested().
2. BlockingCollection<T>
Хоча ConcurrentQueue<T> добре підходить для «виробник‑споживач», вона вимагає ручного очікування під час порожньої черги та самостійної сигналізації завершення. Для зручнішої реалізації у .NET є BlockingCollection<T> — це не окрема колекція, а обгортка над будь‑якою IProducerConsumerCollection<T> (наприклад, над ConcurrentQueue).
Переваги BlockingCollection:
- Блокувальні операції. Add()/Take() блокують потік, якщо колекція повна або порожня. Не потрібно вручну перевіряти IsEmpty.
- Обмеження розміру. Можна задати ємність (Capacity). Add() заблокується, коли досягнуто ліміт, — зручно для контролю пам’яті.
- Зручне завершення. CompleteAdding() сигналізує про завершення додавання, а GetConsumingEnumerable() дозволяє споживачу обробляти елементи до повного завершення.
Приклад: Producer‑Consumer з BlockingCollection<T>
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
// BlockingCollection за замовчуванням використовує ConcurrentQueue
BlockingCollection<int> numbers = new BlockingCollection<int>(capacity: 10); // Черга з лімітом 10
void ProducerBC(int count)
{
for (int i = 0; i < count; i++)
{
numbers.Add(i); // Блокує, якщо черга повна
Console.WriteLine($"[P] Додав: {i}");
Thread.Sleep(50);
}
numbers.CompleteAdding(); // Сигналізуємо, що виробник завершив додавання
Console.WriteLine("[P] Виробник завершив додавання.");
}
void ConsumerBC()
{
// GetConsumingEnumerable блокує, доки є елементи або доки не буде викликано CompleteAdding
foreach (var item in numbers.GetConsumingEnumerable())
{
Console.WriteLine($"[C] Обробив: {item}");
Thread.Sleep(100);
}
Console.WriteLine("[C] Споживач завершив роботу.");
}
// Запуск прикладу в Main:
Task producerTask = Task.Run(() => ProducerBC(15)); // 15 елементів, ліміт 10
Task consumerTask = Task.Run(ConsumerBC);
Task.WaitAll(producerTask, consumerTask); // Чекаємо завершення
Зверніть увагу, наскільки чистішим стає код споживача завдяки GetConsumingEnumerable(). Якщо потрібні блокувальні операції або обмеження розміру — BlockingCollection — ваш інструмент.
3. Додаткові методи та властивості Concurrent-колекцій
IsEmpty, Count
- IsEmpty (bool): чи порожня колекція.
- Count (int): поточна кількість елементів.
Приклад: Використання IsEmpty та Count
using System.Collections.Concurrent;
ConcurrentQueue<string> q = new ConcurrentQueue<string>();
Console.WriteLine($"Черга порожня? {q.IsEmpty}"); // True
q.Enqueue("A");
q.Enqueue("B");
Console.WriteLine($"Елементів у черзі: {q.Count}"); // 2
Console.WriteLine($"Черга порожня? {q.IsEmpty}"); // False
q.TryDequeue(out var itemA);
Console.WriteLine($"Елементів у черзі: {q.Count}"); // 1
Перетворення в масиви (ToArray())
Усі Concurrent-колекції надають метод ToArray(), який повертає моментальний знімок елементів.
Приклад: Використання ToArray()
using System.Collections.Concurrent;
ConcurrentStack<int> s = new ConcurrentStack<int>();
s.Push(10);
s.Push(20);
s.Push(30);
int[] items = s.ToArray(); // Створить новий масив: [30, 20, 10] (для LIFO‑стеку)
Console.WriteLine($"Елементи в масиві: {string.Join(", ", items)}");
// Колекція залишається незмінною
Console.WriteLine($"Елементів у стеку після ToArray: {s.Count}"); // 3
Очищення колекцій
У .NET 6+ багато Concurrent-колекцій отримали метод Clear() для видалення всіх елементів.
Приклад: Очищення колекції
using System.Collections.Concurrent;
ConcurrentBag<string> bag = new ConcurrentBag<string>();
bag.Add("Alpha");
bag.Add("Beta");
Console.WriteLine($"Елементів у мішку: {bag.Count}"); // 2
bag.Clear(); // Очищаємо колекцію
Console.WriteLine($"Елементів у мішку після очищення: {bag.Count}"); // 0
Console.WriteLine($"Мішок порожній? {bag.IsEmpty}"); // True
4. Особливості поведінки Concurrent-колекцій
Важливо пам’ятати про моментальний знімок даних. Властивості на кшталт Count і результати ToArray() відображають стан колекції в конкретний момент часу. В умовах паралельних змін це значення може застаріти буквально одразу після отримання.
Приклад: Count і ToArray() — «моментальний знімок»
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
ConcurrentQueue<int> snapshotQueue = new ConcurrentQueue<int>();
void AddItemsContinuously()
{
for (int i = 0; i < 1000; i++)
{
snapshotQueue.Enqueue(i);
Thread.Sleep(1);
}
}
// Запуск прикладу в Main:
Task.Run(AddItemsContinuously); // Потік, який постійно додає елементи
Thread.Sleep(100); // Даємо трохи часу для додавання
Console.WriteLine($"Поточний Count: {snapshotQueue.Count}"); // Може бути 50, 80, 120...
Thread.Sleep(100);
Console.WriteLine($"Поточний Count знову: {snapshotQueue.Count}"); // Буде інше значення
int[] currentItems = snapshotQueue.ToArray();
Console.WriteLine($"Кількість елементів у ToArray(): {currentItems.Length}"); // Може відрізнятися від останнього Count
Не покладайтеся на Count як на строгий гарант актуальної кількості елементів під час активних змін.
5. Нюанси ітерації Concurrent-колекціями
Окремі операції (Add, TryTake, TryPop, GetOrAdd тощо) — потокобезпечні. Але ітерація оператором foreach колекцією, яку паралельно змінюють інші потоки, не гарантує, що ви побачите всі елементи або лише їх — можливі пропуски та несподіванки.
Приклад: Ітерація під час модифікації
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
ConcurrentQueue<int> iterQueue = new ConcurrentQueue<int>();
// Додаємо початкові елементи
for (int i = 0; i < 10; i++) iterQueue.Enqueue(i);
// Потік‑модифікатор
void Modifier()
{
for (int i = 10; i < 20; i++)
{
iterQueue.Enqueue(i); // Додаємо нові елементи
Thread.Sleep(50);
}
}
// Потік‑ітератор
void Iterator()
{
Console.WriteLine("Починаємо ітерацію...");
int count = 0;
foreach (var item in iterQueue) // Ітеруємо
{
Console.Write($"{item} ");
count++;
Thread.Sleep(30); // Імітація роботи, даємо модифікатору змінити колекцію
}
Console.WriteLine($"\nІтерацію завершено. Прочитано {count} елементів.");
Console.WriteLine($"Поточна кількість у черзі: {iterQueue.Count}");
}
// Запуск прикладу в Main:
Task.Run(Modifier);
Task.Run(Iterator);
Thread.Sleep(1500); // Даємо час на роботу
Правило: якщо потрібен фіксований набір елементів (наприклад, для звіту), спершу візьміть «знімок» через ToArray(), а потім ітеруйте цей масив:
// Правильний спосіб ітерації, якщо колекція може змінюватися
int[] snapshot = iterQueue.ToArray();
foreach (var item in snapshot)
{
// Тепер ви перебираєте незмінний масив‑знімок
}
На цьому завершуємо наше занурення у поглиблені патерни й особливості Concurrent-колекцій: ми розглянули Producer‑Consumer з кількома учасниками та BlockingCollection, а також важливі нюанси роботи з Count, ToArray() й ітерацією.
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ