«1. Введение
В этой статье мы сосредоточимся на практическом, ориентированном на код введении в Spring Batch. Spring Batch — это платформа обработки, предназначенная для надежного выполнения заданий.
Текущая версия 4.3 поддерживает Spring 5 и Java 8. Она также поддерживает JSR-352, новую спецификацию Java для пакетной обработки.
Вот несколько интересных и практических вариантов использования фреймворка.
2. Основы рабочего процесса
Пакет Spring следует традиционной пакетной архитектуре, в которой репозиторий заданий выполняет работу по планированию и взаимодействию с заданием.
Задание может состоять из нескольких шагов, и каждый шаг обычно следует последовательности чтения данных, их обработки и записи.
И, конечно же, фреймворк сделает за нас большую часть тяжелой работы — особенно когда речь идет о низкоуровневой персистентности работы с заданиями — используя sqlite для репозитория заданий.
2.1. Наш пример использования
Простой вариант использования, который мы здесь рассмотрим, — мы собираемся перенести некоторые данные финансовых транзакций из CSV в XML.
Входной файл имеет очень простую структуру — он содержит транзакцию в каждой строке, состоящую из: имени пользователя, идентификатора пользователя, даты транзакции и суммы:
username, userid, transaction_date, transaction_amount
devendra, 1234, 31/10/2015, 10000
john, 2134, 3/12/2015, 12321
robin, 2134, 2/02/2015, 23411
3. Maven POM
Для этого проекта требуются зависимости Spring Core, Spring Batch и sqlite JDBC Connector:
<!-- SQLite database driver -->
<dependency>
<groupId>org.xerial</groupId>
<artifactId>sqlite-jdbc</artifactId>
<version>3.15.1</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
<version>5.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>5.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>4.3.0</version>
</dependency>
4. Spring Batch Config
Первое, что мы сделаем, это настроим Spring Batch с помощью XML:
<!-- connect to SQLite database -->
<bean id="dataSource"
class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<property name="driverClassName" value="org.sqlite.JDBC" />
<property name="url" value="jdbc:sqlite:repository.sqlite" />
<property name="username" value="" />
<property name="password" value="" />
</bean>
<!-- create job-meta tables automatically -->
<jdbc:initialize-database data-source="dataSource">
<jdbc:script
location="org/springframework/batch/core/schema-drop-sqlite.sql" />
<jdbc:script location="org/springframework/batch/core/schema-sqlite.sql" />
</jdbc:initialize-database>
<!-- stored job-meta in memory -->
<!--
<bean id="jobRepository"
class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<property name="transactionManager" ref="transactionManager" />
</bean>
-->
<!-- stored job-meta in database -->
<bean id="jobRepository"
class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
<property name="dataSource" ref="dataSource" />
<property name="transactionManager" ref="transactionManager" />
<property name="databaseType" value="sqlite" />
</bean>
<bean id="transactionManager" class=
"org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
Конечно, конфигурация Java также доступна:
@Configuration
@EnableBatchProcessing
public class SpringConfig {
@Value("org/springframework/batch/core/schema-drop-sqlite.sql")
private Resource dropReopsitoryTables;
@Value("org/springframework/batch/core/schema-sqlite.sql")
private Resource dataReopsitorySchema;
@Bean
public DataSource dataSource() {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName("org.sqlite.JDBC");
dataSource.setUrl("jdbc:sqlite:repository.sqlite");
return dataSource;
}
@Bean
public DataSourceInitializer dataSourceInitializer(DataSource dataSource)
throws MalformedURLException {
ResourceDatabasePopulator databasePopulator =
new ResourceDatabasePopulator();
databasePopulator.addScript(dropReopsitoryTables);
databasePopulator.addScript(dataReopsitorySchema);
databasePopulator.setIgnoreFailedDrops(true);
DataSourceInitializer initializer = new DataSourceInitializer();
initializer.setDataSource(dataSource);
initializer.setDatabasePopulator(databasePopulator);
return initializer;
}
private JobRepository getJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource());
factory.setTransactionManager(getTransactionManager());
factory.afterPropertiesSet();
return (JobRepository) factory.getObject();
}
private PlatformTransactionManager getTransactionManager() {
return new ResourcelessTransactionManager();
}
public JobLauncher getJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(getJobRepository());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
}
5. Spring Batch Job Config
Давайте теперь напишем описание нашей работы для работы с CSV в XML:
<import resource="spring.xml" />
<bean id="record" class="com.baeldung.spring_batch_intro.model.Transaction"></bean>
<bean id="itemReader"
class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="input/record.csv" />
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean class=
"org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="names" value="username,userid,transactiondate,amount" />
</bean>
</property>
<property name="fieldSetMapper">
<bean class="com.baeldung.spring_batch_intro.service.RecordFieldSetMapper" />
</property>
</bean>
</property>
</bean>
<bean id="itemProcessor"
class="com.baeldung.spring_batch_intro.service.CustomItemProcessor" />
<bean id="itemWriter"
class="org.springframework.batch.item.xml.StaxEventItemWriter">
<property name="resource" value="file:xml/output.xml" />
<property name="marshaller" ref="recordMarshaller" />
<property name="rootTagName" value="transactionRecord" />
</bean>
<bean id="recordMarshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
<property name="classesToBeBound">
<list>
<value>com.baeldung.spring_batch_intro.model.Transaction</value>
</list>
</property>
</bean>
<batch:job id="firstBatchJob">
<batch:step id="step1">
<batch:tasklet>
<batch:chunk reader="itemReader" writer="itemWriter"
processor="itemProcessor" commit-interval="10">
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
И конечно, аналогичная конфигурация задания на основе Java:
public class SpringBatchConfig {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Value("input/record.csv")
private Resource inputCsv;
@Value("file:xml/output.xml")
private Resource outputXml;
@Bean
public ItemReader<Transaction> itemReader()
throws UnexpectedInputException, ParseException {
FlatFileItemReader<Transaction> reader = new FlatFileItemReader<Transaction>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
String[] tokens = { "username", "userid", "transactiondate", "amount" };
tokenizer.setNames(tokens);
reader.setResource(inputCsv);
DefaultLineMapper<Transaction> lineMapper =
new DefaultLineMapper<Transaction>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
reader.setLineMapper(lineMapper);
return reader;
}
@Bean
public ItemProcessor<Transaction, Transaction> itemProcessor() {
return new CustomItemProcessor();
}
@Bean
public ItemWriter<Transaction> itemWriter(Marshaller marshaller)
throws MalformedURLException {
StaxEventItemWriter<Transaction> itemWriter =
new StaxEventItemWriter<Transaction>();
itemWriter.setMarshaller(marshaller);
itemWriter.setRootTagName("transactionRecord");
itemWriter.setResource(outputXml);
return itemWriter;
}
@Bean
public Marshaller marshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setClassesToBeBound(new Class[] { Transaction.class });
return marshaller;
}
@Bean
protected Step step1(ItemReader<Transaction> reader,
ItemProcessor<Transaction, Transaction> processor,
ItemWriter<Transaction> writer) {
return steps.get("step1").<Transaction, Transaction> chunk(10)
.reader(reader).processor(processor).writer(writer).build();
}
@Bean(name = "firstBatchJob")
public Job job(@Qualifier("step1") Step step1) {
return jobs.get("firstBatchJob").start(step1).build();
}
}
Итак, теперь, когда у нас есть вся конфигурация, давайте разберем ее и начнем обсуждать.
5.1. Чтение данных и создание объектов с помощью ItemReader
Сначала мы настроили cvsFileItemReader, который будет считывать данные из record.csv и преобразовывать их в объект Transaction:
@SuppressWarnings("restriction")
@XmlRootElement(name = "transactionRecord")
public class Transaction {
private String username;
private int userId;
private LocalDateTime transactionDate;
private double amount;
/* getters and setters for the attributes */
@Override
public String toString() {
return "Transaction [username=" + username + ", userId=" + userId
+ ", transactionDate=" + transactionDate + ", amount=" + amount
+ "]";
}
}
Для этого используется специальный преобразователь. :
public class RecordFieldSetMapper implements FieldSetMapper<Transaction> {
public Transaction mapFieldSet(FieldSet fieldSet) throws BindException {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("d/M/yyy");
Transaction transaction = new Transaction();
transaction.setUsername(fieldSet.readString("username"));
transaction.setUserId(fieldSet.readInt(1));
transaction.setAmount(fieldSet.readDouble(3));
String dateString = fieldSet.readString(2);
transaction.setTransactionDate(LocalDate.parse(dateString, formatter).atStartOfDay());
return transaction;
}
}
5.2. Обработка данных с помощью ItemProcessor
Мы создали собственный обработчик элементов данных CustomItemProcessor. Это не обрабатывает ничего, связанного с объектом транзакции — все, что он делает, это передает исходный объект, поступающий от считывателя к писателю:
public class CustomItemProcessor implements ItemProcessor<Transaction, Transaction> {
public Transaction process(Transaction item) {
return item;
}
}
5.3. Запись объектов в файловую систему с помощью ItemWriter
Наконец, мы сохраним эту транзакцию в файле xml, расположенном по адресу xml/output.xml:
<bean id="itemWriter"
class="org.springframework.batch.item.xml.StaxEventItemWriter">
<property name="resource" value="file:xml/output.xml" />
<property name="marshaller" ref="recordMarshaller" />
<property name="rootTagName" value="transactionRecord" />
</bean>
5.4. Настройка пакетного задания
Итак, все, что нам нужно сделать, это соединить точки с заданием, используя синтаксис batch:job.
Обратите внимание на интервал фиксации — это количество транзакций, которые должны храниться в памяти перед фиксацией пакета в itemWriter; он будет хранить транзакции в памяти до этого момента (или до тех пор, пока не встретится конец входных данных):
<batch:job id="firstBatchJob">
<batch:step id="step1">
<batch:tasklet>
<batch:chunk reader="itemReader" writer="itemWriter"
processor="itemProcessor" commit-interval="10">
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
5.5. Running the Batch Job
That’s it – let’s now set up and run everything:
public class App {
public static void main(String[] args) {
// Spring Java config
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.register(SpringConfig.class);
context.register(SpringBatchConfig.class);
context.refresh();
JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
Job job = (Job) context.getBean("firstBatchJob");
System.out.println("Starting the batch job");
try {
JobExecution execution = jobLauncher.run(job, new JobParameters());
System.out.println("Job Status : " + execution.getStatus());
System.out.println("Job completed");
} catch (Exception e) {
e.printStackTrace();
System.out.println("Job failed");
}
}
}
6. Conclusion
This tutorial gives you a basic idea of how to work with Spring Batch and how to use it in a simple usecase.
It shows how you can easily develop your batch processing pipeline and how you can customize different stages in reading, processing and writing.
The full implementation of this tutorial can be found in the github project – this is an Eclipse based project, so it should be easy to import and run as it is.