«1. Обзор

Spring Integration упрощает использование некоторых шаблонов корпоративной интеграции. Один из этих способов — через DSL.

В этом руководстве мы рассмотрим поддержку подпотоков DSL для упрощения некоторых наших конфигураций.

2. Наша задача

Допустим, у нас есть последовательность целых чисел, которую мы хотим разделить на три разные корзины.

И если бы мы хотели использовать Spring Integration для этого, мы могли бы начать с создания трех выходных каналов:

    Числа, такие как 0, 3, 6 и 9, перейдут в MultipleOfThreeChannel Числа, такие как 1, 4, 7 и 10 отправится в resterIsOneChannel, а числа вроде 2, 5, 8 и 11 перейдут в resterIsTwoChannel

Чтобы увидеть, насколько полезными могут быть подпотоки, давайте начнем с того, как это будет выглядеть без подпотоков.

И затем мы будем использовать подпотоки, чтобы упростить нашу настройку с помощью:

    publishSubscribeChannel routeToRecipients Filters, чтобы настроить нашу логику if-then Маршрутизаторы, чтобы настроить нашу логику коммутатора

3. Предпосылки

Теперь перед настройкой наши подпотоки, давайте создадим эти выходные каналы.

Мы создадим эти QueueChannels, так как это немного легче продемонстрировать:

@EnableIntegration
@IntegrationComponentScan
public class SubflowsConfiguration {
 
    @Bean
    QueueChannel multipleOfThreeChannel() {
        return new QueueChannel();
    }

    @Bean
    QueueChannel remainderIsOneChannel() {
        return new QueueChannel();
    }

    @Bean
    QueueChannel remainderIsTwoChannel() {
        return new QueueChannel();
    }

    boolean isMultipleOfThree(Integer number) {
       return number % 3 == 0;
    }

    boolean isRemainderIOne(Integer number) {
        return number % 3 == 1;
    }

    boolean isRemainderTwo(Integer number) {
        return number % 3 == 2;
    }
}

В конечном счете, это то, где наши сгруппированные числа окажутся в конечном итоге.

Обратите внимание, что Spring Integration может легко показаться сложным, поэтому мы добавим несколько вспомогательных методов для удобочитаемости.

4. Решение без подпотоков

Теперь нам нужно определить наши потоки.

Без подпотоков простая идея состоит в том, чтобы определить три отдельных потока интеграции, по одному для каждого типа числа.

Мы отправим одинаковую последовательность сообщений каждому компоненту IntegrationFlow, но выходные сообщения для каждого компонента будут разными.

4.1. Определение компонентов IntegrationFlow

Во-первых, давайте определим каждый bean-компонент IntegrationFlow в нашем классе SubflowConfiguration:

@Bean
public IntegrationFlow multipleOfThreeFlow() {
    return flow -> flow.split()
      .<Integer> filter(this::isMultipleOfThree)
      .channel("multipleOfThreeChannel");
}

Наш поток содержит две конечные точки — разделитель, за которым следует фильтр.

Фильтр делает то, на что он похож. Но зачем нам еще и разветвитель? Мы увидим это через минуту, но в основном он разбивает входную коллекцию на отдельные сообщения.

И мы, конечно, можем таким же образом определить еще два bean-компонента IntegrationFlow.

4.2. Шлюзы обмена сообщениями

Для каждого потока нам также нужен шлюз сообщений.

Проще говоря, они абстрагируют Spring Integration Messages API от вызывающей стороны, аналогично тому, как служба REST может абстрагироваться от HTTP:

@MessagingGateway
public interface NumbersClassifier {

    @Gateway(requestChannel = "multipleOfThreeFlow.input")
    void multipleOfThree(Collection<Integer> numbers);

    @Gateway(requestChannel = "remainderIsOneFlow.input")
    void remainderIsOne(Collection<Integer> numbers);

    @Gateway(requestChannel = "remainderIsTwoFlow.input")
    void remainderIsTwo(Collection<Integer> numbers);

}

Для каждого из них нам нужно использовать аннотацию @Gateway и указать неявный имя для входного канала, которое является просто именем bean-компонента, за которым следует «.input». Обратите внимание, что мы можем использовать это соглашение, потому что мы используем потоки на основе лямбда.

Эти методы являются точками входа в наши потоки.

4.3. Отправка сообщений и проверка вывода

А теперь давайте проверим:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = { SeparateFlowsConfiguration.class })
public class SeparateFlowsUnitTest {
 
    @Autowired
    private QueueChannel multipleOfThreeChannel;

    @Autowired
    private NumbersClassifier numbersClassifier;
    @Test
    public void whenSendMessagesToMultipleOf3Flow_thenOutputMultiplesOf3() {
        numbersClassifier.multipleOfThree(Arrays.asList(1, 2, 3, 4, 5, 6));
        Message<?> outMessage = multipleOfThreeChannel.receive(0);
        assertEquals(outMessage.getPayload(), 3);
        outMessage = multipleOfThreeChannel.receive(0);
        assertEquals(outMessage.getPayload(), 6);
        outMessage = multipleOfThreeChannel.receive(0);
        assertNull(outMessage);
    }
}

Обратите внимание, что мы отправили сообщения в виде списка, поэтому нам нужен разделитель, чтобы взять один «список». сообщение» и преобразовать его в несколько «цифровых сообщений».

Мы вызываем receive с o, чтобы получить следующее доступное сообщение без ожидания. Поскольку в нашем списке есть два числа, кратные трем, мы ожидаем, что сможем вызвать его дважды. Третий вызов для получения возвращает null.

Receive, конечно же, возвращает сообщение, поэтому мы вызываем getPayload для извлечения числа.

Точно так же мы могли бы сделать то же самое для двух других.

Итак, это было решение без подпотоков. У нас есть три отдельных потока для обслуживания и три отдельных метода шлюза.

Сейчас мы заменим три bean-компонента IntegrationFlow одним bean-компонентом и три метода шлюза одним bean-компонентом.

5. Использование publishSubscribeChannel

@Bean
public IntegrationFlow classify() {
    return flow -> flow.split()
        .publishSubscribeChannel(subscription -> 
           subscription
             .subscribe(subflow -> subflow
               .<Integer> filter(this::isMultipleOfThree)
               .channel("multipleOfThreeChannel"))
             .subscribe(subflow -> subflow
                .<Integer> filter(this::isRemainderOne)
                .channel("remainderIsOneChannel"))
             .subscribe(subflow -> subflow
                .<Integer> filter(this::isRemainderTwo)
                .channel("remainderIsTwoChannel")));
}

Метод publishSubscribeChannel() рассылает сообщения всем подписавшимся подпотокам. Таким образом, мы можем создать один поток вместо трех.

Таким образом, подпотоки являются анонимными, что означает, что к ним нельзя обращаться независимо друг от друга.

@Gateway(requestChannel = "classify.input")
void classify(Collection<Integer> numbers);

Теперь у нас есть только один поток, так что давайте также отредактируем наш NumbersClassifier:

@Test
public void whenSendMessagesToFlow_thenNumbersAreClassified() {
    numbersClassifier.classify(Arrays.asList(1, 2, 3, 4, 5, 6));

    // same assertions as before
}

«

«Теперь, поскольку у нас есть только один bean-компонент IntegrationFlow и один метод шлюза, нам нужно отправить наш список только один раз:

Обратите внимание, что с этого момента изменится только определение потока интеграции, поэтому мы не будем показывать тест. снова.

6. Использование routeToRecipients

Еще один способ добиться того же — это routeToRecipients, который удобен тем, что имеет встроенную фильтрацию.

Используя этот метод, мы можем указать как каналы, так и подпотоки для трансляции.

@Bean
public IntegrationFlow classify() {
    return flow -> flow.split()
        .routeToRecipients(route -> route
          .<Integer> recipient("multipleOfThreeChannel", 
            this::isMultipleOfThree)       
          .<Integer> recipient("remainderIsOneChannel", 
            this::isRemainderOne)
          .<Integer> recipient("remainderIsTwoChannel", 
            this::isRemainderTwo));
}

6.1. получатель

В приведенном ниже коде мы укажем Multipleof3Channel, resterIs1Channel и resterIsTwoChannel в качестве получателей на основе наших условий:

Мы также можем вызвать получателя без условия, и routeToRecipients будет безоговорочно публиковать в этот пункт назначения.

6.2. получательFlow

.routeToRecipients(route -> route
  .recipientFlow(subflow -> subflow
      .<Integer> filter(this::isMultipleOfThree)
      .channel("mutipleOfThreeChannel"))
  ...);

Обратите внимание, что routeToRecipients позволяет нам определить полный поток, как и publishSubscribeChannel.

Давайте изменим приведенный выше код и укажем анонимный подпоток в качестве первого получателя:

Этот подпоток будет получать всю последовательность сообщений, поэтому нам нужно фильтровать, как раньше, чтобы получить такое же поведение.

Опять же, нам хватило одного bean-компонента IntegrationFlow.

Теперь давайте перейдем к компонентам if-else. Один из них — Фильтр.

7. Использование потоков if-then

@Bean
public IntegrationFlow classify() {
    return flow -> flow.split()
        .<Integer> filter(this::isMultipleOfThree, 
           notMultiple -> notMultiple
             .discardFlow(oneflow -> oneflow
               .<Integer> filter(this::isRemainderOne,
                 twoflow -> twoflow
                   .discardChannel("remainderIsTwoChannel"))
               .channel("remainderIsOneChannel"))
        .channel("multipleofThreeChannel");
}

Мы уже использовали фильтр во всех предыдущих примерах. Хорошая новость заключается в том, что мы можем указать не только условие дальнейшей обработки, но и канал или поток для отбрасываемых сообщений.

    Мы можем рассматривать потоки и каналы сброса как блок else:

В этом случае мы реализовали нашу логику маршрутизации if-else:

Если число не кратно трем, затем отбросить эти сообщения в поток сброса; мы используем здесь поток, так как требуется больше логики, чтобы узнать его канал назначения. В потоке сброса, если число не равно единице, отбрасывайте эти сообщения в канал сброса.

8. включение вычисляемого значения

И, наконец, давайте попробуем метод route, который дает нам немного больше контроля, чем routeToRecipients. Это хорошо, потому что Router может разделить поток на любое количество частей, тогда как Filter может сделать только две.

@Bean
public IntegrationFlow classify() {
    return classify -> classify.split()
      .<Integer, Integer> route(number -> number % 3, 
        mapping -> mapping
         .channelMapping(0, "multipleOfThreeChannel")
         .channelMapping(1, "remainderIsOneChannel")
         .channelMapping(2, "remainderIsTwoChannel"));
}

8.1. channelMapping

route(p -> p % 3,...

Давайте определим наш bean-компонент IntegrationFlow:

channelMapping(0, "multipleof3Channel")

В приведенном выше коде мы вычисляем ключ маршрутизации, выполняя деление:

На основе этого ключа мы маршрутизируем сообщения:

.subFlowMapping(1, subflow -> subflow.channel("remainderIsOneChannel"))

~ ~~ 8.2. subFlowMapping

.subFlowMapping(2, subflow -> subflow
  .<Integer> handle((payload, headers) -> {
      // do extra work on the payload
     return payload;
  }))).channel("remainderIsTwoChannel");

Теперь, как и в других случаях, мы можем получить больше контроля, указав подпоток, заменив channelMapping на subFlowMapping:

Или еще больше контроля, вызвав метод handle вместо метода канала:

~ ~~ В этом случае подпоток возвращался бы к основному потоку после метода route(), поэтому там нам нужно было бы указать канал resterIsTwoChannel.

9. Заключение