JavaRush /Курсы /JAVA 25 SELF /Параллельная обработка файлов: ForkJoin, Parallel Streams...

Параллельная обработка файлов: ForkJoin, Parallel Streams

JAVA 25 SELF
59 уровень , 1 лекция
Открыта

1. Параллельные стримы (parallelStream): просто и удобно

Если вы уже работали со Stream API, то знаете, как удобно фильтровать, преобразовывать и собирать коллекции. Важный бонус: любой поток можно сделать параллельным буквально одной строкой — включить parallel(), и элементы коллекции начнут обрабатываться конкурентно.

Это особенно полезно для независимых операций над набором файлов: подсчитать строки, найти подстроку, скопировать, сжать и т.д.

Пример: параллельный подсчёт строк во всех файлах каталога

Вариант 1: последовательно

import java.nio.file.*;
import java.io.IOException;
import java.util.List;

public class LogLineCounter {
    public static void main(String[] args) throws IOException {
        Path logDir = Paths.get("logs");
        long totalLines = 0;
        try (DirectoryStream<Path> stream = Files.newDirectoryStream(logDir, "*.log")) {
            for (Path file : stream) {
                long lines = Files.lines(file).count();
                totalLines += lines;
            }
        }
        System.out.println("Всего строк во всех логах: " + totalLines);
    }
}

Комментарий: всё делается по очереди, один файл за другим. Если файлов много и они большие, ждать придётся долго.

Вариант 2: параллельно!

import java.nio.file.*;
import java.io.IOException;
import java.util.stream.Stream;

public class LogLineCounterParallel {
    public static void main(String[] args) throws IOException {
        Path logDir = Paths.get("logs");
        try (Stream<Path> files = Files.list(logDir)) {
            long totalLines = files
                .filter(path -> path.toString().endsWith(".log"))
                .parallel() // вот и вся магия!
                .mapToLong(file -> {
                    try (Stream<String> lines = Files.lines(file)) {
                        return lines.count();
                    } catch (IOException e) {
                        e.printStackTrace();
                        return 0L;
                    }
                })
                .sum();
            System.out.println("Всего строк во всех логах: " + totalLines);
        }
    }
}

Комментарий: ключевая строка — .parallel(). На многоядерном процессоре программа, как правило, сработает заметно быстрее.

Как это работает?

  • parallel() превращает обычный стрим в параллельный. Под капотом используется общий пул ForkJoinPool (потоков по умолчанию столько, сколько ядер).
  • Каждый файл обрабатывается независимо, результаты агрегируются через терминальные операции (например, sum()).
  • Если файлов мало — ускорения может не быть; если их сотни — выигрыш обычно заметен.

Важно!

  • Параллельные стримы не ускоряют сами операции I/O; они позволяют делать несколько операций одновременно. На быстрых носителях (SSD) это помогает, на медленных (HDD) можно упереться в «узкое место» диска.

2. ForkJoinPool: «разделяй и властвуй» на практике

ForkJoin — фреймворк для параллельных вычислений «разделяй и властвуй»: большую задачу разбиваем на подзадачи, выполняем их параллельно и объединяем результаты. Управляет этим специальный пул — ForkJoinPool. Именно он используется «за кулисами» параллельных стримов, но им можно управлять и напрямую для большей гибкости.

Такая модель особенно хороша для рекурсивных структур (деревья каталогов), крупных массивов данных и задач, которые легко декомпозировать на независимые части.

Пример: рекурсивный поиск по дереву каталогов

Найдём все ".txt"-файлы (включая вложенные папки) и посчитаем суммарное количество строк.

import java.nio.file.*;
import java.util.concurrent.*;
import java.util.*;
import java.io.IOException;
import java.util.stream.Collectors;

public class FolderLineCounter extends RecursiveTask<Long> {
    private final Path dir;

    public FolderLineCounter(Path dir) {
        this.dir = dir;
    }

    @Override
    protected Long compute() {
        List<FolderLineCounter> subTasks = new ArrayList<>();
        long lines = 0;
        try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir)) {
            for (Path entry : stream) {
                if (Files.isDirectory(entry)) {
                    FolderLineCounter task = new FolderLineCounter(entry);
                    task.fork(); // Запускаем подзадачу
                    subTasks.add(task);
                } else if (entry.toString().endsWith(".txt")) {
                    try (Stream<String> fileLines = Files.lines(entry)) {
                        lines += fileLines.count();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        // Собираем результаты из подзадач
        for (FolderLineCounter task : subTasks) {
            lines += task.join();
        }
        return lines;
    }

    public static void main(String[] args) {
        Path root = Paths.get("big_folder");
        ForkJoinPool pool = new ForkJoinPool();
        FolderLineCounter counter = new FolderLineCounter(root);
        long totalLines = pool.invoke(counter);
        System.out.println("Всего строк во всех .txt-файлах: " + totalLines);
    }
}

Что здесь происходит:

  • Для каждой папки создаётся отдельная задача (FolderLineCounter), для подкаталогов — свои подзадачи (fork()).
  • Файлы считаются на месте, результаты суммируются после join() всех подзадач.

В чём преимущество ForkJoin?

  • Эффективно работает с большими иерархиями (деревья каталогов).
  • Максимально загружает ядра процессора.
  • Позволяет точно контролировать распараллеливание и границы задач.

3. Практические сценарии применения

Массовая обработка файлов

Например, нужно скопировать тысячи фотографий в резервную папку.

import java.nio.file.*;
import java.util.List;
import java.util.stream.Collectors;

public class ParallelFileCopier {
    public static void main(String[] args) throws Exception {
        Path sourceDir = Paths.get("photos");
        Path destDir = Paths.get("photos_backup");
        Files.createDirectories(destDir);

        List<Path> files = Files.list(sourceDir)
                .filter(Files::isRegularFile)
                .collect(Collectors.toList());

        files.parallelStream().forEach(file -> {
            try {
                Path destFile = destDir.resolve(file.getFileName());
                Files.copy(file, destFile, StandardCopyOption.REPLACE_EXISTING);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        System.out.println("Все файлы скопированы!");
    }
}

Комментарий: каждый файл копируется в отдельном потоке. При большом числе файлов ускорение заметно.

Параллельное сжатие/распаковка

Аналогично можно распараллелить сжатие, пересчёт хэшей, преобразование формата изображений и т.п. через parallelStream() или свой ForkJoinPool.

4. Важные замечания и ограничения

  • I/O-операции не всегда выигрывают от параллелизма. Если диск или сеть — «узкое место», сотня параллельных задач только увеличит конкуренцию за ресурс.
  • Не запускайте слишком много потоков. По умолчанию параллельные стримы используют общий пул ForkJoinPool.commonPool() с уровнем параллелизма ≈ числу ядер. Его можно изменить через свойство "java.util.concurrent.ForkJoinPool.common.parallelism", но делайте это осознанно.
  • Не забывайте про синхронизацию. Если несколько потоков пишут в один и тот же файл/объект — используйте синхронизацию и очереди; для независимых файлов — синхронизация не нужна.

5. Краткое знакомство с FileChannel и позиционным доступом

Для продвинутых сценариев (например, параллельное чтение разных частей одного большого файла) используйте java.nio.channels.FileChannel, который поддерживает позиционное чтение/запись.

Пример: чтение разных частей файла в разных потоках

import java.nio.channels.FileChannel;
import java.nio.file.*;
import java.nio.ByteBuffer;

public class FileChunkReader implements Runnable {
    private final Path path;
    private final long position;
    private final int size;

    public FileChunkReader(Path path, long position, int size) {
        this.path = path;
        this.position = position;
        this.size = size;
    }

    @Override
    public void run() {
        try (FileChannel channel = FileChannel.open(path, StandardOpenOption.READ)) {
            ByteBuffer buffer = ByteBuffer.allocate(size);
            channel.read(buffer, position);
            // Обработка данных
            System.out.println("Прочитано " + buffer.position() + " байт с позиции " + position);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Комментарий: запускайте несколько таких задач — каждая читает свой диапазон. Но осторожно: не все диски и ФС любят сильную параллельную нагрузку.

6. Типичные ошибки при параллельной обработке файлов

Ошибка №1: параллельная запись в один файл без синхронизации. Данные перемешиваются и портятся. Используйте очереди, буферизацию и синхронизацию записи.

Ошибка №2: слишком много параллелизма. Параллельные стримы на слабом железе/маленьких файлах дают накладные расходы и могут замедлить выполнение.

Ошибка №3: игнорирование ошибок I/O. В параллельных стримах исключения легко «потерять» — обрабатывайте их внутри лямбд, логируйте, учитывайте сбои.

Ошибка №4: не закрытые ресурсы. Всегда используйте try-with-resources для потоков/каналов, иначе получите утечки и странные ошибки.

Ошибка №5: ожидание «магии» от parallel(). Параллельность ускоряет только при достаточном объёме работы и доступных ресурсах (CPU, быстрый диск). Сам по себе вызов parallel() — не серебряная пуля.

1
Задача
JAVA 25 SELF, 59 уровень, 1 лекция
Недоступна
Подсчёт общего размера файлов в каталоге с помощью ForkJoin
Подсчёт общего размера файлов в каталоге с помощью ForkJoin
1
Задача
JAVA 25 SELF, 59 уровень, 1 лекция
Недоступна
Параллельная обработка частей одного файла
Параллельная обработка частей одного файла
Комментарии
ЧТОБЫ ПОСМОТРЕТЬ ВСЕ КОММЕНТАРИИ ИЛИ ОСТАВИТЬ КОММЕНТАРИЙ,
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ