JavaRush /Курси /Go SELF /Пул воркерів — черга завдань + N воркерів

Пул воркерів — черга завдань + N воркерів

Go SELF
Рівень 69 , Лекція 0
Відкрита

1. Навіщо потрібен пул воркерів

Коли ви вперше дізнаєтеся про goroutine, дуже легко зробити цілком природний висновок: «Чудово! Тоді на кожне завдання — по goroutine». Це схоже на момент, коли людина вперше дізналася про нескінченні макарони: хочеться перевірити, чи вони справді нескінченні. Проблема в тому, що нескінченність зазвичай закінчується на слові memory.

Якщо завдань небагато (наприклад, 10–100), підхід «по одній goroutine на завдання» часто справді працює і виглядає гарно. Але якщо завдань багато — тисячі чи сотні тисяч, — ви раптом створюєте величезну армію goroutine, яка:

  • по-перше, має накладні витрати (небільші, але не нульові);
  • по-друге, її складніше коректно завершити, особливо якщо є канали результатів;
  • по-третє, ви втрачаєте контроль: питання «скільки завдань зараз виконується одночасно» стає питанням віри.

Worker pool розв’язує це: ми запускаємо фіксовану кількість воркерів (наприклад, 4, 8, 16), які беруть завдання зі спільної черги та обробляють їх. Це приблизно як каси в супермаркеті: покупців може бути багато, а кас — скінченна кількість, тож черга просто вирівнює навантаження.

2. Ролі та протокол worker pool

Перед тим як писати код, важливо не кидатися з головою в make(chan ...) , а спершу домовитися про ролі. У пулі воркерів зручно мислити кількома «персонажами»: хтось кладе завдання в чергу, хтось їх забирає, хтось закриває вихід, а хтось збирає підсумки.

Нижче — мінісхема, щоб зафіксувати картинку. Ми ще повернемося до неї, коли збиратимемо все в одну програму:

flowchart LR
    P[Виробник<br/>створює завдання] -->|канал jobs| W1[Воркер 1]
    P -->|канал jobs| W2[Воркер 2]
    P -->|канал jobs| WN[Воркер N]

    W1 -->|канал results| C[Збирач<br/>збирає результати]
    W2 -->|канал results| C
    WN -->|канал results| C

Найважливіше правило, яке рятує від половини панік і зависань, таке:

Канал закриває той, хто в нього пише, і закриває рівно один раз.

А якщо тих, хто пише, багато — тобто багато воркерів пишуть у results, — закриття виконує окрема сутність, координатор, який не пише результати, а лише чекає завершення воркерів і закриває канал. Це правило дозволяє уникати класичного panic: close of closed channel.

Про close корисно пам’ятати головне: закриття каналу означає «більше значень надсилатися не буде». Жодної «магії видалення даних» там немає: раніше надіслані значення все одно можна дочитати.

3. Підготуємо приклад: Task, Job і Result

Щоб приклади не були абстрактними «job #123», продовжимо на прикладі умовного застосунку «список завдань». Уявімо, що в нас є Task, і ми хочемо порахувати статистику за заголовками: наприклад, кількість слів у Title. Завдання навмисно просте: нам не потрібні ні файлова система, ні HTTP, ні бази — лише конкурентна обробка.

Модель Task

package main

type Task struct {
	ID    int
	Title string
	Done  bool
}

Типи TaskJob і TaskResult

Визначимо, що таке «робота» і що таке «результат». Важливо: ми вводимо окремі типи не тому, що Go змушує, а тому, що так читабельніше. Канал chan int без імені — це загадка. Канал chan TaskJob — це вже майже документація.

package main

type TaskJob struct {
	Task Task
}

type TaskResult struct {
	TaskID    int
	WordCount int
	Err       error
}

Зверніть увагу: TaskResult містить Err error. Сьогодні ми не будуватимемо складної стратегії помилок, але вже зараз корисно звикати до того, що результат роботи може бути або успішним, або з помилкою. І зручно передавати це одним об’єктом.

Помилка ErrEmptyTitle

package main

import "errors"

var ErrEmptyTitle = errors.New("empty title")

4. Базові версії без worker pool

Будь-яка конкурентність стає зрозумілішою, якщо спочатку побачити звичайний послідовний варіант. А вже потім — те, на що його перетворюють канали та goroutine.

Послідовний варіант

package main

import (
	"fmt"
	"strings"
)

func wordCount(title string) int {
	return len(strings.Fields(title))
}

func main() {
	tasks := []Task{
		{ID: 1, Title: "buy milk"},
		{ID: 2, Title: "write go code"},
	}

	for _, t := range tasks {
		fmt.Println(t.ID, wordCount(t.Title)) // 1 2; 2 3 (порядок стабільний)
	}
}

Послідовний варіант дає стабільний порядок, простоту й мінімум деталей. Але якщо обробка важка (наприклад, парсинг, хешування, валідація великого імпорту), послідовний підхід може бути повільним. І тоді з’являється бажання розпаралелити його.

Наївна конкурентність: goroutine на кожне завдання

Тепер напишемо наївний варіант: запустимо goroutine для кожного Task. Це виглядає красиво… рівно до того моменту, коли завдань стає забагато.

package main

import (
	"fmt"
	"strings"
	"sync"
)

func main() {
	tasks := []Task{{ID: 1, Title: "buy milk"}, {ID: 2, Title: "write go code"}}

	var wg sync.WaitGroup
	for _, t := range tasks {
		wg.Add(1)
		go func(task Task) {
			defer wg.Done()
			n := len(strings.Fields(task.Title))
			fmt.Println(task.ID, n)
		}(t)
	}
	wg.Wait()
}

Тут уже з’являється важлива пастка для новачків: ми передаємо t у функцію як task, щоб не потрапити в пастку змінної циклу. Ви це вже бачили раніше: навіть у Go 1.22+ є сценарії, де можна випадково розділити одну змінну між ітераціями, якщо оголосити її не в тому місці.

Однак головна проблема не в цьому. Проблема в тому, що якщо tasks стане, скажімо, 200_000, ви створите 200_000 goroutine. Вони й справді легкі, але не настільки, щоб сприймати це як свято. Worker pool робить таку конструкцію керованою.

5. Реалізація worker pool: код крок за кроком

Зараз зберемо ключові елементи: воркер, чергу завдань, координатора закриття results, producer і collector, а потім — main, який усе з’єднує.

Worker: читає з jobs і пише в results

Ключовий елемент — worker. Він має брати завдання з jobs, доки канал не закриють. Це ідеально лягає на for range, тому що range по каналу завершується, коли канал закрито й усі значення з нього вже прочитано.

Зверніть увагу на сигнатуру: jobs <-chan TaskJob (лише читати), results chan<- TaskResult (лише писати). Спрямовані канали — невелика, але дуже корисна «страховка для мозку»: функція не зможе випадково закрити чужий канал або прочитати не там, де треба.

package main

import (
	"strings"
	"sync"
)

func worker(jobs <-chan TaskJob, results chan<- TaskResult, wg *sync.WaitGroup) {
	defer wg.Done()

	for j := range jobs {
		title := j.Task.Title
		if title == "" {
			results <- TaskResult{TaskID: j.Task.ID, Err: ErrEmptyTitle}
			continue
		}

		n := len(strings.Fields(title))
		results <- TaskResult{TaskID: j.Task.ID, WordCount: n}
	}
}

Важливо: воркер не закриває results. Він не власник каналу результатів, тому що воркерів багато. Якщо кожен воркер спробує закрити results, буде гонка за право закриття, і решта натрапить на паніку.

Запуск пулу: створюємо N воркерів і WaitGroup

Пул воркерів починається з черги, але технічно зручніше спершу написати функцію, яка запускає воркерів.

package main

import "sync"

func startPool(n int, jobs <-chan TaskJob, results chan<- TaskResult) *sync.WaitGroup {
	var wg sync.WaitGroup
	for i := 0; i < n; i++ {
		wg.Add(1)
		go worker(jobs, results, &wg)
	}
	return &wg
}

Зверніть увагу на дисципліну WaitGroup: Add(1) роблять до запуску goroutine. Це не бюрократія, а спосіб не зловити рідкісну, але дуже неприємну гонку, коли goroutine встигла завершитися й викликати Done(), а лічильник ще не збільшили.

Координатор: закриваємо results після завершення воркерів

Collector читає for r := range results { ... }. Щоб цикл завершився, канал results має бути закритий. Але зробити це вчасно може лише той, хто знає, що всі воркери завершилися.

Це і є роль координатора: дочекатися wg.Wait() і після цього викликати close(results).

package main

func closeResultsWhenDone(wg *sync.WaitGroup, results chan<- TaskResult) {
	go func() {
		wg.Wait()
		close(results)
	}()
}

Чому ми закриваємо results в окремій goroutine? Тому що якщо викликати wg.Wait() прямо в main, ви можете заблокуватися зарано й не встигнути, наприклад, почати читати результати. У простих прикладах це не завжди критично, але як звичка — дуже корисно: координатор живе окремо й закриває канал, коли роботу завершено.

Producer: надсилаємо завдання та закриваємо jobs

Producer кладе завдання в чергу. Тут працює правило володіння: якщо producer пише в jobs, отже producer і закриває jobs.

package main

func produceTasks(tasks []Task, jobs chan<- TaskJob) {
	defer close(jobs)

	for _, t := range tasks {
		jobs <- TaskJob{Task: t}
	}
}

defer close(jobs) — хороший прийом: навіть якщо ви пізніше додасте ранній вихід або обробку помилок, ймовірність забути закрити канал буде помітно меншою.

Collector: читаємо results до закриття та агрегуємо

Collector читає results і робить із ними щось корисне: друкує, складає у слайс, будує звіт.

Зробімо простий збір: виведемо на екран і порахуємо, скільки було помилок.

package main

import "fmt"

func collect(results <-chan TaskResult) int {
	errCount := 0
	for r := range results {
		if r.Err != nil {
			fmt.Println("error:", r.TaskID, r.Err) // error: 3 empty title
			errCount++
			continue
		}
		fmt.Println("ok:", r.TaskID, r.WordCount)
	}
	return errCount
}

Тут важливо морально прийняти факт: порядок результатів не гарантується. Воркер №2 може обробити завдання №10 швидше, ніж воркер №1 — завдання №1. Якщо вам потрібен стабільний порядок, це розв’язується окремо (зазвичай сортуванням або розміщенням результатів за індексом), але це вже наступний шар логіки.

Збираємо все в main

main робить п’ять речей:

  • створює канали,
  • запускає воркерів,
  • запускає координатора закриття results,
  • запускає producer,
  • читає результати.
package main

import "fmt"

func main() {
	tasks := []Task{
		{ID: 1, Title: "buy milk"},
		{ID: 2, Title: "write go code"},
		{ID: 3, Title: ""},
	}

	jobs := make(chan TaskJob)
	results := make(chan TaskResult)

	wg := startPool(2, jobs, results)
	closeResultsWhenDone(wg, results)

	go produceTasks(tasks, jobs)

	errCount := collect(results)
	fmt.Println("errors:", errCount) // errors: 1
}

Зверніть увагу на кілька моментів, які разом дають «схему без зависань»:

  • produceTasks запускається в goroutine, щоб main не блокувався на надсиланні завдань і міг паралельно почати збір результатів.
  • jobs закриває producer, і воркери природно завершуються: for range jobs закінчується.
  • results закриває координатор, який дочекався всіх воркерів.
  • collector читає results через range, і цикл завершується автоматично після закриття каналу.

Це і є протокол worker pool: ніхто не чекає магічної кількості результатів, ніхто не закриває канали навмання, а завершення випливає з домовленостей.

Чому схема не зависає

Коли новачок уперше бачить worker pool, він зазвичай думає: «Тут же купа блокувань: канали, очікування, горутини… чому воно взагалі не зависає?» Спокійна відповідь така: для кожного блокувального місця є гарантований спосіб розблокування.

  • Producer може зависнути на jobs <- ..., але воркери читають jobs і розвантажують чергу.
  • Воркер може зависнути на results <- ..., але collector читає results.
  • Collector може «чекати нескінченно», але координатор закриває results, коли воркери завершилися.
  • Воркери можуть «чекати нескінченно» на читанні jobs, але producer закриває jobs.

Тобто завершення не залежить ні від таймера, ні від удачі: воно логічно випливає із закриття каналів і WaitGroup.

6. Типові помилки під час роботи з worker pool

Помилка № 1: забули закрити jobs.
Тоді воркери назавжди залишаться в for range jobs, тому що range по каналу завершується лише після close(jobs). А раз воркери не завершилися, координатор не закриє results, і collector теж чекатиме нескінченно. Зазвичай це виглядає як «програма нічого не робить і не завершується», і ви починаєте підозрювати, що комп’ютер образився.

Помилка № 2: закривають results усередині воркера.
Це майже гарантований panic: close of closed channel, тому що воркерів кілька, і кожен вважає себе головним. Якщо закриття каналу залежить від завершення кількох goroutine, його закриває не будь-який учасник, а координатор після wg.Wait().

Помилка № 3: роблять wg.Add(1) після go worker(...).
Іноді це «працює», а іноді ви ловите паніку на кшталт sync: negative WaitGroup counter або дивні зависання. Причина в тому, що goroutine може встигнути викликати Done(), поки ви ще не зробили Add(1). Дисципліна проста: Add завжди до запуску goroutine.

Помилка № 4: collector читає не через range results, а «рівно N разів».
Такий код крихкий: ви змінюєте кількість завдань, додаєте ранній вихід у producer, додаєте фільтрацію — і раптом або недочитуєте результати, або чекаєте зайві. Читання через range по results прив’язується до реального протоколу завершення, а не до припущень.

Помилка № 5: очікують, що результати прийдуть у тому самому порядку, що й завдання.
У worker pool порядок виконання не детермінований. Якщо вам потрібен порядок, його слід явно відновити: наприклад, складати результати за TaskID у слайс або мапу, а потім виводити в потрібному порядку. Важливо не боротися з конкурентністю, очікуючи стабільного порядку, а сприймати порядок як окреме завдання форматування.

Коментарі
ЩОБ ПОДИВИТИСЯ ВСІ КОМЕНТАРІ АБО ЗАЛИШИТИ КОМЕНТАР,
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ