1. Параллельные стримы (parallelStream): просто и удобно
Если вы уже работали со Stream API, то знаете, как удобно фильтровать, преобразовывать и собирать коллекции. Важный бонус: любой поток можно сделать параллельным буквально одной строкой — включить parallel(), и элементы коллекции начнут обрабатываться конкурентно.
Это особенно полезно для независимых операций над набором файлов: подсчитать строки, найти подстроку, скопировать, сжать и т.д.
Пример: параллельный подсчёт строк во всех файлах каталога
Вариант 1: последовательно
import java.nio.file.*;
import java.io.IOException;
import java.util.List;
public class LogLineCounter {
public static void main(String[] args) throws IOException {
Path logDir = Paths.get("logs");
long totalLines = 0;
try (DirectoryStream<Path> stream = Files.newDirectoryStream(logDir, "*.log")) {
for (Path file : stream) {
long lines = Files.lines(file).count();
totalLines += lines;
}
}
System.out.println("Всего строк во всех логах: " + totalLines);
}
}
Комментарий: всё делается по очереди, один файл за другим. Если файлов много и они большие, ждать придётся долго.
Вариант 2: параллельно!
import java.nio.file.*;
import java.io.IOException;
import java.util.stream.Stream;
public class LogLineCounterParallel {
public static void main(String[] args) throws IOException {
Path logDir = Paths.get("logs");
try (Stream<Path> files = Files.list(logDir)) {
long totalLines = files
.filter(path -> path.toString().endsWith(".log"))
.parallel() // вот и вся магия!
.mapToLong(file -> {
try (Stream<String> lines = Files.lines(file)) {
return lines.count();
} catch (IOException e) {
e.printStackTrace();
return 0L;
}
})
.sum();
System.out.println("Всего строк во всех логах: " + totalLines);
}
}
}
Комментарий: ключевая строка — .parallel(). На многоядерном процессоре программа, как правило, сработает заметно быстрее.
Как это работает?
- parallel() превращает обычный стрим в параллельный. Под капотом используется общий пул ForkJoinPool (потоков по умолчанию столько, сколько ядер).
- Каждый файл обрабатывается независимо, результаты агрегируются через терминальные операции (например, sum()).
- Если файлов мало — ускорения может не быть; если их сотни — выигрыш обычно заметен.
Важно!
- Параллельные стримы не ускоряют сами операции I/O; они позволяют делать несколько операций одновременно. На быстрых носителях (SSD) это помогает, на медленных (HDD) можно упереться в «узкое место» диска.
2. ForkJoinPool: «разделяй и властвуй» на практике
ForkJoin — фреймворк для параллельных вычислений «разделяй и властвуй»: большую задачу разбиваем на подзадачи, выполняем их параллельно и объединяем результаты. Управляет этим специальный пул — ForkJoinPool. Именно он используется «за кулисами» параллельных стримов, но им можно управлять и напрямую для большей гибкости.
Такая модель особенно хороша для рекурсивных структур (деревья каталогов), крупных массивов данных и задач, которые легко декомпозировать на независимые части.
Пример: рекурсивный поиск по дереву каталогов
Найдём все ".txt"-файлы (включая вложенные папки) и посчитаем суммарное количество строк.
import java.nio.file.*;
import java.util.concurrent.*;
import java.util.*;
import java.io.IOException;
import java.util.stream.Collectors;
public class FolderLineCounter extends RecursiveTask<Long> {
private final Path dir;
public FolderLineCounter(Path dir) {
this.dir = dir;
}
@Override
protected Long compute() {
List<FolderLineCounter> subTasks = new ArrayList<>();
long lines = 0;
try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir)) {
for (Path entry : stream) {
if (Files.isDirectory(entry)) {
FolderLineCounter task = new FolderLineCounter(entry);
task.fork(); // Запускаем подзадачу
subTasks.add(task);
} else if (entry.toString().endsWith(".txt")) {
try (Stream<String> fileLines = Files.lines(entry)) {
lines += fileLines.count();
} catch (IOException e) {
e.printStackTrace();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
// Собираем результаты из подзадач
for (FolderLineCounter task : subTasks) {
lines += task.join();
}
return lines;
}
public static void main(String[] args) {
Path root = Paths.get("big_folder");
ForkJoinPool pool = new ForkJoinPool();
FolderLineCounter counter = new FolderLineCounter(root);
long totalLines = pool.invoke(counter);
System.out.println("Всего строк во всех .txt-файлах: " + totalLines);
}
}
Что здесь происходит:
- Для каждой папки создаётся отдельная задача (FolderLineCounter), для подкаталогов — свои подзадачи (fork()).
- Файлы считаются на месте, результаты суммируются после join() всех подзадач.
В чём преимущество ForkJoin?
- Эффективно работает с большими иерархиями (деревья каталогов).
- Максимально загружает ядра процессора.
- Позволяет точно контролировать распараллеливание и границы задач.
3. Практические сценарии применения
Массовая обработка файлов
Например, нужно скопировать тысячи фотографий в резервную папку.
import java.nio.file.*;
import java.util.List;
import java.util.stream.Collectors;
public class ParallelFileCopier {
public static void main(String[] args) throws Exception {
Path sourceDir = Paths.get("photos");
Path destDir = Paths.get("photos_backup");
Files.createDirectories(destDir);
List<Path> files = Files.list(sourceDir)
.filter(Files::isRegularFile)
.collect(Collectors.toList());
files.parallelStream().forEach(file -> {
try {
Path destFile = destDir.resolve(file.getFileName());
Files.copy(file, destFile, StandardCopyOption.REPLACE_EXISTING);
} catch (Exception e) {
e.printStackTrace();
}
});
System.out.println("Все файлы скопированы!");
}
}
Комментарий: каждый файл копируется в отдельном потоке. При большом числе файлов ускорение заметно.
Параллельное сжатие/распаковка
Аналогично можно распараллелить сжатие, пересчёт хэшей, преобразование формата изображений и т.п. через parallelStream() или свой ForkJoinPool.
4. Важные замечания и ограничения
- I/O-операции не всегда выигрывают от параллелизма. Если диск или сеть — «узкое место», сотня параллельных задач только увеличит конкуренцию за ресурс.
- Не запускайте слишком много потоков. По умолчанию параллельные стримы используют общий пул ForkJoinPool.commonPool() с уровнем параллелизма ≈ числу ядер. Его можно изменить через свойство "java.util.concurrent.ForkJoinPool.common.parallelism", но делайте это осознанно.
- Не забывайте про синхронизацию. Если несколько потоков пишут в один и тот же файл/объект — используйте синхронизацию и очереди; для независимых файлов — синхронизация не нужна.
5. Краткое знакомство с FileChannel и позиционным доступом
Для продвинутых сценариев (например, параллельное чтение разных частей одного большого файла) используйте java.nio.channels.FileChannel, который поддерживает позиционное чтение/запись.
Пример: чтение разных частей файла в разных потоках
import java.nio.channels.FileChannel;
import java.nio.file.*;
import java.nio.ByteBuffer;
public class FileChunkReader implements Runnable {
private final Path path;
private final long position;
private final int size;
public FileChunkReader(Path path, long position, int size) {
this.path = path;
this.position = position;
this.size = size;
}
@Override
public void run() {
try (FileChannel channel = FileChannel.open(path, StandardOpenOption.READ)) {
ByteBuffer buffer = ByteBuffer.allocate(size);
channel.read(buffer, position);
// Обработка данных
System.out.println("Прочитано " + buffer.position() + " байт с позиции " + position);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Комментарий: запускайте несколько таких задач — каждая читает свой диапазон. Но осторожно: не все диски и ФС любят сильную параллельную нагрузку.
6. Типичные ошибки при параллельной обработке файлов
Ошибка №1: параллельная запись в один файл без синхронизации. Данные перемешиваются и портятся. Используйте очереди, буферизацию и синхронизацию записи.
Ошибка №2: слишком много параллелизма. Параллельные стримы на слабом железе/маленьких файлах дают накладные расходы и могут замедлить выполнение.
Ошибка №3: игнорирование ошибок I/O. В параллельных стримах исключения легко «потерять» — обрабатывайте их внутри лямбд, логируйте, учитывайте сбои.
Ошибка №4: не закрытые ресурсы. Всегда используйте try-with-resources для потоков/каналов, иначе получите утечки и странные ошибки.
Ошибка №5: ожидание «магии» от parallel(). Параллельность ускоряет только при достаточном объёме работы и доступных ресурсах (CPU, быстрый диск). Сам по себе вызов parallel() — не серебряная пуля.
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ