1. Идея producer–consumer и роли
Если посмотреть на реальные программы, то очень часто один кусок кода «производит работу», а другой — «переваривает» её. Например, пользователь нажимает кнопки (производит события), а обработчик событий должен их разбирать; сеть приносит пакеты, а бизнес-логика должна их обрабатывать; логгер получает сообщения, а потом записывает их в файл. В таких ситуациях хочется, чтобы производитель не простаивал и не ждал обработку, а потребитель не крутился в цикле «ну где там моя работа?» и не жрал CPU, как будто ему платят за это зарплату.
Именно здесь появляется модель producer–consumer. Producer (производитель) складывает «задачи» в очередь, consumer (потребитель) забирает их и выполняет. Между ними стоит очередь задач — общий ресурс, который мы защищаем mutex-ом и «оживляем» condition_variable, чтобы consumer мог спать, пока работы нет.
Три роли: producer, очередь и consumer
Чтобы не превращать многопоточность в магию и шаманские танцы, удобно мысленно разложить систему на три части. Producer — это тот, кто добавляет элементы в очередь. Consumer — тот, кто извлекает элементы и делает полезную работу. Очередь — это «буфер» между ними и одновременно источник истины: есть ли работа прямо сейчас или нет. Уведомления (notify_*) — это не «работа», а всего лишь аккуратный пинок: «проснись и перепроверь состояние».
Небольшая табличка, чтобы зафиксировать роли:
| Роль | Что делает | С чем работает | Чем рискует, если сделать неправильно |
|---|---|---|---|
| Producer | задачи |
под |
потерять уведомление / не разбудить consumer |
| Очередь | хранит задачи | |
гонки данных, повреждение структуры, зависания |
| Consumer | ждёт и задачи |
на условии «очередь не пуста» |
busy-wait, чтение пустой очереди, дедлок |
Схематично это выглядит так:
flowchart LR
P[Producer<br/>кладёт задачи] -->|push под mutex| Q[Очередь задач<br/>общее состояние]
Q -->|pop под mutex| C[Consumer / Worker<br/>обрабатывает]
P -->|notify_one| CV[condition_variable]
CV -->|будит| C
Ключевая мысль: consumer ждёт не notify, а состояние очереди (например, !q.empty()). Notify — просто сигнал «условие могло стать истинным».
Очередь задач как защищаемое состояние
Очень хочется написать что-то вроде «проверим q.empty(), если пусто — подождём, если нет — заберём». На одном потоке это нормально. В многопоточном мире это превращается в сюжет «я посмотрел — там было пусто, моргнул — и всё поменялось». Поэтому в producer–consumer есть железное правило: любые операции с очередью — только под одним и тем же mutex. Даже empty() — это чтение общего состояния, а чтение общего состояния без дисциплины ничуть не «безопаснее», чем запись.
Обычно мы защищаем такие операции:
- Producer: lock → q.push(x) → unlock → notify_one
- Consumer: lock → wait(пока !q.empty()) → взять элемент → q.pop() → unlock → обработать элемент
Обратите внимание на слово «обработать»: обработка должна быть вне замка. Под замком мы делаем только минимально необходимое: проверить условие, переместить данные из очереди в локальную переменную, обновить очередь.
Чтобы почувствовать, почему empty() тоже нельзя трогать без замка, вот мини-антипример (не делайте так дома, даже если очень просит):
#include <queue>
std::queue<int> q;
bool has_work_unsafe() {
return !q.empty(); // ПЛОХО: в многопоточности так нельзя без mutex
}
Даже если «вам кажется», что empty() — это «просто посмотреть», в многопоточности это всё равно чтение общего ресурса без гарантии согласованности. А дальше обычно случается классика: вы увидели false (не пусто), полезли в front(), а очередь уже стала пустой — и привет, логическая ошибка.
2. Протокол: замок, ожидание и выполнение
Граница критической секции: «взял задачу — отпустил замок»
Новички часто делают одну из двух крайностей. Первая крайность — держать mutex вообще всегда, «чтобы было безопасно». Тогда producer не может ничего добавить, пока consumer долго обрабатывает задачу, и весь смысл многопоточности грустно уходит пить чай. Вторая крайность — держать mutex «почти никогда» и надеяться, что оно «как-то пронесёт». Обычно не проносит.
Правильный стиль: под mutex мы делаем только «манипуляции очередью», а работу выполняем после выхода из критической секции.
Сравним два фрагмента. Сначала плохой (замок держим слишком долго):
#include <mutex>
#include <queue>
#include <iostream>
std::mutex m;
std::queue<int> q;
void process_bad() {
std::lock_guard<std::mutex> lock(m);
int x = q.front();
q.pop();
// ПЛОХО: под mutex делаем долгую работу
std::cout << "Processing " << x << "\n";
}
Теперь хороший (замок короткий, обработка снаружи):
#include <mutex>
#include <queue>
#include <iostream>
std::mutex m;
std::queue<int> q;
void process_good() {
int x = 0;
{
std::lock_guard<std::mutex> lock(m);
x = q.front();
q.pop();
} // mutex отпущен здесь
std::cout << "Processing " << x << "\n"; // уже без mutex
}
Вторая версия лучше тем, что producer может продолжать класть новые задачи, пока consumer обрабатывает старую. Мы как бы говорим: «я быстро взял заказ с кассы, отошёл в сторону и готовлю кофе, не блокируя очередь».
Минимальная очередь задач: проектируем TaskQueue
Сейчас мы соберём маленький кирпичик, из которого потом легко построить worker. Наша цель — сделать класс, который инкапсулирует три вещи: std::queue, std::mutex, std::condition_variable. Так правила протокола не размазываются по коду, а живут в одном месте. Это не «ООП ради ООП», а просто способ не забыть, где какие замки и кто кому что должен.
Начнём с модели задачи. Для учебного примера пусть задача — это id и текст, который worker «выполнит», напечатав его:
#include <string>
struct Task {
int id = 0;
std::string text;
};
Теперь каркас очереди:
#include <condition_variable>
#include <mutex>
#include <queue>
class TaskQueue {
public:
void push(Task t);
Task pop_blocking();
private:
std::mutex m_;
std::condition_variable cv_;
std::queue<Task> q_;
};
Реализуем push. Мы меняем состояние (q_) под mutex, а затем уведомляем ожидающего consumer-а. Уведомление обычно делают уже после выхода из области lock_guard, чтобы не будить поток «в замок», но в простых сценариях даже уведомление под замком не ломает корректность — просто иногда чуть хуже по производительности.
#include <utility> // std::move
void TaskQueue::push(Task t) {
{
std::lock_guard<std::mutex> lock(m_);
q_.push(std::move(t));
}
cv_.notify_one();
}
Теперь pop_blocking. Здесь нам нужен std::unique_lock, потому что condition_variable::wait должна уметь временно отпустить mutex и снова захватить его при пробуждении. lock_guard так не умеет — он «взял и держит до конца scope», без вариантов.
Task TaskQueue::pop_blocking() {
std::unique_lock<std::mutex> lock(m_);
cv_.wait(lock, [&] {
return !q_.empty();
});
Task t = std::move(q_.front());
q_.pop();
return t;
}
Обратите внимание: мы возвращаем Task по значению. Это удобно и безопасно: мы забрали задачу из очереди под замком, дальше worker работает с собственной копией/перемещённым значением. Мы не возвращаем ссылку на элемент очереди, потому что элемент очереди исчезает после pop(), а «ссылка на исчезнувшее» — это такой себе подарок.
Поток‑работник: как выглядит consumer loop
Поток-работник обычно живёт в виде бесконечного цикла: «жду задачу → забираю → выполняю». Это очень похоже на кассира, который сидит и ждёт следующего клиента: он не бегает по магазину каждые 10 миллисекунд с вопросом «ну вы уже пришли?», он спокойно ждёт. В коде роль «спокойно ждать» выполняет condition_variable::wait.
Сделаем worker-функцию. Чтобы нам не пришлось сейчас проектировать полноценный протокол завершения (это отдельная тема), мы сделаем учебное ограничение: worker обработает ровно N задач и завершится. Это не «как в проде», но это честный способ собрать рабочую модель без зависаний.
#include <chrono>
#include <iostream>
#include <thread>
void worker_loop(TaskQueue& queue, int tasks_to_process) {
for (int i = 0; i < tasks_to_process; ++i) {
Task t = queue.pop_blocking(); // ждём, пока появится задача
// обработка вне очереди: имитируем работу
std::cout << "[worker] task #" << t.id << ": " << t.text << "\n";
// пример вывода: [worker] task #1: say hello
std::this_thread::sleep_for(std::chrono::milliseconds{50});
}
std::cout << "[worker] done\n"; // [worker] done
}
Здесь важно, что pop_blocking() блокирует поток без busy-wait. Worker не «крутится», не ест процессор, а спит, пока producer не добавит задачу и не вызовет notify_one().
Producer: запускаем worker и кладём задачи в очередь
Теперь соберём маленькое консольное приложение. Пусть главный поток будет producer-ом: он создаёт задачи и кладёт их в очередь. Второй поток — consumer: он забирает и печатает. Это уже очень похоже на реальные вещи: главный поток «генерирует события», worker «обрабатывает».
Минимальный main() будет выглядеть так:
#include <iostream>
#include <thread>
#include <vector>
int main() {
TaskQueue queue;
const std::vector<Task> tasks = {
{1, "say hello"},
{2, "compute something (pretend)"},
{3, "write log line"},
{4, "send email (pretend)"},
{5, "cleanup"},
};
std::thread worker(worker_loop, std::ref(queue), (int)tasks.size());
for (const auto& t : tasks) {
std::cout << "[main] push task #" << t.id << "\n";
// пример вывода: [main] push task #1
queue.push(t);
}
worker.join();
std::cout << "[main] program finished\n"; // [main] program finished
}
Пара деталей, которые здесь хочется проговорить вслух (и желательно без крика). std::ref(queue) нужен, чтобы передать очередь по ссылке, иначе std::thread попытается скопировать queue, а наш TaskQueue копировать нельзя (внутри mutex, а mutex — не копируемый).
Также обратите внимание на порядок: мы сначала запускаем worker, а потом кладём задачи. Если сделать наоборот, ничего «логически» не сломается, но смысл демонстрации чуть потеряется: worker всё равно справится, потому что pop_blocking() правильно ждёт состояние. Это, кстати, очень важная черта хорошего протокола: он должен быть корректным независимо от того, кто «успел первым».
Где именно держится mutex: «рентген» протокола
Когда вы только начинаете, многопоточность кажется чёрным ящиком: «оно ждёт, потом просыпается, потом что-то там…». Чтобы стало спокойнее, полезно разложить один цикл consumer-а по шагам и прямо подписать, где держится замок, а где нет. Тогда condition_variable перестаёт выглядеть как магия и становится обычным протоколом.
Нарисуем последовательность событий при одном push и одном pop:
sequenceDiagram
participant P as Producer (main)
participant M as Mutex
participant Q as Queue
participant CV as condition_variable
participant C as Consumer (worker)
C->>M: lock (unique_lock)
C->>CV: wait(lock, pred)<br/>mutex отпускается внутри wait
Note over C,CV: Поток worker спит
P->>M: lock (lock_guard)
P->>Q: push(task)
P->>M: unlock
P->>CV: notify_one()
CV-->>C: wake up
C->>M: lock (возвращается из wait с захваченным mutex)
C->>Q: pop(task)
C->>M: unlock
C->>C: process task (без mutex)
Если свести смысл в одну фразу, получится почти дзен: под замком мы меняем и читаем состояние, а wait — это “уснуть, временно отпустив замок, и проснуться снова с замком”.
Очень частая ошибка — сделать обработку задачи под замком. Диагностика простая: если вы видите, что worker держит mutex и при этом делает sleep_for, печатает большие объёмы, или, не дай бог, ходит в сеть, значит очередь у вас превращается в «узкое горлышко».
3. Типичные ошибки
Ошибка №1: «Я проверю empty() без mutex, это же просто посмотреть».
Так обычно начинается история про редкие, неуловимые баги. В многопоточности «просто посмотреть» — это уже участие в гонке данных, если другой поток может одновременно модифицировать очередь. Правильная дисциплина такая: любые empty()/front()/pop()/push() выполняются под одним и тем же mutex, без исключений.
Ошибка №2: consumer делает front()/pop() без гарантии !empty().
Даже если вам кажется, что «producer точно уже положил», реальный планировщик потоков не подписывал контракт «делать как вам удобно». Consumer обязан ждать состояние «очередь не пуста» и только потом извлекать элемент. В корректной модели это выражается либо через cv.wait(lock, predicate), либо через эквивалентный цикл ожидания.
Ошибка №3: держать mutex во время обработки задачи.
Это убивает параллелизм. В итоге producer стоит в очереди за вашим worker-ом, как в поликлинике за справкой. Правильный паттерн: под замком только “взял задачу из очереди”, затем замок отпустили, и только потом выполняем работу. Если обработка долгая — тем более важно делать её снаружи.
Ошибка №4: вызывать notify_one() «где-нибудь потом» или вообще забыть.
Если producer меняет состояние так, что предикат ожидания становится истинным (например, очередь перестала быть пустой), но не делает notify_*, consumer может остаться спать слишком долго. В хорошей реализации push всегда заканчивается уведомлением. Это не «оптимизация», а часть протокола.
Ошибка №5: возвращать наружу ссылку/указатель на элемент очереди.
Очередь — контейнер, а элемент в контейнере живёт ровно до тех пор, пока вы его не удалили. Если вы сделали front(), потом pop(), а снаружи где-то храните ссылку на “тот самый front”, вы получите ссылку на уже несуществующий объект. Надёжная практика: забрать значение (копией или std::move) в локальную переменную под замком, а дальше работать с локальной переменной.
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ