«1. Введение

Parallel-collectors — это небольшая библиотека, предоставляющая набор сборщиков Java Stream API, которые обеспечивают параллельную обработку, в то же время обходя основные недостатки стандартных Parallel Streams.

2. Зависимости Maven

Если мы хотим начать использовать библиотеку, нам нужно добавить одну запись в файл pom.xml Maven:

<dependency>
    <groupId>com.pivovarit</groupId>
    <artifactId>parallel-collectors</artifactId>
    <version>1.1.0</version>
</dependency>

Или одну строку в файл сборки Gradle: ~~ ~

compile 'com.pivovarit:parallel-collectors:1.1.0'

Новейшую версию можно найти на Maven Central.

3. Предостережения относительно параллельных потоков

Параллельные потоки были одним из основных моментов Java 8, но оказалось, что они применимы исключительно к интенсивной обработке ЦП.

Причиной этого был тот факт, что параллельные потоки внутренне поддерживались общим для всей JVM общим ForkJoinPool, который обеспечивал ограниченный параллелизм и использовался всеми параллельными потоками, работающими на одном экземпляре JVM.

Например, представьте, что у нас есть список идентификаторов, и мы хотим использовать их для получения списка пользователей, и эта операция требует больших затрат.

Мы могли бы использовать для этого параллельные потоки:

List<Integer> ids = Arrays.asList(1, 2, 3); 
List<String> results = ids.parallelStream() 
  .map(i -> fetchById(i)) // each operation takes one second
  .collect(Collectors.toList()); 

System.out.println(results); // [user-1, user-2, user-3]

И действительно, мы видим заметное ускорение. Но это становится проблематичным, если мы начинаем выполнять несколько параллельных блокирующих операций… параллельно. Это может быстро насытить пул и привести к огромным задержкам. Вот почему важно создавать перегородки, создавая отдельные пулы потоков, чтобы предотвратить влияние несвязанных задач на выполнение друг друга.

Чтобы предоставить собственный экземпляр ForkJoinPool, мы могли бы использовать описанный здесь трюк, но этот подход основывался на недокументированном хаке и был ошибочным до JDK10. Мы можем прочитать больше в самом выпуске — [JDK8190974].

4. Параллельные коллекторы в действии

Параллельные коллекторы, как следует из названия, являются просто стандартными коллекторами Stream API, которые позволяют выполнять дополнительные операции параллельно на этапе collect(). Класс

ParallelCollectors (отражающий класс Collectors) — это фасад, обеспечивающий доступ ко всей функциональности библиотеки.

Если бы мы хотели переделать приведенный выше пример, мы могли бы просто написать:

ExecutorService executor = Executors.newFixedThreadPool(10);

List<Integer> ids = Arrays.asList(1, 2, 3);

CompletableFuture<List<String>> results = ids.stream()
  .collect(ParallelCollectors.parallelToList(i -> fetchById(i), executor, 4));

System.out.println(results.join()); // [user-1, user-2, user-3]

Результат тот же, однако мы смогли предоставить наш собственный пул потоков, указать наш собственный уровень параллелизма и результат прибыл завернутый в экземпляр CompletableFuture без блокировки текущего потока.

Стандартные параллельные потоки, с другой стороны, не могли достичь ни одного из них.

4.1. ParallelCollectors.parallelToList/ToSet()

Как это ни интуитивно понятно, если мы хотим параллельно обрабатывать поток и собирать результаты в список или набор, мы можем просто использовать ParallelCollectors.parallelToList или parallelToSet:

List<Integer> ids = Arrays.asList(1, 2, 3);

List<String> results = ids.stream()
  .collect(parallelToList(i -> fetchById(i), executor, 4))
  .join();

4.2. ParallelCollectors.parallelToMap()

Если мы хотим собрать элементы Stream в экземпляр Map, как и в случае с Stream API, нам нужно предоставить два преобразователя:

List<Integer> ids = Arrays.asList(1, 2, 3);

Map<Integer, String> results = ids.stream()
  .collect(parallelToMap(i -> i, i -> fetchById(i), executor, 4))
  .join(); // {1=user-1, 2=user-2, 3=user-3}

Мы также можем предоставить пользовательский экземпляр Map Supplier: ~ ~~

Map<Integer, String> results = ids.stream()
  .collect(parallelToMap(i -> i, i -> fetchById(i), TreeMap::new, executor, 4))
  .join();

И собственная стратегия разрешения конфликтов:

List<Integer> ids = Arrays.asList(1, 2, 3);

Map<Integer, String> results = ids.stream()
  .collect(parallelToMap(i -> i, i -> fetchById(i), TreeMap::new, (s1, s2) -> s1, executor, 4))
  .join();

4.3. ParallelCollectors.parallelToCollection()

Аналогично предыдущему, мы можем передать нашего пользовательского поставщика коллекций, если мы хотим получить результаты, упакованные в наш пользовательский контейнер:

List<String> results = ids.stream()
  .collect(parallelToCollection(i -> fetchById(i), LinkedList::new, executor, 4))
  .join();

4.4. ParallelCollectors.parallelToStream()

Если вышеперечисленного недостаточно, мы можем получить экземпляр Stream и продолжить обработку там:

Map<Integer, List<String>> results = ids.stream()
  .collect(parallelToStream(i -> fetchById(i), executor, 4))
  .thenApply(stream -> stream.collect(Collectors.groupingBy(i -> i.length())))
  .join();

4.5. ParallelCollectors.parallel()

Это позволяет нам передавать результаты в порядке завершения:

ids.stream()
  .collect(parallel(i -> fetchByIdWithRandomDelay(i), executor, 4))
  .forEach(System.out::println);

// user-1
// user-3
// user-2

В этом случае мы можем ожидать, что сборщик каждый раз будет возвращать разные результаты, поскольку мы ввели случайную задержку обработки.

4.6. ParallelCollectors.parallelOrdered()

Эта функция позволяет выполнять потоковую передачу результатов, как описано выше, но сохраняет первоначальный порядок:

ids.stream()
  .collect(parallelOrdered(i -> fetchByIdWithRandomDelay(i), executor, 4))
  .forEach(System.out::println);

// user-1
// user-2 
// user-3 

В этом случае сборщик всегда будет поддерживать порядок, но может работать медленнее, чем указано выше.

5. Ограничения

«На момент написания параллельные сборщики не работают с бесконечными потоками, даже если используются операции короткого замыкания — это ограничение дизайна, наложенное внутренними компонентами Stream API. Проще говоря, потоки рассматривают коллекторы как операции без короткого замыкания, поэтому потоку необходимо обработать все восходящие элементы, прежде чем он будет завершен.

Другим ограничением является то, что операции короткого замыкания не прерывают оставшиеся задачи после короткого замыкания.

6. Заключение

Мы увидели, как библиотека параллельных сборщиков позволяет нам выполнять параллельную обработку с использованием пользовательских коллекторов Java Stream API и CompletableFutures для использования пользовательских пулов потоков, параллелизма и неблокирующего стиля CompletableFutures.

Как всегда, фрагменты кода доступны на GitHub.

Дополнительные сведения см. в библиотеке параллельных сборщиков на GitHub, в блоге автора и в его учетной записи Twitter.