1. Обзор

Это руководство представляет собой введение в Apache Storm, распределенную систему вычислений в реальном времени.

Мы сосредоточимся и рассмотрим:

Что такое Apache Storm и какие проблемы он решает Его архитектура и как его использовать в проекте

    2. Что такое Apache Storm?

Apache Storm — бесплатная распределенная система с открытым исходным кодом для вычислений в реальном времени.

Он обеспечивает отказоустойчивость, масштабируемость и гарантирует обработку данных, особенно хорош при обработке неограниченных потоков данных.

Некоторыми хорошими вариантами использования Storm могут быть обработка операций с кредитными картами для обнаружения мошенничества или обработка данных из умных домов для обнаружения неисправных датчиков.

Storm позволяет интегрироваться с различными базами данных и системами очередей, доступными на рынке.

3. Зависимость от Maven

Прежде чем использовать Apache Storm, нам нужно включить в наш проект зависимость storm-core:

Мы должны использовать предоставленную область, только если мы собираемся запускать наше приложение на кластер Шторм.

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.2.2</version>
    <scope>provided</scope>
</dependency>

Для локального запуска приложения мы можем использовать так называемый локальный режим, который будет имитировать кластер Storm в локальном процессе, в таком случае мы должны удалить предоставленный.

4. Модель данных

Модель данных Apache Storm состоит из двух элементов: кортежей и потоков.

4.1. Кортеж

Кортеж — это упорядоченный список именованных полей с динамическими типами. Это означает, что нам не нужно явно объявлять типы полей.

Storm должен знать, как сериализовать все значения, используемые в кортеже. По умолчанию он уже может сериализовать примитивные типы, строки и массивы байтов.

А поскольку Storm использует сериализацию Kryo, нам нужно зарегистрировать сериализатор с помощью Config, чтобы использовать пользовательские типы. Мы можем сделать это одним из двух способов:

Во-первых, мы можем зарегистрировать класс для сериализации, используя его полное имя:

В таком случае Kryo будет сериализовать класс, используя FieldSerializer. По умолчанию это сериализует все непереходные поля класса, как частные, так и общедоступные.

Config config = new Config();
config.registerSerialization(User.class);

Или вместо этого мы можем предоставить как класс для сериализации, так и сериализатор, который мы хотим, чтобы Storm использовал для этого класса:

Чтобы создать собственный сериализатор, нам нужно расширить универсальный класс Serializer, который имеет два метода. пиши и читай.

Config config = new Config();
config.registerSerialization(User.class, UserSerializer.class);

4.2. Поток

Поток — это основная абстракция в экосистеме Storm. Поток — это неограниченная последовательность кортежей.

Storms позволяет обрабатывать несколько потоков параллельно.

Каждый поток имеет идентификатор, который предоставляется и назначается во время объявления.

5. Топология

Логика приложения Storm реального времени упакована в топологию. Топология состоит из носиков и болтов.

5.1. Носик

Носик — это источник потоков. Они испускают кортежи в топологию.

Кортежи можно читать из различных внешних систем, таких как Kafka, Kestrel или ActiveMQ.

Носик может быть надежным или ненадежным. Надежный означает, что источник может ответить, что кортеж не может быть обработан Storm. Ненадежный означает, что носик не отвечает, так как он собирается использовать механизм «выстрелил-забыл» для отправки кортежей.

Чтобы создать собственный носик, нам нужно реализовать интерфейс IRichSpout или расширить любой класс, который уже реализует интерфейс, например, абстрактный класс BaseRichSpout.

Давайте создадим ненадежный носик:

Наш пользовательский RandomIntSpout будет генерировать случайное целое число и метку времени каждую секунду.

public class RandomIntSpout extends BaseRichSpout {

    private Random random;
    private SpoutOutputCollector outputCollector;

    @Override
    public void open(Map map, TopologyContext topologyContext,
      SpoutOutputCollector spoutOutputCollector) {
        random = new Random();
        outputCollector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        Utils.sleep(1000);
        outputCollector.emit(new Values(random.nextInt(), System.currentTimeMillis()));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("randomInt", "timestamp"));
    }
}

5.2. Bolt

Bolt обрабатывает кортежи в потоке. Они могут выполнять различные операции, такие как фильтрация, агрегирование или пользовательские функции.

Некоторые операции требуют нескольких шагов, поэтому в таких случаях нам нужно будет использовать несколько болтов.

Чтобы создать собственный Bolt, нам нужно реализовать интерфейс IRichBolt или для более простых операций IBasicBolt.

Для реализации Bolt также доступно несколько вспомогательных классов. В этом случае мы будем использовать BaseBasicBolt:

Этот пользовательский PrintingBolt просто выведет все кортежи на консоль.

public class PrintingBolt extends BaseBasicBolt {
    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        System.out.println(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}

6. Создание простой топологии

«Давайте объединим эти идеи в простую топологию. Наша топология будет иметь один носик и три болта.

6.1. RandomNumberSpout

Вначале мы создадим ненадежный носик. Он будет генерировать случайные целые числа из диапазона (0,100) каждую секунду:

6.2. FilteringBolt

public class RandomNumberSpout extends BaseRichSpout {
    private Random random;
    private SpoutOutputCollector collector;

    @Override
    public void open(Map map, TopologyContext topologyContext, 
      SpoutOutputCollector spoutOutputCollector) {
        random = new Random();
        collector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        Utils.sleep(1000);
        int operation = random.nextInt(101);
        long timestamp = System.currentTimeMillis();

        Values values = new Values(operation, timestamp);
        collector.emit(values);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("operation", "timestamp"));
    }
}

Далее мы создадим болт, который будет отфильтровывать все элементы с операцией, равной 0:

6.3. AggregatingBolt

public class FilteringBolt extends BaseBasicBolt {
    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        int operation = tuple.getIntegerByField("operation");
        if (operation > 0) {
            basicOutputCollector.emit(tuple.getValues());
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("operation", "timestamp"));
    }
}

Теперь давайте создадим более сложный Bolt, который будет агрегировать все положительные операции за каждый день.

Для этой цели мы будем использовать специальный класс, созданный специально для реализации болтов, которые работают с окнами, а не с отдельными кортежами: BaseWindowedBolt.

Окна — важная концепция потоковой обработки, разбивающая бесконечные потоки на конечные фрагменты. Затем мы можем применить вычисления к каждому фрагменту. Обычно существует два типа окон:

Временные окна используются для группировки элементов из заданного периода времени с использованием временных меток. Временные окна могут иметь разное количество элементов.

Окна Count используются для создания окон определенного размера. В таком случае все окна будут иметь одинаковый размер, и окно не будет сгенерировано, если элементов меньше определенного размера.

Наш AggregatingBolt будет генерировать сумму всех положительных операций из временного окна вместе с временными метками его начала и конца:

Обратите внимание, что в этом случае безопасно получить первый элемент списка напрямую. Это связано с тем, что каждое окно вычисляется с использованием поля временной метки кортежа, поэтому в каждом окне должен быть хотя бы один элемент.

public class AggregatingBolt extends BaseWindowedBolt {
    private OutputCollector outputCollector;
    
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.outputCollector = collector;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sumOfOperations", "beginningTimestamp", "endTimestamp"));
    }

    @Override
    public void execute(TupleWindow tupleWindow) {
        List<Tuple> tuples = tupleWindow.get();
        tuples.sort(Comparator.comparing(this::getTimestamp));

        int sumOfOperations = tuples.stream()
          .mapToInt(tuple -> tuple.getIntegerByField("operation"))
          .sum();
        Long beginningTimestamp = getTimestamp(tuples.get(0));
        Long endTimestamp = getTimestamp(tuples.get(tuples.size() - 1));

        Values values = new Values(sumOfOperations, beginningTimestamp, endTimestamp);
        outputCollector.emit(values);
    }

    private Long getTimestamp(Tuple tuple) {
        return tuple.getLongByField("timestamp");
    }
}

6.4. FileWritingBolt

Наконец, мы создадим болт, который будет принимать все элементы с sumOfOfOperations больше 2000, сериализовать их и записывать в файл:

Обратите внимание, что нам не нужно объявлять вывод как это будет последний болт в нашей топологии

public class FileWritingBolt extends BaseRichBolt {
    public static Logger logger = LoggerFactory.getLogger(FileWritingBolt.class);
    private BufferedWriter writer;
    private String filePath;
    private ObjectMapper objectMapper;

    @Override
    public void cleanup() {
        try {
            writer.close();
        } catch (IOException e) {
            logger.error("Failed to close writer!");
        }
    }

    @Override
    public void prepare(Map map, TopologyContext topologyContext, 
      OutputCollector outputCollector) {
        objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
        
        try {
            writer = new BufferedWriter(new FileWriter(filePath));
        } catch (IOException e) {
            logger.error("Failed to open a file for writing.", e);
        }
    }

    @Override
    public void execute(Tuple tuple) {
        int sumOfOperations = tuple.getIntegerByField("sumOfOperations");
        long beginningTimestamp = tuple.getLongByField("beginningTimestamp");
        long endTimestamp = tuple.getLongByField("endTimestamp");

        if (sumOfOperations > 2000) {
            AggregatedWindow aggregatedWindow = new AggregatedWindow(
                sumOfOperations, beginningTimestamp, endTimestamp);
            try {
                writer.write(objectMapper.writeValueAsString(aggregatedWindow));
                writer.newLine();
                writer.flush();
            } catch (IOException e) {
                logger.error("Failed to write data to file.", e);
            }
        }
    }
    
    // public constructor and other methods
}

6.5. Запуск топологии

Наконец, мы можем собрать все воедино и запустить нашу топологию:

Чтобы данные проходили через каждую часть топологии, нам нужно указать, как их соединить. shuffleGroup позволяет нам заявить, что данные для filteringBolt будут поступать из randomNumberSpout.

public static void runTopology() {
    TopologyBuilder builder = new TopologyBuilder();

    Spout random = new RandomNumberSpout();
    builder.setSpout("randomNumberSpout");

    Bolt filtering = new FilteringBolt();
    builder.setBolt("filteringBolt", filtering)
      .shuffleGrouping("randomNumberSpout");

    Bolt aggregating = new AggregatingBolt()
      .withTimestampField("timestamp")
      .withLag(BaseWindowedBolt.Duration.seconds(1))
      .withWindow(BaseWindowedBolt.Duration.seconds(5));
    builder.setBolt("aggregatingBolt", aggregating)
      .shuffleGrouping("filteringBolt"); 
      
    String filePath = "./src/main/resources/data.txt";
    Bolt file = new FileWritingBolt(filePath);
    builder.setBolt("fileBolt", file)
      .shuffleGrouping("aggregatingBolt");

    Config config = new Config();
    config.setDebug(false);
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("Test", config, builder.createTopology());
}

Для каждого болта нам нужно добавить shuffleGroup, который определяет источник элементов для этого болта. Источником элементов может быть Носик или другой Болт. И если мы установим один и тот же источник для более чем одного болта, источник будет излучать все элементы для каждого из них.

В этом случае наша топология будет использовать LocalCluster для локального запуска задания.

7. Заключение

В этом руководстве мы представили Apache Storm, распределенную систему вычислений в реальном времени. Мы создали носик, несколько болтов и стянули их вместе в полную топологию.

И, как всегда, все примеры кода можно найти на GitHub.

«