1. Навіщо потрібні fan-out і fan-in
Коли ви тільки починаєте писати конкурентний код, майже завжди з’являється один наївний план: «запущу N горутин, вони щось порахують, і я якось потім зберу результати». Наївність тут у слові «якось»: у конкурентному коді воно часто означає, що в п’ятницю ввечері ви дивитиметеся на завислу програму й думатимете, що це карма.
Fan-out/fan-in — це два дуже практичні шаблони, які дають вам зрозумілу схему: як роздати незалежну роботу кільком горутинам (fan-out), і як потім зібрати їхні відповіді в одне місце (fan-in). Це не магія й не фреймворк, а лише дисципліна: хто пише, хто читає, хто закриває канал і що відбувається, якщо хтось «зник».
Щоб простіше тримати це в голові, уявіть ресторан. Fan-out — ви роздаєте замовлення різним кухарям. Fan-in — збираєте готові страви на одну стійку видачі. Важливо: кухарі не повинні стояти з тарілкою вічно, якщо стійка вже закрита.
Намалюємо загальну картинку:
flowchart LR
A[Вхідні дані] -->|роздати| B1[виконавець 1]
A -->|роздати| B2[виконавець 2]
A -->|роздати| B3[виконавець 3]
B1 -->|результат| C[спільний результат]
B2 -->|результат| C
B3 -->|результат| C
C --> D[збирач / main]
2. Fan-out: роздаємо роботу горутинам
Fan-out зазвичай починається з простого: у нас є список завдань, наприклад числа, рядки або елементи. Ми хочемо обробити кожен елемент незалежно. Логіка обробки може бути будь-якою: порахувати, розібрати, перевірити, нормалізувати рядок, обчислити хеш тощо. У навчальних прикладах ми робитимемо щось просте, але мислитимемо як інженери: «що буде, якщо обробник працює швидко або повільно, а читач результатів — теж по-різному?»
Найпростіший fan-out: квадрати чисел
Ось «скелет»: створюємо results, запускаємо по горутині на елемент, а потім читаємо N результатів.
package main
import (
"fmt"
)
func main() {
values := []int{2, 3, 4}
results := make(chan int, len(values)) // буфер під усі результати
for _, v := range values {
go func(v int) {
results <- v * v
}(v) // важливо: передали v параметром
}
for i := 0; i < len(values); i++ {
fmt.Println(<-results) // порядок не гарантується
}
}
Зверніть увагу на дві важливі ідеї.
По‑перше, results буферизований на len(values). Це означає: кожна горутина зможе записати результат і завершитися, навіть якщо main ще не встиг почати читання. Це дуже корисна страховка від випадкових блокувань.
По‑друге, ми передаємо v в анонімну функцію параметром: }(v). Навіть у Go 1.22+ багато типових пасток із range стали менш небезпечними, але звичка робити вхід у горутину явним — це як звичка пристібатися. Можливо, ви й так доїдете, але пристебнутим спокійніше.
Чому надсилання результату може блокуватися
Тепер важливий момент, який багато хто недооцінює: надсилання results <- x — це теж синхронізація. Якщо канал без буфера або буфер заповнений, відправник чекатиме читача. А якщо читача немає і не буде — горутина зависає.
З погляду роботи програми це виглядає так: ваш код ніби завершився, але процес чомусь тримає десятки, сотні або тисячі горутин, які стоять на надсиланні. Це один із найчастіших шляхів до витоків горутин.
3. Fan-in: збираємо результати
Fan-in — це зворотний бік історії. Ми хочемо, щоб результати стікалися в один канал, який читає main або «збирач». На практиці fan-in трапляється у двох варіантах.
Перший варіант: у нас уже є один спільний results, як у прикладі з квадратами, і ми просто читаємо з нього N значень. Це коректно працює, якщо ми знаємо N і гарантуємо, що кожен виконавець надсилає рівно один результат.
Другий варіант: у нас кілька вхідних каналів, наприклад різні джерела даних, і ми хочемо об’єднати їх в один вихідний. Ось тут і з’являється класична функція merge.
merge: об’єднуємо два канали в один
Нижче — мінімальна версія: дві горутини пересилають значення, а окрема горутина-координатор закриває out, коли обидві пересилки завершилися.
package main
import (
"sync"
)
func merge(a, b <-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
wg.Add(2)
forward := func(in <-chan int) {
defer wg.Done()
for v := range in {
out <- v
}
}
go forward(a)
go forward(b)
go func() {
wg.Wait()
close(out)
}()
return out
}
Тут ключова дисципліна: out закриває один координатор. Не «кожен, хто втомився», не «хто перший здогадався», не «давайте всі закриємо, а потім подивимося». У Go закриття каналу — це подія, яку не можна робити двічі.
Оскільки нам важливо не просто «скопіювати код», а зрозуміти протокол, сформулюймо його словами.
forward читає in через range. Це означає: forward завершиться лише тоді, коли вхідний канал закриють. Отже, закриття in — відповідальність того, хто пише в in (producer). Коли обидва forward завершилися, координатор робить close(out), і читач out може завершити читання через range.
4. Комбінуємо й розширюємо патерни
Fan-out + fan-in: паралельний обробник задач
Тепер зберемо обидва шаблони в одну мініісторію, яка схожа на реальні задачі в застосунках. Припустімо, у нас є список завдань, у нашому навчальному стилі — це просто числа, і ми хочемо обробити їх паралельно, а потім зібрати результати.
Оголосімо тип результату, щоб було зрозуміліше, що саме ми повертаємо.
package main
import (
"fmt"
"time"
)
type Result struct {
Input int
Value int
}
func work(x int) Result {
time.Sleep(20 * time.Millisecond) // імітуємо роботу
return Result{Input: x, Value: x * x}
}
func main() {
values := []int{2, 3, 4}
results := make(chan Result, len(values))
for _, v := range values {
go func(v int) {
results <- work(v)
}(v)
}
for i := 0; i < len(values); i++ {
r := <-results
fmt.Println(r.Input, "->", r.Value) // наприклад: 3 -> 9
}
}
Це вже повноцінний fan-out: по горутині на завдання. І fan-in: один канал results, який збирає відповіді.
Зверніть увагу: порядок виводу буде довільним. Якщо вам потрібен стабільний порядок, ви зазвичай або сортуєте результати потім, або використовуєте інший протокол, наприклад індекс у результаті. Конкурентність рідко дає «все швидко й у тому порядку, який мені подобається», тому інженерна звичка не сподіватися на порядок економить нерви.
Перший результат виграє
Іноді вам не потрібні усі результати. Вам потрібен перший успішний або просто той, що прийшов першим. Типовий приклад: «запитаємо кілька джерел і візьмемо відповідь, яка прийшла швидше».
У Go це зазвичай робиться каналом ємності 1 і неблокувальним надсиланням через select з default. Неблокувальне надсилання гарантує, що горутина не зависне, навіть якщо читач уже «пішов» або результат більше не потрібен.
package main
import (
"fmt"
)
func main() {
values := []int{2, 3, 4}
out := make(chan int, 1) // потрібна лише одна відповідь
for _, v := range values {
go func(v int) {
r := v * v
select {
case out <- r:
// переможець надіслав результат
default:
// канал уже зайнятий: той, хто програв, просто йде
}
}(v)
}
fmt.Println(<-out) // один із: 4, 9, 16
}
Чому тут важливий буфер 1? Тому що інакше ви можете потрапити в гонку: «переможець» спробував надіслати, але читач ще не почав читання. Ідея цього патерну така: буфер гарантує, що першому відправнику є куди покласти значення, а default гарантує, що решта не зависнуть.
Тут, до речі, добре видно, що default — небезпечний інструмент. Він робить операцію неблокувальною, але ціною того, що ви можете втратити дані. У цьому патерні втрата даних — це фіча за контрактом: нам потрібна одна відповідь. Якщо за контрактом втрата неприпустима, default використовувати не можна.
5. Протоколи: хто закриває канал і коли
З конкурентністю часто проблема не в синтаксисі, а в тому, що люди не проговорюють протокол. Зафіксуймо кілька правил у вигляді невеликої таблиці. Це не абсолютна істина, але це дуже хороший стартовий контракт.
| Питання | Хороша відповідь | Чому |
|---|---|---|
| Хто закриває канал? | Той, хто пише у канал (producer) | Читач не знає, чи будуть ще значення |
| Чи можна закривати канал із кількох місць? | Ні | Подвійне закриття = panic |
| Чи можна закривати results, якщо в нього пишуть N горутин? | Так, але закриває координатор, який дочекався завершення всіх тих, хто пише | Інакше хтось може писати в закритий канал |
| Що краще: читати N значень чи читати range до закриття? | Залежить від протоколу | Якщо N відоме — можна N разів; якщо потік «доки не закінчиться» — range |
Як це виглядає на практиці у fan-out, коли тих, хто пише, багато? Часто використовують WaitGroup, а окрема горутина закриває results, коли всі завершили роботу. Це майже той самий прийом, який ми бачили в merge.
package main
import (
"sync"
)
func fanOutSquares(values []int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
wg.Add(len(values))
for _, v := range values {
go func(v int) {
defer wg.Done()
out <- v * v
}(v)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
Тепер споживач може писати так:
for x := range fanOutSquares([]int{2, 3, 4}) {
_ = x
}
Це зручно, тому що читання «доки потік не закінчиться» зазвичай легше підтримувати, ніж «рівно N разів». Але знову ж: це працює лише тоді, коли ви справді закриваєте канал коректно.
6. Типові помилки
Помилка №1: небуферизований results + «прочитаю потім» → відправники зависають.
Новачок часто створює results := make(chan int) і запускає десяток горутин, кожна з яких пише результат. А читання результатів відкладає «на потім», до завершення запуску всіх горутин. Але небуферизований канал потребує одночасного читача: горутини почнуть блокуватися на results <- ... уже на перших надсиланнях. Це лікується або буфером під очікувану кількість, або тим, що читач починає читати одразу, або окремим збирачем.
Помилка №2: «перший результат виграє», але надсилання зроблено звичайним out <- r.
Здається логічним: «я прочитаю один результат і піду». Проблема в тому, що горутини, які програли, усе ще спробують надіслати й зависнуть, бо читача більше немає. Правильний протокол — або неблокувальне надсилання через select { case out <- r: default: }, або окремий сигнал скасування.
Помилка №3: закривають out із кількох горутин.
Це одна з найчастіших панік на рівному місці. У fan-in merge хочеться: «щойно мій вхід закінчився — закрию вихід». Але якщо входів два, то вихід спробують закрити двічі. Рішення просте й нудне, а отже правильне: вихід закриває один координатор, який чекає завершення всіх учасників (наприклад, через WaitGroup).
Помилка №4: плутають «канал порожній» і «канал закритий».
Іноді роблять неблокувальне читання й думають: «якщо не прочиталося — значить, канал закритий». Ні: «не прочиталося» означає лише те, що просто зараз значення немає. Закриття визначається або через v, ok := <-ch (де ok == false), або через завершення range. Якщо ви будуєте логіку на хибному розрізненні, поведінка буде нестабільною: то працює, то ні — класика конкурентних багів.
Помилка №5: розраховують на порядок результатів.
Fan-out майже гарантує, що порядок «як у вихідному списку» не збережеться. Якщо порядок важливий, його потрібно зробити частиною даних (наприклад, додати індекс у Result) і потім упорядкувати або збирати результати в структуру за індексом. І так, це той момент, коли багато хто вперше розуміє, що конкурентність — не кнопка «зроби швидко», а контракт «зроби паралельно, але думай».
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ