JavaRush /Курси /JAVA 25 SELF /Конвеєри обробки файлів: producer–consumer

Конвеєри обробки файлів: producer–consumer

JAVA 25 SELF
Рівень 59 , Лекція 4
Відкрита

1. Як скоординувати кілька потоків під час обробки файлів

Коли ви працюєте з великими файлами або складною обробкою даних, часто виникає завдання: розподілити роботу між кількома потоками. Наприклад, один потік читає рядки з файлу, а інші — обробляють ці рядки (шукають слова, рахують статистику тощо).

У чому проблема?

  • Якщо всі потоки працюватимуть із одним і тим самим ресурсом (наприклад, із файлом або спільною колекцією), легко отримати помилки: одні потоки можуть «випередити» інші, виникнуть умови гонки, втрата даних, перевантаження пам’яті.
  • Якщо потоків багато, а координації немає — одні простоюватимуть, інші перевантажуватимуться.

Потрібне рішення, яке:

  • Дозволяє безпечно обмінюватися даними між потоками.
  • Не допускає переповнення пам’яті (якщо обробка йде повільніше, ніж читання).
  • Дозволяє легко завершувати роботу всіх потоків, коли дані закінчилися.

2. Патерн «Producer–Consumer» (Постачальник–Споживач)

Producer–Consumer — це класичний патерн, який допомагає потокам працювати злагоджено, не заважаючи одне одному.

У ньому є дві ролі: Producer (постачальник) створює дані — наприклад, читає рядки з файлу або отримує повідомлення з мережі — і кладе їх у спільну чергу. Consumer (споживач) бере дані з цієї черги й обробляє їх: рахує слова, зберігає в базу або пише в інший файл.

Головна ідея в тому, що ці два типи потоків працюють незалежно. Постачальник може читати швидше, ніж споживач встигає обробляти, або навпаки — і при цьому вони не блокують одне одного. Між ними знаходиться буфер — черга, яка вирівнює темп роботи.

Візуальна схема

[Файл] --(читає)--> [Producer] --(кладає в чергу)--> [BlockingQueue] --(бере)--> [Consumer] --(обробляє)

3. Реалізація з BlockingQueue

У Java для організації такого обміну ідеально підходить інтерфейс BlockingQueue (наприклад, реалізація ArrayBlockingQueue).

Що таке BlockingQueue?

BlockingQueue — це потокобезпечна черга з обмеженим розміром, яка сама дбає про синхронізацію. Якщо постачальник намагається додати елемент, а черга вже заповнена, потік автоматично блокується і чекає, доки з’явиться місце. Аналогічно, якщо споживач намагається взяти елемент, а черга порожня, він просто чекає, доки хтось покладе щось у чергу.

Такий механізм автоматично розв’язує класичну проблему умов гонки між потоками і переповнення пам’яті: постачальники не закидають чергу зайвими елементами, а споживачі не намагаються працювати з порожнечею. Усі потоки спокійно та злагоджено виконують свою роботу.

Приклад створення черги

import java.util.concurrent.*;

BlockingQueue<String> queue = new ArrayBlockingQueue<>(100); // буфер на 100 елементів

100 — максимальна кількість елементів у черзі. Якщо постачальник працює швидше, він чекатиме, доки споживач не обробить частину даних.

4. Координація потоків: backpressure і завершення роботи

Обмеження розміру черги (backpressure)

Backpressure — це механізм, який не дозволяє постачальнику «залити» всю пам’ять, якщо споживач не встигає обробляти дані.

  • Якщо черга заповнена, постачальник автоматично «пригальмовує» (метод put() блокується).
  • Якщо черга порожня, споживач чекає (метод take() блокується).

Це дозволяє системі працювати стабільно навіть за різної швидкості потоків.

Завершення роботи: «poison pill» (отруйна пігулка)

Коли постачальник закінчив читати файл, потрібно якось повідомити споживачам, що даних більше не буде, і їм пора завершуватися.

Рішення:

  • У чергу кладеться спеціальний об’єкт — «poison pill» (наприклад, рядок "__END__" або інше спеціальне значення, але не null).
  • Споживач, отримавши «poison pill», розуміє, що пора завершуватися.

Якщо споживачів кілька, потрібно покласти стільки «poison pill», скільки споживачів!

5. Приклад конвеєра: читання файлу та обробка рядків

Давайте реалізуємо простий конвеєр:

  • Один потік читає рядки з файлу й кладе їх у чергу.
  • Кілька потоків беруть рядки з черги, рахують кількість слів і виводять результат.

Крок 1. Producer — зчитувач файлу

import java.io.*;
import java.util.concurrent.*;

public class FileProducer implements Runnable {
    private final BlockingQueue<String> queue;
    private final File file;
    private final int consumerCount;
    private final String POISON_PILL = "__END__";

    public FileProducer(BlockingQueue<String> queue, File file, int consumerCount) {
        this.queue = queue;
        this.file = file;
        this.consumerCount = consumerCount;
    }

    @Override
    public void run() {
        try (BufferedReader reader = new BufferedReader(new FileReader(file))) {
            String line;
            while ((line = reader.readLine()) != null) {
                queue.put(line); // якщо черга заповнена — чекаємо
            }
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        } finally {
            // Кладемо poison pill для кожного споживача
            try {
                for (int i = 0; i < consumerCount; i++) {
                    queue.put(POISON_PILL);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

Крок 2. Consumer — обробник рядків

import java.util.concurrent.BlockingQueue;

public class LineConsumer implements Runnable {
    private final BlockingQueue<String> queue;
    private final String POISON_PILL = "__END__";

    public LineConsumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                String line = queue.take(); // якщо черга порожня — чекаємо
                if (POISON_PILL.equals(line)) {
                    break; // завершення роботи
                }
                int wordCount = line.trim().isEmpty() ? 0 : line.trim().split("\\s+").length;
                System.out.println(Thread.currentThread().getName() + ": " + wordCount + " слів у рядку: " + line);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

Крок 3. Запуск конвеєра через Executors

import java.io.File;
import java.util.concurrent.*;

public class PipelineDemo {
    public static void main(String[] args) throws Exception {
        int consumerCount = 3;
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);

        File file = new File("input.txt"); // ваш файл

        // Запускаємо producer
        Thread producer = new Thread(new FileProducer(queue, file, consumerCount));
        producer.start();

        // Запускаємо consumers через ExecutorService
        ExecutorService consumers = Executors.newFixedThreadPool(consumerCount);
        for (int i = 0; i < consumerCount; i++) {
            consumers.submit(new LineConsumer(queue));
        }

        // Чекаємо завершення producer
        producer.join();

        // Чекаємо завершення consumers
        consumers.shutdown();
        consumers.awaitTermination(1, TimeUnit.MINUTES);

        System.out.println("Оброблення завершено!");
    }
}

6. Візуалізація: схема роботи конвеєра

flowchart LR A[Producer: читає файл] -- put() --> Q[BlockingQueue] Q -- take() --> C1[Consumer 1: рахує слова] Q -- take() --> C2[Consumer 2: рахує слова] Q -- take() --> C3[Consumer 3: рахує слова] A -.->|poison pill| Q Q -.->|poison pill| C1 Q -.->|poison pill| C2 Q -.->|poison pill| C3

7. Корисні нюанси та найкращі практики

  • Обмежуйте розмір черги — це захищає від переповнення пам’яті.
  • Використовуйте «poison pill» для завершення — інакше споживачі можуть зависнути назавжди.
  • Не використовуйте null як «poison pill», якщо в черзі можуть бути справжні null-значення. Краще спеціальний рядок або об’єкт.
  • Обробляйте InterruptedException — це важливо для коректного завершення потоків.
  • Використовуйте ExecutorService для керування пулом потоків — це простіше й безпечніше, ніж вручну створювати потоки.

8. Типові помилки під час реалізації конвеєра

Помилка № 1: Необмежена черга → OutOfMemoryError. Якщо використовувати LinkedBlockingQueue без обмеження розміру, постачальник може «залити» всю пам’ять, якщо споживач не встигає.

Помилка № 2: Не поклали «poison pill» → споживачі зависають. Якщо забути покласти «отруйні пігулки», потоки-споживачі чекатимуть нові дані вічно.

Помилка № 3: Поклали лише одну «poison pill», а споживачів кілька. Кожному потоку-споживачу потрібна своя «пігулка» — інакше завершаться не всі.

Помилка № 4: Не обробили InterruptedException. Якщо потік перервано, потрібно коректно завершити роботу (відновити прапор переривання), інакше можливі «завислі» потоки.

Помилка № 5: Використання спільної змінної між потоками без синхронізації. Не намагайтеся «винаходити велосипед» — використовуйте BlockingQueue, а не звичайні списки чи масиви.

1
Опитування
Паралельна робота з файлами, рівень 59, лекція 4
Недоступний
Паралельна робота з файлами
Паралельна робота з файлами
Коментарі
ЩОБ ПОДИВИТИСЯ ВСІ КОМЕНТАРІ АБО ЗАЛИШИТИ КОМЕНТАР,
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ