1. «Производитель‑Потребитель» + Concurrent
Мы уже касались паттерна «Производитель‑Потребитель» при обсуждении ConcurrentQueue. Давайте рассмотрим его детальнее для случая с несколькими производителями и потребителями, а также корректной сигнализацией завершения.
Главная прелесть ConcurrentQueue (и других Concurrent коллекций) в том, что она сама заботится о потокобезопасности. Не нужно оборачивать Enqueue или TryDequeue в lock — несколько потоков могут безопасно взаимодействовать через общую очередь.
Пример: Несколько производителей и несколько потребителей
Несколько рабочих потоков генерируют задачи, а несколько других потоков их обрабатывают.
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
ConcurrentQueue<string> taskQueue = new ConcurrentQueue<string>();
CancellationTokenSource cts = new CancellationTokenSource(); // Для отмены работы потребителей
// Метод для производителя
void Producer(string name, int count)
{
for (int i = 0; i < count; i++)
{
string task = $"Задача_{name}_{i}";
taskQueue.Enqueue(task);
Console.WriteLine($"[P:{name}] Добавил: {task}");
Thread.Sleep(10);
}
}
// Метод для потребителя
void Consumer(string name)
{
while (!cts.Token.IsCancellationRequested || taskQueue.Count > 0)
{
if (taskQueue.TryDequeue(out string task))
{
Console.WriteLine($"[C:{name}] Обработал: {task}");
Thread.Sleep(20);
}
else
{
Thread.Sleep(50); // Ждем, если очередь пуста
}
}
Console.WriteLine($"[C:{name}] Завершил работу.");
}
// Запуск примера в Main:
Task.Run(() => Producer("A", 10)); // Производитель A
Task.Run(() => Producer("B", 10)); // Производитель B
Task.Run(() => Consumer("1")); // Потребитель 1
Task.Run(() => Consumer("2")); // Потребитель 2
Thread.Sleep(1000); // Даем время для работы
cts.Cancel(); // Сигнал потребителям завершать работу
Thread.Sleep(500); // Даем время потребителям добрать остатки и завершиться
Здесь несколько производителей и потребителей одновременно работают с одной ConcurrentQueue без гонок данных: методы Enqueue и TryDequeue атомарны.
Важность сигналов завершения работы (CancellationTokenSource)
Мы используем CancellationTokenSource (cts) для сигнализации потребителям о необходимости завершения работы. Это критично для паттерна Producer‑Consumer:
- Производители завершили работу. Когда добавление элементов окончено, потребители не должны ждать бесконечно пустую очередь.
- Приложение завершается. Нужно корректно остановить потребителей.
CancellationTokenSource и CancellationToken дают стандартный механизм: потребитель периодически проверяет IsCancellationRequested и при необходимости вызывает ThrowIfCancellationRequested().
2. BlockingCollection<T>
Хотя ConcurrentQueue<T> хорошо подходит для «производитель‑потребитель», она требует ручного ожидания при пустой очереди и самостоятельной сигнализации завершения. Для более удобной реализации в .NET есть BlockingCollection<T> — это не самостоятельная коллекция, а обёртка над любой IProducerConsumerCollection<T> (например, над ConcurrentQueue).
Преимущества BlockingCollection:
- Блокирующие операции. Add()/Take() блокируют поток, если коллекция полна/пуста. Не нужно вручную проверять IsEmpty.
- Ограничение размера. Можно задать емкость (Capacity). Add() заблокируется, когда лимит достигнут, — удобно для контроля памяти.
- Удобная финализация. CompleteAdding() сигнализирует об окончании добавления, а GetConsumingEnumerable() позволяет потребителю обрабатывать элементы до полного завершения.
Пример: Producer‑Consumer с BlockingCollection<T>
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
// BlockingCollection по умолчанию использует ConcurrentQueue
BlockingCollection<int> numbers = new BlockingCollection<int>(capacity: 10); // Очередь с лимитом 10
void ProducerBC(int count)
{
for (int i = 0; i < count; i++)
{
numbers.Add(i); // Блокирует, если очередь полна
Console.WriteLine($"[P] Добавил: {i}");
Thread.Sleep(50);
}
numbers.CompleteAdding(); // Сигнализируем, что производитель закончил
Console.WriteLine("[P] Производитель завершил добавление.");
}
void ConsumerBC()
{
// GetConsumingEnumerable блокирует, пока есть элементы или пока CompleteAdding не вызван
foreach (var item in numbers.GetConsumingEnumerable())
{
Console.WriteLine($"[C] Обработал: {item}");
Thread.Sleep(100);
}
Console.WriteLine("[C] Потребитель завершил работу.");
}
// Запуск примера в Main:
Task producerTask = Task.Run(() => ProducerBC(15)); // 15 элементов, лимит 10
Task consumerTask = Task.Run(ConsumerBC);
Task.WaitAll(producerTask, consumerTask); // Ждем завершения
Обратите внимание, насколько чище код потребителя благодаря GetConsumingEnumerable(). Если нужны блокирующие операции или ограничение размера — BlockingCollection ваш инструмент.
3. Дополнительные методы и свойства Concurrent-коллекций
IsEmpty, Count
- IsEmpty (bool): пуста ли коллекция.
- Count (int): текущее число элементов.
Пример: Использование IsEmpty и Count
using System.Collections.Concurrent;
ConcurrentQueue<string> q = new ConcurrentQueue<string>();
Console.WriteLine($"Очередь пуста? {q.IsEmpty}"); // True
q.Enqueue("A");
q.Enqueue("B");
Console.WriteLine($"Элементов в очереди: {q.Count}"); // 2
Console.WriteLine($"Очередь пуста? {q.IsEmpty}"); // False
q.TryDequeue(out var itemA);
Console.WriteLine($"Элементов в очереди: {q.Count}"); // 1
Преобразование в массивы (ToArray())
Все Concurrent-коллекции предоставляют метод ToArray(), который возвращает моментальный снимок элементов.
Пример: Использование ToArray()
using System.Collections.Concurrent;
ConcurrentStack<int> s = new ConcurrentStack<int>();
s.Push(10);
s.Push(20);
s.Push(30);
int[] items = s.ToArray(); // Создаст новый массив: [30, 20, 10] (для стека LIFO)
Console.WriteLine($"Элементы в массиве: {string.Join(", ", items)}");
// Коллекция остается неизменной
Console.WriteLine($"Элементов в стеке после ToArray: {s.Count}"); // 3
Очистка коллекций
В .NET 6+ многие Concurrent-коллекции получили метод Clear() для удаления всех элементов.
Пример: Очистка коллекции
using System.Collections.Concurrent;
ConcurrentBag<string> bag = new ConcurrentBag<string>();
bag.Add("Alpha");
bag.Add("Beta");
Console.WriteLine($"Элементов в Bag: {bag.Count}"); // 2
bag.Clear(); // Очищаем коллекцию
Console.WriteLine($"Элементов в Bag после очистки: {bag.Count}"); // 0
Console.WriteLine($"Bag пуст? {bag.IsEmpty}"); // True
4. Особенности поведения Concurrent-коллекций
Важно помнить о моментальном снимке данных. Свойства вроде Count и результаты ToArray() отражают состояние коллекции в конкретный момент времени. В условиях параллельных изменений это значение может устареть буквально сразу после получения.
Пример: Count и ToArray() — «моментальный снимок»
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
ConcurrentQueue<int> snapshotQueue = new ConcurrentQueue<int>();
void AddItemsContinuously()
{
for (int i = 0; i < 1000; i++)
{
snapshotQueue.Enqueue(i);
Thread.Sleep(1);
}
}
// Запуск примера в Main:
Task.Run(AddItemsContinuously); // Поток, который постоянно добавляет элементы
Thread.Sleep(100); // Даем немного времени для добавления
Console.WriteLine($"Текущий Count: {snapshotQueue.Count}"); // Может быть 50, 80, 120...
Thread.Sleep(100);
Console.WriteLine($"Текущий Count снова: {snapshotQueue.Count}"); // Будет другое значение
int[] currentItems = snapshotQueue.ToArray();
Console.WriteLine($"Количество элементов в ToArray(): {currentItems.Length}"); // Может отличаться от последнего Count
Не полагайтесь на Count как на строгую гарантию актуального количества элементов во время активных изменений.
5. Нюансы итерации по Concurrent-коллекциям
Отдельные операции (Add, TryTake, TryPop, GetOrAdd и т.д.) потокобезопасны. Но итерация с foreach по коллекции, которая параллельно модифицируется другими потоками, не гарантирует, что вы увидите все элементы или только их — возможны пропуски и неожиданности.
Пример: Итерация во время модификации
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
ConcurrentQueue<int> iterQueue = new ConcurrentQueue<int>();
// Добавляем начальные элементы
for (int i = 0; i < 10; i++) iterQueue.Enqueue(i);
// Поток-модификатор
void Modifier()
{
for (int i = 10; i < 20; i++)
{
iterQueue.Enqueue(i); // Добавляем новые элементы
Thread.Sleep(50);
}
}
// Поток-итератор
void Iterator()
{
Console.WriteLine("Начинаем итерацию...");
int count = 0;
foreach (var item in iterQueue) // Итерируем
{
Console.Write($"{item} ");
count++;
Thread.Sleep(30); // Имитация работы, даем модификатору изменить коллекцию
}
Console.WriteLine($"\nИтерация завершена. Прочитано {count} элементов.");
Console.WriteLine($"Текущее количество в очереди: {iterQueue.Count}");
}
// Запуск примера в Main:
Task.Run(Modifier);
Task.Run(Iterator);
Thread.Sleep(1500); // Даем время на работу
Правило: если нужен фиксированный набор элементов (например, для отчёта), сначала возьмите «снимок» через ToArray(), а затем итерируйте по нему:
// Правильный способ итерации, если коллекция может меняться
int[] snapshot = iterQueue.ToArray();
foreach (var item in snapshot)
{
// Теперь вы итерируете по неизменяемому массиву-снимку
}
Это завершает наше погружение в продвинутые паттерны и особенности Concurrent-коллекций: мы посмотрели на Producer‑Consumer с несколькими участниками и BlockingCollection, а также разобрали важные нюансы работы с Count, ToArray() и итерацией.
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ