«1. Введение

Представьте, что нам нужно вручную выполнять такие задачи, как обработка платежных ведомостей, расчет процентов и выставление счетов. Это стало бы довольно скучным, подверженным ошибкам и бесконечным списком ручных задач!

В этом руководстве мы рассмотрим Java Batch Processing (JSR 352), часть платформы Jakarta EE, а также прекрасную спецификацию для автоматизации подобных задач. Он предлагает разработчикам приложений модель разработки надежных систем пакетной обработки, чтобы они могли сосредоточиться на бизнес-логике.

2. Зависимости Maven

Поскольку JSR 352 — это всего лишь спецификация, нам нужно будет включить его API и реализацию, например jberet:

<dependency>
    <groupId>javax.batch</groupId>
    <artifactId>javax.batch-api</artifactId>
    <version>1.0.1</version>
</dependency>
<dependency>
    <groupId>org.jberet</groupId>
    <artifactId>jberet-core</artifactId>
    <version>1.0.2.Final</version>
</dependency>
<dependency>
    <groupId>org.jberet</groupId>
    <artifactId>jberet-support</artifactId>
    <version>1.0.2.Final</version>
</dependency>
<dependency>
    <groupId>org.jberet</groupId>
    <artifactId>jberet-se</artifactId>
    <version>1.0.2.Final</version>
</dependency>

Мы также добавим базу данных в памяти, поэтому мы можем рассмотреть некоторые более реалистичные сценарии.

3. Ключевые концепции

JSR 352 вводит несколько концепций, которые мы можем рассмотреть следующим образом:

Давайте сначала определим каждую часть:

    Начиная слева, у нас есть JobOperator. Он управляет всеми аспектами обработки заданий, такими как запуск, остановка и перезапуск. Далее у нас есть задание. Задание — это логический набор шагов; он инкапсулирует весь пакетный процесс. Задание может содержать от 1 до n шагов. Каждый шаг представляет собой независимую последовательную единицу работы. Шаг состоит из чтения ввода, обработки этого ввода и записи вывода. И последнее, но не менее важное: у нас есть JobRepository, в котором хранится информация о выполняемых заданиях. Это помогает отслеживать задания, их состояние и результаты их выполнения

Шаги содержат немного больше деталей, поэтому давайте рассмотрим их далее. Сначала мы рассмотрим шаги Chunk, а затем Batchlets.

4. Создание чанка

Как было сказано ранее, чанк — это своего рода шаг. Мы будем часто использовать кусок для выражения операции, которая выполняется снова и снова, скажем, над набором элементов. Это что-то вроде промежуточных операций из Java Streams.

При описании чанка нам нужно указать, откуда брать элементы, как их обрабатывать и куда их потом отправлять.

4.1. Чтение элементов

Чтобы читать элементы, нам нужно реализовать ItemReader.

В этом случае мы создадим считыватель, который будет просто выдавать числа от 1 до 10:

@Named
public class SimpleChunkItemReader extends AbstractItemReader {
    private Integer[] tokens;
    private Integer count;
    
    @Inject
    JobContext jobContext;

    @Override
    public Integer readItem() throws Exception {
        if (count >= tokens.length) { 
            return null;
        }

        jobContext.setTransientUserData(count);
        return tokens[count++];
    }

    @Override
    public void open(Serializable checkpoint) throws Exception {
        tokens = new Integer[] { 1,2,3,4,5,6,7,8,9,10 };
        count = 0;
    }
}

Здесь мы просто читаем из внутреннего состояния класса. Но, конечно, readItem может извлекать данные из базы данных, из файловой системы или какого-либо другого внешнего источника.

Обратите внимание, что мы сохраняем часть этого внутреннего состояния, используя JobContext#setTransientUserData(), который пригодится позже.

Также обратите внимание на параметр контрольной точки. Мы тоже поднимем это снова.

4.2. Обработка элементов

Конечно, причина, по которой мы разбиваем элементы, заключается в том, что мы хотим выполнить какую-то операцию над нашими элементами!

Каждый раз, когда мы возвращаем null из обработчика элементов, мы удаляем этот элемент из пакета.

Итак, скажем здесь, что мы хотим оставить только четные числа. Мы можем использовать ItemProcessor, который отбрасывает нечетные, возвращая null:

@Named
public class SimpleChunkItemProcessor implements ItemProcessor {
    @Override
    public Integer processItem(Object t) {
        Integer item = (Integer) t;
        return item % 2 == 0 ? item : null;
    }
}

processItem будет вызываться один раз для каждого элемента, который генерирует наш ItemReader.

4.3. Запись элементов

Наконец, задание вызовет ItemWriter, чтобы мы могли записать наши преобразованные элементы:

@Named
public class SimpleChunkWriter extends AbstractItemWriter {
    List<Integer> processed = new ArrayList<>();
    @Override
    public void writeItems(List<Object> items) throws Exception {
        items.stream().map(Integer.class::cast).forEach(processed::add);
    }
}

Какова длина элементов? Через мгновение мы определим размер фрагмента, который будет определять размер списка, отправляемого в writeItems.

4.4. Определение фрагмента в задании

Теперь мы объединим все это в файл XML, используя JSL или язык спецификации заданий. Обратите внимание, что мы перечислим наш считыватель, процессор, блокировщик, а также размер блока:

<job id="simpleChunk">
    <step id="firstChunkStep" >
        <chunk item-count="3">
            <reader ref="simpleChunkItemReader"/>
            <processor ref="simpleChunkItemProcessor"/>
            <writer ref="simpleChunkWriter"/>
        </chunk>    
    </step>
</job>

Размер блока показывает, как часто прогресс в блоке фиксируется в репозитории заданий, что важно для гарантии завершения, если часть системы выходит из строя.

Нам нужно поместить этот файл в META-INF/batch-jobs для файлов .jar и в WEB-INF/classes/META-INF/batch-jobs для файлов .war.

Мы дали нашей работе идентификатор «simpleChunk», так что давайте попробуем это в модульном тесте.

Теперь задания выполняются асинхронно, что затрудняет их тестирование. В примере обязательно проверьте наш BatchTestHelper, который опрашивает и ждет завершения задания:

@Test
public void givenChunk_thenBatch_completesWithSuccess() throws Exception {
    JobOperator jobOperator = BatchRuntime.getJobOperator();
    Long executionId = jobOperator.start("simpleChunk", new Properties());
    JobExecution jobExecution = jobOperator.getJobExecution(executionId);
    jobExecution = BatchTestHelper.keepTestAlive(jobExecution);
    assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}

«

«Так вот что такое куски. Теперь давайте взглянем на пакеты.

5. Создание пакетной обработки

Не все четко укладывается в итеративную модель. Например, у нас может быть задача, которую нам просто нужно вызвать один раз, выполнить до завершения и вернуть статус выхода.

@Named
public class SimpleBatchLet extends AbstractBatchlet {
 
    @Override
    public String process() throws Exception {
        return BatchStatus.COMPLETED.toString();
    }
}

Контракт для пакета довольно прост:

<job id="simpleBatchLet">
    <step id="firstStep" >
        <batchlet ref="simpleBatchLet"/>
    </step>
</job>

Как и JSL:

@Test
public void givenBatchlet_thenBatch_completeWithSuccess() throws Exception {
    JobOperator jobOperator = BatchRuntime.getJobOperator();
    Long executionId = jobOperator.start("simpleBatchLet", new Properties());
    JobExecution jobExecution = jobOperator.getJobExecution(executionId);
    jobExecution = BatchTestHelper.keepTestAlive(jobExecution);
    assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}

И мы можем протестировать его, используя тот же подход, что и раньше:

Итак, мы рассмотрели несколько различных способов реализации шагов.

Теперь давайте рассмотрим механизмы маркировки и гарантии прогресса.

6. Пользовательская контрольная точка

Сбои обязательно случаются в середине работы. Должны ли мы просто начать все сначала, или мы можем как-то начать с того места, на котором остановились?

Как следует из названия, контрольные точки помогают нам периодически устанавливать закладки в случае сбоя.

По умолчанию конец обработки чанка является естественной контрольной точкой.

@Named
public class CustomCheckPoint extends AbstractCheckpointAlgorithm {
    
    @Inject
    JobContext jobContext;
    
    @Override
    public boolean isReadyToCheckpoint() throws Exception {
        int counterRead = (Integer) jobContext.getTransientUserData();
        return counterRead % 5 == 0;
    }
}

Однако мы можем настроить его с помощью нашего собственного алгоритма CheckpointAlgorithm:

Помните счетчик, который мы поместили в переходные данные ранее? Здесь мы можем вывести его с помощью JobContext#getTransientUserData, чтобы указать, что мы хотим фиксировать каждое 5-е обработанное число.

Без этого фиксация происходила бы в конце каждого фрагмента или, в нашем случае, каждого третьего числа.

<job id="customCheckPoint">
    <step id="firstChunkStep" >
        <chunk item-count="3" checkpoint-policy="custom">
            <reader ref="simpleChunkItemReader"/>
            <processor ref="simpleChunkItemProcessor"/>
            <writer ref="simpleChunkWriter"/>
            <checkpoint-algorithm ref="customCheckPoint"/>
        </chunk>    
    </step>
</job>

И затем мы сопоставляем это с директивой checkout-algorithm в нашем XML под нашим фрагментом:

@Test
public void givenChunk_whenCustomCheckPoint_thenCommitCountIsThree() throws Exception {
    // ... start job and wait for completion

    jobOperator.getStepExecutions(executionId)
      .stream()
      .map(BatchTestHelper::getCommitCount)
      .forEach(count -> assertEquals(3L, count.longValue()));
    assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}

Давайте протестируем код, снова отметив, что некоторые шаблонные шаги скрыты в BatchTestHelper: ~~ ~

Таким образом, мы могли бы ожидать, что количество коммитов будет равно 2, так как у нас есть десять элементов, и мы настроили коммиты для каждого 5-го элемента. Но в конце фреймворк выполняет еще одну финальную фиксацию чтения, чтобы убедиться, что все было обработано, что приводит нас к 3.

Далее давайте посмотрим, как обрабатывать ошибки.

7. Обработка исключений

По умолчанию оператор задания пометит наше задание как FAILED в случае исключения.

@Override
public Integer readItem() throws Exception {
    if (tokens.hasMoreTokens()) {
        String tempTokenize = tokens.nextToken();
        throw new RuntimeException();
    }
    return null;
}

Давайте изменим наш считыватель элементов, чтобы убедиться, что он не работает:

@Test
public void whenChunkError_thenBatch_CompletesWithFailed() throws Exception {
    // ... start job and wait for completion
    assertEquals(jobExecution.getBatchStatus(), BatchStatus.FAILED);
}

А затем проверим:

    Но мы можем переопределить это поведение по умолчанию несколькими способами:

skip- limit указывает количество исключений, которые этот шаг будет игнорировать до сбоя повторной попытки-limit указывает количество раз, которое оператор задания должен повторить шаг до сбоя. skippable-exception-class указывает набор исключений, которые будут игнорироваться при обработке блоков

<job id="simpleErrorSkipChunk" >
    <step id="errorStep" >
        <chunk checkpoint-policy="item" item-count="3" skip-limit="3" retry-limit="3">
            <reader ref="myItemReader"/>
            <processor ref="myItemProcessor"/>
            <writer ref="myItemWriter"/>
            <skippable-exception-classes>
                <include class="java.lang.RuntimeException"/>
                <include class="java.lang.UnsupportedOperationException"/>
            </skippable-exception-classes>
            <retryable-exception-classes>
                <include class="java.lang.IllegalArgumentException"/>
                <include class="java.lang.UnsupportedOperationException"/>
            </retryable-exception-classes>
        </chunk>
    </step>
</job>

можно отредактировать нашу работу так, чтобы она игнорировала RuntimeException, а также некоторые другие, просто для иллюстрации:

@Test
public void givenChunkError_thenErrorSkipped_CompletesWithSuccess() throws Exception {
   // ... start job and wait for completion
   jobOperator.getStepExecutions(executionId).stream()
     .map(BatchTestHelper::getProcessSkipCount)
     .forEach(skipCount -> assertEquals(1L, skipCount.longValue()));
   assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}

И теперь наш код будет проходить:

8. Выполнение нескольких шагов

Мы упоминали ранее, что задание может иметь любое количество шагов, так что давайте посмотрим на это сейчас.

8.1. Запуск следующего шага

По умолчанию каждый шаг является последним шагом в задании.

<job id="simpleJobSequence">
    <step id="firstChunkStepStep1" next="firstBatchStepStep2">
        <chunk item-count="3">
            <reader ref="simpleChunkItemReader"/>
            <processor ref="simpleChunkItemProcessor"/>
            <writer ref="simpleChunkWriter"/>
        </chunk>    
    </step>
    <step id="firstBatchStepStep2" >
        <batchlet ref="simpleBatchLet"/>
    </step>
</job>

Чтобы выполнить следующий шаг в пакетном задании, нам придется явно указать, используя следующий атрибут в определении шага:

Если мы забудем этот атрибут, то следующий шаг в последовательности не будет казнен.

@Test
public void givenTwoSteps_thenBatch_CompleteWithSuccess() throws Exception {
    // ... start job and wait for completion
    assertEquals(2 , jobOperator.getStepExecutions(executionId).size());
    assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}

И мы можем увидеть, как это выглядит в API:

8.2. Потоки

Последовательность шагов также может быть инкапсулирована в поток. Когда поток завершается, весь поток переходит к исполнительному элементу. Кроме того, элементы внутри потока не могут переходить к элементам вне потока.

<job id="flowJobSequence">
    <flow id="flow1" next="firstBatchStepStep3">
        <step id="firstChunkStepStep1" next="firstBatchStepStep2">
            <chunk item-count="3">
	        <reader ref="simpleChunkItemReader" />
		<processor ref="simpleChunkItemProcessor" />
		<writer ref="simpleChunkWriter" />
	    </chunk>
	</step>
	<step id="firstBatchStepStep2">
	    <batchlet ref="simpleBatchLet" />
	</step>
    </flow>
    <step id="firstBatchStepStep3">
	 <batchlet ref="simpleBatchLet" />
    </step>
</job>

Мы можем, скажем, выполнить два шага внутри потока, а затем этот поток перейдет к изолированному шагу:

@Test
public void givenFlow_thenBatch_CompleteWithSuccess() throws Exception {
    // ... start job and wait for completion
 
    assertEquals(3, jobOperator.getStepExecutions(executionId).size());
    assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}

И мы по-прежнему можем видеть выполнение каждого шага независимо:

8.3. Решения

У нас также есть поддержка if/else в виде решений. Решения обеспечивают настраиваемый способ определения последовательности шагов, потоков и разбиений.

Как и шаги, он работает с переходными элементами, такими как next, которые могут направлять или завершать выполнение задания.

<job id="decideJobSequence">
     <step id="firstBatchStepStep1" next="firstDecider">
	 <batchlet ref="simpleBatchLet" />
     </step>	
     <decision id="firstDecider" ref="deciderJobSequence">
        <next on="two" to="firstBatchStepStep2"/>
        <next on="three" to="firstBatchStepStep3"/>
     </decision>
     <step id="firstBatchStepStep2">
	<batchlet ref="simpleBatchLet" />
     </step>	
     <step id="firstBatchStepStep3">
	<batchlet ref="simpleBatchLet" />
     </step>		
</job>

Давайте посмотрим, как можно настроить задание:

Любой элемент решения должен быть настроен с помощью класса, реализующего Decider. Его задача — вернуть решение в виде строки.

Каждое следующее внутреннее решение похоже на case в операторе switch.

8.4. Разделяет

<job id="splitJobSequence">
   <split id="split1" next="splitJobSequenceStep3">
      <flow id="flow1">
	  <step id="splitJobSequenceStep1">
              <batchlet ref="simpleBatchLet" />
           </step>
      </flow>
      <flow id="flow2">
          <step id="splitJobSequenceStep2">
              <batchlet ref="simpleBatchLet" />
	  </step>
      </flow>
   </split>
   <step id="splitJobSequenceStep3">
      <batchlet ref="simpleBatchLet" />
   </step>
</job>

«Разделения удобны, так как они позволяют нам выполнять потоки одновременно:

Конечно, это означает, что порядок не гарантируется.

@Test
public void givenSplit_thenBatch_CompletesWithSuccess() throws Exception {
    // ... start job and wait for completion
    List<StepExecution> stepExecutions = jobOperator.getStepExecutions(executionId);

    assertEquals(3, stepExecutions.size());
    assertEquals("splitJobSequenceStep3", stepExecutions.get(2).getStepName());
    assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}

Давайте подтвердим, что они все еще запускаются. Шаги потока будут выполняться в произвольном порядке, но изолированный шаг всегда будет последним:

9. Разделение задания

Мы также можем использовать свойства пакета в нашем коде Java, которые были определены в наша работа.

Их можно разделить на три уровня: задание, шаг и пакетный артефакт.

Давайте посмотрим на несколько примеров того, как они потребляются.

@Inject
JobContext jobContext;
...
jobProperties = jobContext.getProperties();
...

Когда мы хотим использовать свойства на уровне задания:

@Inject
StepContext stepContext;
...
stepProperties = stepContext.getProperties();
...

Это можно использовать и на уровне шага:

@Inject
@BatchProperty(name = "name")
private String nameString;

Когда мы хотим использовать свойства на уровне пакетного артефакта:

Это удобно с разделами.

Видите, с разделением мы можем запускать потоки одновременно. Но мы также можем разделить шаг на n наборов элементов или задать отдельные входные данные, что дает нам еще один способ разделить работу на несколько потоков.

<job id="injectSimpleBatchLet">
    <properties>
        <property name="jobProp1" value="job-value1"/>
    </properties>
    <step id="firstStep">
        <properties>
            <property name="stepProp1" value="value1" />
        </properties>
	<batchlet ref="injectSimpleBatchLet">
	    <properties>
		<property name="name" value="#{partitionPlan['name']}" />
	    </properties>
	</batchlet>
	<partition>
	    <plan partitions="2">
		<properties partition="0">
		    <property name="name" value="firstPartition" />
		</properties>
		<properties partition="1">
		    <property name="name" value="secondPartition" />
		</properties>
	    </plan>
	</partition>
    </step>
</job>

Чтобы понять, какую часть работы должен выполнять каждый раздел, мы можем объединить свойства с разделами:

10. Остановить и перезапустить

Вот и все, что нужно для определения заданий. Теперь давайте поговорим об управлении ими.

JobOperator jobOperator = BatchRuntime.getJobOperator();

В наших модульных тестах мы уже видели, что можем получить экземпляр JobOperator из BatchRuntime:

Long executionId = jobOperator.start("simpleBatchlet", new Properties());

И затем мы можем запустить задание:

jobOperator.stop(executionId);

Однако мы также можем остановить задание:

executionId = jobOperator.restart(executionId, new Properties());

И, наконец, мы можем перезапустить задание:

@Test
public void givenBatchLetStarted_whenStopped_thenBatchStopped() throws Exception {
    JobOperator jobOperator = BatchRuntime.getJobOperator();
    Long executionId = jobOperator.start("simpleBatchLet", new Properties());
    JobExecution jobExecution = jobOperator.getJobExecution(executionId);
    jobOperator.stop(executionId);
    jobExecution = BatchTestHelper.keepTestStopped(jobExecution);
    assertEquals(jobExecution.getBatchStatus(), BatchStatus.STOPPED);
}

Давайте посмотрим, как мы можем остановить запущенное задание:

@Test
public void givenBatchLetStopped_whenRestarted_thenBatchCompletesSuccess() {
    // ... start and stop the job
 
    assertEquals(jobExecution.getBatchStatus(), BatchStatus.STOPPED);
    executionId = jobOperator.restart(jobExecution.getExecutionId(), new Properties());
    jobExecution = BatchTestHelper.keepTestAlive(jobOperator.getJobExecution(executionId));
 
    assertEquals(jobExecution.getBatchStatus(), BatchStatus.COMPLETED);
}

И если пакет ОСТАНОВЛЕН, то мы можем перезапустите его:

11. Получение заданий

Когда отправляется пакетное задание, среда выполнения пакета создает экземпляр JobExecution для его отслеживания.

Чтобы получить JobExecution для идентификатора выполнения, мы можем использовать метод JobOperator#getJobExecution(executionId).

Кроме того, StepExecution предоставляет полезную информацию для отслеживания выполнения шага.

Чтобы получить StepExecution для идентификатора выполнения, мы можем использовать метод JobOperator#getStepExecutions(executionId).

@Test
public void givenChunk_whenJobStarts_thenStepsHaveMetrics() throws Exception {
    // ... start job and wait for completion
    assertTrue(jobOperator.getJobNames().contains("simpleChunk"));
    assertTrue(jobOperator.getParameters(executionId).isEmpty());
    StepExecution stepExecution = jobOperator.getStepExecutions(executionId).get(0);
    Map<Metric.MetricType, Long> metricTest = BatchTestHelper.getMetricsMap(stepExecution.getMetrics());
    assertEquals(10L, metricTest.get(Metric.MetricType.READ_COUNT).longValue());
    assertEquals(5L, metricTest.get(Metric.MetricType.FILTER_COUNT).longValue());
    assertEquals(4L, metricTest.get(Metric.MetricType.COMMIT_COUNT).longValue());
    assertEquals(5L, metricTest.get(Metric.MetricType.WRITE_COUNT).longValue());
    // ... and many more!
}

Отсюда мы можем получить несколько метрик об этом шаге с помощью StepExecution#getMetrics:

12. Недостатки

    JSR 352 мощен, хотя и уступает в ряде областей:

Кажется, не хватает программ чтения и записи, которые могут обрабатывать другие форматы, такие как JSON. Нет поддержки дженериков. Разделение поддерживает только один шаг. асинхронный характер, тестирование может быть проблемой. API довольно многословный

13. Заключение

В этой статье мы рассмотрели JSR 352 и узнали о чанках, пакетах, разбиениях, потоках и многом другом. Тем не менее, мы едва поцарапали поверхность.