«1. Обзор

В этой статье будут рассмотрены некоторые важные шаблоны корпоративной интеграции (EIP), поддерживаемые Apache Camel. Шаблоны интеграции помогают, предоставляя решения для стандартизированных способов интеграции систем.

Если вам нужно сначала ознакомиться с основами Apache Camel, обязательно посетите эту статью, чтобы освежить в памяти основы.

2. О EIP

Шаблоны корпоративной интеграции — это шаблоны проектирования, цель которых — предоставить решения проблем интеграции. Camel предоставляет реализации для многих из этих шаблонов. Чтобы увидеть полный список поддерживаемых шаблонов, перейдите по этой ссылке.

В этой статье мы рассмотрим шаблоны интеграции Маршрутизатор на основе содержимого, Переводчик сообщений, Многоадресную рассылку, Разделитель и Канал недоставленных сообщений.

2. Content Based Router

Content Based Router is a message router which routes a message to its destination based on a message header, part of payload or basically anything from message exchange which we consider as content.

It starts with choice() DSL statement followed by one or more when() DSL statements. Each when() contains a predicate expression which, if satisfied, will result in the execution of contained processing steps.

Let’s illustrate this EIP by defining a route which consumes files from one folder and moves them into two different folders depending on file extension. Our route is referenced in Spring XML file using custom XML syntax for Camel:

<bean id="contentBasedFileRouter" 
  class="com.baeldung.camel.file.ContentBasedFileRouter" />

<camelContext xmlns="http://camel.apache.org/schema/spring">
    <routeBuilder ref="contentBasedFileRouter" />
</camelContext>

Определение маршрута содержится в классе ContentBasedFileRouter, где файлы маршрутизируются из исходной папки в две разные папки назначения в зависимости от их расширения.

В качестве альтернативы мы могли бы использовать здесь подход конфигурации Spring Java вместо использования XML-файла Spring. Для этого нам нужно добавить в наш проект дополнительную зависимость:

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-spring-javaconfig</artifactId>
    <version>2.18.1</version>
</dependency>

Последнюю версию артефакта можно найти здесь.

После этого нам нужно расширить класс CamelConfiguration и переопределить метод route(), который будет ссылаться на ContentBasedFileRouter: оценка выражений и предикатов:

@Configuration
public class ContentBasedFileRouterConfig extends CamelConfiguration {

    @Bean
    ContentBasedFileRouter getContentBasedFileRouter() {
        return new ContentBasedFileRouter();
    }

    @Override
    public List<RouteBuilder> routes() {
        return Arrays.asList(getContentBasedFileRouter());
    }
}

Здесь мы дополнительно используем оператор DSL else() для маршрутизации всех сообщений, которые не удовлетворяют предикатам, заданным операторами when().

public class ContentBasedFileRouter extends RouteBuilder {

    private static final String SOURCE_FOLDER 
      = "src/test/source-folder";
    private static final String DESTINATION_FOLDER_TXT 
      = "src/test/destination-folder-txt";
    private static final String DESTINATION_FOLDER_OTHER 
      = "src/test/destination-folder-other";

    @Override
    public void configure() throws Exception {
        from("file://" + SOURCE_FOLDER + "?delete=true").choice()
          .when(simple("${file:ext} == 'txt'"))
          .to("file://" + DESTINATION_FOLDER_TXT).otherwise()
          .to("file://" + DESTINATION_FOLDER_OTHER);
    }
}

3. Переводчик сообщений

Поскольку каждая система использует свой собственный формат данных, часто требуется перевести сообщение, приходящее из другой системы, в формат данных, поддерживаемый целевой системой.

Camel поддерживает маршрутизатор MessageTranslator, который позволяет нам преобразовывать сообщения, используя либо пользовательский процессор в логике маршрутизации, либо специальный bean-компонент для выполнения преобразования, либо оператор DSL transform().

Пример использования пользовательского процессора можно найти в предыдущей статье, где мы определили процессор, который добавляет временную метку к имени каждого входящего файла.

Давайте теперь продемонстрируем, как использовать переводчик сообщений, используя оператор transform():

В этом примере мы добавляем имя файла к содержимому файла с помощью оператора transform() для каждого файла из исходной папки и перемещаем преобразованный файлы в папку назначения.

public class MessageTranslatorFileRouter extends RouteBuilder {
    private static final String SOURCE_FOLDER 
      = "src/test/source-folder";
    private static final String DESTINATION_FOLDER 
      = "src/test/destination-folder";

    @Override
    public void configure() throws Exception {
        from("file://" + SOURCE_FOLDER + "?delete=true")
          .transform(body().append(header(Exchange.FILE_NAME)))
          .to("file://" + DESTINATION_FOLDER);
    }
}

4. Многоадресная рассылка

Многоадресная рассылка позволяет нам направлять одно и то же сообщение в набор различных конечных точек и обрабатывать их по-разному.

Это возможно с помощью оператора multicast() DSL, а затем путем перечисления конечных точек и шагов обработки внутри них.

По умолчанию обработка на разных конечных точках не выполняется параллельно, но это можно изменить с помощью оператора DSL parallelProcessing().

По умолчанию Camel будет использовать последний ответ в качестве исходящего сообщения после многоадресной рассылки. Однако можно определить другую стратегию агрегирования, которая будет использоваться для сборки ответов от многоадресных рассылок.

Давайте посмотрим, как выглядит Multicast EIP на примере. Мы будем многоадресно передавать файлы из исходной папки по двум разным маршрутам, где мы будем преобразовывать их содержимое и отправлять их в разные папки назначения. Здесь мы используем компонент direct:, который позволяет нам связать два маршрута вместе:

5. Разделитель

public class MulticastFileRouter extends RouteBuilder {
    private static final String SOURCE_FOLDER 
      = "src/test/source-folder";
    private static final String DESTINATION_FOLDER_WORLD 
      = "src/test/destination-folder-world";
    private static final String DESTINATION_FOLDER_HELLO 
      = "src/test/destination-folder-hello";

    @Override
    public void configure() throws Exception {
        from("file://" + SOURCE_FOLDER + "?delete=true")
          .multicast()
          .to("direct:append", "direct:prepend").end();

        from("direct:append")
          .transform(body().append("World"))
          .to("file://" + DESTINATION_FOLDER_WORLD);

        from("direct:prepend")
           .transform(body().prepend("Hello"))
           .to("file://" + DESTINATION_FOLDER_HELLO);
    }
}

Разделитель позволяет нам разделить входящее сообщение на несколько частей и обрабатывать каждую из них по отдельности. Это возможно с помощью оператора DSL split().

В отличие от Multicast, Splitter изменит входящее сообщение, а Multicast оставит его как есть.

Чтобы продемонстрировать это на примере, мы определим маршрут, в котором каждая строка из файла разделяется и преобразуется в отдельный файл, который затем перемещается в другую папку назначения. Каждый новый файл будет создаваться с именем файла, равным содержимому файла:

6. Канал недоставленных писем

public class SplitterFileRouter extends RouteBuilder {
    private static final String SOURCE_FOLDER 
      = "src/test/source-folder";
    private static final String DESTINATION_FOLDER  
      = "src/test/destination-folder";

    @Override
    public void configure() throws Exception {
        from("file://" + SOURCE_FOLDER + "?delete=true")
          .split(body().convertToString().tokenize("\n"))
          .setHeader(Exchange.FILE_NAME, body())
          .to("file://" + DESTINATION_FOLDER);
    }
}

«Это распространено, и следует ожидать, что иногда могут возникать проблемы, например взаимоблокировки базы данных, которые могут привести к тому, что сообщение не будет доставлено должным образом. Однако в некоторых случаях повторная попытка с определенной задержкой может помочь, и сообщение будет обработано.

Канал недоставленных писем позволяет нам контролировать, что происходит с сообщением, когда оно не может быть доставлено. Используя канал недоставленных сообщений, мы можем указать, следует ли распространять выброшенное исключение вызывающей стороне и куда направить неудавшийся обмен.

Когда сообщение не может быть доставлено, канал недоставленных сообщений (если используется) переместит сообщение в конечную точку недоставленных сообщений.

Давайте продемонстрируем это на примере, создав исключение на маршруте:

Здесь мы определили errorHandler, который регистрирует неудачные доставки и определяет стратегию повторной доставки. При установке retryAttemptedLogLevel() каждая попытка повторной доставки будет регистрироваться с указанным уровнем журнала.

public class DeadLetterChannelFileRouter extends RouteBuilder {
    private static final String SOURCE_FOLDER 
      = "src/test/source-folder";

    @Override
    public void configure() throws Exception {
        errorHandler(deadLetterChannel("log:dead?level=ERROR")
          .maximumRedeliveries(3).redeliveryDelay(1000)
          .retryAttemptedLogLevel(LoggingLevel.ERROR));

        from("file://" + SOURCE_FOLDER + "?delete=true")
          .process(exchange -> {
            throw new IllegalArgumentException("Exception thrown!");
        });
    }
}

Для того, чтобы это было полнофункционально, нам дополнительно нужно настроить логгер.

После выполнения этого теста в консоли отображаются следующие операторы журнала:

Как вы можете заметить, каждая попытка повторной доставки записывается в журнал с отображением Exchange, для которого доставка не удалась.

ERROR DeadLetterChannel:156 - Failed delivery for 
(MessageId: ID-ZAG0025-50922-1481340325657-0-1 on 
ExchangeId: ID-ZAG0025-50922-1481340325657-0-2). 
On delivery attempt: 0 caught: java.lang.IllegalArgumentException: 
Exception thrown!
ERROR DeadLetterChannel:156 - Failed delivery for 
(MessageId: ID-ZAG0025-50922-1481340325657-0-1 on 
ExchangeId: ID-ZAG0025-50922-1481340325657-0-2). 
On delivery attempt: 1 caught: java.lang.IllegalArgumentException: 
Exception thrown!
ERROR DeadLetterChannel:156 - Failed delivery for 
(MessageId: ID-ZAG0025-50922-1481340325657-0-1 on 
ExchangeId: ID-ZAG0025-50922-1481340325657-0-2). 
On delivery attempt: 2 caught: java.lang.IllegalArgumentException: 
Exception thrown!
ERROR DeadLetterChannel:156 - Failed delivery for 
(MessageId: ID-ZAG0025-50922-1481340325657-0-1 on 
ExchangeId: ID-ZAG0025-50922-1481340325657-0-2). 
On delivery attempt: 3 caught: java.lang.IllegalArgumentException: 
Exception thrown!
ERROR dead:156 - Exchange[ExchangePattern: InOnly, 
BodyType: org.apache.camel.component.file.GenericFile, 
Body: [Body is file based: GenericFile[File.txt]]]

7. Заключение

В этой статье мы представили введение в шаблоны интеграции с использованием Apache Camel и продемонстрировали их на нескольких примерах.

Мы продемонстрировали, как использовать эти шаблоны интеграции и почему они полезны для решения задач интеграции.

Код из этой статьи можно найти на GitHub.

«