JavaRush /Курсы /C# SELF /Продвинутые паттерны и особенности

Продвинутые паттерны и особенности Concurrent-коллекций

C# SELF
58 уровень , 3 лекция
Открыта

1. «Производитель‑Потребитель» + Concurrent

Мы уже касались паттерна «Производитель‑Потребитель» при обсуждении ConcurrentQueue. Давайте рассмотрим его детальнее для случая с несколькими производителями и потребителями, а также корректной сигнализацией завершения.

Главная прелесть ConcurrentQueue (и других Concurrent коллекций) в том, что она сама заботится о потокобезопасности. Не нужно оборачивать Enqueue или TryDequeue в lock — несколько потоков могут безопасно взаимодействовать через общую очередь.

Пример: Несколько производителей и несколько потребителей

Несколько рабочих потоков генерируют задачи, а несколько других потоков их обрабатывают.

using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

ConcurrentQueue<string> taskQueue = new ConcurrentQueue<string>();
CancellationTokenSource cts = new CancellationTokenSource(); // Для отмены работы потребителей

// Метод для производителя
void Producer(string name, int count)
{
    for (int i = 0; i < count; i++)
    {
        string task = $"Задача_{name}_{i}";
        taskQueue.Enqueue(task);
        Console.WriteLine($"[P:{name}] Добавил: {task}");
        Thread.Sleep(10); 
    }
}

// Метод для потребителя
void Consumer(string name)
{
    while (!cts.Token.IsCancellationRequested || taskQueue.Count > 0)
    {
        if (taskQueue.TryDequeue(out string task))
        {
            Console.WriteLine($"[C:{name}] Обработал: {task}");
            Thread.Sleep(20); 
        }
        else
        {
            Thread.Sleep(50); // Ждем, если очередь пуста
        }
    }
    Console.WriteLine($"[C:{name}] Завершил работу.");
}

// Запуск примера в Main:
Task.Run(() => Producer("A", 10)); // Производитель A
Task.Run(() => Producer("B", 10)); // Производитель B
Task.Run(() => Consumer("1"));    // Потребитель 1
Task.Run(() => Consumer("2"));    // Потребитель 2

Thread.Sleep(1000); // Даем время для работы
cts.Cancel();       // Сигнал потребителям завершать работу
Thread.Sleep(500); // Даем время потребителям добрать остатки и завершиться

Здесь несколько производителей и потребителей одновременно работают с одной ConcurrentQueue без гонок данных: методы Enqueue и TryDequeue атомарны.

Важность сигналов завершения работы (CancellationTokenSource)

Мы используем CancellationTokenSource (cts) для сигнализации потребителям о необходимости завершения работы. Это критично для паттерна Producer‑Consumer:

  • Производители завершили работу. Когда добавление элементов окончено, потребители не должны ждать бесконечно пустую очередь.
  • Приложение завершается. Нужно корректно остановить потребителей.

CancellationTokenSource и CancellationToken дают стандартный механизм: потребитель периодически проверяет IsCancellationRequested и при необходимости вызывает ThrowIfCancellationRequested().

2. BlockingCollection<T>

Хотя ConcurrentQueue<T> хорошо подходит для «производитель‑потребитель», она требует ручного ожидания при пустой очереди и самостоятельной сигнализации завершения. Для более удобной реализации в .NET есть BlockingCollection<T> — это не самостоятельная коллекция, а обёртка над любой IProducerConsumerCollection<T> (например, над ConcurrentQueue).

Преимущества BlockingCollection:

  • Блокирующие операции. Add()/Take() блокируют поток, если коллекция полна/пуста. Не нужно вручную проверять IsEmpty.
  • Ограничение размера. Можно задать емкость (Capacity). Add() заблокируется, когда лимит достигнут, — удобно для контроля памяти.
  • Удобная финализация. CompleteAdding() сигнализирует об окончании добавления, а GetConsumingEnumerable() позволяет потребителю обрабатывать элементы до полного завершения.

Пример: Producer‑Consumer с BlockingCollection<T>

using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

// BlockingCollection по умолчанию использует ConcurrentQueue
BlockingCollection<int> numbers = new BlockingCollection<int>(capacity: 10); // Очередь с лимитом 10

void ProducerBC(int count)
{
    for (int i = 0; i < count; i++)
    {
        numbers.Add(i); // Блокирует, если очередь полна
        Console.WriteLine($"[P] Добавил: {i}");
        Thread.Sleep(50);
    }
    numbers.CompleteAdding(); // Сигнализируем, что производитель закончил
    Console.WriteLine("[P] Производитель завершил добавление.");
}

void ConsumerBC()
{
    // GetConsumingEnumerable блокирует, пока есть элементы или пока CompleteAdding не вызван
    foreach (var item in numbers.GetConsumingEnumerable())
    {
        Console.WriteLine($"[C] Обработал: {item}");
        Thread.Sleep(100);
    }
    Console.WriteLine("[C] Потребитель завершил работу.");
}

// Запуск примера в Main:
Task producerTask = Task.Run(() => ProducerBC(15)); // 15 элементов, лимит 10
Task consumerTask = Task.Run(ConsumerBC);
Task.WaitAll(producerTask, consumerTask); // Ждем завершения

Обратите внимание, насколько чище код потребителя благодаря GetConsumingEnumerable(). Если нужны блокирующие операции или ограничение размера — BlockingCollection ваш инструмент.

3. Дополнительные методы и свойства Concurrent-коллекций

IsEmpty, Count

  • IsEmpty (bool): пуста ли коллекция.
  • Count (int): текущее число элементов.

Пример: Использование IsEmpty и Count

using System.Collections.Concurrent;

ConcurrentQueue<string> q = new ConcurrentQueue<string>();
Console.WriteLine($"Очередь пуста? {q.IsEmpty}"); // True

q.Enqueue("A");
q.Enqueue("B");
Console.WriteLine($"Элементов в очереди: {q.Count}"); // 2
Console.WriteLine($"Очередь пуста? {q.IsEmpty}"); // False

q.TryDequeue(out var itemA);
Console.WriteLine($"Элементов в очереди: {q.Count}"); // 1

Преобразование в массивы (ToArray())

Все Concurrent-коллекции предоставляют метод ToArray(), который возвращает моментальный снимок элементов.

Пример: Использование ToArray()

using System.Collections.Concurrent;

ConcurrentStack<int> s = new ConcurrentStack<int>();
s.Push(10);
s.Push(20);
s.Push(30);

int[] items = s.ToArray(); // Создаст новый массив: [30, 20, 10] (для стека LIFO)
Console.WriteLine($"Элементы в массиве: {string.Join(", ", items)}");

// Коллекция остается неизменной
Console.WriteLine($"Элементов в стеке после ToArray: {s.Count}"); // 3

Очистка коллекций

В .NET 6+ многие Concurrent-коллекции получили метод Clear() для удаления всех элементов.

Пример: Очистка коллекции

using System.Collections.Concurrent;

ConcurrentBag<string> bag = new ConcurrentBag<string>();
bag.Add("Alpha");
bag.Add("Beta");
Console.WriteLine($"Элементов в Bag: {bag.Count}"); // 2

bag.Clear(); // Очищаем коллекцию
Console.WriteLine($"Элементов в Bag после очистки: {bag.Count}"); // 0
Console.WriteLine($"Bag пуст? {bag.IsEmpty}"); // True

4. Особенности поведения Concurrent-коллекций

Важно помнить о моментальном снимке данных. Свойства вроде Count и результаты ToArray() отражают состояние коллекции в конкретный момент времени. В условиях параллельных изменений это значение может устареть буквально сразу после получения.

Пример: Count и ToArray() — «моментальный снимок»

using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

ConcurrentQueue<int> snapshotQueue = new ConcurrentQueue<int>();

void AddItemsContinuously()
{
    for (int i = 0; i < 1000; i++)
    {
        snapshotQueue.Enqueue(i);
        Thread.Sleep(1); 
    }
}

// Запуск примера в Main:
Task.Run(AddItemsContinuously); // Поток, который постоянно добавляет элементы

Thread.Sleep(100); // Даем немного времени для добавления
Console.WriteLine($"Текущий Count: {snapshotQueue.Count}"); // Может быть 50, 80, 120...
Thread.Sleep(100);
Console.WriteLine($"Текущий Count снова: {snapshotQueue.Count}"); // Будет другое значение
int[] currentItems = snapshotQueue.ToArray();
Console.WriteLine($"Количество элементов в ToArray(): {currentItems.Length}"); // Может отличаться от последнего Count

Не полагайтесь на Count как на строгую гарантию актуального количества элементов во время активных изменений.

5. Нюансы итерации по Concurrent-коллекциям

Отдельные операции (Add, TryTake, TryPop, GetOrAdd и т.д.) потокобезопасны. Но итерация с foreach по коллекции, которая параллельно модифицируется другими потоками, не гарантирует, что вы увидите все элементы или только их — возможны пропуски и неожиданности.

Пример: Итерация во время модификации

using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

ConcurrentQueue<int> iterQueue = new ConcurrentQueue<int>();

// Добавляем начальные элементы
for (int i = 0; i < 10; i++) iterQueue.Enqueue(i);

// Поток-модификатор
void Modifier()
{
    for (int i = 10; i < 20; i++)
    {
        iterQueue.Enqueue(i); // Добавляем новые элементы
        Thread.Sleep(50);
    }
}

// Поток-итератор
void Iterator()
{
    Console.WriteLine("Начинаем итерацию...");
    int count = 0;
    foreach (var item in iterQueue) // Итерируем
    {
        Console.Write($"{item} ");
        count++;
        Thread.Sleep(30); // Имитация работы, даем модификатору изменить коллекцию
    }
    Console.WriteLine($"\nИтерация завершена. Прочитано {count} элементов.");
    Console.WriteLine($"Текущее количество в очереди: {iterQueue.Count}");
}

// Запуск примера в Main:
Task.Run(Modifier);
Task.Run(Iterator);
Thread.Sleep(1500); // Даем время на работу

Правило: если нужен фиксированный набор элементов (например, для отчёта), сначала возьмите «снимок» через ToArray(), а затем итерируйте по нему:

// Правильный способ итерации, если коллекция может меняться
int[] snapshot = iterQueue.ToArray();
foreach (var item in snapshot)
{
    // Теперь вы итерируете по неизменяемому массиву-снимку
}

Это завершает наше погружение в продвинутые паттерны и особенности Concurrent-коллекций: мы посмотрели на Producer‑Consumer с несколькими участниками и BlockingCollection, а также разобрали важные нюансы работы с Count, ToArray() и итерацией.

2
Задача
C# SELF, 58 уровень, 3 лекция
Недоступна
Пример работы с BlockingCollection с ограничением размера
Пример работы с BlockingCollection с ограничением размера
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ