1. Backpressure: что это такое
Когда вы только начинаете писать конкурентный код, очень хочется сделать так, чтобы «ничего никогда не блокировалось». Кажется, будто блокировки — это баг, а неблокирующая работа — признак крутизны. В Go всё чуть хитрее: блокировка на отправке в канал часто является полезным сигналом, который стабилизирует систему и не даёт ей раздуваться по памяти.
Backpressure (буквально «обратное давление») — это ситуация, когда производитель (producer) вынужден замедлиться, потому что потребители (workers/collector) не успевают. В Go это чаще всего выражается очень просто: отправка в канал блокируется, потому что канал заполнен (если он buffered) или потому что нет читателя (если он unbuffered). И это нормально: программа таким образом «саморегулируется».
Представьте конвейер на заводе. Если упаковщики не успевают, конвейер должен замедлиться, иначе коробки начнут падать на пол, а потом придёт бухгалтер и скажет: «А почему у нас склад теперь размером с космодром?» Вот backpressure — это «конвейер замедляется сам».
Буфер канала как очередь: cap(ch) — это лимит
Канал в Go можно воспринимать как трубу, но для понимания памяти полезнее думать про очередь. Если канал buffered, он может хранить некоторое количество элементов «внутри себя». И вот это количество — не абстракция, а очень конкретное число: cap(ch).
Важно сразу зафиксировать: буфер канала хранит значения. Если вы отправляете в канал Task, то в буфере лежат Task. Если вы отправляете *Task, то в буфере лежат указатели. И это напрямую влияет на память: буфер удерживает эти значения «живыми», и сборщик мусора не сможет освободить то, на что они ссылаются.
Мини-демо, чтобы почувствовать len/cap руками:
package main
import "fmt"
func main() {
ch := make(chan int, 2)
ch <- 10
ch <- 20
fmt.Println(len(ch), cap(ch)) // 2 2
// ch <- 30 // тут бы заблокировались: буфер полон
}
Здесь cap(ch) — это «сколько мы можем накопить», а len(ch) — «сколько уже накопили прямо сейчас». Да, len(ch) в конкурентной программе меняется «на лету» и не должен быть управляющим условием, но как диагностический термометр он полезен: показывает, где образуется пробка.
2. Почему большой буфер может съесть память
Часто разработчик видит блокировку и думает: «О, надо добавить буфер побольше — и станет быстрее». Иногда станет. А иногда вы просто переносите проблему в память: вместо того чтобы блокироваться, producer начинает накапливать данные в очереди, и программа «вежливо» умирает от OOM (out of memory), не мешая вам наслаждаться отсутствием блокировок.
Есть три механизма, из-за которых буферы могут внезапно стать пожирателями памяти.
-
Прямолинейный механизм: cap(jobs) * размер(Task). Если задача тяжёлая (например, содержит большую строку, []byte, или ссылается на большой объект), то даже очередь в 10_000 задач — это уже серьёзно.
-
Удержание backing array через «лёгкие заголовки»: даже если в канал вы кладёте небольшой «заголовок», например []byte (который сам по себе маленький), он содержит указатель на backing array. То есть буфер канала удерживает ссылку, а значит удерживает и всю память backing array.
Это похоже на историю со слайсами, где «невидимый хвост» может удерживать память. В стандартной библиотеке даже пришлось доработать функции вроде slices.Delete, чтобы они очищали хвост, иначе старые ссылки мешали сборщику мусора освобождать объекты. С каналами логика похожая: пока значение лежит в очереди, память «не отпустит».
-
Очередь может быть не в канале, а в goroutine: если вы делаете go func(){ ... acquire sem ... }(), то goroutine могут копиться тысячами и ждать токен. Параллельность вроде ограничена, а память всё равно растёт — потому что очередь спряталась не в канале, а в планировщике.
4. Учебная мини-система: конвейер задач и «склад»
Чтобы не обсуждать backpressure в вакууме, давайте продолжим наш общий стиль примеров: у нас есть конвейер “producer → workers → collector”. Представим, что в нашем учебном приложении (условно: таск-трекер/обработчик пачки задач) появился режим «массовой обработки», где нужно применить однотипную операцию к большому списку входных элементов.
Начнём с типов. Они максимально простые, но уже позволяют говорить о памяти осмысленно:
package main
type Task struct {
ID int
Payload string // представим, что это может быть "тяжёлой" строкой
}
type Result struct {
ID int
Size int
Err error
}
Теперь воркер. Он делает “работу” — здесь просто считает длину строки, но на реальной задаче это мог бы быть парсинг, валидация, компрессия и т.п.
package main
import "sync"
func worker(jobs <-chan Task, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for t := range jobs {
results <- Result{ID: t.ID, Size: len(t.Payload)}
}
}
Обратите внимание: здесь results <- ... потенциально блокирует воркера. Это и есть точка backpressure между worker и collector. Если collector читает медленно, воркеры начинают ждать, а значит перестают разбирать новые задачи. И это может быть как плохо (падение throughput), так и хорошо (система сама ограничивается и не раздувает очередь дальше).
5. Практика: где буферить и как не съесть память
Где ставить буфер: jobs, results и перенос пробки
Обычно новичок спрашивает: «Так буфер нужен или нет?» Правильный ответ: «Буфер — это не “нужен/не нужен”, а где и какого размера».
Если jobs unbuffered, producer будет очень тесно связан с воркерами: отправил задачу — кто-то должен тут же её принять. Это даёт сильный backpressure вверх по конвейеру: producer не сможет «убежать вперёд».
Если jobs buffered, то producer может «налить задач в очередь» и уйти дальше. Это повышает загрузку воркеров (меньше простоя), но создаёт склад задач в памяти.
Если results unbuffered, воркеры тесно связаны с collector’ом. Если вывод/агрегация результатов медленные, воркеры будут простаивать. Если results buffered, воркеры смогут отправить несколько результатов «вперёд», но буфер результатов тоже является складом и тоже съедает память.
Наглядная схема, где буферы — это склады:
flowchart LR P[producer] -->|send| J[(jobs buffer)] J --> W[workers x N] W -->|send| R[(results buffer)] R --> C[collector]
Самый важный вывод здесь такой: буфер не убирает пробку, он переносит её. Пробка либо станет пробкой на заполнении буфера, либо превратится в рост памяти. Это не “плохо”, это просто физика системы.
Bounded queue и осознанные размеры
Теперь к практической части: что делать, если задач много (десятки тысяч, миллионы), а память хочется оставить живой.
Главный приём — сделать очередь задач bounded, то есть ограниченной. Для worker pool это обычно означает: jobs := make(chan Task, queueSize), где queueSize — не «бесконечность», а разумная величина.
Пример “разумного склада”: пусть очередь вмещает 100 задач, а воркеров 4. Тогда максимум задач, которые одновременно «живут в системе» (и ещё не обработаны), примерно равен очереди плюс то, что воркеры держат в руках прямо сейчас. Producer не сможет улететь в космос и залить миллион задач, потому что на 101-й задаче он блокируется и ждёт.
package main
import "sync"
func run(workers, queueSize int, tasks []Task) []Result {
jobs := make(chan Task, queueSize)
results := make(chan Result, queueSize)
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go worker(jobs, results, &wg)
}
go func() {
wg.Wait()
close(results)
}()
go func() {
defer close(jobs)
for _, t := range tasks {
jobs <- t // backpressure: если очередь полна, producer ждёт
}
}()
var out []Result
for r := range results {
out = append(out, r)
}
return out
}
Здесь специально сделано так, чтобы queueSize влиял и на jobs, и на results. В реальности размеры могут быть разными, но для первого понимания это хорошая модель: мы ограничили “склад задач” и “склад результатов”.
Теперь антипример — то, что кажется «ускорением», но на больших объёмах превращается в «память, прощай»:
package main
func badIdea(tasks []Task) chan Task {
jobs := make(chan Task, 1_000_000) // звучит “круто”, но это огромный склад
for _, t := range tasks {
jobs <- t
}
return jobs
}
Проблема даже не в самом числе 1_000_000. Проблема в том, что это необоснованный склад, и вы потеряли backpressure. Если Task тяжёлый, вы буквально попросили программу держать в памяти «миллион тяжёлых объектов», даже если воркеры успевают обрабатывать только 100 в секунду.
Буфер на 1: «чтобы не зависло»
Есть отдельный класс ситуаций, когда маленький буфер — это не про производительность, а про корректность завершения.
Классический пример: у вас есть goroutine, которая по таймауту пытается отправить сигнал, и ей не важно, прочитал ли кто-то этот сигнал. Тогда канал часто делают buffered на 1, чтобы goroutine не зависла на send и могла завершиться. Это ровно та логика, почему в схемах “first error” канал ошибок часто make(chan error, 1): мы хотим гарантировать, что первая ошибка “влезет”, и воркер не повиснет, если читатель ещё не готов.
Вот очень маленькая иллюстрация из нашей темы. Допустим, collector хочет «принять первую ошибку» и дальше остановить систему. Канал errCh часто делают буфером 1, чтобы отправитель не завис:
package main
import "errors"
var ErrBoom = errors.New("boom")
func reportFirst(errCh chan<- error, err error) {
if err == nil {
return
}
select {
case errCh <- err:
default:
// ошибка уже записана, не блокируемся
}
}
Здесь важный баланс: мы используем буфер (и select + default) не чтобы “ускорить”, а чтобы не получить зависшую goroutine. Но если вы начнёте так же “не блокироваться” везде, вы можете случайно превратить систему в “теряющую данные” (drop) и потом долго искать, куда пропали результаты.
Как выбрать размер буфера без гадания
Очень хочется универсальную формулу: “ставь queueSize = workers * 10 и будь счастлив”. Иногда так и делают, но важно понимать смысл.
Размер буфера — это компромисс между тремя вещами: памятью, задержками и загрузкой воркеров. Маленький буфер усиливает backpressure: producer чаще ждёт, зато очередь не растёт, и память спокойна. Большой буфер сглаживает рывки скорости: producer может быстро «налить» данные, воркеры не простаивают, но растёт задержка (задача может долго лежать в очереди), и растёт память.
Удобно думать так:
| Параметр | Если увеличить cap(jobs) | Если уменьшить cap(jobs) |
|---|---|---|
| Память | растёт склад задач | уменьшается склад задач |
| Latency (время ожидания задачи) | может вырасти (длиннее очередь) | обычно меньше очередь |
| Backpressure | слабее (producer меньше ждёт) | сильнее (producer чаще ждёт) |
| Устойчивость к всплескам | выше | ниже |
И ещё одна мысль, которую важно проговорить: backpressure — это не наказание, а механизм устойчивости. Когда у вас большие объёмы данных, вы почти всегда хотите, чтобы система умела сказать producer’у: «Стоп, я не успеваю, подожди». Если producer продолжит «пихать» без ограничений, вы не ускорите обработку — вы просто перенесёте данные из входа в память программы.
6. Типичные ошибки
Ошибка №1: делать огромный буфер “чтобы никогда не блокировалось”.
Это выглядит как оптимизация, но на больших объёмах часто превращается в “очередь-склад”, которая держит данные в памяти часами. В результате вы выигрываете пару миллисекунд на send, но проигрываете весь процесс из‑за роста памяти и длинных очередей.
Ошибка №2: считать, что buffered channel “всегда быстрее”.
Буфер часто просто переносит место, где вы блокируетесь: вместо того чтобы блокироваться сразу на jobs <- t, вы блокируетесь позже, когда буфер заполнится. Если потребители медленнее производителя, физику вы не обманете — вы только решаете, где копить.
Ошибка №3: не понимать, что канал удерживает память через ссылки.
Если в очереди лежит []byte, string, map или указатель на структуру, то буфер удерживает эти ссылки, а значит — удерживает backing array/объекты, пока значение не будет прочитано. Это очень похоже на ситуацию со слайсами и “хвостом”, который удерживает объекты и мешает сборщику мусора.
Ошибка №4: пытаться управлять конкурентной логикой через len(ch).
len(ch) полезен в отладке и логах, но как управляющее условие в конкурентной программе он почти всегда приводит к гонкам и странным зависаниям. Завершение конвейера обычно выражается протоколом закрытия каналов (close) и ожиданием воркеров (WaitGroup), а не “когда len стал 0”.
Ошибка №5: “collector устал и ушёл”, а воркеры продолжают писать в results.
Если consumer перестаёт читать, то воркеры могут зависнуть на отправке. Иногда это лечится буфером, но буфер лишь отсрочит проблему. Более надёжный путь — чёткий протокол остановки (обычно через context), где воркеры уважают ctx.Done() и имеют путь корректно завершиться, а не упираться в блокирующий send.
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ