«1. Обзор

В этом руководстве мы рассмотрим поддержку транзакций в среде Spring Integration.

2. Транзакции в потоках сообщений

Spring поддерживает синхронизацию ресурсов с транзакциями, начиная с самых ранних версий. Мы часто используем его для синхронизации транзакций, управляемых несколькими менеджерами транзакций.

Например, мы можем синхронизировать фиксацию JMS с фиксацией JDBC.

С другой стороны, у нас также есть более сложные варианты использования в потоках сообщений. Они включают синхронизацию нетранзакционных ресурсов, а также различных типов транзакционных ресурсов.

Обычно потоки обмена сообщениями могут инициироваться двумя различными типами механизмов.

2.1. Потоки сообщений, инициированные пользовательским процессом

Некоторые потоки сообщений зависят от инициации сторонних процессов, таких как запуск сообщения в каком-либо канале сообщений или вызов метода шлюза сообщений.

Мы настраиваем поддержку транзакций для этих потоков через стандартную поддержку транзакций Spring. Потоки не должны быть явно настроены Spring Integration для поддержки транзакций. Поток сообщений Spring Integration естественным образом учитывает транзакционную семантику компонентов Spring.

Например, мы можем аннотировать ServiceActivator или его метод с помощью @Transactional:

@Transactional
public class TxServiceActivator {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    public void storeTestResult(String testResult) {
        this.jdbcTemplate.update("insert into STUDENT values(?)", testResult);
        log.info("Test result is stored: {}", testResult);
    }
}

Мы можем запустить метод storeTestResult из любого компонента, и транзакционный контекст будет применяться как обычно. При таком подходе у нас есть полный контроль над конфигурацией транзакции.

2.2. Потоки сообщений, инициированные процессом демона

Мы часто используем этот тип потока сообщений для автоматизации. Например, опросчик опрашивает очередь сообщений, чтобы инициировать новый поток сообщений с опрошенным сообщением, или планировщик, планирующий процесс путем создания нового сообщения и инициирования потока сообщений в заранее определенное время.

По сути, это потоки на основе триггеров, инициированные триггерным процессом (процессом демона). Для этих потоков мы должны предоставить некоторую конфигурацию транзакции для создания контекста транзакции всякий раз, когда начинается новый поток сообщений.

Через конфигурацию мы делегируем потоки существующей поддержке транзакций Spring.

Мы сосредоточимся на поддержке транзакций для этого типа потока сообщений в оставшейся части статьи.

3. Поддержка транзакций опросника

Опросчик — это распространенный компонент в потоках интеграции. Он периодически извлекает данные из различных источников и передает их по цепочке интеграции.

Spring Integration обеспечивает поддержку транзакций для опросников из коробки. Каждый раз, когда мы настраиваем компонент Poller, мы можем предоставить конфигурацию транзакций:

@Bean
@InboundChannelAdapter(value = "someChannel", poller = @Poller(value = "pollerMetadata"))
public MessageSource<File> someMessageSource() {
    ...
}

@Bean
public PollerMetadata pollerMetadata() {
    return Pollers.fixedDelay(5000)
      .advice(transactionInterceptor())
      .transactionSynchronizationFactory(transactionSynchronizationFactory)
      .get();
}

private TransactionInterceptor transactionInterceptor() {
    return new TransactionInterceptorBuilder()
      .transactionManager(txManager)
      .build();
}

Мы должны предоставить ссылку на TransactionManager и пользовательскую TransactionSynchronizationFactory, или мы можем полагаться на значения по умолчанию. Внутри нативная транзакция Spring оборачивает процесс. В результате все потоки сообщений, инициированные этим средством опроса, являются транзакционными.

4. Границы транзакции

Когда транзакция запускается, ее контекст всегда привязывается к текущему потоку. Независимо от того, сколько конечных точек и каналов у нас может быть в нашем потоке сообщений, наш контекст транзакции всегда будет сохраняться, пока поток живет в том же потоке.

Если мы нарушим его, инициировав новый поток в каком-либо сервисе, мы также нарушим границу транзакции. По сути, транзакция завершится в этот момент.

Если между потоками произошла успешная передача обслуживания, поток будет считаться успешным. Это зафиксирует транзакцию в этот момент, но поток будет продолжаться, и это все равно может привести к исключению где-то ниже по течению.

Следовательно, это исключение может вернуться к инициатору потока, так что транзакция может закончиться откатом. Вот почему мы должны использовать транзакционные каналы в любой точке, где может быть нарушена граница потока.

Например, мы должны использовать JMS, JDBC или какой-либо другой транзакционный канал.

5. Синхронизация транзакций

«В некоторых случаях полезно синхронизировать определенные операции с транзакцией, охватывающей весь поток.

Например, мы покажем, как использовать Poller, который читает входящий файл и на основе его содержимого выполняет обновление базы данных. Когда операция базы данных завершается, она также переименовывает файл в зависимости от успеха операции.

Прежде чем мы перейдем к примеру, важно понять, что этот подход синхронизирует операции в файловой системе с транзакцией. Это не делает файловую систему, которая по своей сути не является транзакционной, на самом деле становится транзакционной.

Транзакция начинается до опроса и либо фиксируется, либо откатывается после завершения потока, за которым следует синхронизированная операция в файловой системе.

Сначала мы определяем InboundChannelAdapter с простым Poller:

@Bean
@InboundChannelAdapter(value = "inputChannel", poller = @Poller(value = "pollerMetadata"))
public MessageSource<File> fileReadingMessageSource() {
    FileReadingMessageSource sourceReader = new FileReadingMessageSource();
    sourceReader.setDirectory(new File(INPUT_DIR));
    sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
    return sourceReader;
}

@Bean
public PollerMetadata pollerMetadata() {
    return Pollers.fixedDelay(5000)
      .advice(transactionInterceptor())
      .transactionSynchronizationFactory(transactionSynchronizationFactory)
      .get();
}

Poller содержит ссылку на TransactionManager, как объяснялось ранее. Кроме того, он также содержит ссылку на TransactionSynchronizationFactory. Этот компонент обеспечивает механизм синхронизации операций файловой системы с транзакцией:

@Bean
public TransactionSynchronizationFactory transactionSynchronizationFactory() {
    ExpressionEvaluatingTransactionSynchronizationProcessor processor =
      new ExpressionEvaluatingTransactionSynchronizationProcessor();

    SpelExpressionParser spelParser = new SpelExpressionParser();
 
    processor.setAfterCommitExpression(
      spelParser.parseExpression(
        "payload.renameTo(new java.io.File(payload.absolutePath + '.PASSED'))"));
 
    processor.setAfterRollbackExpression(
      spelParser.parseExpression(
        "payload.renameTo(new java.io.File(payload.absolutePath + '.FAILED'))"));

    return new DefaultTransactionSynchronizationFactory(processor);
}

Если транзакция фиксируется, TransactionSynchronizationFactory переименует файл, добавив «.PASSED» к имени файла. Однако, если он откатится, он добавит «.FAILED».

InputChannel преобразует полезную нагрузку с помощью FileToStringTransformer и делегирует ее toServiceChannel. Этот канал привязан к ServiceActivator:

@Bean
public MessageChannel inputChannel() {
    return new DirectChannel();
}
    
@Bean
@Transformer(inputChannel = "inputChannel", outputChannel = "toServiceChannel")
public FileToStringTransformer fileToStringTransformer() {
    return new FileToStringTransformer();
}

ServiceActivator считывает входящий файл, содержащий результаты экзамена студента. Он записывает результат в базу данных. Если результат содержит строку «fail», генерируется исключение, которое вызывает откат базы данных:

@ServiceActivator(inputChannel = "toServiceChannel")
public void serviceActivator(String payload) {

    jdbcTemplate.update("insert into STUDENT values(?)", payload);

    if (payload.toLowerCase().startsWith("fail")) {
        log.error("Service failure. Test result: {} ", payload);
        throw new RuntimeException("Service failure.");
    }

    log.info("Service success. Test result: {}", payload);
}

После успешной фиксации или отката операции базы данных TransactionSynchronizationFactory синхронизирует операцию файловой системы с ее результатом.

6. Заключение

В этой статье мы объяснили поддержку транзакций в среде Spring Integration. Кроме того, мы продемонстрировали, как синхронизировать транзакцию с операциями над нетранзакционным ресурсом, таким как файловая система.

Полный исходный код примера доступен на GitHub.