«1. Введение

В этом руководстве мы продемонстрируем Apache Crunch на примере приложения для обработки данных. Мы запустим это приложение, используя фреймворк MapReduce.

Мы начнем с краткого рассмотрения некоторых концепций Apache Crunch. Затем мы перейдем к примеру приложения. В этом приложении мы займемся обработкой текста:

    Прежде всего, мы прочитаем строки из текстового файла. Позже мы разобьем их на слова и удалим некоторые общие слова. Затем мы сгруппируем оставшиеся слова. чтобы получить список уникальных слов и их количество Наконец, мы запишем этот список в текстовый файл

2. Что такое Crunch?

MapReduce — это распределенная среда параллельного программирования для обработки больших объемов данных на кластере серверов. Программные платформы, такие как Hadoop и Spark, реализуют MapReduce.

Crunch предоставляет платформу для написания, тестирования и запуска конвейеров MapReduce на Java. Здесь мы не пишем задания MapReduce напрямую. Скорее, мы определяем конвейер данных (то есть операции для выполнения шагов ввода, обработки и вывода) с помощью API-интерфейсов Crunch. Crunch Planner сопоставляет их с заданиями MapReduce и выполняет их при необходимости.

Таким образом, каждый конвейер данных Crunch координируется экземпляром интерфейса Pipeline. Этот интерфейс также определяет методы для чтения данных в конвейер через исходные экземпляры и записи данных из конвейера в целевые экземпляры.

У нас есть 3 интерфейса для представления данных:

  1. PCollection – an immutable, distributed collection of elements
  2. PTable<K, V> – an immutable, distributed, unordered multi-map of keys and values
  3. PGroupedTable<K, V> – a distributed, sorted map of keys of type K to an Iterable V that may be iterated over exactly once

DoFn — базовый класс для всех функций обработки данных. Он соответствует классам Mapper, Reducer и Combiner в MapReduce. Мы тратим большую часть времени разработки на написание и тестирование логических вычислений с его использованием.

Теперь, когда мы лучше познакомились с Crunch, давайте воспользуемся им для создания примера приложения.

3. Настройка проекта Crunch

Прежде всего, давайте настроим проект Crunch с помощью Maven. Мы можем сделать это двумя способами:

  1. Add the required dependencies in the pom.xml file of an existing project
  2. Use an archetype to generate a starter project

Давайте кратко рассмотрим оба подхода.

3.1. Зависимости Maven

Чтобы добавить Crunch в существующий проект, добавим необходимые зависимости в файл pom.xml.

Сначала добавим библиотеку crunch-core:

<dependency>
    <groupId>org.apache.crunch</groupId>
    <artifactId>crunch-core</artifactId>
    <version>0.15.0</version>
</dependency>

Затем добавим библиотеку hadoop-client для взаимодействия с Hadoop. Мы используем версию, соответствующую установке Hadoop:

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.2.0</version>
    <scope>provided</scope>
</dependency>

Мы можем проверить Maven Central на наличие последних версий библиотек crunch-core и hadoop-client.

3.2. Архетип Maven

Другой подход состоит в том, чтобы быстро сгенерировать начальный проект, используя архетип Maven, предоставленный Crunch:

mvn archetype:generate -Dfilter=org.apache.crunch:crunch-archetype

При запросе приведенной выше команды мы предоставляем версию Crunch и сведения об артефакте проекта.

4. Crunch Pipeline Setup

После настройки проекта нам нужно создать объект Pipeline. В Crunch есть 3 реализации Pipeline:

    MRPipeline — выполняется в Hadoop MapReduce SparkPipeline — выполняется как серия конвейеров Spark MemPipeline — выполняется в памяти на клиенте и полезен для модульного тестирования

Обычно, мы разрабатываем и тестируем с использованием экземпляра MemPipeline. Позже мы используем экземпляр MRPipeline или SparkPipeline для фактического выполнения.

Если бы нам нужен конвейер в памяти, мы могли бы использовать статический метод getInstance для получения экземпляра MemPipeline:

Pipeline pipeline = MemPipeline.getInstance();

А пока давайте создадим экземпляр MRPipeline для выполнения приложения с помощью Hadoop: ~~ ~

Pipeline pipeline = new MRPipeline(WordCount.class, getConf());

5. Чтение входных данных

После создания объекта конвейера мы хотим прочитать входные данные. Интерфейс Pipeline предоставляет удобный метод для чтения входных данных из текстового файла, readTextFile(pathName).

Давайте вызовем этот метод для чтения входного текстового файла:

PCollection<String> lines = pipeline.readTextFile(inputPath);

Приведенный выше код читает текстовый файл как набор строк.

В качестве следующего шага давайте напишем тестовый пример для чтения ввода:

@Test
public void givenPipeLine_whenTextFileRead_thenExpectedNumberOfRecordsRead() {
    Pipeline pipeline = MemPipeline.getInstance();
    PCollection<String> lines = pipeline.readTextFile(INPUT_FILE_PATH);

    assertEquals(21, lines.asCollection()
      .getValue()
      .size());
}

В этом тесте мы проверяем, получаем ли мы ожидаемое количество строк при чтении текстового файла.

6. Этапы обработки данных

После чтения входных данных нам нужно их обработать. Crunch API содержит ряд подклассов DoFn для обработки распространенных сценариев обработки данных:

    «FilterFn — фильтрует элементы коллекции на основе логического условия. MapFn — сопоставляет каждую входную запись ровно с одной выходной записью. CombineFn — объединяет несколько значений в одно значение. JoinFn — выполняет соединения, такие как внутреннее соединение. , левое внешнее соединение, правое внешнее соединение и полное внешнее соединение

Давайте реализуем следующую логику обработки данных, используя эти классы:

  1. Split each line in the input file into words
  2. Remove the stop words
  3. Count the unique words

6.1. Разделить строку текста на слова

Прежде всего, давайте создадим класс Tokenizer для разделения строки на слова.

Мы расширим класс DoFn. Этот класс имеет абстрактный метод, называемый процессом. Этот метод обрабатывает входные записи из PCollection и отправляет выходные данные в Emitter.

Нам нужно реализовать логику разделения в этом методе:

public class Tokenizer extends DoFn<String, String> {
    private static final Splitter SPLITTER = Splitter
      .onPattern("\\s+")
      .omitEmptyStrings();

    @Override
    public void process(String line, Emitter<String> emitter) {
        for (String word : SPLITTER.split(line)) {
            emitter.emit(word);
        }
    }
}

В приведенной выше реализации мы использовали класс Splitter из библиотеки Guava для извлечения слов из строки.

Далее давайте напишем модульный тест для класса Tokenizer:

@RunWith(MockitoJUnitRunner.class)
public class TokenizerUnitTest {
 
    @Mock
    private Emitter<String> emitter;

    @Test
    public void givenTokenizer_whenLineProcessed_thenOnlyExpectedWordsEmitted() {
        Tokenizer splitter = new Tokenizer();
        splitter.process("  hello  world ", emitter);

        verify(emitter).emit("hello");
        verify(emitter).emit("world");
        verifyNoMoreInteractions(emitter);
    }
}

Приведенный выше тест проверяет, что возвращаются правильные слова.

Наконец, давайте разделим строки, считанные из входного текстового файла, с помощью этого класса.

Метод parallelDo интерфейса PCollection применяет заданный DoFn ко всем элементам и возвращает новую коллекцию PCollection.

Вызовем этот метод на коллекции строк и передадим экземпляр Tokenizer:

PCollection<String> words = lines.parallelDo(new Tokenizer(), Writables.strings());

В результате мы получим список слов во входном текстовом файле. Мы удалим стоп-слова на следующем шаге.

6.2. Удалить стоп-слова

Аналогично предыдущему шагу создадим класс StopWordFilter для фильтрации стоп-слов.

Однако мы расширим FilterFn вместо DoFn. В FilterFn есть абстрактный метод accept. Нам нужно реализовать логику фильтрации в этом методе:

public class StopWordFilter extends FilterFn<String> {

    // English stop words, borrowed from Lucene.
    private static final Set<String> STOP_WORDS = ImmutableSet
      .copyOf(new String[] { "a", "and", "are", "as", "at", "be", "but", "by",
        "for", "if", "in", "into", "is", "it", "no", "not", "of", "on",
        "or", "s", "such", "t", "that", "the", "their", "then", "there",
        "these", "they", "this", "to", "was", "will", "with" });

    @Override
    public boolean accept(String word) {
        return !STOP_WORDS.contains(word);
    }
}

Далее давайте напишем модульный тест для класса StopWordFilter:

public class StopWordFilterUnitTest {

    @Test
    public void givenFilter_whenStopWordPassed_thenFalseReturned() {
        FilterFn<String> filter = new StopWordFilter();
 
        assertFalse(filter.accept("the"));
        assertFalse(filter.accept("a"));
    }

    @Test
    public void givenFilter_whenNonStopWordPassed_thenTrueReturned() {
        FilterFn<String> filter = new StopWordFilter();
 
        assertTrue(filter.accept("Hello"));
        assertTrue(filter.accept("World"));
    }

    @Test
    public void givenWordCollection_whenFiltered_thenStopWordsRemoved() {
        PCollection<String> words = MemPipeline
          .collectionOf("This", "is", "a", "test", "sentence");
        PCollection<String> noStopWords = words.filter(new StopWordFilter());

        assertEquals(ImmutableList.of("This", "test", "sentence"),
         Lists.newArrayList(noStopWords.materialize()));
    }
}

Этот тест проверяет правильность выполнения логики фильтрации.

Наконец, давайте воспользуемся StopWordFilter для фильтрации списка слов, сгенерированного на предыдущем шаге. Метод filter интерфейса PCollection применяет данный FilterFn ко всем элементам и возвращает новую коллекцию PCollection.

Вызовем этот метод для коллекции слов и передадим экземпляр StopWordFilter:

PCollection<String> noStopWords = words.filter(new StopWordFilter());

В результате мы получим отфильтрованную коллекцию слов.

6.3. Подсчет уникальных слов

Получив отфильтрованный набор слов, мы хотим подсчитать, как часто встречается каждое слово. Интерфейс PCollection имеет ряд методов для выполнения общих агрегаций:

    min — возвращает минимальный элемент коллекции max — возвращает максимальный элемент длины коллекции — возвращает количество элементов в коллекции count – возвращает PTable, содержащую количество каждого уникального элемента коллекции

Давайте воспользуемся методом count, чтобы получить уникальные слова вместе с их количеством:

// The count method applies a series of Crunch primitives and returns
// a map of the unique words in the input PCollection to their counts.
PTable<String, Long> counts = noStopWords.count();

7. Задайте вывод

Как В результате предыдущих шагов у нас есть таблица слов и их количество. Мы хотим записать этот результат в текстовый файл. Интерфейс Pipeline предоставляет удобные методы для записи вывода:

void write(PCollection<?> collection, Target target);

void write(PCollection<?> collection, Target target,
  Target.WriteMode writeMode);

<T> void writeTextFile(PCollection<T> collection, String pathName);

Поэтому давайте вызовем метод writeTextFile:

pipeline.writeTextFile(counts, outputPath);

8. Управление выполнением конвейера

Все шаги до сих пор только определяли конвейер данных. Никакой ввод не был прочитан или обработан. Это связано с тем, что Crunch использует модель ленивого выполнения.

Задания MapReduce не запускаются до тех пор, пока в интерфейсе Pipeline не будет вызван метод, управляющий планированием и выполнением заданий:

    run — подготавливает план выполнения для создания необходимых выходных данных, а затем выполняет его синхронно. – запускает все оставшиеся задания, необходимые для создания выходных данных, а затем очищает все созданные промежуточные файлы данных. runAsync – аналогично методу run, но выполняется неблокирующим образом как задания MapReduce:

Приведенный выше оператор запускает задания MapReduce для чтения входных данных, их обработки и записи результата в выходной каталог.

PipelineResult result = pipeline.done();

9. Собираем конвейер

На данный момент мы разработали и протестировали логику для чтения входных данных, их обработки и записи в выходной файл.

«Теперь давайте объединим их для создания всего конвейера данных:

10. Конфигурация запуска Hadoop

public int run(String[] args) throws Exception {
    String inputPath = args[0];
    String outputPath = args[1];

    // Create an object to coordinate pipeline creation and execution.
    Pipeline pipeline = new MRPipeline(WordCount.class, getConf());

    // Reference a given text file as a collection of Strings.
    PCollection<String> lines = pipeline.readTextFile(inputPath);

    // Define a function that splits each line in a PCollection of Strings into
    // a PCollection made up of the individual words in the file.
    // The second argument sets the serialization format.
    PCollection<String> words = lines.parallelDo(new Tokenizer(), Writables.strings());

    // Take the collection of words and remove known stop words.
    PCollection<String> noStopWords = words.filter(new StopWordFilter());

    // The count method applies a series of Crunch primitives and returns
    // a map of the unique words in the input PCollection to their counts.
    PTable<String, Long> counts = noStopWords.count();

    // Instruct the pipeline to write the resulting counts to a text file.
    pipeline.writeTextFile(counts, outputPath);

    // Execute the pipeline as a MapReduce.
    PipelineResult result = pipeline.done();

    return result.succeeded() ? 0 : 1;
}

Таким образом, конвейер данных готов.

Однако нам нужен код для его запуска. Поэтому напишем основной метод для запуска приложения:

ToolRunner.run разбирает конфигурацию Hadoop из командной строки и выполняет задание MapReduce.

public class WordCount extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(), new WordCount(), args);
    }

11. Запустите приложение

Теперь все приложение готово. Давайте запустим следующую команду для его сборки:

В результате выполнения вышеуказанной команды мы получим упакованное приложение и специальный jar-файл с заданием в целевом каталоге.

mvn package

Давайте воспользуемся этим jar-файлом задания для выполнения приложения в Hadoop:

Приложение читает входной файл и записывает результат в выходной файл. Выходной файл содержит уникальные слова и их количество, подобное следующему:

hadoop jar target/crunch-1.0-SNAPSHOT-job.jar <input file path> <output directory>

Помимо Hadoop, мы можем запускать приложение в среде IDE, как отдельное приложение или как модульные тесты.

[Add,1]
[Added,1]
[Admiration,1]
[Admitting,1]
[Allowance,1]

12. Заключение

В этом руководстве мы создали приложение для обработки данных, работающее на MapReduce. Apache Crunch упрощает написание, тестирование и выполнение конвейеров MapReduce на Java.

Как обычно, полный исходный код можно найти на Github.

«