1. Як скоординувати кілька потоків під час обробки файлів
Коли ви працюєте з великими файлами або складною обробкою даних, часто виникає завдання: розподілити роботу між кількома потоками. Наприклад, один потік читає рядки з файлу, а інші — обробляють ці рядки (шукають слова, рахують статистику тощо).
У чому проблема?
- Якщо всі потоки працюватимуть із одним і тим самим ресурсом (наприклад, із файлом або спільною колекцією), легко отримати помилки: одні потоки можуть «випередити» інші, виникнуть умови гонки, втрата даних, перевантаження пам’яті.
- Якщо потоків багато, а координації немає — одні простоюватимуть, інші перевантажуватимуться.
Потрібне рішення, яке:
- Дозволяє безпечно обмінюватися даними між потоками.
- Не допускає переповнення пам’яті (якщо обробка йде повільніше, ніж читання).
- Дозволяє легко завершувати роботу всіх потоків, коли дані закінчилися.
2. Патерн «Producer–Consumer» (Постачальник–Споживач)
Producer–Consumer — це класичний патерн, який допомагає потокам працювати злагоджено, не заважаючи одне одному.
У ньому є дві ролі: Producer (постачальник) створює дані — наприклад, читає рядки з файлу або отримує повідомлення з мережі — і кладе їх у спільну чергу. Consumer (споживач) бере дані з цієї черги й обробляє їх: рахує слова, зберігає в базу або пише в інший файл.
Головна ідея в тому, що ці два типи потоків працюють незалежно. Постачальник може читати швидше, ніж споживач встигає обробляти, або навпаки — і при цьому вони не блокують одне одного. Між ними знаходиться буфер — черга, яка вирівнює темп роботи.
Візуальна схема
[Файл] --(читає)--> [Producer] --(кладає в чергу)--> [BlockingQueue] --(бере)--> [Consumer] --(обробляє)
3. Реалізація з BlockingQueue
У Java для організації такого обміну ідеально підходить інтерфейс BlockingQueue (наприклад, реалізація ArrayBlockingQueue).
Що таке BlockingQueue?
BlockingQueue — це потокобезпечна черга з обмеженим розміром, яка сама дбає про синхронізацію. Якщо постачальник намагається додати елемент, а черга вже заповнена, потік автоматично блокується і чекає, доки з’явиться місце. Аналогічно, якщо споживач намагається взяти елемент, а черга порожня, він просто чекає, доки хтось покладе щось у чергу.
Такий механізм автоматично розв’язує класичну проблему умов гонки між потоками і переповнення пам’яті: постачальники не закидають чергу зайвими елементами, а споживачі не намагаються працювати з порожнечею. Усі потоки спокійно та злагоджено виконують свою роботу.
Приклад створення черги
import java.util.concurrent.*;
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100); // буфер на 100 елементів
100 — максимальна кількість елементів у черзі. Якщо постачальник працює швидше, він чекатиме, доки споживач не обробить частину даних.
4. Координація потоків: backpressure і завершення роботи
Обмеження розміру черги (backpressure)
Backpressure — це механізм, який не дозволяє постачальнику «залити» всю пам’ять, якщо споживач не встигає обробляти дані.
- Якщо черга заповнена, постачальник автоматично «пригальмовує» (метод put() блокується).
- Якщо черга порожня, споживач чекає (метод take() блокується).
Це дозволяє системі працювати стабільно навіть за різної швидкості потоків.
Завершення роботи: «poison pill» (отруйна пігулка)
Коли постачальник закінчив читати файл, потрібно якось повідомити споживачам, що даних більше не буде, і їм пора завершуватися.
Рішення:
- У чергу кладеться спеціальний об’єкт — «poison pill» (наприклад, рядок "__END__" або інше спеціальне значення, але не null).
- Споживач, отримавши «poison pill», розуміє, що пора завершуватися.
Якщо споживачів кілька, потрібно покласти стільки «poison pill», скільки споживачів!
5. Приклад конвеєра: читання файлу та обробка рядків
Давайте реалізуємо простий конвеєр:
- Один потік читає рядки з файлу й кладе їх у чергу.
- Кілька потоків беруть рядки з черги, рахують кількість слів і виводять результат.
Крок 1. Producer — зчитувач файлу
import java.io.*;
import java.util.concurrent.*;
public class FileProducer implements Runnable {
private final BlockingQueue<String> queue;
private final File file;
private final int consumerCount;
private final String POISON_PILL = "__END__";
public FileProducer(BlockingQueue<String> queue, File file, int consumerCount) {
this.queue = queue;
this.file = file;
this.consumerCount = consumerCount;
}
@Override
public void run() {
try (BufferedReader reader = new BufferedReader(new FileReader(file))) {
String line;
while ((line = reader.readLine()) != null) {
queue.put(line); // якщо черга заповнена — чекаємо
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
} finally {
// Кладемо poison pill для кожного споживача
try {
for (int i = 0; i < consumerCount; i++) {
queue.put(POISON_PILL);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
Крок 2. Consumer — обробник рядків
import java.util.concurrent.BlockingQueue;
public class LineConsumer implements Runnable {
private final BlockingQueue<String> queue;
private final String POISON_PILL = "__END__";
public LineConsumer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
String line = queue.take(); // якщо черга порожня — чекаємо
if (POISON_PILL.equals(line)) {
break; // завершення роботи
}
int wordCount = line.trim().isEmpty() ? 0 : line.trim().split("\\s+").length;
System.out.println(Thread.currentThread().getName() + ": " + wordCount + " слів у рядку: " + line);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Крок 3. Запуск конвеєра через Executors
import java.io.File;
import java.util.concurrent.*;
public class PipelineDemo {
public static void main(String[] args) throws Exception {
int consumerCount = 3;
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
File file = new File("input.txt"); // ваш файл
// Запускаємо producer
Thread producer = new Thread(new FileProducer(queue, file, consumerCount));
producer.start();
// Запускаємо consumers через ExecutorService
ExecutorService consumers = Executors.newFixedThreadPool(consumerCount);
for (int i = 0; i < consumerCount; i++) {
consumers.submit(new LineConsumer(queue));
}
// Чекаємо завершення producer
producer.join();
// Чекаємо завершення consumers
consumers.shutdown();
consumers.awaitTermination(1, TimeUnit.MINUTES);
System.out.println("Оброблення завершено!");
}
}
6. Візуалізація: схема роботи конвеєра
7. Корисні нюанси та найкращі практики
- Обмежуйте розмір черги — це захищає від переповнення пам’яті.
- Використовуйте «poison pill» для завершення — інакше споживачі можуть зависнути назавжди.
- Не використовуйте null як «poison pill», якщо в черзі можуть бути справжні null-значення. Краще спеціальний рядок або об’єкт.
- Обробляйте InterruptedException — це важливо для коректного завершення потоків.
- Використовуйте ExecutorService для керування пулом потоків — це простіше й безпечніше, ніж вручну створювати потоки.
8. Типові помилки під час реалізації конвеєра
Помилка № 1: Необмежена черга → OutOfMemoryError. Якщо використовувати LinkedBlockingQueue без обмеження розміру, постачальник може «залити» всю пам’ять, якщо споживач не встигає.
Помилка № 2: Не поклали «poison pill» → споживачі зависають. Якщо забути покласти «отруйні пігулки», потоки-споживачі чекатимуть нові дані вічно.
Помилка № 3: Поклали лише одну «poison pill», а споживачів кілька. Кожному потоку-споживачу потрібна своя «пігулка» — інакше завершаться не всі.
Помилка № 4: Не обробили InterruptedException. Якщо потік перервано, потрібно коректно завершити роботу (відновити прапор переривання), інакше можливі «завислі» потоки.
Помилка № 5: Використання спільної змінної між потоками без синхронізації. Не намагайтеся «винаходити велосипед» — використовуйте BlockingQueue, а не звичайні списки чи масиви.
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ