JavaRush /Курси /JAVA 25 SELF /Паралельні стріми: синтаксис, застосування

Паралельні стріми: синтаксис, застосування

JAVA 25 SELF
Рівень 54 , Лекція 2
Відкрита

1. Пригадуємо Stream API

Ви вже знайомі зі Stream API — це зручний спосіб роботи з колекціями, який дозволяє писати компактний і зрозумілий код для обробки даних: фільтрації, сортування, підрахунку тощо.

Ось класичний приклад:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

int sum = numbers.stream()
    .filter(n -> n % 2 == 0)
    .mapToInt(n -> n)
    .sum();

System.out.println(sum); // 6 (2 + 4)

У цьому прикладі колекція перетворюється на стрім (stream()), з нього відбираються лише парні числа, потім вони перетворюються на int, і результат підсумовується викликом sum().

Stream API робить код коротшим і виразнішим: замість того щоб описувати покроково, як саме все відбувається, ви просто кажете, що хочете отримати. До того ж за потреби можна легко перемкнутися на паралельну обробку — лише одним рядком коду.

2. Паралельні стріми: синтаксис і принцип роботи

Як зробити стрім паралельним?

Усе просто: замість stream() використовуйте parallelStream(). Або викличте метод .parallel() для вже наявного стріму.

List<Integer> numbers = ...;

int sum = numbers.parallelStream()
    .filter(n -> n % 2 == 0)
    .mapToInt(n -> n)
    .sum();

Або так:

numbers.stream()
    .parallel() // перетворюємо на паралельний стрім
    .filter(...)
    .map(...)
    .sum();

Що відбувається під капотом?

  • Колекція автоматично розбивається на частини.
  • Кожна частина обробляється в окремому потоці (використовується ForkJoinPool — спеціальний пул потоків).
  • Результати об’єднуються в підсумкове значення.

Тобто якщо у вас багатоядерний процесор, обробка дійсно відбувається паралельно — наприклад, фільтрація та підрахунок суми можуть виконуватися одночасно на кількох ядрах.

Де це особливо корисно?

  • Обробка великих колекцій (десятки тисяч елементів і більше).
  • Складні обчислення для кожного елемента.
  • Немає потреби зберігати суворий порядок обробки.

Приклад: порівняння послідовного та паралельного стріму

Подивімося на простий приклад з обробкою великого масиву.

import java.util.*;
import java.util.stream.*;

public class ParallelStreamDemo {
    public static void main(String[] args) {
        List<Integer> numbers = IntStream.rangeClosed(1, 10_000_000)
                                         .boxed()
                                         .collect(Collectors.toList());

        // Послідовний стрім
        long time1 = System.currentTimeMillis();
        long count1 = numbers.stream()
            .filter(n -> n % 2 == 0)
            .count();
        long time2 = System.currentTimeMillis();
        System.out.println("Послідовно: " + (time2 - time1) + " мс, парних: " + count1);

        // Паралельний стрім
        long time3 = System.currentTimeMillis();
        long count2 = numbers.parallelStream()
            .filter(n -> n % 2 == 0)
            .count();
        long time4 = System.currentTimeMillis();
        System.out.println("Паралельно: " + (time4 - time3) + " мс, парних: " + count2);
    }
}

Спробуйте цей код на своєму комп’ютері — швидше за все, паралельний стрім обробить колекцію швидше (особливо якщо у вас багатоядерний процесор). Але не завжди! Про нюанси — нижче.

3. Як це працює: ForkJoinPool і автоматичне розбиття

Паралельні стріми використовують під капотом ForkJoinPool.commonPool(), який автоматично керує кількістю потоків (зазвичай — за кількістю доступних процесорних ядер).

Схематично:

+-----------------------------+
|      Ваша колекція          |
+-----------------------------+
| 1  | 2  | 3  | ... | 10 млн |
+----+----+----+-----+--------+
   |    |    |           |
   v    v    v           v
[Потік1][Потік2]...[ПотікN]
   |    |    |           |
   +----+----+-----------+
        |
   [Об'єднання результату]

Кожен потік обробляє свою частину, а потім результати об’єднуються.

4. Обмеження та підводні камені

Паралельні стріми — це не чарівна кнопка «прискорити все». Іноді вони навіть сповільнюють виконання!

Коли запускати паралельно невигідно:

  • Колекція маленька (до ~1000 елементів).
  • Операція над кожним елементом дуже швидка (наприклад, просто n * 2).
  • Вам важливий суворий порядок обробки (наприклад, для послідовного запису у файл).

Чому? Створення та синхронізація потоків теж займає час. Якщо саме завдання «дрібне», накладні витрати можуть перевищувати вигоду від розпаралелювання.

Побічні ефекти — ворог паралелізму

Якщо ваші операції всередині стріму змінюють зовнішні змінні, будьте обережні!

Поганий приклад:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
int[] sum = {0};

numbers.parallelStream().forEach(n -> sum[0] += n);

System.out.println(sum[0]); // ??? (очікуєте 15, а отримаєте що завгодно)

Чому? Тому що кілька потоків одночасно змінюють одну змінну — виникає race condition (стан гонки). Підсумкове значення може бути неправильним.

Правильний спосіб — використовувати методи стріму, що повертають результат:

int sum = numbers.parallelStream().mapToInt(n -> n).sum();

Не всі колекції однаково добре розпаралелюються

Деякі колекції (наприклад, звичайний ArrayList) добре розбиваються на частини. А от LinkedList або стрім з нескінченною кількістю елементів (наприклад, Stream.generate(...)) — не дуже.

5. Практика: порівняння продуктивності

Приклад: пошук максимального числа

import java.util.*;
import java.util.stream.*;

public class ParallelMaxDemo {
    public static void main(String[] args) {
        List<Integer> numbers = IntStream.rangeClosed(1, 30_000_000)
                                         .boxed()
                                         .collect(Collectors.toList());

        // Послідовно
        long t1 = System.currentTimeMillis();
        int max1 = numbers.stream().max(Integer::compareTo).get();
        long t2 = System.currentTimeMillis();
        System.out.println("Послідовно: " + (t2 - t1) + " мс, max = " + max1);

        // Паралельно
        long t3 = System.currentTimeMillis();
        int max2 = numbers.parallelStream().max(Integer::compareTo).get();
        long t4 = System.currentTimeMillis();
        System.out.println("Паралельно: " + (t4 - t3) + " мс, max = " + max2);
    }
}

Що побачимо? На сучасних багатоядерних процесорах паралельний стрім зазвичай швидший. Але якщо замінити 30_000_000 на 1000, різниці не буде — а інколи паралельний навіть повільніший!

6. Приклади використання: фільтрація, агрегація, сортування

Фільтрація та підрахунок

List<String> names = Arrays.asList("Аня", "Борис", "Вася", "Гриша", "Даша", "Єгор", "Женя");

long count = names.parallelStream()
    .filter(name -> name.length() == 4)
    .count();

System.out.println("Імен довжиною 4: " + count);

Групування

List<String> words = Arrays.asList("кіт", "кит", "кіт", "пес", "кит", "кіт");

Map<String, Long> freq = words.parallelStream()
    .collect(Collectors.groupingBy(
        w -> w,
        Collectors.counting()
    ));

System.out.println(freq); // {пес=1, кит=2, кіт=3}

Сортування (але тут паралелізм не завжди дає приріст!)

List<Integer> bigList = IntStream.rangeClosed(1, 5_000_000)
                                 .boxed()
                                 .collect(Collectors.toList());

long t1 = System.currentTimeMillis();
List<Integer> sorted = bigList.parallelStream()
    .sorted()
    .collect(Collectors.toList());
long t2 = System.currentTimeMillis();

System.out.println("Паралельне сортування: " + (t2 - t1) + " мс");

7. Важливі нюанси та рекомендації

Коли варто використовувати parallelStream()

  • Колекція велика (десятки тисяч елементів і більше).
  • Операція над елементом «важка» (складні обчислення, робота з файлами/мережею).
  • Немає залежності від порядку елементів.
  • Немає побічних ефектів (не змінюються зовнішні змінні).

Коли НЕ варто використовувати parallelStream()

  • Колекція маленька.
  • Операція швидка.
  • Потрібне суворе збереження порядку.
  • Є доступ до спільних змінних (розгляньте потокобезпечні колекції або інші підходи).

Як дізнатися, скільки потоків використовується?

За замовчуванням — за кількістю ядер процесора: Runtime.getRuntime().availableProcessors(). Можна змінити цю поведінку через системну властивість:

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "8");

Робіть це лише якщо розумієте наслідки — інакше можна «забити» процесор і отримати уповільнення.

8. Типові помилки під час роботи з паралельними стрімами

Помилка № 1: Побічні ефекти всередині forEach
Багато хто думає: «Зараз я паралельно заповню список!»

List<Integer> result = new ArrayList<>();
IntStream.range(0, 1_000)
    .parallel()
    .forEach(result::add); // НЕБЕЗПЕЧНО!
System.out.println(result.size()); // Результат — випадковий!

Чому це погано? ArrayList не потокобезпечний, і за одночасного додавання з різних потоків результат непередбачуваний: можуть бути пропуски, дублікати, винятки.

Рішення: Використовуйте методи збирання стріму (collect), які самі забезпечують безпеку, або спеціальні колекції.

List<Integer> result = IntStream.range(0, 1_000)
    .parallel()
    .boxed()
    .collect(Collectors.toList());

Помилка № 2: Очікування прискорення на невеликих завданнях
Паралелізм — не безкоштовний! Якщо колекція маленька, паралельний стрім може працювати повільніше через накладні витрати на планування та синхронізацію.

Помилка № 3: Порушення порядку
Якщо вам важливий порядок елементів (наприклад, під час запису у файл), не використовуйте паралельні стріми — порядок не гарантується (або працюватиме повільніше).

Помилка № 4: Використання «невдалих» колекцій
Деякі колекції (наприклад, LinkedList, нестандартні структури) погано розбиваються на частини — ефективність паралелізму знижується.

Помилка № 5: Ігнорування потокобезпеки під час збирання результатів
Якщо ви збираєте результати вручну (наприклад, додаєте в список), використовуйте потокобезпечні колекції (CopyOnWriteArrayList, ConcurrentLinkedQueue) або методи збирання стріму.

Коментарі
ЩОБ ПОДИВИТИСЯ ВСІ КОМЕНТАРІ АБО ЗАЛИШИТИ КОМЕНТАР,
ПЕРЕЙДІТЬ В ПОВНУ ВЕРСІЮ