«1. Обзор
В этом руководстве мы познакомимся с Apache Beam и рассмотрим его основные концепции.
Мы начнем с демонстрации варианта использования и преимуществ использования Apache Beam, а затем рассмотрим основные понятия и терминологию. После этого мы рассмотрим простой пример, иллюстрирующий все важные аспекты Apache Beam.
2. Что такое Apache Beam?
Apache Beam (Batch + strEAM) — это унифицированная модель программирования для задач пакетной и потоковой обработки данных. Он предоставляет комплект для разработки программного обеспечения для определения и построения конвейеров обработки данных, а также бегунов для их выполнения.
Apache Beam предназначен для обеспечения переносимого уровня программирования. Фактически Beam Pipeline Runners переводят конвейер обработки данных в API, совместимый с серверной частью по выбору пользователя. В настоящее время поддерживаются следующие серверные части распределенной обработки:
-
Apache Apex Apache Flink Apache Gearpump (инкубационный) Apache Samza Apache Spark Google Cloud Dataflow Hazelcast Jet
3. Почему Apache Beam?
Apache Beam объединяет пакетную и потоковую обработку данных, в то время как другие часто делают это через отдельные API. Следовательно, очень легко изменить потоковый процесс на пакетный и наоборот, скажем, по мере изменения требований.
Apache Beam повышает мобильность и гибкость. Мы сосредотачиваемся на нашей логике, а не на основных деталях. Более того, мы можем изменить серверную часть обработки данных в любое время.
Для Apache Beam доступны пакеты SDK для Java, Python, Go и Scala. Действительно, каждый в команде может использовать его на своем языке.
4. Основные понятия
С помощью Apache Beam мы можем создавать графы рабочих процессов (конвейеры) и выполнять их. Ключевыми понятиями в модели программирования являются:
-
PCollection — представляет набор данных, который может быть фиксированным пакетом или потоком данных PTransform — операция обработки данных, которая принимает одну или несколько PCollection и выводит ноль или более Конвейер PCollections — представляет собой ориентированный ациклический граф PCollection и PTransform и, следовательно, инкапсулирует все задание обработки данных. PipelineRunner — выполняет конвейер на указанном сервере распределенной обработки. Конвейер состоит из PCollection и PTransform.
5. Пример подсчета слов
Теперь, когда мы изучили основные концепции Apache Beam, давайте спроектируем и протестируем задачу подсчета слов.
5.1. Построение конвейера Beam
Проектирование графа рабочего процесса — это первый шаг в каждом задании Apache Beam. Давайте определим шаги задачи подсчета слов:
Для этого нам нужно преобразовать вышеуказанные шаги в один конвейер, используя абстракции PCollection и PTransform.
- Read the text from a source.
- Split the text into a list of words.
- Lowercase all words.
- Trim punctuations.
- Filter stopwords.
- Count each unique word.
5.2. Зависимости
Прежде чем мы сможем реализовать граф рабочего процесса, мы должны добавить в наш проект основную зависимость Apache Beam:
Beam Pipeline Runners полагаются на серверную часть распределенной обработки для выполнения задач. Давайте добавим DirectRunner в качестве зависимости времени выполнения:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
В отличие от других Pipeline Runner, DirectRunner не требует дополнительной настройки, что делает его хорошим выбором для начинающих.
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
5.3. Реализация
Apache Beam использует парадигму программирования Map-Reduce (такую же, как Java Streams). На самом деле, прежде чем продолжить, полезно иметь базовое представление о функциях reduce(), filter(), count(), map() и flatMap().
Создание конвейера — это первое, что мы делаем:
Теперь мы применяем нашу шестиступенчатую задачу подсчета слов:
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
Первый (необязательный) аргумент apply() — это строка, которая только для лучшей читаемости кода. Вот что делает каждая функция apply() в приведенном выше коде:
PCollection<KV<String, Long>> wordCount = p
.apply("(1) Read all lines",
TextIO.read().from(inputFilePath))
.apply("(2) Flatmap to a list of words",
FlatMapElements.into(TypeDescriptors.strings())
.via(line -> Arrays.asList(line.split("\\s"))))
.apply("(3) Lowercase all",
MapElements.into(TypeDescriptors.strings())
.via(word -> word.toLowerCase()))
.apply("(4) Trim punctuations",
MapElements.into(TypeDescriptors.strings())
.via(word -> trim(word)))
.apply("(5) Filter stopwords",
Filter.by(word -> !isStopWord(word)))
.apply("(6) Count words",
Count.perElement());
Как упоминалось ранее, конвейеры обрабатываются на распределенной серверной части. Невозможно выполнить итерацию по коллекции PCollection в памяти, поскольку она распределена между несколькими серверными частями. Вместо этого мы записываем результаты во внешнюю базу данных или файл.
- First, we read an input text file line by line using TextIO.
- Splitting each line by whitespaces, we flat-map it to a list of words.
- Word count is case-insensitive, so we lowercase all words.
- Earlier, we split lines by whitespace, ending up with words like “word!” and “word?”, so we remove punctuations.
- Stopwords such as “is” and “by” are frequent in almost every English text, so we remove them.
- Finally, we count unique words using the built-in function Count.perElement().
Сначала мы преобразуем нашу PCollection в String. Затем мы используем TextIO для записи вывода:
Теперь, когда наше определение конвейера завершено, мы можем запустить и протестировать его.
wordCount.apply(MapElements.into(TypeDescriptors.strings())
.via(count -> count.getKey() + " --> " + count.getValue()))
.apply(TextIO.write().to(outputFilePath));
«5.4. Запуск и тестирование
На данный момент мы определили конвейер для задачи подсчета слов. На этом этапе давайте запустим Pipeline:
В этой строке кода Apache Beam отправит нашу задачу нескольким экземплярам DirectRunner. Следовательно, в конце будет сгенерировано несколько выходных файлов. Они будут содержать такие вещи, как:
p.run().waitUntilFinish();
Определение и запуск распределенного задания в Apache Beam так же просто и выразительно, как это. Для сравнения, реализация подсчета слов также доступна в Apache Spark, Apache Flink и Hazelcast Jet.
...
apache --> 3
beam --> 5
rocks --> 2
...
6. Куда мы идем дальше?
Мы успешно подсчитали каждое слово из нашего входного файла, но у нас пока нет отчета о наиболее часто встречающихся словах. Конечно, сортировка PCollection — это хорошая задача, которую нужно решить в качестве следующего шага.
Позже мы сможем больше узнать об окнах, триггерах, метриках и более сложных преобразованиях. Документация Apache Beam содержит подробную информацию и справочные материалы.
7. Заключение
В этом руководстве мы узнали, что такое Apache Beam и почему он предпочтительнее альтернатив. Мы также продемонстрировали основные концепции Apache Beam на примере подсчета слов.
Код для этого руководства доступен на GitHub.
«