«1. Обзор

Apache Spark — это быстрая распределенная система обработки данных. Он выполняет обработку данных в памяти и использует кэширование в памяти и оптимизированное выполнение, что приводит к высокой производительности. Он предоставляет API высокого уровня для популярных языков программирования, таких как Scala, Python, Java и R.

В этом кратком руководстве мы рассмотрим три основных понятия Spark: кадры данных, наборы данных и RDD.

2. DataFrame

Spark SQL представил абстракцию табличных данных, называемую DataFrame, начиная с Spark 1.3. С тех пор это стало одной из самых важных функций в Spark. Этот API полезен, когда мы хотим обрабатывать структурированные и полуструктурированные распределенные данные.

В разделе 3 мы обсудим отказоустойчивые распределенные наборы данных (RDD). DataFrames хранят данные более эффективно, чем RDD, потому что они используют неизменяемые, находящиеся в памяти, отказоустойчивые, распределенные и параллельные возможности RDD, но они также применяют схему к данным. DataFrames также переводят код SQL в оптимизированные низкоуровневые операции RDD.

Мы можем создавать DataFrames тремя способами:

    Преобразование существующих RDD Выполнение SQL-запросов Загрузка внешних данных

Команда Spark представила SparkSession в версии 2.0, он объединяет все различные контексты, гарантируя, что разработчикам не придется беспокоиться о создании разных contexts:

SparkSession session = SparkSession.builder()
  .appName("TouristDataFrameExample")
  .master("local[*]")
  .getOrCreate();

DataFrameReader dataFrameReader = session.read();

Мы будем анализировать файл Tourist.csv:

Dataset<Row> data = dataFrameReader.option("header", "true")
  .csv("data/Tourist.csv");

Поскольку Spark 2.0 DataFrame стал набором данных типа Row, мы можем использовать DataFrame в качестве псевдонима для Dataset\u003cRow \u003e.

Мы можем выбрать определенные столбцы, которые нас интересуют. Мы также можем фильтровать и группировать по заданному столбцу:

data.select(col("country"), col("year"), col("value"))
  .show();

data.filter(col("country").equalTo("Mexico"))
  .show();

data.groupBy(col("country"))
  .count()
  .show();

3. Наборы данных

Набор данных — это набор строго типизированных, структурированных данных. Они обеспечивают знакомый объектно-ориентированный стиль программирования, а также преимущества безопасности типов, поскольку наборы данных могут проверять синтаксис и выявлять ошибки во время компиляции.

Набор данных является расширением DataFrame, поэтому мы можем рассматривать DataFrame как нетипизированное представление набора данных.

Команда Spark выпустила API набора данных в Spark 1.6 и, как они упомянули: «Цель наборов данных Spark — предоставить API, который позволяет пользователям легко выражать преобразования в объектных доменах, а также обеспечивает преимущества производительности и надежности исполнительный механизм Spark SQL».

Во-первых, нам нужно создать класс типа TouristData:

public class TouristData {
    private String region;
    private String country;
    private String year;
    private String series;
    private Double value;
    private String footnotes;
    private String source;
    // ... getters and setters
}

Чтобы сопоставить каждую из наших записей с указанным типом, нам потребуется использовать Encoder. Кодировщики преобразуют объекты Java во внутренний двоичный формат Spark:

// SparkSession initialization and data load
Dataset<Row> responseWithSelectedColumns = data.select(col("region"), 
  col("country"), col("year"), col("series"), col("value").cast("double"), 
  col("footnotes"), col("source"));

Dataset<TouristData> typedDataset = responseWithSelectedColumns
  .as(Encoders.bean(TouristData.class));

Как и в случае с DataFrame, мы можем фильтровать и группировать по определенным столбцам:

typedDataset.filter((FilterFunction) record -> record.getCountry()
  .equals("Norway"))
  .show();

typedDataset.groupBy(typedDataset.col("country"))
  .count()
  .show();

Мы также можем выполнять такие операции, как фильтрация по столбцу, соответствующему определенному диапазону или вычисление суммы определенного столбца, чтобы получить его общее значение:

typedDataset.filter((FilterFunction) record -> record.getYear() != null 
  && (Long.valueOf(record.getYear()) > 2010 
  && Long.valueOf(record.getYear()) < 2017)).show();

typedDataset.filter((FilterFunction) record -> record.getValue() != null 
  && record.getSeries()
    .contains("expenditure"))
    .groupBy("country")
    .agg(sum("value"))
    .show();

4. RDD

Resilient Distributed Dataset или RDD — это основная программная абстракция Spark. Он представляет собой набор неизменяемых, устойчивых и распределенных элементов.

RDD инкапсулирует большой набор данных, Spark автоматически распределяет данные, содержащиеся в RDD, по нашему кластеру и распараллеливает операции, которые мы выполняем над ними.

Мы можем создавать RDD только посредством операций с данными в стабильном хранилище или операций с другими RDD.

Отказоустойчивость важна, когда мы имеем дело с большими наборами данных, и данные распределены по кластерным компьютерам. RDD устойчивы благодаря встроенной в Spark механике восстановления после сбоев. Spark полагается на тот факт, что RDD запоминают, как они были созданы, поэтому мы можем легко проследить происхождение для восстановления раздела.

Есть два типа операций, которые мы можем выполнять с RDD: преобразования и действия.

4.1. Преобразования

Мы можем применять преобразования к RDD для управления его данными. После выполнения этой манипуляции мы получим совершенно новый RDD, поскольку RDD являются неизменяемыми объектами.

Мы проверим, как реализовать Map и Filter, два наиболее распространенных преобразования.

Сначала нам нужно создать JavaSparkContext и загрузить данные в виде RDD из файла Tourist.csv:

SparkConf conf = new SparkConf().setAppName("uppercaseCountries")
  .setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDD<String> tourists = sc.textFile("data/Tourist.csv");

«

JavaRDD<String> upperCaseCountries = tourists.map(line -> {
    String[] columns = line.split(COMMA_DELIMITER);
    return columns[1].toUpperCase();
}).distinct();

upperCaseCountries.saveAsTextFile("data/output/uppercase.txt");

«Далее давайте применим функцию карты, чтобы получить название страны из каждой записи и преобразовать название в верхний регистр. Мы можем сохранить этот вновь сгенерированный набор данных в виде текстового файла на диске:

JavaRDD<String> touristsInMexico = tourists
  .filter(line -> line.split(COMMA_DELIMITER)[1].equals("Mexico"));

touristsInMexico.saveAsTextFile("data/output/touristInMexico.txt");

Если мы хотим выбрать только конкретную страну, мы можем применить функцию фильтра к нашему исходному RDD туристов:

4.2. Действия

Действия вернут окончательное значение или сохранят результаты на диск после выполнения некоторых вычислений с данными.

В Spark регулярно используются два действия: Count и Reduce.

// Spark Context initialization and data load
JavaRDD<String> countries = tourists.map(line -> {
    String[] columns = line.split(COMMA_DELIMITER);
    return columns[1];
}).distinct();

Long numberOfCountries = countries.count();

Давайте подсчитаем общее количество стран в нашем CSV-файле:

Теперь мы посчитаем общие расходы по странам. Нам нужно будет отфильтровать записи, содержащие расходы в их описании.

JavaRDD<String> touristsExpenditure = tourists
  .filter(line -> line.split(COMMA_DELIMITER)[3].contains("expenditure"));

JavaPairRDD<String, Double> expenditurePairRdd = touristsExpenditure
  .mapToPair(line -> {
      String[] columns = line.split(COMMA_DELIMITER);
      return new Tuple2<>(columns[1], Double.valueOf(columns[6]));
});

List<Tuple2<String, Double>> totalByCountry = expenditurePairRdd
  .reduceByKey((x, y) -> x + y)
  .collect();

Вместо использования JavaRDD мы будем использовать JavaPairRDD. Пара RDD — это тип RDD, который может хранить пары ключ-значение. Давайте проверим это дальше:

5. Заключение

Подводя итог, мы должны использовать DataFrames или Datasets, когда нам нужны специфичные для предметной области API, нам нужны высокоуровневые выражения, такие как агрегация, сумма или SQL-запросы. . Или когда нам нужна безопасность типов во время компиляции.

С другой стороны, мы должны использовать RDD, когда данные неструктурированы и нам не нужно реализовывать определенную схему или когда нам нужны низкоуровневые преобразования и действия.