1. Вспоминаем Stream API
Вы уже знакомы со Stream API — это удобный способ работы с коллекциями, который позволяет писать компактный и понятный код для обработки данных: фильтрации, сортировки, подсчёта и т.д.
Вот классический пример:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
int sum = numbers.stream()
.filter(n -> n % 2 == 0)
.mapToInt(n -> n)
.sum();
System.out.println(sum); // 6 (2 + 4)
В этом примере коллекция превращается в поток (stream()), из него отбираются только чётные числа, затем они преобразуются в int, и результат суммируется вызовом sum().
Stream API делает код короче и выразительнее: вместо того чтобы описывать пошагово, как именно всё происходит, вы просто говорите, что хотите получить. К тому же при необходимости можно легко переключиться на параллельную обработку — всего одной строкой.
2. Параллельные стримы: синтаксис и принцип работы
Как сделать стрим параллельным?
Всё просто: вместо stream() используйте parallelStream(). Или вызовите метод .parallel() для существующего стрима.
List<Integer> numbers = ...;
int sum = numbers.parallelStream()
.filter(n -> n % 2 == 0)
.mapToInt(n -> n)
.sum();
Или так:
numbers.stream()
.parallel() // превращаем в параллельный стрим
.filter(...)
.map(...)
.sum();
Что происходит под капотом?
- Коллекция автоматически разбивается на части.
- Каждая часть обрабатывается в отдельном потоке (используется ForkJoinPool — специальный пул потоков).
- Результаты объединяются в итоговое значение.
То есть, если у вас многоядерный процессор, обработка реально идёт параллельно — например, фильтрация и подсчёт суммы могут выполняться одновременно на нескольких ядрах.
Где это особенно полезно?
- Обработка больших коллекций (десятки тысяч элементов и больше).
- Сложные вычисления для каждого элемента.
- Отсутствие необходимости сохранять строгий порядок обработки.
Пример: сравнение последовательного и параллельного стрима
Давайте посмотрим на простой пример с обработкой большого массива.
import java.util.*;
import java.util.stream.*;
public class ParallelStreamDemo {
public static void main(String[] args) {
List<Integer> numbers = IntStream.rangeClosed(1, 10_000_000)
.boxed()
.collect(Collectors.toList());
// Последовательный стрим
long time1 = System.currentTimeMillis();
long count1 = numbers.stream()
.filter(n -> n % 2 == 0)
.count();
long time2 = System.currentTimeMillis();
System.out.println("Последовательно: " + (time2 - time1) + " мс, чётных: " + count1);
// Параллельный стрим
long time3 = System.currentTimeMillis();
long count2 = numbers.parallelStream()
.filter(n -> n % 2 == 0)
.count();
long time4 = System.currentTimeMillis();
System.out.println("Параллельно: " + (time4 - time3) + " мс, чётных: " + count2);
}
}
Попробуйте этот код на своём компьютере — скорее всего, параллельный стрим обработает коллекцию быстрее (особенно если у вас многоядерный процессор). Но не всегда! О нюансах — ниже.
3. Как это работает: ForkJoinPool и автоматическое разбиение
Параллельные стримы используют под капотом ForkJoinPool.commonPool(), который автоматически управляет количеством потоков (обычно — по количеству доступных процессорных ядер).
Схематично:
+-----------------------------+
| Ваша коллекция |
+-----------------------------+
| 1 | 2 | 3 | ... | 10 млн |
+----+----+----+-----+--------+
| | | |
v v v v
[Поток1][Поток2]...[ПотокN]
| | | |
+----+----+-----------+
|
[Объединение результата]
Каждый поток обрабатывает свою часть, потом результаты складываются.
4. Ограничения и подводные камни
Параллельные стримы — это не магическая кнопка «ускорить всё». Иногда они даже замедляют выполнение!
Когда параллелить невыгодно:
- Коллекция маленькая (до ~1000 элементов).
- Операция над каждым элементом очень быстрая (например, просто n * 2).
- Вам важен строгий порядок обработки (например, для последовательной записи в файл).
Почему? Создание и синхронизация потоков тоже занимает время. Если сама задача «мелкая», накладные расходы могут превышать выгоду от распараллеливания.
Побочные эффекты — враг параллелизма
Если ваши операции внутри стрима изменяют внешние переменные, будьте осторожны!
Плохой пример:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
int[] sum = {0};
numbers.parallelStream().forEach(n -> sum[0] += n);
System.out.println(sum[0]); // ??? (ожидаете 15, а получите что угодно)
Почему? Потому что несколько потоков одновременно меняют одну переменную — возникает race condition (состояние гонки). Итоговое значение может быть неправильным.
Правильный способ — использовать методы стрима, возвращающие результат:
int sum = numbers.parallelStream().mapToInt(n -> n).sum();
Не все коллекции одинаково «параллелятся»
Некоторые коллекции (например, обычный ArrayList) хорошо разбиваются на части. А вот LinkedList или поток с бесконечным числом элементов (например, Stream.generate(...)) — не очень.
5. Практика: сравнение производительности
Пример: поиск максимального числа
import java.util.*;
import java.util.stream.*;
public class ParallelMaxDemo {
public static void main(String[] args) {
List<Integer> numbers = IntStream.rangeClosed(1, 30_000_000)
.boxed()
.collect(Collectors.toList());
// Последовательно
long t1 = System.currentTimeMillis();
int max1 = numbers.stream().max(Integer::compareTo).get();
long t2 = System.currentTimeMillis();
System.out.println("Последовательно: " + (t2 - t1) + " мс, max = " + max1);
// Параллельно
long t3 = System.currentTimeMillis();
int max2 = numbers.parallelStream().max(Integer::compareTo).get();
long t4 = System.currentTimeMillis();
System.out.println("Параллельно: " + (t4 - t3) + " мс, max = " + max2);
}
}
Что увидим? На современных многоядерных процессорах параллельный стрим обычно быстрее. Но если заменить 30_000_000 на 1000, разницы не будет — а иногда параллельный даже медленнее!
6. Примеры использования: фильтрация, агрегация, сортировка
Фильтрация и подсчёт
List<String> names = Arrays.asList("Аня", "Борис", "Вася", "Гриша", "Даша", "Егор", "Женя");
long count = names.parallelStream()
.filter(name -> name.length() == 4)
.count();
System.out.println("Имен длиной 4: " + count);
Группировка
List<String> words = Arrays.asList("кот", "кит", "кот", "пес", "кит", "кот");
Map<String, Long> freq = words.parallelStream()
.collect(Collectors.groupingBy(
w -> w,
Collectors.counting()
));
System.out.println(freq); // {пес=1, кит=2, кот=3}
Сортировка (но тут параллелизм не всегда даёт прирост!)
List<Integer> bigList = IntStream.rangeClosed(1, 5_000_000)
.boxed()
.collect(Collectors.toList());
long t1 = System.currentTimeMillis();
List<Integer> sorted = bigList.parallelStream()
.sorted()
.collect(Collectors.toList());
long t2 = System.currentTimeMillis();
System.out.println("Параллельная сортировка: " + (t2 - t1) + " мс");
7. Важные нюансы и рекомендации
Когда стоит использовать parallelStream()
- Коллекция большая (десятки тысяч элементов и больше).
- Операция над элементом «тяжёлая» (сложные вычисления, работа с файлами/сетью).
- Нет зависимости от порядка элементов.
- Нет побочных эффектов (не изменяются внешние переменные).
Когда НЕ стоит использовать parallelStream()
- Коллекция маленькая.
- Операция быстрая.
- Нужно строгое сохранение порядка.
- Есть доступ к общим переменным (рассмотрите thread-safe коллекции или другие подходы).
Как узнать, сколько потоков используется?
По умолчанию — по количеству ядер процессора: Runtime.getRuntime().availableProcessors(). Можно изменить это поведение через системное свойство:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "8");
Делайте это только если понимаете последствия — иначе можно «забить» процессор и получить тормоза.
8. Типичные ошибки при работе с параллельными стримами
Ошибка №1: Побочные эффекты внутри forEach
Многие думают: «Сейчас я параллельно заполню список!»
List<Integer> result = new ArrayList<>();
IntStream.range(0, 1_000)
.parallel()
.forEach(result::add); // ОПАСНО!
System.out.println(result.size()); // Результат — случайный!
Почему это плохо? ArrayList не потокобезопасен, и при одновременном добавлении из разных потоков результат непредсказуем: могут быть пропуски, дубли, исключения.
Решение: Используйте методы сбора стрима (collect), которые сами обеспечивают безопасность, или специальные коллекции.
List<Integer> result = IntStream.range(0, 1_000)
.parallel()
.boxed()
.collect(Collectors.toList());
Ошибка №2: Ожидание ускорения на маленьких задачах
Параллелизм — не бесплатный! Если коллекция маленькая, параллельный стрим может работать медленнее из-за накладных расходов на планирование и синхронизацию.
Ошибка №3: Нарушение порядка
Если вам важен порядок элементов (например, при записи в файл), не используйте параллельные стримы — порядок не гарантируется (или будет работать медленнее).
Ошибка №4: Использование «неудачных» коллекций
Некоторые коллекции (например, LinkedList, нестандартные структуры) плохо разбиваются на части — эффективность параллелизма снижается.
Ошибка №5: Игнорирование thread-safety при сборе результатов
Если вы собираете результаты вручную (например, добавляете в список), используйте потокобезопасные коллекции (CopyOnWriteArrayList, ConcurrentLinkedQueue) или методы сбора стрима.
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ