1. Паралельні стріми (parallelStream): просто й зручно
Якщо ви вже працювали зі Stream API, то знаєте, як зручно фільтрувати, перетворювати й збирати колекції. Важливий бонус: будь-який стрім можна зробити паралельним буквально одним рядком коду — увімкнути parallel() — і елементи колекції почнуть оброблятися паралельно.
Це особливо корисно для незалежних операцій над набором файлів: порахувати рядки, знайти підрядок, скопіювати, стиснути тощо.
Приклад: паралельний підрахунок рядків у всіх файлах каталогу
Варіант 1: послідовно
import java.nio.file.*;
import java.io.IOException;
import java.util.List;
public class LogLineCounter {
public static void main(String[] args) throws IOException {
Path logDir = Paths.get("logs");
long totalLines = 0;
try (DirectoryStream<Path> stream = Files.newDirectoryStream(logDir, "*.log")) {
for (Path file : stream) {
long lines = Files.lines(file).count();
totalLines += lines;
}
}
System.out.println("Усього рядків у всіх логах: " + totalLines);
}
}
Коментар: усе виконується послідовно, файл за файлом. Якщо файлів багато й вони великі — чекати доведеться довго.
Варіант 2: паралельно!
import java.nio.file.*;
import java.io.IOException;
import java.util.stream.Stream;
public class LogLineCounterParallel {
public static void main(String[] args) throws IOException {
Path logDir = Paths.get("logs");
try (Stream<Path> files = Files.list(logDir)) {
long totalLines = files
.filter(path -> path.toString().endsWith(".log"))
.parallel() // Ось і вся магія!
.mapToLong(file -> {
try (Stream<String> lines = Files.lines(file)) {
return lines.count();
} catch (IOException e) {
e.printStackTrace();
return 0L;
}
})
.sum();
System.out.println("Усього рядків у всіх логах: " + totalLines);
}
}
}
Коментар: ключовий рядок — .parallel(). Зазвичай на багатоядерному процесорі програма спрацює помітно швидше.
Як це працює?
- parallel() перетворює звичайний стрім на паралельний. Під капотом використовується спільний пул ForkJoinPool (кількість потоків за замовчуванням дорівнює кількості ядер).
- Кожен файл обробляється незалежно, результати агрегуються через термінальні операції (наприклад, sum()).
- Якщо файлів мало — прискорення може й не бути; якщо їх сотні — виграш зазвичай відчутний.
Важливо!
- Паралельні стріми не пришвидшують самі I/O-операції; вони дають змогу виконувати кілька операцій одночасно. На швидких носіях (SSD) це допомагає, на повільних (HDD) можна впертися у «вузьке місце» диска.
2. ForkJoinPool: «розділяй і володарюй» на практиці
ForkJoin — фреймворк для паралельних обчислень «розділяй і володарюй»: велику задачу розбиваємо на підзадачі, виконуємо їх паралельно й об’єднуємо результати. Керує цим спеціальний пул — ForkJoinPool. Саме він використовується «за лаштунками» паралельних стрімів, але ним можна керувати безпосередньо для більшої гнучкості.
Особливо ефективна для рекурсивних структур (дерева каталогів), великих масивів даних і задач, які легко декомпозувати на незалежні частини.
Приклад: рекурсивний пошук у дереві каталогів
Знайдемо всі файли ".txt" (включно з вкладеними папками) і порахуємо загальну кількість рядків.
import java.nio.file.*;
import java.util.concurrent.*;
import java.util.*;
import java.io.IOException;
import java.util.stream.Collectors;
public class FolderLineCounter extends RecursiveTask<Long> {
private final Path dir;
public FolderLineCounter(Path dir) {
this.dir = dir;
}
@Override
protected Long compute() {
List<FolderLineCounter> subTasks = new ArrayList<>();
long lines = 0;
try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir)) {
for (Path entry : stream) {
if (Files.isDirectory(entry)) {
FolderLineCounter task = new FolderLineCounter(entry);
task.fork(); // Запускаємо підзадачу
subTasks.add(task);
} else if (entry.toString().endsWith(".txt")) {
try (Stream<String> fileLines = Files.lines(entry)) {
lines += fileLines.count();
} catch (IOException e) {
e.printStackTrace();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
// Збираємо результати з підзадач
for (FolderLineCounter task : subTasks) {
lines += task.join();
}
return lines;
}
public static void main(String[] args) {
Path root = Paths.get("big_folder");
ForkJoinPool pool = new ForkJoinPool();
FolderLineCounter counter = new FolderLineCounter(root);
long totalLines = pool.invoke(counter);
System.out.println("Усього рядків у всіх .txt-файлах: " + totalLines);
}
}
Що тут відбувається:
- Для кожної папки створюється окреме завдання (FolderLineCounter), для підкаталогів — свої підзадачі (fork()).
- Файли рахуються на місці, результати підсумовуються після join() усіх підзадач.
У чому перевага ForkJoin?
- Ефективно працює з великими ієрархіями (дерева каталогів).
- Максимально завантажує ядра процесора.
- Дозволяє точно контролювати розпаралелювання та межі задач.
3. Практичні сценарії застосування
Масова обробка файлів
Наприклад, потрібно скопіювати тисячі фотографій у резервну папку.
import java.nio.file.*;
import java.util.List;
import java.util.stream.Collectors;
public class ParallelFileCopier {
public static void main(String[] args) throws Exception {
Path sourceDir = Paths.get("photos");
Path destDir = Paths.get("photos_backup");
Files.createDirectories(destDir);
List<Path> files = Files.list(sourceDir)
.filter(Files::isRegularFile)
.collect(Collectors.toList());
files.parallelStream().forEach(file -> {
try {
Path destFile = destDir.resolve(file.getFileName());
Files.copy(file, destFile, StandardCopyOption.REPLACE_EXISTING);
} catch (Exception e) {
e.printStackTrace();
}
});
System.out.println("Усі файли скопійовано!");
}
}
Коментар: кожен файл копіюється в окремому потоці. За великої кількості файлів прискорення помітне.
Паралельне стискання/розпакування
Аналогічно можна розпаралелити стискання, перерахунок хешів, перетворення формату зображень тощо через parallelStream() або свій ForkJoinPool.
4. Важливі зауваження та обмеження
- I/O-операції не завжди виграють від паралелізму. Якщо диск або мережа — «вузьке місце», сотня паралельних задач лише збільшить конкуренцію за ресурс.
- Не запускайте занадто багато потоків. За замовчуванням паралельні стріми використовують спільний пул ForkJoinPool.commonPool() з рівнем паралелізму ≈ кількості ядер. Його можна змінити через властивість "java.util.concurrent.ForkJoinPool.common.parallelism", але робіть це свідомо.
- Не забувайте про синхронізацію. Якщо кілька потоків пишуть в один і той самий файл/об’єкт — використовуйте синхронізацію та черги; для незалежних файлів — синхронізація не потрібна.
5. Коротке знайомство з FileChannel і позиційним доступом
Для складніших сценаріїв (наприклад, паралельне читання різних частин одного великого файлу) використовуйте java.nio.channels.FileChannel, який підтримує позиційне читання та запис.
Приклад: читання різних частин файлу в різних потоках
import java.nio.channels.FileChannel;
import java.nio.file.*;
import java.nio.ByteBuffer;
public class FileChunkReader implements Runnable {
private final Path path;
private final long position;
private final int size;
public FileChunkReader(Path path, long position, int size) {
this.path = path;
this.position = position;
this.size = size;
}
@Override
public void run() {
try (FileChannel channel = FileChannel.open(path, StandardOpenOption.READ)) {
ByteBuffer buffer = ByteBuffer.allocate(size);
channel.read(buffer, position);
// Обробка даних
System.out.println("Прочитано " + buffer.position() + " байт з позиції " + position);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Коментар: запускайте кілька таких задач — кожна читає свій діапазон. Але обережно: не всі диски та файлові системи добре переносять надмірне паралельне навантаження.
6. Типові помилки під час паралельної обробки файлів
Помилка № 1: паралельний запис в один файл без синхронізації. Дані перемішуються й псуються. Використовуйте черги, буферизацію та синхронізацію запису.
Помилка № 2: занадто багато паралелізму. Паралельні стріми на слабкому «залізі»/маленьких файлах дають накладні витрати й можуть уповільнити виконання.
Помилка № 3: ігнорування помилок I/O. У паралельних стрімах винятки легко «загубити» — обробляйте їх усередині лямбд, логуйте, враховуйте збої.
Помилка № 4: незакриті ресурси. Завжди використовуйте try-with-resources для потоків/каналів, інакше отримаєте витоки та дивні помилки.
Помилка № 5: очікування «магії» від parallel(). Паралельність пришвидшує лише за достатнього обсягу роботи та наявних ресурсів (CPU, швидкий диск). Сам по собі виклик parallel() — не срібна куля.
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ