«1. Введение

В этом руководстве мы узнаем о Spring Integration Java DSL для создания интеграции приложений.

Мы возьмем интеграцию с перемещением файлов, созданную нами в разделе «Введение в Spring Integration», и вместо нее будем использовать DSL.

2. Зависимости

Spring Integration Java DSL является частью Spring Integration Core.

Итак, мы можем добавить эту зависимость:

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-core</artifactId>
    <version>5.0.6.RELEASE</version>
</dependency>

И для работы с нашим приложением для перемещения файлов нам также понадобится файл Spring Integration:

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-file</artifactId>
    <version>5.0.6.RELEASE</version>
</dependency>

3. Spring Integration Java DSL ~~ ~ До Java DSL пользователи настраивали компоненты Spring Integration в XML.

DSL вводит несколько быстрых компоновщиков, из которых мы можем легко создать полный конвейер Spring Integration исключительно на Java.

Итак, допустим, мы хотим создать канал, в котором все данные, поступающие по каналу, записываются в верхнем регистре.

В прошлом мы могли бы сделать:

А теперь вместо этого мы можем сделать:

<int:channel id="input"/>

<int:transformer input-channel="input" expression="payload.toUpperCase()" />

4. Приложение для перемещения файлов

@Bean
public IntegrationFlow upcaseFlow() {
    return IntegrationFlows.from("input")
      .transform(String::toUpperCase)
      .get();
}

Чтобы начать нашу интеграцию с перемещением файлов, мы понадобятся простые строительные блоки.

4.1. Поток интеграции

Первый строительный блок, который нам нужен, — это поток интеграции, который мы можем получить из конструктора IntegrationFlows:

from может принимать несколько типов, но в этом руководстве мы рассмотрим только три: ~ ~~ MessageSources MessageChannels и Strings

IntegrationFlows.from(...)

Вскоре мы поговорим обо всех трех.

    После вызова from нам стали доступны некоторые методы настройки:

В конечном счете, IntegrationFlows всегда будет создавать экземпляр IntegrationFlow, который является конечным продуктом любого приложения Spring Integration.

Этот шаблон ввода, выполнения соответствующих преобразований и выдачи результатов является фундаментальным для всех приложений Spring Integration.

IntegrationFlow flow = IntegrationFlows.from(sourceDirectory())
  .filter(onlyJpgs())
  .handle(targetDirectory())
  // add more components
  .get();

4.2. Описание источника ввода

Во-первых, чтобы переместить файлы, нам нужно указать нашему потоку интеграции, где он должен их искать, и для этого нам нужен MessageSource:

Проще говоря, MessageSource это место, откуда могут приходить сообщения, которые являются внешними по отношению к приложению.

В частности, нам нужно что-то, что может адаптировать этот внешний источник к представлению обмена сообщениями Spring. И поскольку эта адаптация ориентирована на ввод, их часто называют адаптерами входных каналов.

@Bean
public MessageSource<File> sourceDirectory() {
  // .. create a message source
}

Зависимость spring-integration-file дает нам адаптер входного канала, который отлично подходит для нашего варианта использования: FileReadingMessageSource:

Здесь наш FileReadingMessageSource будет читать каталог, заданный INPUT_DIR, и создаст из него MessageSource .

Давайте укажем это как наш источник в вызове IntegrationFlows.from:

@Bean
public MessageSource<File> sourceDirectory() {
    FileReadingMessageSource messageSource = new FileReadingMessageSource();
    messageSource.setDirectory(new File(INPUT_DIR));
    return messageSource;
}

4.3. Настройка источника ввода

Теперь, если мы думаем об этом как о долгоживущем приложении, мы, вероятно, захотим иметь возможность замечать файлы по мере их поступления, а не просто перемещать файлы, которые уже существуют при запуске.

IntegrationFlows.from(sourceDirectory());

Чтобы облегчить это, from также может использовать дополнительные конфигураторы для дальнейшей настройки источника ввода:

В этом случае мы можем сделать наш источник ввода более устойчивым, сказав Spring Integration опросить этот источник — наш файловая система в этом случае — каждые 10 секунд.

И, конечно же, это относится не только к нашему файловому источнику ввода, мы можем добавить этот опросник к любому MessageSource.

IntegrationFlows.from(sourceDirectory(), configurer -> configurer.poller(Pollers.fixedDelay(10000)));

4.4. Фильтрация сообщений из источника ввода

Далее, давайте предположим, что мы хотим, чтобы наше приложение для перемещения файлов перемещало только определенные файлы, скажем, файлы изображений с расширением jpg.

Для этого мы можем использовать GenericSelector:

Итак, давайте снова обновим наш поток интеграции:

Или, поскольку этот фильтр настолько прост, мы могли бы вместо этого определить его с помощью лямбда-выражения :

@Bean
public GenericSelector<File> onlyJpgs() {
    return new GenericSelector<File>() {

        @Override
        public boolean accept(File source) {
          return source.getName().endsWith(".jpg");
        }
    };
}

4.5. Обработка сообщений с помощью активаторов служб

IntegrationFlows.from(sourceDirectory())
  .filter(onlyJpgs());

Теперь, когда у нас есть отфильтрованный список файлов, нам нужно записать их в новое место.

IntegrationFlows.from(sourceDirectory())
  .filter(source -> ((File) source).getName().endsWith(".jpg"));

Активаторы служб — это то, к чему мы обращаемся, когда думаем о результатах в Spring Integration.

Давайте воспользуемся активатором сервиса FileWritingMessageHandler из файла spring-integration:

Здесь наш FileWritingMessageHandler будет записывать каждое полученное сообщение в OUTPUT_DIR.

«Снова обновим:

@Bean
public MessageHandler targetDirectory() {
    FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR));
    handler.setFileExistsMode(FileExistsMode.REPLACE);
    handler.setExpectReply(false);
    return handler;
}

И, кстати, обратите внимание на использование setExpectReply. Поскольку потоки интеграции могут быть двунаправленными, этот вызов указывает, что этот конкретный канал является односторонним.

4.6. Активация нашего потока интеграции

IntegrationFlows.from(sourceDirectory())
  .filter(onlyJpgs())
  .handle(targetDirectory());

Когда мы добавили все наши компоненты, нам нужно зарегистрировать наш IntegrationFlow как bean-компонент, чтобы активировать его:

Метод get извлекает экземпляр IntegrationFlow, который нам нужно зарегистрировать как Spring Bean.

Как только загружается контекст нашего приложения, активируются все наши компоненты, содержащиеся в нашем IntegrationFlow.

@Bean
public IntegrationFlow fileMover() {
    return IntegrationFlows.from(sourceDirectory(), c -> c.poller(Pollers.fixedDelay(10000)))
      .filter(onlyJpgs())
      .handle(targetDirectory())
      .get();
}

Теперь наше приложение начнет перемещать файлы из исходного каталога в целевой.

5. Дополнительные компоненты

В нашем приложении для перемещения файлов на основе DSL мы создали адаптер входящего канала, фильтр сообщений и активатор службы.

Давайте рассмотрим несколько других распространенных компонентов Spring Integration и посмотрим, как мы можем их использовать.

5.1. Каналы сообщений

Как упоминалось ранее, канал сообщений — это еще один способ инициализации потока:

Мы можем прочитать это как «пожалуйста, найдите или создайте компонент канала с именем anyChannel». Затем прочитайте любые данные, поступающие в anyChannel из других потоков».

Но на самом деле это более универсальное назначение.

IntegrationFlows.from("anyChannel")

Проще говоря, канал отделяет производителей от потребителей, и мы можем думать о нем как о очереди Java. Канал может быть вставлен в любой точке потока.

Предположим, например, что мы хотим установить приоритет файлов при их перемещении из одного каталога в другой:

Затем мы можем вставить вызов канала между нашим потоком:

Есть десятки каналов на выбор, некоторые из наиболее удобных предназначены для параллелизма, аудита или промежуточного сохранения (например, буферы Kafka или JMS).

@Bean
public PriorityChannel alphabetically() {
    return new PriorityChannel(1000, (left, right) -> 
      ((File)left.getPayload()).getName().compareTo(
        ((File)right.getPayload()).getName()));
}

Кроме того, каналы могут быть мощными в сочетании с мостами.

@Bean
public IntegrationFlow fileMover() {
    return IntegrationFlows.from(sourceDirectory())
      .filter(onlyJpgs())
      .channel("alphabetically")
      .handle(targetDirectory())
      .get();
}

5.2. Мост

Когда мы хотим объединить два канала, мы используем мост.

Давайте представим, что вместо прямой записи в выходной каталог наше приложение для перемещения файлов записывает данные в другой канал:

Теперь, поскольку мы просто записали его в канал, мы можем соединить оттуда в другие потоки.

Давайте создадим мост, который опрашивает наш резервуар для сообщений и записывает их в место назначения:

@Bean
public IntegrationFlow fileReader() {
    return IntegrationFlows.from(sourceDirectory())
      .filter(onlyJpgs())
      .channel("holdingTank")
      .get();
}

Опять же, поскольку мы писали в промежуточный канал, теперь мы можем добавить еще один поток, который принимает те же самые файлы и записывает их. их с разной скоростью:

Как мы видим, отдельные мосты могут управлять конфигурацией опроса для разных обработчиков.

@Bean
public IntegrationFlow fileWriter() {
    return IntegrationFlows.from("holdingTank")
      .bridge(e -> e.poller(Pollers.fixedRate(1, TimeUnit.SECONDS, 20)))
      .handle(targetDirectory())
      .get();
}

Как только наш контекст приложения загружен, у нас теперь есть более сложное приложение в действии, которое начнет перемещать файлы из исходного каталога в два целевых каталога.

@Bean
public IntegrationFlow anotherFileWriter() {
    return IntegrationFlows.from("holdingTank")
      .bridge(e -> e.poller(Pollers.fixedRate(2, TimeUnit.SECONDS, 10)))
      .handle(anotherTargetDirectory())
      .get();
}

6. Заключение

В этой статье мы рассмотрели различные способы использования Spring Integration Java DSL для построения различных конвейеров интеграции.

По сути, мы смогли воссоздать приложение для перемещения файлов из предыдущего руководства, на этот раз с использованием чистой Java.

Кроме того, мы рассмотрели несколько других компонентов, таких как каналы и мосты.

Полный исходный код, используемый в этом руководстве, доступен на Github.

«

The complete source code used in this tutorial is available over on Github.