JavaRush /Курсы /C# SELF /Очереди и стеки Producer-Consumer

Очереди и стеки Producer-Consumer

C# SELF
58 уровень , 1 лекция
Открыта

1. Введение

Начнём с основ. Очередь — это фундаментальная структура данных, работающая по принципу FIFO (First-In, First-Out), то есть "первый пришёл – первый ушёл". Представьте обычную очередь в супермаркете: кто первым встал, тот первым и обслуживается.

В многопоточном программировании паттерн Производитель-Потребитель (Producer-Consumer) — один из самых распространённых и мощных.

  • Производители (Producers) — это потоки или части приложения, которые создают данные или задачи и помещают их в общую очередь. Они "производят" работу.
  • Потребители (Consumers) — это потоки или части приложения, которые берут данные или задачи из очереди и обрабатывают их. Они "потребляют" работу.

Этот паттерн помогает управлять потоком данных, развязывает компоненты системы (производителю не нужно знать, кто и как будет обрабатывать данные), делает её более отзывчивой и помогает равномерно распределять нагрузку между потоками.

Пример: ConcurrentQueue — добавление и извлечение

Посмотрим, как добавлять и извлекать элементы из ConcurrentQueue<T>.

using System.Collections.Concurrent;

ConcurrentQueue<string> tasks = new ConcurrentQueue<string>();

// Добавление элементов (производитель)
tasks.Enqueue("Скачать файл");
tasks.Enqueue("Обработать изображение");
Console.WriteLine($"Задач в очереди: {tasks.Count}"); // Вывод: Задач в очереди: 2

// Извлечение элементов (потребитель)
if (tasks.TryDequeue(out string task1))
{
    Console.WriteLine($"Выполнена задача: {task1}"); // Вывод: Выполнена задача: Скачать файл
}

if (tasks.TryDequeue(out string task2))
{
    Console.WriteLine($"Выполнена задача: {task2}"); // Вывод: Выполнена задача: Обработать изображение
}

if (!tasks.TryDequeue(out string emptyTask))
{
    Console.WriteLine("Очередь пуста, новых задач нет."); // Вывод: Очередь пуста, новых задач нет.
}

Основы работы: Enqueue(), TryDequeue()

Enqueue(T item): Используется для добавления элемента в конец очереди. Эта операция потокобезопасна. Вы можете одновременно вызывать Enqueue из 10 разных потоков, и все элементы будут корректно добавлены.

TryDequeue(out T item): Используется для попытки извлечения элемента из начала очереди. Это ключевой метод для потребителей. Он возвращает true, если элемент был успешно извлечён (значение попадает в выходной параметр item), и false, если очередь пуста. Важно, что TryDequeue не блокирует поток, если очередь пуста.

2. Важность TryDequeue() и атомарность операций

Метод TryDequeue() не просто удобен; он критически важен для корректной потокобезопасной работы. Он атомарен: проверка на пустоту очереди и само извлечение элемента происходят как одна неделимая операция.

Если бы у нас были отдельные методы IsEmpty (проверить, пуста ли очередь) и Dequeue (извлечь элемент), то между их вызовами другой поток мог бы опустошить очередь. В итоге ваш Dequeue выбросил бы исключение или вернул некорректные данные. TryDequeue полностью исключает такую ситуацию.

Пример: Producer-Consumer с несколькими потоками

Здесь мы запускаем два потока-производителя и один поток-потребитель.

using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

ConcurrentQueue<int> dataQueue = new ConcurrentQueue<int>();
bool producersDone = false; // Флаг для сигнализации потребителю

void Producer(int start, int count)
{
    for (int i = 0; i < count; i++)
    {
        dataQueue.Enqueue(start + i);
        Console.WriteLine($"[П] Добавил: {start + i}");
        Thread.Sleep(10); 
    }
}

void Consumer()
{
    while (!producersDone || dataQueue.Count > 0) // Продолжаем, пока есть данные или производители работают
    {
        if (dataQueue.TryDequeue(out int item))
        {
            Console.WriteLine($"[К] Обработал: {item}");
        }
        else
        {
            Thread.Sleep(50); // Ждем, если очередь пуста
        }
    }
    Console.WriteLine("[К] Завершил работу.");
}

// Запуск примера в Main:
// Task.Run(() => Producer(1, 5));
// Task.Run(() => Producer(100, 5)); // Второй производитель
// Task.Run(() => Consumer());
// Thread.Sleep(600); // Даем время потокам поработать
// producersDone = true; // Сигнализируем, что производители закончили
// Thread.Sleep(200); // Даем время потребителю забрать остаток

Обратите внимание, что в этом простом примере флаг producersDone и Thread.Sleep используются для имитации завершения. В реальных приложениях для более надёжной синхронизации завершения часто применяются CancellationTokenSource или BlockingCollection<T>.

ConcurrentQueue<T> идеально подходит для сценариев, где:

  • Порядок обработки элементов важен (FIFO).
  • Множество потоков добавляют элементы, и/или множество потоков их забирают.
  • Нужна высокая производительность без ручного управления блокировками.

3. Стек для producer-consumer (LIFO)

Стек — это другая фундаментальная структура данных, работающая по принципу LIFO (Last-In, First-Out), то есть "последний пришёл – первый ушёл". Представьте стопку тарелок: вы всегда берёте самую верхнюю, и новую тарелку всегда кладёте на самый верх стопки.

ConcurrentStack<T> так же потокобезопасен, как и ConcurrentQueue<T>, и тоже может использоваться в паттерне "производитель-потребитель", но с обратным порядком обработки.

Пример: ConcurrentStack — добавление и извлечение

using System.Collections.Concurrent;

ConcurrentStack<string> commandStack = new ConcurrentStack<string>();

// Добавление команд (производитель)
commandStack.Push("Выделить текст");
commandStack.Push("Изменить шрифт");
commandStack.Push("Сохранить документ");
Console.WriteLine($"Команд в стеке: {commandStack.Count}"); // Вывод: Команд в стеке: 3

// Извлечение команд (потребитель)
if (commandStack.TryPop(out string cmd1))
{
    Console.WriteLine($"Отменена команда: {cmd1}"); // Вывод: Отменена команда: Сохранить документ
}

if (commandStack.TryPop(out string cmd2))
{
    Console.WriteLine($"Отменена команда: {cmd2}"); // Вывод: Отменена команда: Изменить шрифт
}

if (!commandStack.TryPop(out string emptyCmd))
{
    Console.WriteLine("Стек команд пуст."); // Вывод: Стек команд пуст.
}

4. Основы работы: Push(), TryPop()

Push(T item): Используется для добавления элемента на вершину стека. Операция потокобезопасна.

TryPop(out T item): Используется для попытки извлечения элемента с вершины стека. Возвращает true, если элемент был успешно извлечён, и false, если стек пуст. Как и TryDequeue, это атомарная операция, предотвращающая гонки данных.

Пример: использование ConcurrentStack для пула объектов

Стек отлично подходит для реализации пулов объектов: взяли — использовали — вернули.

using System.Collections.Concurrent;

class Connection { /* Простая заглушка */ public Guid Id { get; } = Guid.NewGuid(); }

ConcurrentStack<Connection> connectionPool = new ConcurrentStack<Connection>();

// Заполняем пул начальными соединениями
for (int i = 0; i < 3; i++)
{
    connectionPool.Push(new Connection());
}
Console.WriteLine($"Соединений в пуле: {connectionPool.Count}"); // Вывод: Соединений в пуле: 3

void UseConnection()
{
    if (connectionPool.TryPop(out Connection conn))
    {
        Console.WriteLine($"[Пул] Использовано соединение: {conn.Id}");
        // Имитация работы с соединением
        Thread.Sleep(50); 
        connectionPool.Push(conn); // Возвращаем в пул
        Console.WriteLine($"[Пул] Возвращено соединение: {conn.Id}. В пуле: {connectionPool.Count}");
    }
    else
    {
        Console.WriteLine("[Пул] Пул соединений пуст. Создаем новое.");
        // Обычно здесь создают новое соединение, если пул пуст
        connectionPool.Push(new Connection()); 
    }
}

// Запуск примера в Main:
Task.Run(() => UseConnection());
Task.Run(() => UseConnection());
Task.Run(() => UseConnection());
Thread.Sleep(500);

В этом примере несколько потоков могут безопасно брать и возвращать соединения в общий пул.

5. Примеры применения и сравнение с ConcurrentQueue

ConcurrentStack<T> применяется, когда:

  • Порядок LIFO критичен (например, история операций для функции "Отменить").
  • Нужен быстрый доступ к самым последним добавленным элементам (часто остаются "горячими" в кеше процессора).
  • Реализуются алгоритмы на стеках (обход графов в глубину, синтаксический анализ выражений).

Сравнение и выбор подходящей коллекции

Коллекция Порядок Преимущества Типичные сценарии
ConcurrentQueue
FIFO (Первый пришёл, первый ушёл) Обеспечивает справедливую обработку в порядке поступления Очереди задач, логирование, обработка входящих запросов, событийные шины
ConcurrentStack
LIFO (Последний пришёл, первый ушёл) Быстрый доступ к недавно добавленным элементам История действий (Undo/Redo), пулы объектов, алгоритмы обхода (DFS)

Выбор между ConcurrentQueue и ConcurrentStack полностью зависит от требуемого порядка обработки элементов в вашем producer-consumer сценарии. Обе коллекции обеспечивают высокую производительность и потокобезопасность "из коробки", освобождая вас от забот о ручной синхронизации и помогая строить масштабируемые многопоточные системы.

2
Задача
C# SELF, 58 уровень, 1 лекция
Недоступна
Реализация Producer-Consumer с использованием очереди
Реализация Producer-Consumer с использованием очереди
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ