JavaRush /Курсы /JAVA 25 SELF /Конвейеры обработки файлов: producer–consumer

Конвейеры обработки файлов: producer–consumer

JAVA 25 SELF
59 уровень , 4 лекция
Открыта

1. Как скоординировать несколько потоков при обработке файлов

Когда вы работаете с большими файлами или сложной обработкой данных, часто возникает задача: разделить работу между несколькими потоками. Например, один поток читает строки из файла, а другие — обрабатывают эти строки (ищут слова, считают статистику и т.д.).

В чём проблема?

  • Если все потоки будут работать с одним и тем же ресурсом (например, с файлом или общей коллекцией), легко получить ошибки: одни потоки могут «обогнать» другие, возникнут гонки, потеря данных, перегрузка памяти.
  • Если потоков много, а координации нет — одни будут простаивать, другие перегружаться.

Нужно решение, которое:

  • Позволяет безопасно обмениваться данными между потоками.
  • Не допускает переполнения памяти (если обработка идёт медленнее, чем чтение).
  • Позволяет легко завершать работу всех потоков, когда данные закончились.

2. Паттерн «Producer–Consumer» (Поставщик–Потребитель)

Producer–Consumer — это классический паттерн, который помогает потокам работать слаженно, не мешая друг другу.

В нём есть две роли: Producer (поставщик) создаёт данные — например, читает строки из файла или получает сообщения из сети — и кладёт их в общую очередь. Consumer (потребитель) берёт данные из этой очереди и обрабатывает их: считает слова, сохраняет в базу или пишет в другой файл.

Главная идея в том, что эти два типа потоков работают независимо. Поставщик может читать быстрее, чем потребитель успевает обрабатывать, или наоборот — и при этом никто не блокируется. Между ними находится буфер — очередь, которая выравнивает темп работы.

Визуальная схема

[Файл] --(читает)--> [Producer] --(кладёт в очередь)--> [BlockingQueue] --(берёт)--> [Consumer] --(обрабатывает)

3. Реализация с BlockingQueue

В Java для организации такого обмена идеально подходит интерфейс BlockingQueue (например, реализация ArrayBlockingQueue).

Что такое BlockingQueue?

BlockingQueue — это потокобезопасная очередь с ограниченным размером, которая сама заботится о синхронизации. Если поставщик пытается добавить элемент, а очередь уже полна, поток автоматически блокируется и ждёт, пока появится место. Аналогично, если потребитель пытается взять элемент, а очередь пуста, он просто ждёт, пока кто-то положит что-то в очередь.

Такой механизм автоматически решает классическую проблему «перегонки» потоков и переполнения памяти: поставщики не закидывают очередь лишними элементами, а потребители не пытаются работать с пустотой. Все потоки спокойно и слаженно выполняют свою работу.

Пример создания очереди

import java.util.concurrent.*;

BlockingQueue<String> queue = new ArrayBlockingQueue<>(100); // буфер на 100 элементов

100 — максимальное количество элементов в очереди. Если producer работает быстрее, он будет ждать, пока consumer не обработает часть данных.

4. Координация потоков: backpressure и завершение работы

Ограничение размера очереди (backpressure)

Backpressure — это механизм, который не даёт producer'у «залить» всю память, если consumer не успевает обрабатывать данные.

  • Если очередь заполнена, producer автоматически «притормаживает» (метод put() блокируется).
  • Если очередь пуста, consumer ждёт (метод take() блокируется).

Это позволяет системе работать стабильно даже при разной скорости потоков.

Завершение работы: «poison pill» (отравленная пилюля)

Когда producer закончил читать файл, нужно как-то сообщить consumer'ам, что данных больше не будет, и им пора завершаться.

Решение:

  • В очередь кладётся специальный объект — «poison pill» (например, строка "__END__" или иное специальное значение, но не null).
  • Consumer, получив «poison pill», понимает, что пора завершаться.

Если consumer'ов несколько, нужно положить столько «poison pill», сколько consumer'ов!

5. Пример конвейера: чтение файла и обработка строк

Давайте реализуем простой конвейер:

  • Один поток читает строки из файла и кладёт их в очередь.
  • Несколько потоков берут строки из очереди, считают количество слов и выводят результат.

Шаг 1. Producer — читатель файла

import java.io.*;
import java.util.concurrent.*;

public class FileProducer implements Runnable {
    private final BlockingQueue<String> queue;
    private final File file;
    private final int consumerCount;
    private final String POISON_PILL = "__END__";

    public FileProducer(BlockingQueue<String> queue, File file, int consumerCount) {
        this.queue = queue;
        this.file = file;
        this.consumerCount = consumerCount;
    }

    @Override
    public void run() {
        try (BufferedReader reader = new BufferedReader(new FileReader(file))) {
            String line;
            while ((line = reader.readLine()) != null) {
                queue.put(line); // если очередь заполнена — ждём
            }
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        } finally {
            // Кладём poison pill для каждого consumer
            try {
                for (int i = 0; i < consumerCount; i++) {
                    queue.put(POISON_PILL);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

Шаг 2. Consumer — обработчик строк

import java.util.concurrent.BlockingQueue;

public class LineConsumer implements Runnable {
    private final BlockingQueue<String> queue;
    private final String POISON_PILL = "__END__";

    public LineConsumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                String line = queue.take(); // если очередь пуста — ждём
                if (POISON_PILL.equals(line)) {
                    break; // завершение работы
                }
                int wordCount = line.trim().isEmpty() ? 0 : line.trim().split("\\s+").length;
                System.out.println(Thread.currentThread().getName() + ": " + wordCount + " слов в строке: " + line);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

Шаг 3. Запуск конвейера через Executors

import java.io.File;
import java.util.concurrent.*;

public class PipelineDemo {
    public static void main(String[] args) throws Exception {
        int consumerCount = 3;
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);

        File file = new File("input.txt"); // ваш файл

        // Запускаем producer
        Thread producer = new Thread(new FileProducer(queue, file, consumerCount));
        producer.start();

        // Запускаем consumers через ExecutorService
        ExecutorService consumers = Executors.newFixedThreadPool(consumerCount);
        for (int i = 0; i < consumerCount; i++) {
            consumers.submit(new LineConsumer(queue));
        }

        // Ждём завершения producer
        producer.join();

        // Ждём завершения consumers
        consumers.shutdown();
        consumers.awaitTermination(1, TimeUnit.MINUTES);

        System.out.println("Обработка завершена!");
    }
}

6. Визуализация: схема работы конвейера

flowchart LR
    A[Producer: читает файл] -- put() --> Q[BlockingQueue]
    Q -- take() --> C1[Consumer 1: считает слова]
    Q -- take() --> C2[Consumer 2: считает слова]
    Q -- take() --> C3[Consumer 3: считает слова]
    A -.->|poison pill| Q
    Q -.->|poison pill| C1
    Q -.->|poison pill| C2
    Q -.->|poison pill| C3

7. Полезные нюансы и best practices

  • Ограничивайте размер очереди — это защищает от переполнения памяти.
  • Используйте «poison pill» для завершения — иначе consumer'ы могут зависнуть навсегда.
  • Не используйте null как «poison pill», если в очереди могут быть реальные null-значения. Лучше специальную строку или объект.
  • Обрабатывайте InterruptedException — это важно для корректного завершения потоков.
  • Используйте ExecutorService для управления пулом потоков — это проще и безопаснее, чем вручную создавать потоки.

8. Типичные ошибки при реализации конвейера

Ошибка №1: Неограниченная очередь → OutOfMemoryError. Если использовать LinkedBlockingQueue без ограничения размера, producer может «залить» всю память, если consumer не успевает.

Ошибка №2: Не положили «poison pill» → consumer'ы зависают. Если забыть положить «отравленные пилюли», потоки-потребители будут ждать новые данные вечно.

Ошибка №3: Положили только одну «poison pill», а consumer'ов несколько. Каждому потоку-потребителю нужна своя «пилюля» — иначе не все завершатся.

Ошибка №4: Не обработали InterruptedException. Если поток прерван, нужно корректно завершить работу (восстановить флаг прерывания), иначе возможны «зависшие» потоки.

Ошибка №5: Использование общей переменной между потоками без синхронизации. Не пытайтесь «изобрести велосипед» — используйте BlockingQueue, а не обычные списки или массивы.

1
Задача
JAVA 25 SELF, 59 уровень, 4 лекция
Недоступна
Использование poison pill для завершения работы consumer
Использование poison pill для завершения работы consumer
1
Задача
JAVA 25 SELF, 59 уровень, 4 лекция
Недоступна
Конвейер из producer и двух consumers
Конвейер из producer и двух consumers
1
Опрос
Паралельная работа с файлами, 59 уровень, 4 лекция
Недоступен
Паралельная работа с файлами
Паралельная работа с файлами
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ