«1. Введение
Spring Batch предоставляет два разных способа реализации задания: использование тасклетов и чанков.
В этой статье мы научимся настраивать и реализовывать оба метода на простом примере из реальной жизни.
2. Зависимости
Давайте начнем с добавления необходимых зависимостей:
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<version>4.3.0</version>
<scope>test</scope>
</dependency>
Чтобы получить последнюю версию spring-batch-core и spring-batch-test, обратитесь к Maven Central.
3. Пример использования
Давайте рассмотрим файл CSV со следующим содержимым:
Mae Hodges,10/22/1972
Gary Potter,02/22/1953
Betty Wise,02/17/1968
Wayne Rose,04/06/1977
Adam Caldwell,09/27/1995
Lucille Phillips,05/14/1992
Первая позиция каждой строки представляет имя человека, а вторая позиция представляет его/ее дату рождения.
Наш вариант использования — сгенерировать еще один CSV-файл, содержащий имя и возраст каждого человека:
Mae Hodges,45
Gary Potter,64
Betty Wise,49
Wayne Rose,40
Adam Caldwell,22
Lucille Phillips,25
Теперь, когда наша предметная область ясна, давайте продолжим и создадим решение, используя оба подхода. Начнем с тасклетов.
4. Подход с тасклетами
4.1. Введение и дизайн
Тасклеты предназначены для выполнения одной задачи за один шаг. Наша работа будет состоять из нескольких шагов, которые выполняются один за другим. Каждый шаг должен выполнять только одну определенную задачу.
Наша работа будет состоять из трех шагов:
- Read lines from the input CSV file.
- Calculate age for every person in the input CSV file.
- Write name and age of each person to a new output CSV file.
Теперь, когда общая картина готова, давайте создадим по одному классу на каждый шаг.
LinesReader будет отвечать за чтение данных из входного файла:
public class LinesReader implements Tasklet {
// ...
}
LinesProcessor рассчитает возраст для каждого человека в файле:
public class LinesProcessor implements Tasklet {
// ...
}
Наконец, LinesWriter будет отвечать за запись имен и состаривается в выходной файл:
public class LinesWriter implements Tasklet {
// ...
}
На этом этапе все наши шаги реализуют интерфейс Tasklet. Это заставит нас реализовать его метод execute:
@Override
public RepeatStatus execute(StepContribution stepContribution,
ChunkContext chunkContext) throws Exception {
// ...
}
В этом методе мы будем добавлять логику для каждого шага. Прежде чем начать с этого кода, давайте настроим нашу работу.
4.2. Конфигурация
Нам нужно добавить некоторую конфигурацию в контекст приложения Spring. После добавления стандартного объявления bean-компонента для классов, созданных в предыдущем разделе, мы готовы создать определение нашей работы:
@Configuration
@EnableBatchProcessing
public class TaskletsConfig {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
protected Step readLines() {
return steps
.get("readLines")
.tasklet(linesReader())
.build();
}
@Bean
protected Step processLines() {
return steps
.get("processLines")
.tasklet(linesProcessor())
.build();
}
@Bean
protected Step writeLines() {
return steps
.get("writeLines")
.tasklet(linesWriter())
.build();
}
@Bean
public Job job() {
return jobs
.get("taskletsJob")
.start(readLines())
.next(processLines())
.next(writeLines())
.build();
}
// ...
}
Это означает, что наша задача «taskletsJob» будет состоять из трех шагов. Первый (readLines) выполнит тасклет, определенный в bean-компоненте linesReader, и перейдет к следующему шагу: processLines. ProcessLines выполнит тасклет, определенный в bean-компоненте linesProcessor, и перейдет к последнему шагу: writeLines.
Наш рабочий поток определен, и мы готовы добавить немного логики!
4.3. Модель и утилиты
Поскольку мы будем манипулировать строками в файле CSV, мы собираемся создать класс Line:
public class Line implements Serializable {
private String name;
private LocalDate dob;
private Long age;
// standard constructor, getters, setters and toString implementation
}
Обратите внимание, что Line реализует Serializable. Это потому, что Line будет действовать как DTO для передачи данных между шагами. Согласно Spring Batch, объекты, которые передаются между шагами, должны быть сериализуемыми.
С другой стороны, мы можем начать думать о чтении и написании строк.
Для этого мы будем использовать OpenCSV:
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>4.1</version>
</dependency>
Найдите последнюю версию OpenCSV в Maven Central.
После включения OpenCSV мы также создадим класс FileUtils. Он предоставит методы для чтения и записи строк CSV:
public class FileUtils {
public Line readLine() throws Exception {
if (CSVReader == null)
initReader();
String[] line = CSVReader.readNext();
if (line == null)
return null;
return new Line(
line[0],
LocalDate.parse(
line[1],
DateTimeFormatter.ofPattern("MM/dd/yyyy")));
}
public void writeLine(Line line) throws Exception {
if (CSVWriter == null)
initWriter();
String[] lineStr = new String[2];
lineStr[0] = line.getName();
lineStr[1] = line
.getAge()
.toString();
CSVWriter.writeNext(lineStr);
}
// ...
}
Обратите внимание, что readLine действует как оболочка над методом openCSV readNext и возвращает объект Line.
Точно так же writeLine оборачивает writeNext OpenCSV, получая объект Line. Полную реализацию этого класса можно найти в проекте GitHub.
На данный момент мы готовы начать реализацию каждого шага.
4.4. LinesReader
Давайте продолжим и завершим наш класс LinesReader:
public class LinesReader implements Tasklet, StepExecutionListener {
private final Logger logger = LoggerFactory
.getLogger(LinesReader.class);
private List<Line> lines;
private FileUtils fu;
@Override
public void beforeStep(StepExecution stepExecution) {
lines = new ArrayList<>();
fu = new FileUtils(
"taskletsvschunks/input/tasklets-vs-chunks.csv");
logger.debug("Lines Reader initialized.");
}
@Override
public RepeatStatus execute(StepContribution stepContribution,
ChunkContext chunkContext) throws Exception {
Line line = fu.readLine();
while (line != null) {
lines.add(line);
logger.debug("Read line: " + line.toString());
line = fu.readLine();
}
return RepeatStatus.FINISHED;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeReader();
stepExecution
.getJobExecution()
.getExecutionContext()
.put("lines", this.lines);
logger.debug("Lines Reader ended.");
return ExitStatus.COMPLETED;
}
}
Метод execute LinesReader создает экземпляр FileUtils по пути к входному файлу. Затем добавляет строки в список, пока не останется строк для чтения.
Наш класс также реализует StepExecutionListener, который предоставляет два дополнительных метода: beforeStep и afterStep. Мы будем использовать эти методы для инициализации и закрытия вещей до и после запуска выполнения.
Если мы посмотрим на код afterStep, мы заметим строку, в которой список результатов (строк) помещается в контекст задания, чтобы сделать его доступным для следующего шага:
stepExecution
.getJobExecution()
.getExecutionContext()
.put("lines", this.lines);
«
«На данный момент наш первый шаг уже выполнил свою задачу: загрузить строки CSV в список в памяти. Давайте перейдем ко второму шагу и обработаем их.
4.5. LinesProcessor
public class LinesProcessor implements Tasklet, StepExecutionListener {
private Logger logger = LoggerFactory.getLogger(
LinesProcessor.class);
private List<Line> lines;
@Override
public void beforeStep(StepExecution stepExecution) {
ExecutionContext executionContext = stepExecution
.getJobExecution()
.getExecutionContext();
this.lines = (List<Line>) executionContext.get("lines");
logger.debug("Lines Processor initialized.");
}
@Override
public RepeatStatus execute(StepContribution stepContribution,
ChunkContext chunkContext) throws Exception {
for (Line line : lines) {
long age = ChronoUnit.YEARS.between(
line.getDob(),
LocalDate.now());
logger.debug("Calculated age " + age + " for line " + line.toString());
line.setAge(age);
}
return RepeatStatus.FINISHED;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
logger.debug("Lines Processor ended.");
return ExitStatus.COMPLETED;
}
}
LinesProcessor также реализует StepExecutionListener и, конечно же, Tasklet. Это означает, что он также реализует методы beforeStep, execute и afterStep:
Нетрудно понять, что он загружает список строк из контекста задания и вычисляет возраст каждого человека.
Нет необходимости помещать в контекст еще один список результатов, так как изменения происходят в том же объекте, что и на предыдущем шаге.
И мы готовы к нашему последнему шагу.
4.6. LinesWriter
public class LinesWriter implements Tasklet, StepExecutionListener {
private final Logger logger = LoggerFactory
.getLogger(LinesWriter.class);
private List<Line> lines;
private FileUtils fu;
@Override
public void beforeStep(StepExecution stepExecution) {
ExecutionContext executionContext = stepExecution
.getJobExecution()
.getExecutionContext();
this.lines = (List<Line>) executionContext.get("lines");
fu = new FileUtils("output.csv");
logger.debug("Lines Writer initialized.");
}
@Override
public RepeatStatus execute(StepContribution stepContribution,
ChunkContext chunkContext) throws Exception {
for (Line line : lines) {
fu.writeLine(line);
logger.debug("Wrote line " + line.toString());
}
return RepeatStatus.FINISHED;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeWriter();
logger.debug("Lines Writer ended.");
return ExitStatus.COMPLETED;
}
}
Задача LinesWriter состоит в том, чтобы просмотреть список строк и записать имя и возраст в выходной файл:
Мы закончили реализацию нашей работы! Давайте создадим тест, чтобы запустить его и посмотреть результаты.
4.7. Запуск задания
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = TaskletsConfig.class)
public class TaskletsTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Test
public void givenTaskletsJob_whenJobEnds_thenStatusCompleted()
throws Exception {
JobExecution jobExecution = jobLauncherTestUtils.launchJob();
assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
}
}
Чтобы запустить задание, мы создадим тест:
Аннотация ContextConfiguration указывает на класс конфигурации контекста Spring, в котором находится наше определение задания.
@Bean
public JobLauncherTestUtils jobLauncherTestUtils() {
return new JobLauncherTestUtils();
}
@Bean
public JobRepository jobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource());
factory.setTransactionManager(transactionManager());
return factory.getObject();
}
@Bean
public DataSource dataSource() {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName("org.sqlite.JDBC");
dataSource.setUrl("jdbc:sqlite:repository.sqlite");
return dataSource;
}
@Bean
public PlatformTransactionManager transactionManager() {
return new ResourcelessTransactionManager();
}
@Bean
public JobLauncher jobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository());
return jobLauncher;
}
Перед запуском теста нам нужно добавить пару дополнительных бинов:
Все готово! Заходи и запускай тест!
[main] DEBUG o.b.t.tasklets.LinesReader - Lines Reader initialized.
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.tasklets.LinesReader - Read line: [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.tasklets.LinesReader - Lines Reader ended.
[main] DEBUG o.b.t.tasklets.LinesProcessor - Lines Processor initialized.
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 45 for line [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 64 for line [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 49 for line [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 40 for line [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 22 for line [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Calculated age 25 for line [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.tasklets.LinesProcessor - Lines Processor ended.
[main] DEBUG o.b.t.tasklets.LinesWriter - Lines Writer initialized.
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Mae Hodges,10/22/1972,45]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Gary Potter,02/22/1953,64]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Betty Wise,02/17/1968,49]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Wayne Rose,04/06/1977,40]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Adam Caldwell,09/27/1995,22]
[main] DEBUG o.b.t.tasklets.LinesWriter - Wrote line [Lucille Phillips,05/14/1992,25]
[main] DEBUG o.b.t.tasklets.LinesWriter - Lines Writer ended.
После завершения задания файл output.csv имеет ожидаемое содержимое, а журналы показывают поток выполнения:
Это все для тасклетов. Теперь мы можем перейти к подходу Chunks.
5. Подход фрагментов
5.1. Введение и дизайн
Как следует из названия, этот подход выполняет действия над блоками данных. То есть вместо одновременного чтения, обработки и записи всех строк он будет читать, обрабатывать и записывать фиксированное количество записей (фрагментов) за раз.
Затем цикл повторяется до тех пор, пока в файле не останется данных.
- While there’re lines:
- Do for X amount of lines:
- Read one line
- Process one line
- Write X amount of lines.
- Do for X amount of lines:
В результате поток будет немного отличаться:
public class LineReader {
// ...
}
public class LineProcessor {
// ...
}
public class LinesWriter {
// ...
}
Итак, нам также нужно создать три bean-компонента для подхода, ориентированного на фрагменты:
Прежде чем перейти к реализации, давайте настроим наша работа.
@Configuration
@EnableBatchProcessing
public class ChunksConfig {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public ItemReader<Line> itemReader() {
return new LineReader();
}
@Bean
public ItemProcessor<Line, Line> itemProcessor() {
return new LineProcessor();
}
@Bean
public ItemWriter<Line> itemWriter() {
return new LinesWriter();
}
@Bean
protected Step processLines(ItemReader<Line> reader,
ItemProcessor<Line, Line> processor, ItemWriter<Line> writer) {
return steps.get("processLines").<Line, Line> chunk(2)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
public Job job() {
return jobs
.get("chunksJob")
.start(processLines(itemReader(), itemProcessor(), itemWriter()))
.build();
}
}
5.2. Конфигурация
Определение задания также будет выглядеть иначе:
В этом случае есть только один шаг, выполняющий только один тасклет.
Однако этот тасклет определяет средства чтения, записи и процессора, которые будут обрабатывать фрагменты данных.
Обратите внимание, что интервал фиксации указывает количество данных, которые должны быть обработаны в одном фрагменте. Наша работа будет читать, обрабатывать и записывать по две строки за раз.
Теперь мы готовы добавить нашу логику блоков!
5.3. LineReader
public class LineReader implements ItemReader<Line> {
@Override
public Line read() throws Exception {
Line line = fu.readLine();
if (line != null)
logger.debug("Read line: " + line.toString());
return line;
}
}
LineReader будет отвечать за чтение одной записи и возврат экземпляра Line с его содержимым.
public class LineReader implements
ItemReader<Line>, StepExecutionListener {
private final Logger logger = LoggerFactory
.getLogger(LineReader.class);
private FileUtils fu;
@Override
public void beforeStep(StepExecution stepExecution) {
fu = new FileUtils("taskletsvschunks/input/tasklets-vs-chunks.csv");
logger.debug("Line Reader initialized.");
}
@Override
public Line read() throws Exception {
Line line = fu.readLine();
if (line != null) logger.debug("Read line: " + line.toString());
return line;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeReader();
logger.debug("Line Reader ended.");
return ExitStatus.COMPLETED;
}
}
Чтобы стать читателем, наш класс должен реализовать интерфейс ItemReader:
Код прост, он просто читает одну строку и возвращает ее. Мы также реализуем StepExecutionListener для окончательной версии этого класса:
Следует отметить, что beforeStep и afterStep выполняются до и после всего шага соответственно.
5.4. LineProcessor
public class LineProcessor implements ItemProcessor<Line, Line> {
private Logger logger = LoggerFactory.getLogger(LineProcessor.class);
@Override
public Line process(Line line) throws Exception {
long age = ChronoUnit.YEARS
.between(line.getDob(), LocalDate.now());
logger.debug("Calculated age " + age + " for line " + line.toString());
line.setAge(age);
return line;
}
}
LineProcessor следует почти той же логике, что и LineReader.
public class LineProcessor implements
ItemProcessor<Line, Line>, StepExecutionListener {
private Logger logger = LoggerFactory.getLogger(LineProcessor.class);
@Override
public void beforeStep(StepExecution stepExecution) {
logger.debug("Line Processor initialized.");
}
@Override
public Line process(Line line) throws Exception {
long age = ChronoUnit.YEARS
.between(line.getDob(), LocalDate.now());
logger.debug(
"Calculated age " + age + " for line " + line.toString());
line.setAge(age);
return line;
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
logger.debug("Line Processor ended.");
return ExitStatus.COMPLETED;
}
}
Однако в этом случае мы реализуем ItemProcessor и его метод process():
Метод process() принимает входную строку, обрабатывает ее и возвращает выходную строку. Опять же, мы также реализуем StepExecutionListener:
public class LinesWriter implements
ItemWriter<Line>, StepExecutionListener {
private final Logger logger = LoggerFactory
.getLogger(LinesWriter.class);
private FileUtils fu;
@Override
public void beforeStep(StepExecution stepExecution) {
fu = new FileUtils("output.csv");
logger.debug("Line Writer initialized.");
}
@Override
public void write(List<? extends Line> lines) throws Exception {
for (Line line : lines) {
fu.writeLine(line);
logger.debug("Wrote line " + line.toString());
}
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
fu.closeWriter();
logger.debug("Line Writer ended.");
return ExitStatus.COMPLETED;
}
}
5.5. LinesWriter
В отличие от считывателя и процессора, LinesWriter запишет целую порцию строк, чтобы получить список строк:
Код LinesWriter говорит сам за себя. И снова мы готовы проверить нашу работу.
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = ChunksConfig.class)
public class ChunksTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Test
public void givenChunksJob_whenJobEnds_thenStatusCompleted()
throws Exception {
JobExecution jobExecution = jobLauncherTestUtils.launchJob();
assertEquals(ExitStatus.COMPLETED, jobExecution.getExitStatus());
}
}
5.6. Запуск задания
Мы создадим новый тест, такой же, как тот, который мы создали для подхода с тасклетами:
[main] DEBUG o.b.t.chunks.LineReader - Line Reader initialized.
[main] DEBUG o.b.t.chunks.LinesWriter - Line Writer initialized.
[main] DEBUG o.b.t.chunks.LineProcessor - Line Processor initialized.
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 45 for line [Mae Hodges,10/22/1972]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 64 for line [Gary Potter,02/22/1953]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Mae Hodges,10/22/1972,45]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Gary Potter,02/22/1953,64]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 49 for line [Betty Wise,02/17/1968]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 40 for line [Wayne Rose,04/06/1977]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Betty Wise,02/17/1968,49]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Wayne Rose,04/06/1977,40]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.chunks.LineReader - Read line: [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 22 for line [Adam Caldwell,09/27/1995]
[main] DEBUG o.b.t.chunks.LineProcessor - Calculated age 25 for line [Lucille Phillips,05/14/1992]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Adam Caldwell,09/27/1995,22]
[main] DEBUG o.b.t.chunks.LinesWriter - Wrote line [Lucille Phillips,05/14/1992,25]
[main] DEBUG o.b.t.chunks.LineProcessor - Line Processor ended.
[main] DEBUG o.b.t.chunks.LinesWriter - Line Writer ended.
[main] DEBUG o.b.t.chunks.LineReader - Line Reader ended.
После настройки ChunksConfig, как описано выше для TaskletsConfig, все готово для запуска теста!
Когда задание выполнено, мы видим, что output.csv снова содержит ожидаемый результат, а журналы описывают поток:
У нас тот же результат, но другой поток. Журналы показывают, как задание выполняется при таком подходе.
6. Заключение