1. Пригадуємо Stream API
Ви вже знайомі зі Stream API — це зручний спосіб роботи з колекціями, який дозволяє писати компактний і зрозумілий код для обробки даних: фільтрації, сортування, підрахунку тощо.
Ось класичний приклад:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
int sum = numbers.stream()
.filter(n -> n % 2 == 0)
.mapToInt(n -> n)
.sum();
System.out.println(sum); // 6 (2 + 4)
У цьому прикладі колекція перетворюється на стрім (stream()), з нього відбираються лише парні числа, потім вони перетворюються на int, і результат підсумовується викликом sum().
Stream API робить код коротшим і виразнішим: замість того щоб описувати покроково, як саме все відбувається, ви просто кажете, що хочете отримати. До того ж за потреби можна легко перемкнутися на паралельну обробку — лише одним рядком коду.
2. Паралельні стріми: синтаксис і принцип роботи
Як зробити стрім паралельним?
Усе просто: замість stream() використовуйте parallelStream(). Або викличте метод .parallel() для вже наявного стріму.
List<Integer> numbers = ...;
int sum = numbers.parallelStream()
.filter(n -> n % 2 == 0)
.mapToInt(n -> n)
.sum();
Або так:
numbers.stream()
.parallel() // перетворюємо на паралельний стрім
.filter(...)
.map(...)
.sum();
Що відбувається під капотом?
- Колекція автоматично розбивається на частини.
- Кожна частина обробляється в окремому потоці (використовується ForkJoinPool — спеціальний пул потоків).
- Результати об’єднуються в підсумкове значення.
Тобто якщо у вас багатоядерний процесор, обробка дійсно відбувається паралельно — наприклад, фільтрація та підрахунок суми можуть виконуватися одночасно на кількох ядрах.
Де це особливо корисно?
- Обробка великих колекцій (десятки тисяч елементів і більше).
- Складні обчислення для кожного елемента.
- Немає потреби зберігати суворий порядок обробки.
Приклад: порівняння послідовного та паралельного стріму
Подивімося на простий приклад з обробкою великого масиву.
import java.util.*;
import java.util.stream.*;
public class ParallelStreamDemo {
public static void main(String[] args) {
List<Integer> numbers = IntStream.rangeClosed(1, 10_000_000)
.boxed()
.collect(Collectors.toList());
// Послідовний стрім
long time1 = System.currentTimeMillis();
long count1 = numbers.stream()
.filter(n -> n % 2 == 0)
.count();
long time2 = System.currentTimeMillis();
System.out.println("Послідовно: " + (time2 - time1) + " мс, парних: " + count1);
// Паралельний стрім
long time3 = System.currentTimeMillis();
long count2 = numbers.parallelStream()
.filter(n -> n % 2 == 0)
.count();
long time4 = System.currentTimeMillis();
System.out.println("Паралельно: " + (time4 - time3) + " мс, парних: " + count2);
}
}
Спробуйте цей код на своєму комп’ютері — швидше за все, паралельний стрім обробить колекцію швидше (особливо якщо у вас багатоядерний процесор). Але не завжди! Про нюанси — нижче.
3. Як це працює: ForkJoinPool і автоматичне розбиття
Паралельні стріми використовують під капотом ForkJoinPool.commonPool(), який автоматично керує кількістю потоків (зазвичай — за кількістю доступних процесорних ядер).
Схематично:
+-----------------------------+
| Ваша колекція |
+-----------------------------+
| 1 | 2 | 3 | ... | 10 млн |
+----+----+----+-----+--------+
| | | |
v v v v
[Потік1][Потік2]...[ПотікN]
| | | |
+----+----+-----------+
|
[Об'єднання результату]
Кожен потік обробляє свою частину, а потім результати об’єднуються.
4. Обмеження та підводні камені
Паралельні стріми — це не чарівна кнопка «прискорити все». Іноді вони навіть сповільнюють виконання!
Коли запускати паралельно невигідно:
- Колекція маленька (до ~1000 елементів).
- Операція над кожним елементом дуже швидка (наприклад, просто n * 2).
- Вам важливий суворий порядок обробки (наприклад, для послідовного запису у файл).
Чому? Створення та синхронізація потоків теж займає час. Якщо саме завдання «дрібне», накладні витрати можуть перевищувати вигоду від розпаралелювання.
Побічні ефекти — ворог паралелізму
Якщо ваші операції всередині стріму змінюють зовнішні змінні, будьте обережні!
Поганий приклад:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
int[] sum = {0};
numbers.parallelStream().forEach(n -> sum[0] += n);
System.out.println(sum[0]); // ??? (очікуєте 15, а отримаєте що завгодно)
Чому? Тому що кілька потоків одночасно змінюють одну змінну — виникає race condition (стан гонки). Підсумкове значення може бути неправильним.
Правильний спосіб — використовувати методи стріму, що повертають результат:
int sum = numbers.parallelStream().mapToInt(n -> n).sum();
Не всі колекції однаково добре розпаралелюються
Деякі колекції (наприклад, звичайний ArrayList) добре розбиваються на частини. А от LinkedList або стрім з нескінченною кількістю елементів (наприклад, Stream.generate(...)) — не дуже.
5. Практика: порівняння продуктивності
Приклад: пошук максимального числа
import java.util.*;
import java.util.stream.*;
public class ParallelMaxDemo {
public static void main(String[] args) {
List<Integer> numbers = IntStream.rangeClosed(1, 30_000_000)
.boxed()
.collect(Collectors.toList());
// Послідовно
long t1 = System.currentTimeMillis();
int max1 = numbers.stream().max(Integer::compareTo).get();
long t2 = System.currentTimeMillis();
System.out.println("Послідовно: " + (t2 - t1) + " мс, max = " + max1);
// Паралельно
long t3 = System.currentTimeMillis();
int max2 = numbers.parallelStream().max(Integer::compareTo).get();
long t4 = System.currentTimeMillis();
System.out.println("Паралельно: " + (t4 - t3) + " мс, max = " + max2);
}
}
Що побачимо? На сучасних багатоядерних процесорах паралельний стрім зазвичай швидший. Але якщо замінити 30_000_000 на 1000, різниці не буде — а інколи паралельний навіть повільніший!
6. Приклади використання: фільтрація, агрегація, сортування
Фільтрація та підрахунок
List<String> names = Arrays.asList("Аня", "Борис", "Вася", "Гриша", "Даша", "Єгор", "Женя");
long count = names.parallelStream()
.filter(name -> name.length() == 4)
.count();
System.out.println("Імен довжиною 4: " + count);
Групування
List<String> words = Arrays.asList("кіт", "кит", "кіт", "пес", "кит", "кіт");
Map<String, Long> freq = words.parallelStream()
.collect(Collectors.groupingBy(
w -> w,
Collectors.counting()
));
System.out.println(freq); // {пес=1, кит=2, кіт=3}
Сортування (але тут паралелізм не завжди дає приріст!)
List<Integer> bigList = IntStream.rangeClosed(1, 5_000_000)
.boxed()
.collect(Collectors.toList());
long t1 = System.currentTimeMillis();
List<Integer> sorted = bigList.parallelStream()
.sorted()
.collect(Collectors.toList());
long t2 = System.currentTimeMillis();
System.out.println("Паралельне сортування: " + (t2 - t1) + " мс");
7. Важливі нюанси та рекомендації
Коли варто використовувати parallelStream()
- Колекція велика (десятки тисяч елементів і більше).
- Операція над елементом «важка» (складні обчислення, робота з файлами/мережею).
- Немає залежності від порядку елементів.
- Немає побічних ефектів (не змінюються зовнішні змінні).
Коли НЕ варто використовувати parallelStream()
- Колекція маленька.
- Операція швидка.
- Потрібне суворе збереження порядку.
- Є доступ до спільних змінних (розгляньте потокобезпечні колекції або інші підходи).
Як дізнатися, скільки потоків використовується?
За замовчуванням — за кількістю ядер процесора: Runtime.getRuntime().availableProcessors(). Можна змінити цю поведінку через системну властивість:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "8");
Робіть це лише якщо розумієте наслідки — інакше можна «забити» процесор і отримати уповільнення.
8. Типові помилки під час роботи з паралельними стрімами
Помилка № 1: Побічні ефекти всередині forEach
Багато хто думає: «Зараз я паралельно заповню список!»
List<Integer> result = new ArrayList<>();
IntStream.range(0, 1_000)
.parallel()
.forEach(result::add); // НЕБЕЗПЕЧНО!
System.out.println(result.size()); // Результат — випадковий!
Чому це погано? ArrayList не потокобезпечний, і за одночасного додавання з різних потоків результат непередбачуваний: можуть бути пропуски, дублікати, винятки.
Рішення: Використовуйте методи збирання стріму (collect), які самі забезпечують безпеку, або спеціальні колекції.
List<Integer> result = IntStream.range(0, 1_000)
.parallel()
.boxed()
.collect(Collectors.toList());
Помилка № 2: Очікування прискорення на невеликих завданнях
Паралелізм — не безкоштовний! Якщо колекція маленька, паралельний стрім може працювати повільніше через накладні витрати на планування та синхронізацію.
Помилка № 3: Порушення порядку
Якщо вам важливий порядок елементів (наприклад, під час запису у файл), не використовуйте паралельні стріми — порядок не гарантується (або працюватиме повільніше).
Помилка № 4: Використання «невдалих» колекцій
Деякі колекції (наприклад, LinkedList, нестандартні структури) погано розбиваються на частини — ефективність паралелізму знижується.
Помилка № 5: Ігнорування потокобезпеки під час збирання результатів
Якщо ви збираєте результати вручну (наприклад, додаєте в список), використовуйте потокобезпечні колекції (CopyOnWriteArrayList, ConcurrentLinkedQueue) або методи збирання стріму.
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ