JavaRush /Курси /Swift SELF /Обмеження паралелізму: не більше ніж N одночасних завдань...

Обмеження паралелізму: не більше ніж N одночасних завдань

Swift SELF
Рівень 69 , Лекція 4
Відкрита

1. Навіщо обмежувати паралелізм

Якщо ви щойно відкрили для себе конкурентність, хочеться влаштувати вечірку: «Дивіться, мамо, я все запускаю паралельно!». У TaskGroup це справді дуже просто: у циклі додаєте завдання для кожного елемента, а потім збираєте результати. Але реальний світ має почуття гумору: мережа має ліміти, API може заблокувати за надто часті запити, пам’ять не безмежна, а планувальник завдань — не ваш особистий дворецький.

Уявіть наш LibraryCLI: команда fetch-many отримує список BookID і завантажує дані через мережу. Якщо в користувача 500 ID і ми запустимо 500 мережевих запитів одночасно, можемо самі собі створити проблеми: від «429 Забагато запитів» до нестабільних тайм-аутів і просто некрасивих піків навантаження.

З технічного боку це ще й питання «скільки завдань одночасно живе в системі». TaskGroup уміє динамічно додавати завдання через addTask. Але сам по собі він не змушує вас бути обережними. Обережність — це вже наша робота.

Базовий анти‑приклад: «заповнили групу всім входом»

Спершу подивімося на підхід, який часто пишуть першим: він логічний, але на великих вхідних даних може бути занадто агресивним.

import Foundation

func naiveParallelFetch(ids: [Int]) async -> [Int] {
    await withTaskGroup(of: Int.self, returning: [Int].self) { group in
        for id in ids {
            group.addTask { id * 10 } // Уявімо, що тут мережевий запит
        }

        var results: [Int] = []
        while let value = await group.next() {
            results.append(value)
        }
        return results
    }
}

Код коректний: ми додали завдання й зібрали результати. Але проблема в тому, що кількість одночасно активних завдань дорівнює розміру ids. За ids.count == 10 це мило. За ids.count == 10_000 це вже схоже на спробу зʼїсти слона цілком — і ще й виделкою.

Що таке ліміт паралелізму

Ліміт паралелізму — це правило: у будь-який момент часу виконується не більше N завдань. При цьому ми все ще використовуємо TaskGroup, все ще збираємо результати через next() і все ще живемо в structured concurrency, тобто не відпускаємо завдання «в космос».

Ідея проста, але дуже інженерна: ми хочемо використовувати ресурси ефективно, але без перевантаження. У TaskGroup результати надходять у міру завершення, тобто в порядку завершення (completion order). І це зручно: щойно одне завдання завершилося, ми можемо запустити наступне. Такий патерн часто називають «one-in-one-out».

У Swift Evolution цей шаблон прямо описують як спосіб примусово обмежити максимальну конкурентність групи: спочатку заповнити групу до ліміту, а потім дочекатися завершення й додати наступне завдання.

Шаблон «заповнили → одне завершилося → додали наступне»

Зараз — центральна техніка лекції. Я рекомендую читати цей розділ як рецепт: не тому, що без нього ви не програміст, а тому, що згодом писатимете це на автоматі (як if let, тільки менш драматично).

Спершу зафіксуймо мінімальний набір змінних, які потрібні в батьківському коді:

Змінна Сенс
limit
нормалізований ліміт (мінімум 1)
nextIndex
індекс наступного елемента, який ще не надіслано в групу
results/out
накопичувач результатів — масив або словник, який ми заповнюємо в батьківському коді

А тепер — схема:

flowchart TD
    A[Є вхідний масив items] --> B[Заповнюємо групу першими limit завданнями]
    B --> C{Є готовий результат?}
    C -->|"await next()"| D[Отримали результат і зберегли його]
    D --> E{Залишилися елементи?}
    E -->|так| F[Додали наступне завдання]
    F --> C
    E -->|ні| G{"Група порожня? next() повернув nil"}
    G --> H[Повертаємо підсумок]

Зверніть увагу на смислову красу: у нас є конвеєр, який підтримує ширину паралелізму, але не роздуває її безконтрольно.

2. Мініприклади: як реалізувати ліміт

«Іграшкове» завдання, щоб побачити механіку

Перед тим як переходити до нашого LibraryCLI, зробімо маленький приклад на Task.sleep, щоб побачити, що ліміт справді працює.

import Foundation

func mockWork(id: Int) async -> Int {
    let delay = UInt64((id % 5) + 1) * 50_000_000
    try? await Task.sleep(nanoseconds: delay)
    return id * 2
}

Тепер функція з лімітом:

import Foundation

func parallelMapWithLimit(ids: [Int], maxConcurrency: Int) async -> [Int] {
    let limit = max(1, maxConcurrency)

    return await withTaskGroup(of: Int.self, returning: [Int].self) { group in
        var nextIndex = 0

        while nextIndex < ids.count && nextIndex < limit {
            let id = ids[nextIndex]
            group.addTask { await mockWork(id: id) }
            nextIndex += 1
        }

        var results: [Int] = []
        while let value = await group.next() {
            results.append(value)

            if nextIndex < ids.count {
                let id = ids[nextIndex]
                group.addTask { await mockWork(id: id) }
                nextIndex += 1
            }
        }
        return results
    }
}

Тут важливий момент: group.next() повертає наступний готовий результат у порядку завершення. Саме тому ми можемо «підкидати» нове завдання рівно тоді, коли одне завершилося.

Ліміт + збереження порядку

Коли ми збираємо results.append(value), порядок буде порядком завершення, а не вхідним. Це нормально, але іноді нам потрібно повернути результати в початковому порядку ID.

У попередніх лекціях ми вже використовували трюк (index, value). Тут поєднаємо його з обмеженням паралелізму.

import Foundation

func parallelMapOrderedWithLimit(ids: [Int], maxConcurrency: Int) async -> [Int] {
    let limit = max(1, maxConcurrency)

    return await withTaskGroup(of: (Int, Int).self, returning: [Int].self) { group in
        var nextIndex = 0

        while nextIndex < ids.count && nextIndex < limit {
            let index = nextIndex
            let id = ids[index]
            group.addTask { (index, await mockWork(id: id)) }
            nextIndex += 1
        }

        var out = Array<Int?>(repeating: nil, count: ids.count)
        while let (index, value) = await group.next() {
            out[index] = value

            if nextIndex < ids.count {
                let index = nextIndex
                let id = ids[index]
                group.addTask { (index, await mockWork(id: id)) }
                nextIndex += 1
            }
        }

        return out.compactMap { $0 }
    }
}

Так, compactMap { $0 } виглядає як маленький фінальний ритуал — але він дуже зручний: ми перетворили [Int?] назад на [Int], бо впевнені, що все заповнено.

3. Вбудовуємо ліміт у застосунок: LibraryCLI і fetch-many

Тепер давайте зробимо крок від «іграшкового id * 2» до реальної логіки курсу. Нагадаю контекст: у нас є мережевий шар (умовно ApiClient), і команда fetch-many хоче отримати книги за кількома BookID.

Ми не будемо зараз заглиблюватися в URLSession і декодування — це вже було раніше. Наше завдання: правильно розпаралелити велику кількість запитів, але не надто широко.

«Одна книга» як Result для часткового успіху

Почнімо з маленького допоміжного методу: «отримай одну книгу, але поверни Result».

import Foundation

struct Book {
    let id: String
    let title: String
}

enum FetchError: Error {
    case notFound(String)
}

func fetchBookResult(id: String) async -> Result<Book, Error> {
    // Уявімо, що тут: try await apiClient.fetch(...)
    if id == "bad" { return .failure(FetchError.notFound(id)) }
    return .success(Book(id: id, title: "Книга \(id)"))
}

«Багато книг» з лімітом і збереженням порядку

Тепер — «fetch many» з лімітом. Зверніть увагу: ми одночасно розв’язуємо три задачі: динамічну кількість запитів, ліміт паралелізму і збереження порядку.

import Foundation

func fetchManyBooks(ids: [String], maxConcurrency: Int) async -> [Result<Book, Error>] {
    let limit = max(1, maxConcurrency)

    return await withTaskGroup(of: (Int, Result<Book, Error>).self,
                              returning: [Result<Book, Error>].self) { group in
        var nextIndex = 0

        while nextIndex < ids.count && nextIndex < limit {
            let index = nextIndex
            let id = ids[index]
            group.addTask { (index, await fetchBookResult(id: id)) }
            nextIndex += 1
        }

        var out = Array<Result<Book, Error>?>(repeating: nil, count: ids.count)

        while let (index, result) = await group.next() {
            out[index] = result

            if nextIndex < ids.count {
                let index = nextIndex
                let id = ids[index]
                group.addTask { (index, await fetchBookResult(id: id)) }
                nextIndex += 1
            }
        }

        return out.compactMap { $0 }
    }
}

Чому тут Result, а не throws? Тому що ми хочемо частковий успіх: якщо 2 книги не знайшлися, але 18 знайшлися, ми хочемо повернути 18 успіхів і 2 помилки, а не завалити все на першій помилці. Ми свідомо перетворюємо помилку на дані.

4. Практика: вибрати N і не зламатися на реальності

Як вибрати N

Значення maxConcurrency майже ніколи не буває ідеальним назавжди. Воно залежить від природи роботи.

Якщо завдання CPU-bound (багато обчислень), надто велике N може призвести до конкуренції за процесорний час і до погіршення загальної продуктивності. Якщо завдання I/O-bound (як мережеві запити), трохи більше N часто пришвидшує виконання загалом, тому що поки один запит чекає мережу, інший може працювати.

Але є ще зовнішні обмеження: rate limiting API, обмеження сервера, обмеження вашої власної машини, та й просто щоб логи не перетворилися на салат.

На практиці в CLI-інструментах часто починають із невеликих значень на кшталт 4, 8 або 16, а потім, якщо потрібно, роблять це параметром команди. Головне, що тепер у вас є механізм, який коректно реалізує «не більше N одночасно», а не просто надія на вдачу.

До речі, важливий контраст: async let добре підходить для фіксованої кількості завдань, але погано, точніше, ніяк, не виражає «динамічний пул завдань». async let не призначений для ситуації, коли кількість паралельних завдань залежить від розміру масиву під час виконання. Тому TaskGroup + ліміт — природне рішення.

Граничні випадки, які ламають «красивий код»

У цьому місці багато рішень «майже працюють», але падають на реальності. Давайте проговоримо їх спокійно, як дорослі люди, які вже бачили баг «чому воно зависло».

Якщо вхід порожній (ids.isEmpty), наш код усе одно працює коректно: ми не додамо жодного завдання, а цикл while let ... = await group.next() одразу завершиться, тому що next() поверне nil на порожній групі. Корисно пам’ятати про це, щоб не будувати зайву перевірку заради перевірки.

Якщо maxConcurrency дорівнює 0 (або від’ємний), то логічно ви заборонили запускати взагалі все. На практиці це перетворюється на «нічого не відбувається». Тому ми нормалізуємо ліміт через max(1, maxConcurrency). Це маленький захист від дивних вхідних даних.

Якщо ви помилилися в межах початкового заповнення (наприклад, написали while nextIndex <= limit), ви отримаєте класичну проблему off-by-one: або запустите на одне завдання більше за ліміт, або пропустите останній елемент. У такій логіці краще тримати дві перевірки разом: nextIndex < ids.count && nextIndex < limit, як у прикладах.

Чи можна зробити простіше

Іноді хочеться зробити чергу всередині дочірніх завдань або змінювати спільний лічильник активних завдань. Це виглядає як спроба вручну написати планувальник. Погана новина: ви пишете планувальник гірший за runtime Swift, ще й із більшим шансом отримати гонки або зламану логіку прогресу.

Саме тому ми тримаємося простого патерну: завдання нічого не знають про ліміти, вони просто виконують роботу. Ліміт реалізує батьківський код, який читає результати через next(). Сам устрій TaskGroup передбачає, що завдання додаються через addTask, а результати забираються через next().

5. Типові помилки

Помилка № 1: ліміт «0», і жодного завдання не запускається.
Це виглядає як «нічого не відбувається» і легко сплутати з зависанням мережі. На практиці проблема найчастіше в тому, що maxConcurrency прийшов з аргументу команди й виявився нулем. Нормалізуйте ліміт хоча б до 1, інакше ви буквально заборонили системі працювати.

Помилка № 2: неправильне початкове заповнення групи, через що частина елементів не обробляється.
Коли ви заповнюєте групу першими limit завданнями, легко помилитися в умові циклу й не додати, наприклад, останній елемент масиву. Протиотрута проста: в умові тримайте одночасно «не вийшли за вхід» і «не вийшли за ліміт», тобто nextIndex < ids.count && nextIndex < limit, а індекс завжди збільшуйте лише в одному місці.

Помилка № 3: ви додаєте нові завдання не в момент завершення старих, і ліміт перестає бути лімітом.
Сам сенс «не більше N одночасно» тримається на дисципліні: нове завдання додаємо лише після того, як отримали готовий результат через group.next(). Якщо ви почнете addTask з інших гілок коду (або спробуєте робити це з дочірнього завдання), ви дуже швидко перестанете розуміти реальну кількість активних завдань.

Помилка № 4: ви очікуєте, що порядок результатів збережеться сам.
З лімітом чи без ліміту TaskGroup віддає значення в порядку завершення. Якщо вам потрібен початковий порядок, повертайте з завдання індекс (index, value) і розкладіть результати за індексами. Інакше ви отримаєте «правильні дані в неправильному порядку» — один із найнеприємніших різновидів майже перемоги.

Помилка № 5: побічні ефекти відбуваються всередині addTask, і ви ловите «напівзавершений стан».
Якщо дочірні завдання пишуть у спільний репозиторій, файл або змінюють спільний масив, то в разі помилок чи скасування ви ризикуєте отримати стан «половина записалася, половина ні, а що саме — невідомо». Набагато надійніше збирати результати (або Result) у батька, а застосовувати зміни окремим послідовним кроком.

1
Опитування
Пріоритети задач, рівень 69, лекція 4
Недоступний
Пріоритети задач
Swift Concurrency: пріоритети й групи задач
Коментарі
ЩОБ ПОДИВИТИСЯ ВСІ КОМЕНТАРІ АБО ЗАЛИШИТИ КОМЕНТАР,
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ