1. Введение
Начнём с основ. Очередь — это фундаментальная структура данных, работающая по принципу FIFO (First-In, First-Out), то есть "первый пришёл – первый ушёл". Представьте обычную очередь в супермаркете: кто первым встал, тот первым и обслуживается.
В многопоточном программировании паттерн Производитель-Потребитель (Producer-Consumer) — один из самых распространённых и мощных.
- Производители (Producers) — это потоки или части приложения, которые создают данные или задачи и помещают их в общую очередь. Они "производят" работу.
- Потребители (Consumers) — это потоки или части приложения, которые берут данные или задачи из очереди и обрабатывают их. Они "потребляют" работу.
Этот паттерн помогает управлять потоком данных, развязывает компоненты системы (производителю не нужно знать, кто и как будет обрабатывать данные), делает её более отзывчивой и помогает равномерно распределять нагрузку между потоками.
Пример: ConcurrentQueue — добавление и извлечение
Посмотрим, как добавлять и извлекать элементы из ConcurrentQueue<T>.
using System.Collections.Concurrent;
ConcurrentQueue<string> tasks = new ConcurrentQueue<string>();
// Добавление элементов (производитель)
tasks.Enqueue("Скачать файл");
tasks.Enqueue("Обработать изображение");
Console.WriteLine($"Задач в очереди: {tasks.Count}"); // Вывод: Задач в очереди: 2
// Извлечение элементов (потребитель)
if (tasks.TryDequeue(out string task1))
{
Console.WriteLine($"Выполнена задача: {task1}"); // Вывод: Выполнена задача: Скачать файл
}
if (tasks.TryDequeue(out string task2))
{
Console.WriteLine($"Выполнена задача: {task2}"); // Вывод: Выполнена задача: Обработать изображение
}
if (!tasks.TryDequeue(out string emptyTask))
{
Console.WriteLine("Очередь пуста, новых задач нет."); // Вывод: Очередь пуста, новых задач нет.
}
Основы работы: Enqueue(), TryDequeue()
Enqueue(T item): Используется для добавления элемента в конец очереди. Эта операция потокобезопасна. Вы можете одновременно вызывать Enqueue из 10 разных потоков, и все элементы будут корректно добавлены.
TryDequeue(out T item): Используется для попытки извлечения элемента из начала очереди. Это ключевой метод для потребителей. Он возвращает true, если элемент был успешно извлечён (значение попадает в выходной параметр item), и false, если очередь пуста. Важно, что TryDequeue не блокирует поток, если очередь пуста.
2. Важность TryDequeue() и атомарность операций
Метод TryDequeue() не просто удобен; он критически важен для корректной потокобезопасной работы. Он атомарен: проверка на пустоту очереди и само извлечение элемента происходят как одна неделимая операция.
Если бы у нас были отдельные методы IsEmpty (проверить, пуста ли очередь) и Dequeue (извлечь элемент), то между их вызовами другой поток мог бы опустошить очередь. В итоге ваш Dequeue выбросил бы исключение или вернул некорректные данные. TryDequeue полностью исключает такую ситуацию.
Пример: Producer-Consumer с несколькими потоками
Здесь мы запускаем два потока-производителя и один поток-потребитель.
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
ConcurrentQueue<int> dataQueue = new ConcurrentQueue<int>();
bool producersDone = false; // Флаг для сигнализации потребителю
void Producer(int start, int count)
{
for (int i = 0; i < count; i++)
{
dataQueue.Enqueue(start + i);
Console.WriteLine($"[П] Добавил: {start + i}");
Thread.Sleep(10);
}
}
void Consumer()
{
while (!producersDone || dataQueue.Count > 0) // Продолжаем, пока есть данные или производители работают
{
if (dataQueue.TryDequeue(out int item))
{
Console.WriteLine($"[К] Обработал: {item}");
}
else
{
Thread.Sleep(50); // Ждем, если очередь пуста
}
}
Console.WriteLine("[К] Завершил работу.");
}
// Запуск примера в Main:
// Task.Run(() => Producer(1, 5));
// Task.Run(() => Producer(100, 5)); // Второй производитель
// Task.Run(() => Consumer());
// Thread.Sleep(600); // Даем время потокам поработать
// producersDone = true; // Сигнализируем, что производители закончили
// Thread.Sleep(200); // Даем время потребителю забрать остаток
Обратите внимание, что в этом простом примере флаг producersDone и Thread.Sleep используются для имитации завершения. В реальных приложениях для более надёжной синхронизации завершения часто применяются CancellationTokenSource или BlockingCollection<T>.
ConcurrentQueue<T> идеально подходит для сценариев, где:
- Порядок обработки элементов важен (FIFO).
- Множество потоков добавляют элементы, и/или множество потоков их забирают.
- Нужна высокая производительность без ручного управления блокировками.
3. Стек для producer-consumer (LIFO)
Стек — это другая фундаментальная структура данных, работающая по принципу LIFO (Last-In, First-Out), то есть "последний пришёл – первый ушёл". Представьте стопку тарелок: вы всегда берёте самую верхнюю, и новую тарелку всегда кладёте на самый верх стопки.
ConcurrentStack<T> так же потокобезопасен, как и ConcurrentQueue<T>, и тоже может использоваться в паттерне "производитель-потребитель", но с обратным порядком обработки.
Пример: ConcurrentStack — добавление и извлечение
using System.Collections.Concurrent;
ConcurrentStack<string> commandStack = new ConcurrentStack<string>();
// Добавление команд (производитель)
commandStack.Push("Выделить текст");
commandStack.Push("Изменить шрифт");
commandStack.Push("Сохранить документ");
Console.WriteLine($"Команд в стеке: {commandStack.Count}"); // Вывод: Команд в стеке: 3
// Извлечение команд (потребитель)
if (commandStack.TryPop(out string cmd1))
{
Console.WriteLine($"Отменена команда: {cmd1}"); // Вывод: Отменена команда: Сохранить документ
}
if (commandStack.TryPop(out string cmd2))
{
Console.WriteLine($"Отменена команда: {cmd2}"); // Вывод: Отменена команда: Изменить шрифт
}
if (!commandStack.TryPop(out string emptyCmd))
{
Console.WriteLine("Стек команд пуст."); // Вывод: Стек команд пуст.
}
4. Основы работы: Push(), TryPop()
Push(T item): Используется для добавления элемента на вершину стека. Операция потокобезопасна.
TryPop(out T item): Используется для попытки извлечения элемента с вершины стека. Возвращает true, если элемент был успешно извлечён, и false, если стек пуст. Как и TryDequeue, это атомарная операция, предотвращающая гонки данных.
Пример: использование ConcurrentStack для пула объектов
Стек отлично подходит для реализации пулов объектов: взяли — использовали — вернули.
using System.Collections.Concurrent;
class Connection { /* Простая заглушка */ public Guid Id { get; } = Guid.NewGuid(); }
ConcurrentStack<Connection> connectionPool = new ConcurrentStack<Connection>();
// Заполняем пул начальными соединениями
for (int i = 0; i < 3; i++)
{
connectionPool.Push(new Connection());
}
Console.WriteLine($"Соединений в пуле: {connectionPool.Count}"); // Вывод: Соединений в пуле: 3
void UseConnection()
{
if (connectionPool.TryPop(out Connection conn))
{
Console.WriteLine($"[Пул] Использовано соединение: {conn.Id}");
// Имитация работы с соединением
Thread.Sleep(50);
connectionPool.Push(conn); // Возвращаем в пул
Console.WriteLine($"[Пул] Возвращено соединение: {conn.Id}. В пуле: {connectionPool.Count}");
}
else
{
Console.WriteLine("[Пул] Пул соединений пуст. Создаем новое.");
// Обычно здесь создают новое соединение, если пул пуст
connectionPool.Push(new Connection());
}
}
// Запуск примера в Main:
Task.Run(() => UseConnection());
Task.Run(() => UseConnection());
Task.Run(() => UseConnection());
Thread.Sleep(500);
В этом примере несколько потоков могут безопасно брать и возвращать соединения в общий пул.
5. Примеры применения и сравнение с ConcurrentQueue
ConcurrentStack<T> применяется, когда:
- Порядок LIFO критичен (например, история операций для функции "Отменить").
- Нужен быстрый доступ к самым последним добавленным элементам (часто остаются "горячими" в кеше процессора).
- Реализуются алгоритмы на стеках (обход графов в глубину, синтаксический анализ выражений).
Сравнение и выбор подходящей коллекции
| Коллекция | Порядок | Преимущества | Типичные сценарии |
|---|---|---|---|
|
FIFO (Первый пришёл, первый ушёл) | Обеспечивает справедливую обработку в порядке поступления | Очереди задач, логирование, обработка входящих запросов, событийные шины |
|
LIFO (Последний пришёл, первый ушёл) | Быстрый доступ к недавно добавленным элементам | История действий (Undo/Redo), пулы объектов, алгоритмы обхода (DFS) |
Выбор между ConcurrentQueue и ConcurrentStack полностью зависит от требуемого порядка обработки элементов в вашем producer-consumer сценарии. Обе коллекции обеспечивают высокую производительность и потокобезопасность "из коробки", освобождая вас от забот о ручной синхронизации и помогая строить масштабируемые многопоточные системы.
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ