1. Навіщо потрібен пул воркерів
Коли ви вперше дізнаєтеся про goroutine, дуже легко зробити цілком природний висновок: «Чудово! Тоді на кожне завдання — по goroutine». Це схоже на момент, коли людина вперше дізналася про нескінченні макарони: хочеться перевірити, чи вони справді нескінченні. Проблема в тому, що нескінченність зазвичай закінчується на слові memory.
Якщо завдань небагато (наприклад, 10–100), підхід «по одній goroutine на завдання» часто справді працює і виглядає гарно. Але якщо завдань багато — тисячі чи сотні тисяч, — ви раптом створюєте величезну армію goroutine, яка:
- по-перше, має накладні витрати (небільші, але не нульові);
- по-друге, її складніше коректно завершити, особливо якщо є канали результатів;
- по-третє, ви втрачаєте контроль: питання «скільки завдань зараз виконується одночасно» стає питанням віри.
Worker pool розв’язує це: ми запускаємо фіксовану кількість воркерів (наприклад, 4, 8, 16), які беруть завдання зі спільної черги та обробляють їх. Це приблизно як каси в супермаркеті: покупців може бути багато, а кас — скінченна кількість, тож черга просто вирівнює навантаження.
2. Ролі та протокол worker pool
Перед тим як писати код, важливо не кидатися з головою в make(chan ...) , а спершу домовитися про ролі. У пулі воркерів зручно мислити кількома «персонажами»: хтось кладе завдання в чергу, хтось їх забирає, хтось закриває вихід, а хтось збирає підсумки.
Нижче — мінісхема, щоб зафіксувати картинку. Ми ще повернемося до неї, коли збиратимемо все в одну програму:
flowchart LR
P[Виробник<br/>створює завдання] -->|канал jobs| W1[Воркер 1]
P -->|канал jobs| W2[Воркер 2]
P -->|канал jobs| WN[Воркер N]
W1 -->|канал results| C[Збирач<br/>збирає результати]
W2 -->|канал results| C
WN -->|канал results| C
Найважливіше правило, яке рятує від половини панік і зависань, таке:
Канал закриває той, хто в нього пише, і закриває рівно один раз.
А якщо тих, хто пише, багато — тобто багато воркерів пишуть у results, — закриття виконує окрема сутність, координатор, який не пише результати, а лише чекає завершення воркерів і закриває канал. Це правило дозволяє уникати класичного panic: close of closed channel.
Про close корисно пам’ятати головне: закриття каналу означає «більше значень надсилатися не буде». Жодної «магії видалення даних» там немає: раніше надіслані значення все одно можна дочитати.
3. Підготуємо приклад: Task, Job і Result
Щоб приклади не були абстрактними «job #123», продовжимо на прикладі умовного застосунку «список завдань». Уявімо, що в нас є Task, і ми хочемо порахувати статистику за заголовками: наприклад, кількість слів у Title. Завдання навмисно просте: нам не потрібні ні файлова система, ні HTTP, ні бази — лише конкурентна обробка.
Модель Task
package main
type Task struct {
ID int
Title string
Done bool
}
Типи TaskJob і TaskResult
Визначимо, що таке «робота» і що таке «результат». Важливо: ми вводимо окремі типи не тому, що Go змушує, а тому, що так читабельніше. Канал chan int без імені — це загадка. Канал chan TaskJob — це вже майже документація.
package main
type TaskJob struct {
Task Task
}
type TaskResult struct {
TaskID int
WordCount int
Err error
}
Зверніть увагу: TaskResult містить Err error. Сьогодні ми не будуватимемо складної стратегії помилок, але вже зараз корисно звикати до того, що результат роботи може бути або успішним, або з помилкою. І зручно передавати це одним об’єктом.
Помилка ErrEmptyTitle
package main
import "errors"
var ErrEmptyTitle = errors.New("empty title")
4. Базові версії без worker pool
Будь-яка конкурентність стає зрозумілішою, якщо спочатку побачити звичайний послідовний варіант. А вже потім — те, на що його перетворюють канали та goroutine.
Послідовний варіант
package main
import (
"fmt"
"strings"
)
func wordCount(title string) int {
return len(strings.Fields(title))
}
func main() {
tasks := []Task{
{ID: 1, Title: "buy milk"},
{ID: 2, Title: "write go code"},
}
for _, t := range tasks {
fmt.Println(t.ID, wordCount(t.Title)) // 1 2; 2 3 (порядок стабільний)
}
}
Послідовний варіант дає стабільний порядок, простоту й мінімум деталей. Але якщо обробка важка (наприклад, парсинг, хешування, валідація великого імпорту), послідовний підхід може бути повільним. І тоді з’являється бажання розпаралелити його.
Наївна конкурентність: goroutine на кожне завдання
Тепер напишемо наївний варіант: запустимо goroutine для кожного Task. Це виглядає красиво… рівно до того моменту, коли завдань стає забагато.
package main
import (
"fmt"
"strings"
"sync"
)
func main() {
tasks := []Task{{ID: 1, Title: "buy milk"}, {ID: 2, Title: "write go code"}}
var wg sync.WaitGroup
for _, t := range tasks {
wg.Add(1)
go func(task Task) {
defer wg.Done()
n := len(strings.Fields(task.Title))
fmt.Println(task.ID, n)
}(t)
}
wg.Wait()
}
Тут уже з’являється важлива пастка для новачків: ми передаємо t у функцію як task, щоб не потрапити в пастку змінної циклу. Ви це вже бачили раніше: навіть у Go 1.22+ є сценарії, де можна випадково розділити одну змінну між ітераціями, якщо оголосити її не в тому місці.
Однак головна проблема не в цьому. Проблема в тому, що якщо tasks стане, скажімо, 200_000, ви створите 200_000 goroutine. Вони й справді легкі, але не настільки, щоб сприймати це як свято. Worker pool робить таку конструкцію керованою.
5. Реалізація worker pool: код крок за кроком
Зараз зберемо ключові елементи: воркер, чергу завдань, координатора закриття results, producer і collector, а потім — main, який усе з’єднує.
Worker: читає з jobs і пише в results
Ключовий елемент — worker. Він має брати завдання з jobs, доки канал не закриють. Це ідеально лягає на for range, тому що range по каналу завершується, коли канал закрито й усі значення з нього вже прочитано.
Зверніть увагу на сигнатуру: jobs <-chan TaskJob (лише читати), results chan<- TaskResult (лише писати). Спрямовані канали — невелика, але дуже корисна «страховка для мозку»: функція не зможе випадково закрити чужий канал або прочитати не там, де треба.
package main
import (
"strings"
"sync"
)
func worker(jobs <-chan TaskJob, results chan<- TaskResult, wg *sync.WaitGroup) {
defer wg.Done()
for j := range jobs {
title := j.Task.Title
if title == "" {
results <- TaskResult{TaskID: j.Task.ID, Err: ErrEmptyTitle}
continue
}
n := len(strings.Fields(title))
results <- TaskResult{TaskID: j.Task.ID, WordCount: n}
}
}
Важливо: воркер не закриває results. Він не власник каналу результатів, тому що воркерів багато. Якщо кожен воркер спробує закрити results, буде гонка за право закриття, і решта натрапить на паніку.
Запуск пулу: створюємо N воркерів і WaitGroup
Пул воркерів починається з черги, але технічно зручніше спершу написати функцію, яка запускає воркерів.
package main
import "sync"
func startPool(n int, jobs <-chan TaskJob, results chan<- TaskResult) *sync.WaitGroup {
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go worker(jobs, results, &wg)
}
return &wg
}
Зверніть увагу на дисципліну WaitGroup: Add(1) роблять до запуску goroutine. Це не бюрократія, а спосіб не зловити рідкісну, але дуже неприємну гонку, коли goroutine встигла завершитися й викликати Done(), а лічильник ще не збільшили.
Координатор: закриваємо results після завершення воркерів
Collector читає for r := range results { ... }. Щоб цикл завершився, канал results має бути закритий. Але зробити це вчасно може лише той, хто знає, що всі воркери завершилися.
Це і є роль координатора: дочекатися wg.Wait() і після цього викликати close(results).
package main
func closeResultsWhenDone(wg *sync.WaitGroup, results chan<- TaskResult) {
go func() {
wg.Wait()
close(results)
}()
}
Чому ми закриваємо results в окремій goroutine? Тому що якщо викликати wg.Wait() прямо в main, ви можете заблокуватися зарано й не встигнути, наприклад, почати читати результати. У простих прикладах це не завжди критично, але як звичка — дуже корисно: координатор живе окремо й закриває канал, коли роботу завершено.
Producer: надсилаємо завдання та закриваємо jobs
Producer кладе завдання в чергу. Тут працює правило володіння: якщо producer пише в jobs, отже producer і закриває jobs.
package main
func produceTasks(tasks []Task, jobs chan<- TaskJob) {
defer close(jobs)
for _, t := range tasks {
jobs <- TaskJob{Task: t}
}
}
defer close(jobs) — хороший прийом: навіть якщо ви пізніше додасте ранній вихід або обробку помилок, ймовірність забути закрити канал буде помітно меншою.
Collector: читаємо results до закриття та агрегуємо
Collector читає results і робить із ними щось корисне: друкує, складає у слайс, будує звіт.
Зробімо простий збір: виведемо на екран і порахуємо, скільки було помилок.
package main
import "fmt"
func collect(results <-chan TaskResult) int {
errCount := 0
for r := range results {
if r.Err != nil {
fmt.Println("error:", r.TaskID, r.Err) // error: 3 empty title
errCount++
continue
}
fmt.Println("ok:", r.TaskID, r.WordCount)
}
return errCount
}
Тут важливо морально прийняти факт: порядок результатів не гарантується. Воркер №2 може обробити завдання №10 швидше, ніж воркер №1 — завдання №1. Якщо вам потрібен стабільний порядок, це розв’язується окремо (зазвичай сортуванням або розміщенням результатів за індексом), але це вже наступний шар логіки.
Збираємо все в main
main робить п’ять речей:
- створює канали,
- запускає воркерів,
- запускає координатора закриття results,
- запускає producer,
- читає результати.
package main
import "fmt"
func main() {
tasks := []Task{
{ID: 1, Title: "buy milk"},
{ID: 2, Title: "write go code"},
{ID: 3, Title: ""},
}
jobs := make(chan TaskJob)
results := make(chan TaskResult)
wg := startPool(2, jobs, results)
closeResultsWhenDone(wg, results)
go produceTasks(tasks, jobs)
errCount := collect(results)
fmt.Println("errors:", errCount) // errors: 1
}
Зверніть увагу на кілька моментів, які разом дають «схему без зависань»:
- produceTasks запускається в goroutine, щоб main не блокувався на надсиланні завдань і міг паралельно почати збір результатів.
- jobs закриває producer, і воркери природно завершуються: for range jobs закінчується.
- results закриває координатор, який дочекався всіх воркерів.
- collector читає results через range, і цикл завершується автоматично після закриття каналу.
Це і є протокол worker pool: ніхто не чекає магічної кількості результатів, ніхто не закриває канали навмання, а завершення випливає з домовленостей.
Чому схема не зависає
Коли новачок уперше бачить worker pool, він зазвичай думає: «Тут же купа блокувань: канали, очікування, горутини… чому воно взагалі не зависає?» Спокійна відповідь така: для кожного блокувального місця є гарантований спосіб розблокування.
- Producer може зависнути на jobs <- ..., але воркери читають jobs і розвантажують чергу.
- Воркер може зависнути на results <- ..., але collector читає results.
- Collector може «чекати нескінченно», але координатор закриває results, коли воркери завершилися.
- Воркери можуть «чекати нескінченно» на читанні jobs, але producer закриває jobs.
Тобто завершення не залежить ні від таймера, ні від удачі: воно логічно випливає із закриття каналів і WaitGroup.
6. Типові помилки під час роботи з worker pool
Помилка № 1: забули закрити jobs.
Тоді воркери назавжди залишаться в for range jobs, тому що range по каналу завершується лише після close(jobs). А раз воркери не завершилися, координатор не закриє results, і collector теж чекатиме нескінченно. Зазвичай це виглядає як «програма нічого не робить і не завершується», і ви починаєте підозрювати, що комп’ютер образився.
Помилка № 2: закривають results усередині воркера.
Це майже гарантований panic: close of closed channel, тому що воркерів кілька, і кожен вважає себе головним. Якщо закриття каналу залежить від завершення кількох goroutine, його закриває не будь-який учасник, а координатор після wg.Wait().
Помилка № 3: роблять wg.Add(1) після go worker(...).
Іноді це «працює», а іноді ви ловите паніку на кшталт sync: negative WaitGroup counter або дивні зависання. Причина в тому, що goroutine може встигнути викликати Done(), поки ви ще не зробили Add(1). Дисципліна проста: Add завжди до запуску goroutine.
Помилка № 4: collector читає не через range results, а «рівно N разів».
Такий код крихкий: ви змінюєте кількість завдань, додаєте ранній вихід у producer, додаєте фільтрацію — і раптом або недочитуєте результати, або чекаєте зайві. Читання через range по results прив’язується до реального протоколу завершення, а не до припущень.
Помилка № 5: очікують, що результати прийдуть у тому самому порядку, що й завдання.
У worker pool порядок виконання не детермінований. Якщо вам потрібен порядок, його слід явно відновити: наприклад, складати результати за TaskID у слайс або мапу, а потім виводити в потрібному порядку. Важливо не боротися з конкурентністю, очікуючи стабільного порядку, а сприймати порядок як окреме завдання форматування.
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ