«1. Обзор

В этой статье мы рассмотрим платформу Mantis, разработанную Netflix.

Мы изучим основные концепции Mantis, создав, запустив и исследуя задание потоковой обработки.

2. Что такое богомол?

Mantis — это платформа для создания приложений потоковой обработки (заданий). Он обеспечивает простой способ управления развертыванием и жизненным циклом заданий. Кроме того, это облегчает распределение ресурсов, обнаружение и связь между этими заданиями.

Таким образом, разработчики могут сосредоточиться на реальной бизнес-логике, имея при этом поддержку надежной и масштабируемой платформы для запуска своих неблокирующих приложений с малыми задержками.

Задание Mantis состоит из трех отдельных частей:

    источник, отвечающий за получение данных из внешнего источника, один или несколько этапов, отвечающих за обработку входящих потоков событий, и приемник, который собирает обработанные данные

Давайте теперь исследуем каждый из них.

3. Настройка и зависимости

Давайте начнем с добавления зависимостей mantis-runtime и jackson-databind:

<dependency>
    <groupId>io.mantisrx</groupId>
    <artifactId>mantis-runtime</artifactId>
</dependency>

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

Теперь, для настройки источника данных нашего задания, давайте реализуем интерфейс Mantis Source: ~~ ~

public class RandomLogSource implements Source<String> {

    @Override
    public Observable<Observable<String>> call(Context context, Index index) {
        return Observable.just(
          Observable
            .interval(250, TimeUnit.MILLISECONDS)
            .map(this::createRandomLogEvent));
    }

    private String createRandomLogEvent(Long tick) {
        // generate a random log entry string
        ...
    }

}

Как мы видим, он просто генерирует случайные записи журнала несколько раз в секунду.

4. Наше первое задание

Давайте теперь создадим задание Mantis, которое просто собирает события журнала из нашего RandomLogSource. Позже мы добавим групповые и агрегатные преобразования для более сложного и интересного результата.

Для начала создадим сущность LogEvent:

public class LogEvent implements JsonType {
    private Long index;
    private String level;
    private String message;

    // ...
}

Затем добавим наш TransformLogStage.

Это простой этап, который реализует интерфейс ScalarComputation и разбивает запись журнала для создания LogEvent. Кроме того, он отфильтровывает все неправильно отформатированные строки:

public class TransformLogStage implements ScalarComputation<String, LogEvent> {

    @Override
    public Observable<LogEvent> call(Context context, Observable<String> logEntry) {
        return logEntry
          .map(log -> log.split("#"))
          .filter(parts -> parts.length == 3)
          .map(LogEvent::new);
    }

}

4.1. Запуск задания

На данный момент у нас достаточно строительных блоков для сборки нашего задания Mantis:

public class LogCollectingJob extends MantisJobProvider<LogEvent> {

    @Override
    public Job<LogEvent> getJobInstance() {
        return MantisJob
          .source(new RandomLogSource())
          .stage(new TransformLogStage(), new ScalarToScalar.Config<>())
          .sink(Sinks.eagerSubscribe(Sinks.sse(LogEvent::toJsonString)))
          .metadata(new Metadata.Builder().build())
          .create();
    }

}

Давайте более подробно рассмотрим нашу работу.

Как мы видим, он расширяет MantisJobProvider. Сначала он извлекает данные из нашего RandomLogSource и применяет TransformLogStage к полученным данным. Наконец, он отправляет обработанные данные во встроенный приемник, который охотно подписывается и доставляет данные через SSE.

Теперь давайте настроим наше задание на локальное выполнение при запуске:

@SpringBootApplication
public class MantisApplication implements CommandLineRunner {

    // ...
 
    @Override
    public void run(String... args) {
        LocalJobExecutorNetworked.execute(new LogCollectingJob().getJobInstance());
    }
}

Запустим приложение. Мы увидим такое сообщение в журнале:

...
Serving modern HTTP SSE server sink on port: 86XX

Теперь подключимся к приемнику с помощью curl:

$ curl localhost:86XX
data: {"index":86,"level":"WARN","message":"login attempt"}
data: {"index":87,"level":"ERROR","message":"user created"}
data: {"index":88,"level":"INFO","message":"user created"}
data: {"index":89,"level":"INFO","message":"login attempt"}
data: {"index":90,"level":"INFO","message":"user created"}
data: {"index":91,"level":"ERROR","message":"user created"}
data: {"index":92,"level":"WARN","message":"login attempt"}
data: {"index":93,"level":"INFO","message":"user created"}
...

4.2. Настройка приемника

До сих пор мы использовали встроенный приемник для сбора обработанных данных. Давайте посмотрим, сможем ли мы добавить больше гибкости нашему сценарию, предоставив собственный приемник.

Что, если, например, мы хотим отфильтровать журналы по сообщениям?

Давайте создадим LogSink, который реализует интерфейс Sink\u003cLogEvent\u003e:

public class LogSink implements Sink<LogEvent> {
    @Override
    public void call(Context context, PortRequest portRequest, Observable<LogEvent> logEventObservable) {
        SelfDocumentingSink<LogEvent> sink = new ServerSentEventsSink.Builder<LogEvent>()
          .withEncoder(LogEvent::toJsonString)
          .withPredicate(filterByLogMessage())
          .build();
        logEventObservable.subscribe();
        sink.call(context, portRequest, logEventObservable);
    }
    private Predicate<LogEvent> filterByLogMessage() {
        return new Predicate<>("filter by message",
          parameters -> {
            if (parameters != null && parameters.containsKey("filter")) {
                return logEvent -> logEvent.getMessage().contains(parameters.get("filter").get(0));
            }
            return logEvent -> true;
        });
    }
}

В этой реализации приемника мы настроили предикат, который использует параметр filter для извлечения только тех журналов, которые содержат текст, заданный в параметре filter:

$ curl localhost:8874?filter=login
data: {"index":93,"level":"ERROR","message":"login attempt"}
data: {"index":95,"level":"INFO","message":"login attempt"}
data: {"index":97,"level":"ERROR","message":"login attempt"}
...

Примечание Mantis также предлагает мощный язык запросов MQL, который можно использовать для запроса, преобразования и анализа потоковых данных в стиле SQL.

5. Цепочка этапов

Теперь предположим, что нам интересно узнать, сколько записей в журнале ERROR, WARN или INFO имеется за заданный интервал времени. Для этого мы добавим в нашу работу еще два этапа и свяжем их вместе.

5.1. Группировка

Во-первых, давайте создадим GroupLogStage.

Этот этап представляет собой реализацию ToGroupComputation, которая получает данные потока LogEvent из существующего TransformLogStage. После этого он группирует записи по уровню ведения журнала и отправляет их на следующий этап:

public class GroupLogStage implements ToGroupComputation<LogEvent, String, LogEvent> {

    @Override
    public Observable<MantisGroup<String, LogEvent>> call(Context context, Observable<LogEvent> logEvent) {
        return logEvent.map(log -> new MantisGroup<>(log.getLevel(), log));
    }

    public static ScalarToGroup.Config<LogEvent, String, LogEvent> config(){
        return new ScalarToGroup.Config<LogEvent, String, LogEvent>()
          .description("Group event data by level")
          .codec(JacksonCodecs.pojo(LogEvent.class))
          .concurrentInput();
    }
    
}

Мы также создали пользовательскую конфигурацию этапа, предоставив описание, кодек, который будет использоваться для сериализации вывода, и разрешили этот этап вызовите метод для одновременного запуска с помощью concurrentInput().

«Следует отметить, что этот этап является горизонтально масштабируемым. Это означает, что мы можем запускать столько экземпляров этого этапа, сколько необходимо. Также стоит упомянуть, что при развертывании в кластере Mantis этот этап отправляет данные на следующий этап, так что все события, принадлежащие определенной группе, попадут на один и тот же рабочий процесс следующего этапа.

5.2. Агрегирование

Прежде чем мы продолжим и создадим следующий этап, давайте сначала добавим объект LogAggregate:

public class LogAggregate implements JsonType {

    private final Integer count;
    private final String level;

}

Теперь давайте создадим последний этап в цепочке.

На этом этапе реализуется GroupToScalarComputation и преобразуется поток групп журналов в скалярный LogAggregate. Он делает это, подсчитывая, сколько раз каждый тип журнала появляется в потоке. Кроме того, у него также есть параметр LogAggregationDuration, который можно использовать для управления размером окна агрегации:

public class CountLogStage implements GroupToScalarComputation<String, LogEvent, LogAggregate> {

    private int duration;

    @Override
    public void init(Context context) {
        duration = (int)context.getParameters().get("LogAggregationDuration", 1000);
    }

    @Override
    public Observable<LogAggregate> call(Context context, Observable<MantisGroup<String, LogEvent>> mantisGroup) {
        return mantisGroup
          .window(duration, TimeUnit.MILLISECONDS)
          .flatMap(o -> o.groupBy(MantisGroup::getKeyValue)
            .flatMap(group -> group.reduce(0, (count, value) ->  count = count + 1)
              .map((count) -> new LogAggregate(count, group.getKey()))
            ));
    }

    public static GroupToScalar.Config<String, LogEvent, LogAggregate> config(){
        return new GroupToScalar.Config<String, LogEvent, LogAggregate>()
          .description("sum events for a log level")
          .codec(JacksonCodecs.pojo(LogAggregate.class))
          .withParameters(getParameters());
    }

    public static List<ParameterDefinition<?>> getParameters() {
        List<ParameterDefinition<?>> params = new ArrayList<>();

        params.add(new IntParameter()
          .name("LogAggregationDuration")
          .description("window size for aggregation in milliseconds")
          .validator(Validators.range(100, 10000))
          .defaultValue(5000)
          .build());

        return params;
    }
    
}

5.3. Настройка и запуск задания

Осталось только настроить наше задание:

public class LogAggregationJob extends MantisJobProvider<LogAggregate> {

    @Override
    public Job<LogAggregate> getJobInstance() {

        return MantisJob
          .source(new RandomLogSource())
          .stage(new TransformLogStage(), TransformLogStage.stageConfig())
          .stage(new GroupLogStage(), GroupLogStage.config())
          .stage(new CountLogStage(), CountLogStage.config())
          .sink(Sinks.eagerSubscribe(Sinks.sse(LogAggregate::toJsonString)))
          .metadata(new Metadata.Builder().build())
          .create();
    }
}

Как только мы запустим приложение и выполним наше новое задание, мы увидим, что счетчики журналов извлекаются каждые несколько секунд:

$ curl localhost:8133
data: {"count":3,"level":"ERROR"}
data: {"count":13,"level":"INFO"}
data: {"count":4,"level":"WARN"}

data: {"count":8,"level":"ERROR"}
data: {"count":5,"level":"INFO"}
data: {"count":7,"level":"WARN"}
...

6. Заключение

Подводя итог, в этой статье мы увидели, что такое Netflix Mantis и для чего его можно использовать. Кроме того, мы рассмотрели основные концепции, использовали их для создания заданий и изучили настраиваемые конфигурации для различных сценариев.

Как всегда, полный код доступен на GitHub.