«1. Обзор

В этом руководстве мы познакомимся с Apache Beam и рассмотрим его основные концепции.

Мы начнем с демонстрации варианта использования и преимуществ использования Apache Beam, а затем рассмотрим основные понятия и терминологию. После этого мы рассмотрим простой пример, иллюстрирующий все важные аспекты Apache Beam.

2. Что такое Apache Beam?

Apache Beam (Batch + strEAM) — это унифицированная модель программирования для задач пакетной и потоковой обработки данных. Он предоставляет комплект для разработки программного обеспечения для определения и построения конвейеров обработки данных, а также бегунов для их выполнения.

Apache Beam предназначен для обеспечения переносимого уровня программирования. Фактически Beam Pipeline Runners переводят конвейер обработки данных в API, совместимый с серверной частью по выбору пользователя. В настоящее время поддерживаются следующие серверные части распределенной обработки:

    Apache Apex Apache Flink Apache Gearpump (инкубационный) Apache Samza Apache Spark Google Cloud Dataflow Hazelcast Jet

3. Почему Apache Beam?

Apache Beam объединяет пакетную и потоковую обработку данных, в то время как другие часто делают это через отдельные API. Следовательно, очень легко изменить потоковый процесс на пакетный и наоборот, скажем, по мере изменения требований.

Apache Beam повышает мобильность и гибкость. Мы сосредотачиваемся на нашей логике, а не на основных деталях. Более того, мы можем изменить серверную часть обработки данных в любое время.

Для Apache Beam доступны пакеты SDK для Java, Python, Go и Scala. Действительно, каждый в команде может использовать его на своем языке.

4. Основные понятия

С помощью Apache Beam мы можем создавать графы рабочих процессов (конвейеры) и выполнять их. Ключевыми понятиями в модели программирования являются:

    PCollection — представляет набор данных, который может быть фиксированным пакетом или потоком данных PTransform — операция обработки данных, которая принимает одну или несколько PCollection и выводит ноль или более Конвейер PCollections — представляет собой ориентированный ациклический граф PCollection и PTransform и, следовательно, инкапсулирует все задание обработки данных. PipelineRunner — выполняет конвейер на указанном сервере распределенной обработки. Конвейер состоит из PCollection и PTransform.

5. Пример подсчета слов

Теперь, когда мы изучили основные концепции Apache Beam, давайте спроектируем и протестируем задачу подсчета слов.

5.1. Построение конвейера Beam

Проектирование графа рабочего процесса — это первый шаг в каждом задании Apache Beam. Давайте определим шаги задачи подсчета слов:

Для этого нам нужно преобразовать вышеуказанные шаги в один конвейер, используя абстракции PCollection и PTransform.

  1. Read the text from a source.
  2. Split the text into a list of words.
  3. Lowercase all words.
  4. Trim punctuations.
  5. Filter stopwords.
  6. Count each unique word.

5.2. Зависимости

Прежде чем мы сможем реализовать граф рабочего процесса, мы должны добавить в наш проект основную зависимость Apache Beam:

Beam Pipeline Runners полагаются на серверную часть распределенной обработки для выполнения задач. Давайте добавим DirectRunner в качестве зависимости времени выполнения:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-core</artifactId>
    <version>${beam.version}</version>
</dependency>

В отличие от других Pipeline Runner, DirectRunner не требует дополнительной настройки, что делает его хорошим выбором для начинающих.

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-runners-direct-java</artifactId>
    <version>${beam.version}</version>
    <scope>runtime</scope>
</dependency>

5.3. Реализация

Apache Beam использует парадигму программирования Map-Reduce (такую ​​же, как Java Streams). На самом деле, прежде чем продолжить, полезно иметь базовое представление о функциях reduce(), filter(), count(), map() и flatMap().

Создание конвейера — это первое, что мы делаем:

Теперь мы применяем нашу шестиступенчатую задачу подсчета слов:

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);

Первый (необязательный) аргумент apply() — это строка, которая только для лучшей читаемости кода. Вот что делает каждая функция apply() в приведенном выше коде:

PCollection<KV<String, Long>> wordCount = p
    .apply("(1) Read all lines", 
      TextIO.read().from(inputFilePath))
    .apply("(2) Flatmap to a list of words", 
      FlatMapElements.into(TypeDescriptors.strings())
      .via(line -> Arrays.asList(line.split("\\s"))))
    .apply("(3) Lowercase all", 
      MapElements.into(TypeDescriptors.strings())
      .via(word -> word.toLowerCase()))
    .apply("(4) Trim punctuations", 
      MapElements.into(TypeDescriptors.strings())
      .via(word -> trim(word)))
    .apply("(5) Filter stopwords", 
      Filter.by(word -> !isStopWord(word)))
    .apply("(6) Count words", 
      Count.perElement());

Как упоминалось ранее, конвейеры обрабатываются на распределенной серверной части. Невозможно выполнить итерацию по коллекции PCollection в памяти, поскольку она распределена между несколькими серверными частями. Вместо этого мы записываем результаты во внешнюю базу данных или файл.

  1. First, we read an input text file line by line using TextIO.
  2. Splitting each line by whitespaces, we flat-map it to a list of words.
  3. Word count is case-insensitive, so we lowercase all words.
  4. Earlier, we split lines by whitespace, ending up with words like “word!” and “word?”, so we remove punctuations.
  5. Stopwords such as “is” and “by” are frequent in almost every English text, so we remove them.
  6. Finally, we count unique words using the built-in function Count.perElement().

Сначала мы преобразуем нашу PCollection в String. Затем мы используем TextIO для записи вывода:

Теперь, когда наше определение конвейера завершено, мы можем запустить и протестировать его.

wordCount.apply(MapElements.into(TypeDescriptors.strings())
    .via(count -> count.getKey() + " --> " + count.getValue()))
    .apply(TextIO.write().to(outputFilePath));

«5.4. Запуск и тестирование

На данный момент мы определили конвейер для задачи подсчета слов. На этом этапе давайте запустим Pipeline:

В этой строке кода Apache Beam отправит нашу задачу нескольким экземплярам DirectRunner. Следовательно, в конце будет сгенерировано несколько выходных файлов. Они будут содержать такие вещи, как:

p.run().waitUntilFinish();

Определение и запуск распределенного задания в Apache Beam так же просто и выразительно, как это. Для сравнения, реализация подсчета слов также доступна в Apache Spark, Apache Flink и Hazelcast Jet.

...
apache --> 3
beam --> 5
rocks --> 2
...

6. Куда мы идем дальше?

Мы успешно подсчитали каждое слово из нашего входного файла, но у нас пока нет отчета о наиболее часто встречающихся словах. Конечно, сортировка PCollection — это хорошая задача, которую нужно решить в качестве следующего шага.

Позже мы сможем больше узнать об окнах, триггерах, метриках и более сложных преобразованиях. Документация Apache Beam содержит подробную информацию и справочные материалы.

7. Заключение

В этом руководстве мы узнали, что такое Apache Beam и почему он предпочтительнее альтернатив. Мы также продемонстрировали основные концепции Apache Beam на примере подсчета слов.

Код для этого руководства доступен на GitHub.

«