JavaRush /Курси /JAVA 25 SELF /Паралельна обробка файлів: ForkJoin, Parallel Streams

Паралельна обробка файлів: ForkJoin, Parallel Streams

JAVA 25 SELF
Рівень 59 , Лекція 1
Відкрита

1. Паралельні стріми (parallelStream): просто й зручно

Якщо ви вже працювали зі Stream API, то знаєте, як зручно фільтрувати, перетворювати й збирати колекції. Важливий бонус: будь-який стрім можна зробити паралельним буквально одним рядком коду — увімкнути parallel() — і елементи колекції почнуть оброблятися паралельно.

Це особливо корисно для незалежних операцій над набором файлів: порахувати рядки, знайти підрядок, скопіювати, стиснути тощо.

Приклад: паралельний підрахунок рядків у всіх файлах каталогу

Варіант 1: послідовно

import java.nio.file.*;
import java.io.IOException;
import java.util.List;

public class LogLineCounter {
    public static void main(String[] args) throws IOException {
        Path logDir = Paths.get("logs");
        long totalLines = 0;
        try (DirectoryStream<Path> stream = Files.newDirectoryStream(logDir, "*.log")) {
            for (Path file : stream) {
                long lines = Files.lines(file).count();
                totalLines += lines;
            }
        }
        System.out.println("Усього рядків у всіх логах: " + totalLines);
    }
}

Коментар: усе виконується послідовно, файл за файлом. Якщо файлів багато й вони великі — чекати доведеться довго.

Варіант 2: паралельно!

import java.nio.file.*;
import java.io.IOException;
import java.util.stream.Stream;

public class LogLineCounterParallel {
    public static void main(String[] args) throws IOException {
        Path logDir = Paths.get("logs");
        try (Stream<Path> files = Files.list(logDir)) {
            long totalLines = files
                .filter(path -> path.toString().endsWith(".log"))
                .parallel() // Ось і вся магія!
                .mapToLong(file -> {
                    try (Stream<String> lines = Files.lines(file)) {
                        return lines.count();
                    } catch (IOException e) {
                        e.printStackTrace();
                        return 0L;
                    }
                })
                .sum();
            System.out.println("Усього рядків у всіх логах: " + totalLines);
        }
    }
}

Коментар: ключовий рядок — .parallel(). Зазвичай на багатоядерному процесорі програма спрацює помітно швидше.

Як це працює?

  • parallel() перетворює звичайний стрім на паралельний. Під капотом використовується спільний пул ForkJoinPool (кількість потоків за замовчуванням дорівнює кількості ядер).
  • Кожен файл обробляється незалежно, результати агрегуються через термінальні операції (наприклад, sum()).
  • Якщо файлів мало — прискорення може й не бути; якщо їх сотні — виграш зазвичай відчутний.

Важливо!

  • Паралельні стріми не пришвидшують самі I/O-операції; вони дають змогу виконувати кілька операцій одночасно. На швидких носіях (SSD) це допомагає, на повільних (HDD) можна впертися у «вузьке місце» диска.

2. ForkJoinPool: «розділяй і володарюй» на практиці

ForkJoin — фреймворк для паралельних обчислень «розділяй і володарюй»: велику задачу розбиваємо на підзадачі, виконуємо їх паралельно й об’єднуємо результати. Керує цим спеціальний пул — ForkJoinPool. Саме він використовується «за лаштунками» паралельних стрімів, але ним можна керувати безпосередньо для більшої гнучкості.

Особливо ефективна для рекурсивних структур (дерева каталогів), великих масивів даних і задач, які легко декомпозувати на незалежні частини.

Приклад: рекурсивний пошук у дереві каталогів

Знайдемо всі файли ".txt" (включно з вкладеними папками) і порахуємо загальну кількість рядків.

import java.nio.file.*;
import java.util.concurrent.*;
import java.util.*;
import java.io.IOException;
import java.util.stream.Collectors;

public class FolderLineCounter extends RecursiveTask<Long> {
    private final Path dir;

    public FolderLineCounter(Path dir) {
        this.dir = dir;
    }

    @Override
    protected Long compute() {
        List<FolderLineCounter> subTasks = new ArrayList<>();
        long lines = 0;
        try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir)) {
            for (Path entry : stream) {
                if (Files.isDirectory(entry)) {
                    FolderLineCounter task = new FolderLineCounter(entry);
                    task.fork(); // Запускаємо підзадачу
                    subTasks.add(task);
                } else if (entry.toString().endsWith(".txt")) {
                    try (Stream<String> fileLines = Files.lines(entry)) {
                        lines += fileLines.count();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        // Збираємо результати з підзадач
        for (FolderLineCounter task : subTasks) {
            lines += task.join();
        }
        return lines;
    }

    public static void main(String[] args) {
        Path root = Paths.get("big_folder");
        ForkJoinPool pool = new ForkJoinPool();
        FolderLineCounter counter = new FolderLineCounter(root);
        long totalLines = pool.invoke(counter);
        System.out.println("Усього рядків у всіх .txt-файлах: " + totalLines);
    }
}

Що тут відбувається:

  • Для кожної папки створюється окреме завдання (FolderLineCounter), для підкаталогів — свої підзадачі (fork()).
  • Файли рахуються на місці, результати підсумовуються після join() усіх підзадач.

У чому перевага ForkJoin?

  • Ефективно працює з великими ієрархіями (дерева каталогів).
  • Максимально завантажує ядра процесора.
  • Дозволяє точно контролювати розпаралелювання та межі задач.

3. Практичні сценарії застосування

Масова обробка файлів

Наприклад, потрібно скопіювати тисячі фотографій у резервну папку.

import java.nio.file.*;
import java.util.List;
import java.util.stream.Collectors;

public class ParallelFileCopier {
    public static void main(String[] args) throws Exception {
        Path sourceDir = Paths.get("photos");
        Path destDir = Paths.get("photos_backup");
        Files.createDirectories(destDir);

        List<Path> files = Files.list(sourceDir)
                .filter(Files::isRegularFile)
                .collect(Collectors.toList());

        files.parallelStream().forEach(file -> {
            try {
                Path destFile = destDir.resolve(file.getFileName());
                Files.copy(file, destFile, StandardCopyOption.REPLACE_EXISTING);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        System.out.println("Усі файли скопійовано!");
    }
}

Коментар: кожен файл копіюється в окремому потоці. За великої кількості файлів прискорення помітне.

Паралельне стискання/розпакування

Аналогічно можна розпаралелити стискання, перерахунок хешів, перетворення формату зображень тощо через parallelStream() або свій ForkJoinPool.

4. Важливі зауваження та обмеження

  • I/O-операції не завжди виграють від паралелізму. Якщо диск або мережа — «вузьке місце», сотня паралельних задач лише збільшить конкуренцію за ресурс.
  • Не запускайте занадто багато потоків. За замовчуванням паралельні стріми використовують спільний пул ForkJoinPool.commonPool() з рівнем паралелізму ≈ кількості ядер. Його можна змінити через властивість "java.util.concurrent.ForkJoinPool.common.parallelism", але робіть це свідомо.
  • Не забувайте про синхронізацію. Якщо кілька потоків пишуть в один і той самий файл/об’єкт — використовуйте синхронізацію та черги; для незалежних файлів — синхронізація не потрібна.

5. Коротке знайомство з FileChannel і позиційним доступом

Для складніших сценаріїв (наприклад, паралельне читання різних частин одного великого файлу) використовуйте java.nio.channels.FileChannel, який підтримує позиційне читання та запис.

Приклад: читання різних частин файлу в різних потоках

import java.nio.channels.FileChannel;
import java.nio.file.*;
import java.nio.ByteBuffer;

public class FileChunkReader implements Runnable {
    private final Path path;
    private final long position;
    private final int size;

    public FileChunkReader(Path path, long position, int size) {
        this.path = path;
        this.position = position;
        this.size = size;
    }

    @Override
    public void run() {
        try (FileChannel channel = FileChannel.open(path, StandardOpenOption.READ)) {
            ByteBuffer buffer = ByteBuffer.allocate(size);
            channel.read(buffer, position);
            // Обробка даних
            System.out.println("Прочитано " + buffer.position() + " байт з позиції " + position);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Коментар: запускайте кілька таких задач — кожна читає свій діапазон. Але обережно: не всі диски та файлові системи добре переносять надмірне паралельне навантаження.

6. Типові помилки під час паралельної обробки файлів

Помилка № 1: паралельний запис в один файл без синхронізації. Дані перемішуються й псуються. Використовуйте черги, буферизацію та синхронізацію запису.

Помилка № 2: занадто багато паралелізму. Паралельні стріми на слабкому «залізі»/маленьких файлах дають накладні витрати й можуть уповільнити виконання.

Помилка № 3: ігнорування помилок I/O. У паралельних стрімах винятки легко «загубити» — обробляйте їх усередині лямбд, логуйте, враховуйте збої.

Помилка № 4: незакриті ресурси. Завжди використовуйте try-with-resources для потоків/каналів, інакше отримаєте витоки та дивні помилки.

Помилка № 5: очікування «магії» від parallel(). Паралельність пришвидшує лише за достатнього обсягу роботи та наявних ресурсів (CPU, швидкий диск). Сам по собі виклик parallel() — не срібна куля.

Коментарі
ЩОБ ПОДИВИТИСЯ ВСІ КОМЕНТАРІ АБО ЗАЛИШИТИ КОМЕНТАР,
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ