JavaRush /Курсы /C++ SELF /Producer–consumer — очередь задач и поток‑работник

Producer–consumer — очередь задач и поток‑работник

C++ SELF
70 уровень , 2 лекция
Открыта

1. Идея producer–consumer и роли

Если посмотреть на реальные программы, то очень часто один кусок кода «производит работу», а другой — «переваривает» её. Например, пользователь нажимает кнопки (производит события), а обработчик событий должен их разбирать; сеть приносит пакеты, а бизнес-логика должна их обрабатывать; логгер получает сообщения, а потом записывает их в файл. В таких ситуациях хочется, чтобы производитель не простаивал и не ждал обработку, а потребитель не крутился в цикле «ну где там моя работа?» и не жрал CPU, как будто ему платят за это зарплату.

Именно здесь появляется модель producer–consumer. Producer (производитель) складывает «задачи» в очередь, consumer (потребитель) забирает их и выполняет. Между ними стоит очередь задач — общий ресурс, который мы защищаем mutex-ом и «оживляем» condition_variable, чтобы consumer мог спать, пока работы нет.

Три роли: producer, очередь и consumer

Чтобы не превращать многопоточность в магию и шаманские танцы, удобно мысленно разложить систему на три части. Producer — это тот, кто добавляет элементы в очередь. Consumer — тот, кто извлекает элементы и делает полезную работу. Очередь — это «буфер» между ними и одновременно источник истины: есть ли работа прямо сейчас или нет. Уведомления (notify_*) — это не «работа», а всего лишь аккуратный пинок: «проснись и перепроверь состояние».

Небольшая табличка, чтобы зафиксировать роли:

Роль Что делает С чем работает Чем рискует, если сделать неправильно
Producer
push
задачи
queue
под
mutex
потерять уведомление / не разбудить consumer
Очередь хранит задачи
queue + mutex + condition_variable
гонки данных, повреждение структуры, зависания
Consumer ждёт и
pop
задачи
wait
на условии «очередь не пуста»
busy-wait, чтение пустой очереди, дедлок

Схематично это выглядит так:

flowchart LR
    P[Producer<br/>кладёт задачи] -->|push под mutex| Q[Очередь задач<br/>общее состояние]
    Q -->|pop под mutex| C[Consumer / Worker<br/>обрабатывает]
    P -->|notify_one| CV[condition_variable]
    CV -->|будит| C

Ключевая мысль: consumer ждёт не notify, а состояние очереди (например, !q.empty()). Notify — просто сигнал «условие могло стать истинным».

Очередь задач как защищаемое состояние

Очень хочется написать что-то вроде «проверим q.empty(), если пусто — подождём, если нет — заберём». На одном потоке это нормально. В многопоточном мире это превращается в сюжет «я посмотрел — там было пусто, моргнул — и всё поменялось». Поэтому в producer–consumer есть железное правило: любые операции с очередью — только под одним и тем же mutex. Даже empty() — это чтение общего состояния, а чтение общего состояния без дисциплины ничуть не «безопаснее», чем запись.

Обычно мы защищаем такие операции:

  • Producer: lockq.push(x)unlocknotify_one
  • Consumer: lockwait(пока !q.empty()) → взять элемент → q.pop()unlock → обработать элемент

Обратите внимание на слово «обработать»: обработка должна быть вне замка. Под замком мы делаем только минимально необходимое: проверить условие, переместить данные из очереди в локальную переменную, обновить очередь.

Чтобы почувствовать, почему empty() тоже нельзя трогать без замка, вот мини-антипример (не делайте так дома, даже если очень просит):

#include <queue>

std::queue<int> q;

bool has_work_unsafe() {
    return !q.empty(); // ПЛОХО: в многопоточности так нельзя без mutex
}

Даже если «вам кажется», что empty() — это «просто посмотреть», в многопоточности это всё равно чтение общего ресурса без гарантии согласованности. А дальше обычно случается классика: вы увидели false (не пусто), полезли в front(), а очередь уже стала пустой — и привет, логическая ошибка.

2. Протокол: замок, ожидание и выполнение

Граница критической секции: «взял задачу — отпустил замок»

Новички часто делают одну из двух крайностей. Первая крайность — держать mutex вообще всегда, «чтобы было безопасно». Тогда producer не может ничего добавить, пока consumer долго обрабатывает задачу, и весь смысл многопоточности грустно уходит пить чай. Вторая крайность — держать mutex «почти никогда» и надеяться, что оно «как-то пронесёт». Обычно не проносит.

Правильный стиль: под mutex мы делаем только «манипуляции очередью», а работу выполняем после выхода из критической секции.

Сравним два фрагмента. Сначала плохой (замок держим слишком долго):

#include <mutex>
#include <queue>
#include <iostream>

std::mutex m;
std::queue<int> q;

void process_bad() {
    std::lock_guard<std::mutex> lock(m);
    int x = q.front();
    q.pop();

    // ПЛОХО: под mutex делаем долгую работу
    std::cout << "Processing " << x << "\n";
}

Теперь хороший (замок короткий, обработка снаружи):

#include <mutex>
#include <queue>
#include <iostream>

std::mutex m;
std::queue<int> q;

void process_good() {
    int x = 0;

    {
        std::lock_guard<std::mutex> lock(m);
        x = q.front();
        q.pop();
    } // mutex отпущен здесь

    std::cout << "Processing " << x << "\n"; // уже без mutex
}

Вторая версия лучше тем, что producer может продолжать класть новые задачи, пока consumer обрабатывает старую. Мы как бы говорим: «я быстро взял заказ с кассы, отошёл в сторону и готовлю кофе, не блокируя очередь».

Минимальная очередь задач: проектируем TaskQueue

Сейчас мы соберём маленький кирпичик, из которого потом легко построить worker. Наша цель — сделать класс, который инкапсулирует три вещи: std::queue, std::mutex, std::condition_variable. Так правила протокола не размазываются по коду, а живут в одном месте. Это не «ООП ради ООП», а просто способ не забыть, где какие замки и кто кому что должен.

Начнём с модели задачи. Для учебного примера пусть задача — это id и текст, который worker «выполнит», напечатав его:

#include <string>

struct Task {
    int id = 0;
    std::string text;
};

Теперь каркас очереди:

#include <condition_variable>
#include <mutex>
#include <queue>

class TaskQueue {
public:
    void push(Task t);

    Task pop_blocking();

private:
    std::mutex m_;
    std::condition_variable cv_;
    std::queue<Task> q_;
};

Реализуем push. Мы меняем состояние (q_) под mutex, а затем уведомляем ожидающего consumer-а. Уведомление обычно делают уже после выхода из области lock_guard, чтобы не будить поток «в замок», но в простых сценариях даже уведомление под замком не ломает корректность — просто иногда чуть хуже по производительности.

#include <utility> // std::move

void TaskQueue::push(Task t) {
    {
        std::lock_guard<std::mutex> lock(m_);
        q_.push(std::move(t));
    }
    cv_.notify_one();
}

Теперь pop_blocking. Здесь нам нужен std::unique_lock, потому что condition_variable::wait должна уметь временно отпустить mutex и снова захватить его при пробуждении. lock_guard так не умеет — он «взял и держит до конца scope», без вариантов.

Task TaskQueue::pop_blocking() {
    std::unique_lock<std::mutex> lock(m_);

    cv_.wait(lock, [&] {
        return !q_.empty();
    });

    Task t = std::move(q_.front());
    q_.pop();
    return t;
}

Обратите внимание: мы возвращаем Task по значению. Это удобно и безопасно: мы забрали задачу из очереди под замком, дальше worker работает с собственной копией/перемещённым значением. Мы не возвращаем ссылку на элемент очереди, потому что элемент очереди исчезает после pop(), а «ссылка на исчезнувшее» — это такой себе подарок.

Поток‑работник: как выглядит consumer loop

Поток-работник обычно живёт в виде бесконечного цикла: «жду задачу → забираю → выполняю». Это очень похоже на кассира, который сидит и ждёт следующего клиента: он не бегает по магазину каждые 10 миллисекунд с вопросом «ну вы уже пришли?», он спокойно ждёт. В коде роль «спокойно ждать» выполняет condition_variable::wait.

Сделаем worker-функцию. Чтобы нам не пришлось сейчас проектировать полноценный протокол завершения (это отдельная тема), мы сделаем учебное ограничение: worker обработает ровно N задач и завершится. Это не «как в проде», но это честный способ собрать рабочую модель без зависаний.

#include <chrono>
#include <iostream>
#include <thread>

void worker_loop(TaskQueue& queue, int tasks_to_process) {
    for (int i = 0; i < tasks_to_process; ++i) {
        Task t = queue.pop_blocking(); // ждём, пока появится задача

        // обработка вне очереди: имитируем работу
        std::cout << "[worker] task #" << t.id << ": " << t.text << "\n";
        // пример вывода: [worker] task #1: say hello

        std::this_thread::sleep_for(std::chrono::milliseconds{50});
    }

    std::cout << "[worker] done\n"; // [worker] done
}

Здесь важно, что pop_blocking() блокирует поток без busy-wait. Worker не «крутится», не ест процессор, а спит, пока producer не добавит задачу и не вызовет notify_one().

Producer: запускаем worker и кладём задачи в очередь

Теперь соберём маленькое консольное приложение. Пусть главный поток будет producer-ом: он создаёт задачи и кладёт их в очередь. Второй поток — consumer: он забирает и печатает. Это уже очень похоже на реальные вещи: главный поток «генерирует события», worker «обрабатывает».

Минимальный main() будет выглядеть так:

#include <iostream>
#include <thread>
#include <vector>

int main() {
    TaskQueue queue;

    const std::vector<Task> tasks = {
        {1, "say hello"},
        {2, "compute something (pretend)"},
        {3, "write log line"},
        {4, "send email (pretend)"},
        {5, "cleanup"},
    };

    std::thread worker(worker_loop, std::ref(queue), (int)tasks.size());

    for (const auto& t : tasks) {
        std::cout << "[main] push task #" << t.id << "\n";
        // пример вывода: [main] push task #1
        queue.push(t);
    }

    worker.join();
    std::cout << "[main] program finished\n"; // [main] program finished
}

Пара деталей, которые здесь хочется проговорить вслух (и желательно без крика). std::ref(queue) нужен, чтобы передать очередь по ссылке, иначе std::thread попытается скопировать queue, а наш TaskQueue копировать нельзя (внутри mutex, а mutex — не копируемый).

Также обратите внимание на порядок: мы сначала запускаем worker, а потом кладём задачи. Если сделать наоборот, ничего «логически» не сломается, но смысл демонстрации чуть потеряется: worker всё равно справится, потому что pop_blocking() правильно ждёт состояние. Это, кстати, очень важная черта хорошего протокола: он должен быть корректным независимо от того, кто «успел первым».

Где именно держится mutex: «рентген» протокола

Когда вы только начинаете, многопоточность кажется чёрным ящиком: «оно ждёт, потом просыпается, потом что-то там…». Чтобы стало спокойнее, полезно разложить один цикл consumer-а по шагам и прямо подписать, где держится замок, а где нет. Тогда condition_variable перестаёт выглядеть как магия и становится обычным протоколом.

Нарисуем последовательность событий при одном push и одном pop:

sequenceDiagram
    participant P as Producer (main)
    participant M as Mutex
    participant Q as Queue
    participant CV as condition_variable
    participant C as Consumer (worker)

    C->>M: lock (unique_lock)
    C->>CV: wait(lock, pred)<br/>mutex отпускается внутри wait
    Note over C,CV: Поток worker спит

    P->>M: lock (lock_guard)
    P->>Q: push(task)
    P->>M: unlock
    P->>CV: notify_one()

    CV-->>C: wake up
    C->>M: lock (возвращается из wait с захваченным mutex)
    C->>Q: pop(task)
    C->>M: unlock
    C->>C: process task (без mutex)

Если свести смысл в одну фразу, получится почти дзен: под замком мы меняем и читаем состояние, а wait — это “уснуть, временно отпустив замок, и проснуться снова с замком”.

Очень частая ошибка — сделать обработку задачи под замком. Диагностика простая: если вы видите, что worker держит mutex и при этом делает sleep_for, печатает большие объёмы, или, не дай бог, ходит в сеть, значит очередь у вас превращается в «узкое горлышко».

3. Типичные ошибки

Ошибка №1: «Я проверю empty() без mutex, это же просто посмотреть».
Так обычно начинается история про редкие, неуловимые баги. В многопоточности «просто посмотреть» — это уже участие в гонке данных, если другой поток может одновременно модифицировать очередь. Правильная дисциплина такая: любые empty()/front()/pop()/push() выполняются под одним и тем же mutex, без исключений.

Ошибка №2: consumer делает front()/pop() без гарантии !empty().
Даже если вам кажется, что «producer точно уже положил», реальный планировщик потоков не подписывал контракт «делать как вам удобно». Consumer обязан ждать состояние «очередь не пуста» и только потом извлекать элемент. В корректной модели это выражается либо через cv.wait(lock, predicate), либо через эквивалентный цикл ожидания.

Ошибка №3: держать mutex во время обработки задачи.
Это убивает параллелизм. В итоге producer стоит в очереди за вашим worker-ом, как в поликлинике за справкой. Правильный паттерн: под замком только “взял задачу из очереди”, затем замок отпустили, и только потом выполняем работу. Если обработка долгая — тем более важно делать её снаружи.

Ошибка №4: вызывать notify_one() «где-нибудь потом» или вообще забыть.
Если producer меняет состояние так, что предикат ожидания становится истинным (например, очередь перестала быть пустой), но не делает notify_*, consumer может остаться спать слишком долго. В хорошей реализации push всегда заканчивается уведомлением. Это не «оптимизация», а часть протокола.

Ошибка №5: возвращать наружу ссылку/указатель на элемент очереди.
Очередь — контейнер, а элемент в контейнере живёт ровно до тех пор, пока вы его не удалили. Если вы сделали front(), потом pop(), а снаружи где-то храните ссылку на “тот самый front”, вы получите ссылку на уже несуществующий объект. Надёжная практика: забрать значение (копией или std::move) в локальную переменную под замком, а дальше работать с локальной переменной.

1
Задача
C++ SELF, 70 уровень, 2 лекция
Недоступна
Одно сообщение
Одно сообщение
1
Задача
C++ SELF, 70 уровень, 2 лекция
Недоступна
Очередь поддержки
Очередь поддержки
1
Задача
C++ SELF, 70 уровень, 2 лекция
Недоступна
Делим сумму
Делим сумму
1
Задача
C++ SELF, 70 уровень, 2 лекция
Недоступна
Два источника
Два источника
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ