«1. Обзор
В нашем предыдущем введении в Spring Batch мы представили фреймворк как инструмент пакетной обработки. Мы также изучили детали конфигурации и реализацию однопоточного выполнения задания в одном процессе.
Для реализации задания с некоторой параллельной обработкой предоставляется ряд опций. На более высоком уровне есть два режима параллельной обработки:
- Single-Process, multi-threaded
- Multi-Process
В этой быстрой статье мы обсудим разбиение Step, которое может быть реализовано как для однопроцессных, так и для многопроцессорных заданий.
2. Разделение шага
Spring Batch с разделением предоставляет нам возможность разделить выполнение шага:
На рисунке выше показана реализация задания с разделенным шагом.
Существует шаг под названием «Master», выполнение которого разделено на несколько «Slave» шагов. Эти рабы могут занять место хозяина, и результат все равно не изменится. И ведущий, и подчиненный являются экземплярами Step. Подчиненные могут быть удаленными службами или просто локально исполняющими потоками.
При необходимости мы можем передать данные от мастера к слейву. Метаданные (т. е. JobRepository) гарантируют, что каждое ведомое устройство выполняется только один раз за одно выполнение задания.
Вот диаграмма последовательности, показывающая, как все это работает:
Как показано, PartitionStep управляет выполнением. PartitionHandler отвечает за разделение работы «Master» на «Slave». Самая правая Ступень — ведомая.
3. Maven POM
Зависимости Maven такие же, как упоминалось в нашей предыдущей статье. То есть Spring Core, Spring Batch и зависимость для базы данных (в нашем случае SQLite).
4. Конфигурация
В нашей вводной статье мы видели пример преобразования некоторых финансовых данных из файла CSV в файл XML. Давайте расширим тот же пример.
Здесь мы преобразуем финансовую информацию из 5 файлов CSV в соответствующие файлы XML, используя многопоточную реализацию.
Мы можем добиться этого, используя одно разделение Job и Step. У нас будет пять потоков, по одному для каждого файла CSV.
Прежде всего, давайте создадим задание:
@Bean(name = "partitionerJob")
public Job partitionerJob()
throws UnexpectedInputException, MalformedURLException, ParseException {
return jobs.get("partitioningJob")
.start(partitionStep())
.build();
}
Как мы видим, это задание начинается с шага разбиения. Это наш главный шаг, который будет разделен на несколько подчиненных шагов:
@Bean
public Step partitionStep()
throws UnexpectedInputException, MalformedURLException, ParseException {
return steps.get("partitionStep")
.partitioner("slaveStep", partitioner())
.step(slaveStep())
.taskExecutor(taskExecutor())
.build();
}
Здесь мы создадим PartitioningStep, используя StepBuilderFactory. Для этого нам нужно предоставить информацию о SlaveSteps и Partitioner.
Partitioner — это интерфейс, предоставляющий возможность определять набор входных значений для каждого из ведомых устройств. Другими словами, здесь находится логика для разделения задач на соответствующие потоки.
Давайте создадим его реализацию, названную CustomMultiResourcePartitioner, где мы поместим имена входных и выходных файлов в ExecutionContext для передачи на каждый подчиненный шаг:
public class CustomMultiResourcePartitioner implements Partitioner {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> map = new HashMap<>(gridSize);
int i = 0, k = 1;
for (Resource resource : resources) {
ExecutionContext context = new ExecutionContext();
Assert.state(resource.exists(), "Resource does not exist: "
+ resource);
context.putString(keyName, resource.getFilename());
context.putString("opFileName", "output"+k+++".xml");
map.put(PARTITION_KEY + i, context);
i++;
}
return map;
}
}
Мы также создайте bean-компонент для этого класса, где мы укажем исходный каталог для входных файлов:
@Bean
public CustomMultiResourcePartitioner partitioner() {
CustomMultiResourcePartitioner partitioner
= new CustomMultiResourcePartitioner();
Resource[] resources;
try {
resources = resoursePatternResolver
.getResources("file:src/main/resources/input/*.csv");
} catch (IOException e) {
throw new RuntimeException("I/O problems when resolving"
+ " the input file pattern.", e);
}
partitioner.setResources(resources);
return partitioner;
}
Мы определим подчиненный шаг, как и любой другой шаг с считывателем и писателем. Читатель и писатель будут такими же, как мы видели в нашем вводном примере, за исключением того, что они получат параметр имени файла из StepExecutionContext.
Обратите внимание, что эти bean-компоненты должны иметь пошаговую область действия, чтобы они могли получать параметры stepExecutionContext на каждом этапе. Если они не будут иметь пошаговую область действия, их bean-компоненты будут созданы изначально и не будут принимать имена файлов на уровне шага: аргументы равны нулю, потому что эти имена файлов не будут использоваться, так как они получат имена файлов из stepExecutionContext:
@StepScope
@Bean
public FlatFileItemReader<Transaction> itemReader(
@Value("#{stepExecutionContext[fileName]}") String filename)
throws UnexpectedInputException, ParseException {
FlatFileItemReader<Transaction> reader
= new FlatFileItemReader<>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
String[] tokens
= {"username", "userid", "transactiondate", "amount"};
tokenizer.setNames(tokens);
reader.setResource(new ClassPathResource("input/" + filename));
DefaultLineMapper<Transaction> lineMapper
= new DefaultLineMapper<>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
reader.setLinesToSkip(1);
reader.setLineMapper(lineMapper);
return reader;
}
@Bean
@StepScope
public ItemWriter<Transaction> itemWriter(Marshaller marshaller,
@Value("#{stepExecutionContext[opFileName]}") String filename)
throws MalformedURLException {
StaxEventItemWriter<Transaction> itemWriter
= new StaxEventItemWriter<Transaction>();
itemWriter.setMarshaller(marshaller);
itemWriter.setRootTagName("transactionRecord");
itemWriter.setResource(new ClassPathResource("xml/" + filename));
return itemWriter;
}
5. Заключение
@Bean
public Step slaveStep()
throws UnexpectedInputException, MalformedURLException, ParseException {
return steps.get("slaveStep").<Transaction, Transaction>chunk(1)
.reader(itemReader(null))
.writer(itemWriter(marshaller(), null))
.build();
}
В этом руководстве мы обсудили, как реализовать задание с параллельной обработкой с помощью Spring Batch. .
Как всегда, полная реализация этого примера доступна на GitHub.
«