1. Собственные Collector'ы: когда и как писать свои
В Java Stream API для преобразования потока в коллекцию или агрегат используется интерфейс Collector. Обычно вы пользуетесь готовыми коллекторами из класса Collectors (toList(), toMap(), groupingBy() и др.), но иногда нужно что-то особенное — и тогда вы можете написать свой собственный Collector.
Collector — это объект, который описывает, как из элементов потока собрать итоговый результат. Он определяет четыре (на самом деле пять) ключевых компонента:
- supplier — создаёт новый контейнер для сбора элементов (например, новый список или карту).
- accumulator — добавляет очередной элемент в контейнер.
- combiner — объединяет два контейнера (важно для параллельных стримов!).
- finisher — превращает контейнер в итоговый результат (например, делает его неизменяемым или преобразует в другой тип).
- characteristics — набор флагов, описывающих свойства коллектора (например, поддерживает ли параллелизм, изменяет ли тип результата и т. д.).
Сигнатура:
Collector<T, A, R>
- T — тип элементов потока,
- A — тип промежуточного аккумулятора,
- R — тип результата.
2. Пример: Collector для MultiMap (Map<K, List<V>>)
Допустим, вы хотите собрать поток пар Pair<K, V> в Map<K, List<V>> (мульти-мапу), где каждому ключу соответствует список значений.
Пример реализации:
public static <K, V> Collector<Pair<K, V>, ?, Map<K, List<V>>> toMultiMap() {
return Collector.of(
HashMap::new, // supplier
(map, pair) -> map.computeIfAbsent(pair.key(), k -> new ArrayList<>()).add(pair.value()), // accumulator
(map1, map2) -> { // combiner
map2.forEach((k, vList) -> map1.merge(k, vList, (l1, l2) -> { l1.addAll(l2); return l1; }));
return map1;
},
Function.identity(), // finisher
Collector.Characteristics.UNORDERED
);
}
Использование:
List<Pair<String, Integer>> pairs = List.of(
new Pair<>("a", 1), new Pair<>("b", 2), new Pair<>("a", 3)
);
Map<String, List<Integer>> multiMap = pairs.stream().collect(toMultiMap());
// multiMap: {a=[1, 3], b=[2]}
3. Пример: Collector для топ-N элементов
Допустим, вы хотите собрать поток в список из N наибольших элементов (например, топ-5 по убыванию).
Реализация:
public static <T> Collector<T, ?, List<T>> topN(int n, Comparator<? super T> comparator) {
return Collector.of(
() -> new PriorityQueue<>(n, comparator), // supplier
(pq, t) -> {
pq.offer(t);
if (pq.size() > n) pq.poll(); // удаляем наименьший
},
(pq1, pq2) -> {
pq2.forEach(t -> {
pq1.offer(t);
if (pq1.size() > n) pq1.poll();
});
return pq1;
},
pq -> {
List<T> result = new ArrayList<>(pq);
result.sort(comparator.reversed()); // по убыванию
return result;
},
Collector.Characteristics.UNORDERED
);
}
Использование:
List<Integer> top3 = Stream.of(5, 1, 9, 3, 7, 2).collect(topN(3, Comparator.naturalOrder()));
// top3: [9, 7, 5]
4. Когда НЕ стоит писать свой Collector
- Если можно выразить задачу через комбинацию стандартных коллекторов и downstream-операций (groupingBy, mapping, flatMapping, collectingAndThen и др.), лучше использовать их.
- Свой Collector нужен только для действительно нестандартных сценариев (особая структура данных, сложная агрегация, топ-N, мульти-мапы и т. д.).
- Не стоит писать Collector ради Collector — это усложняет поддержку и тестирование.
Пример:
// Вместо своего Collector для Map<K, Set<V>>:
.collect(Collectors.groupingBy(
Pair::key,
Collectors.mapping(Pair::value, Collectors.toSet())
))
5. Собственный Spliterator: зачем и как
Spliterator — это специальный интерфейс для эффективного перебора и разбиения коллекций (или других источников данных) на части, особенно для параллельной обработки. В отличие от обычного итератора, Spliterator может «разделять» (split) коллекцию на независимые куски для параллельной обработки.
Ключевые методы:
- tryAdvance(Consumer<? super T> action) — обработать следующий элемент.
- trySplit() — попытаться разделить коллекцию на две части (возвращает новый Spliterator для одной из частей).
- estimateSize() — оценка оставшегося количества элементов.
- characteristics() — битовая маска характеристик (ORDERED, SIZED, SUBSIZED и др.).
trySplit: стратегии деления
Сбалансированное деление — важно для параллельных стримов: trySplit должен возвращать примерно равные по размеру части, чтобы потоки были загружены равномерно.
Если делить некуда (например, мало элементов) — возвращаем null.
Пример: Spliterator для чтения файла порционно
Допустим, у вас есть большой файл, и вы хотите обрабатывать его по 1000 строк за раз (порциями), чтобы не держать всё в памяти.
public class ChunkedLineSpliterator implements Spliterator<List<String>> {
private final BufferedReader reader;
private final int chunkSize;
public ChunkedLineSpliterator(BufferedReader reader, int chunkSize) {
this.reader = reader;
this.chunkSize = chunkSize;
}
@Override
public boolean tryAdvance(Consumer<? super List<String>> action) {
List<String> chunk = new ArrayList<>(chunkSize);
try {
String line;
for (int i = 0; i < chunkSize && (line = reader.readLine()) != null; i++) {
chunk.add(line);
}
if (chunk.isEmpty()) return false;
action.accept(chunk);
return true;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public Spliterator<List<String>> trySplit() {
// Для потокового чтения из файла деление не имеет смысла — возвращаем null
return null;
}
@Override
public long estimateSize() {
return Long.MAX_VALUE; // неизвестно заранее
}
@Override
public int characteristics() {
return ORDERED | NONNULL;
}
}
Использование:
try (BufferedReader reader = Files.newBufferedReader(Path.of("big.txt"))) {
StreamSupport.stream(new ChunkedLineSpliterator(reader, 1000), false)
.forEach(chunk -> processChunk(chunk));
}
Характеристики Spliterator
- ORDERED — элементы идут в определённом порядке (например, список).
- SIZED — известно точное количество элементов.
- SUBSIZED — все Spliterator, полученные через trySplit, тоже SIZED.
- IMMUTABLE — источник не меняется во время обхода.
- CONCURRENT — источник поддерживает безопасную параллельную модификацию.
- DISTINCT, SORTED, NONNULL — дополнительные свойства.
Важно: правильно указывать характеристики — это влияет на оптимизацию стримов.
6. Примеры
- Чтение файла порциями (чанками) — позволяет обрабатывать большие файлы по частям, не загружая всё в память.
- Парсинг без лишних аллокаций — если вы парсите поток байтов/символов и хотите минимизировать создание временных объектов, можно реализовать Spliterator, который отдаёт «окна» или «срезы» исходного массива.
Пример: Spliterator для парсинга CSV по строкам
public class CsvLineSpliterator implements Spliterator<String[]> {
private final BufferedReader reader;
public CsvLineSpliterator(BufferedReader reader) {
this.reader = reader;
}
@Override
public boolean tryAdvance(Consumer<? super String[]> action) {
try {
String line = reader.readLine();
if (line == null) return false;
action.accept(line.split(","));
return true;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public Spliterator<String[]> trySplit() {
return null; // последовательный парсинг
}
@Override
public long estimateSize() {
return Long.MAX_VALUE;
}
@Override
public int characteristics() {
return ORDERED | NONNULL;
}
}
7. Интеграция с parallel() — как сделать безопасно
- Если ваш Spliterator поддерживает параллельное деление (trySplit возвращает не null), и характеристики включают SIZED/SUBSIZED, то Stream API сможет эффективно распараллелить обработку.
- Для потоковых источников (файлы, сокеты) обычно деление не поддерживается — используйте последовательные стримы.
- Для коллекций и массивов — реализуйте сбалансированное деление (например, делите массив пополам).
Пример: Spliterator для массива
public class ArraySpliterator<T> implements Spliterator<T> {
private final T[] array;
private int start, end;
public ArraySpliterator(T[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if (start < end) {
action.accept(array[start++]);
return true;
}
return false;
}
@Override
public Spliterator<T> trySplit() {
int mid = (start + end) >>> 1;
if (mid == start) return null;
ArraySpliterator<T> split = new ArraySpliterator<>(array, start, mid);
start = mid;
return split;
}
@Override
public long estimateSize() {
return end - start;
}
@Override
public int characteristics() {
return ORDERED | SIZED | SUBSIZED | IMMUTABLE;
}
}
Использование:
String[] arr = {"a", "b", "c", "d"};
StreamSupport.stream(new ArraySpliterator<>(arr, 0, arr.length), true)
.forEach(System.out::println);
ПЕРЕЙДИТЕ В ПОЛНУЮ ВЕРСИЮ