config
@EnableBatchProcessing(modular = true)
@Configuration
public class BatchConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(BatchConfig.class);
@Bean
public Job importUserJob(JobBuilderFactory jobs, Step s1, JobExecutionListener listener) {
return jobs.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(s1)
.end()
.build();
}
@Bean
public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<RecordSO> reader,
ItemWriter<WriterSO> writer, ItemProcessor<RecordSO, WriterSO> processor) {
return stepBuilderFactory.get("step1")
.<RecordSO, WriterSO>chunk(5)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
@Bean
public ItemReader<RecordSO> reader(DataSource dataSource) {
JdbcCursorItemReader<RecordSO> reader = new JdbcCursorItemReader<RecordSO>();
reader.setSql("select id, first_name, last_name, random_num from reader");
reader.setDataSource(dataSource);
reader.setRowMapper(
(ResultSet resultSet, int rowNum) -> {
LOGGER.info("RowMapper resultset: {}", resultSet);
if (!(resultSet.isAfterLast()) && !(resultSet.isBeforeFirst())) {
RecordSO recordSO = new RecordSO();
recordSO.setFirstName(resultSet.getString("first_name"));
recordSO.setLastName(resultSet.getString("last_name"));
recordSO.setId(resultSet.getLong("id"));
recordSO.setRandomNum(resultSet.getString("random_num"));
LOGGER.info("RowMapper record : {}", recordSO);
return recordSO;
} else {
LOGGER.info("Returning null from rowMapper");
return null;
}
});
return reader;
}
@Bean
public ItemPreparedStatementSetter<WriterSO> setter() {
return (item, ps) -> {
ps.setLong(1, item.getId());
ps.setString(2, item.getFullName());
ps.setString(3, item.getRandomNum());
};
}
@Bean
public ItemProcessor<RecordSO, WriterSO> processor() {
return new RecordProcessor();
}
@Bean
public ItemWriter<WriterSO> writer(DataSource dataSource, ItemPreparedStatementSetter<WriterSO> setter) {
JdbcBatchItemWriter<WriterSO> writer = new JdbcBatchItemWriter<>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<WriterSO>());
writer.setItemPreparedStatementSetter(setter);
writer.setSql("insert into writer (id, full_name, random_num) values (?,?,?)");
writer.setDataSource(dataSource);
return writer;
}
}
model
@Entity
@Table(name = "reader")
public class RecordSO {
@Id
@GeneratedValue(strategy=GenerationType.IDENTITY)
private Long id;
private String firstName;
private String lastName;
private String randomNum;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public String getRandomNum() {
return randomNum;
}
public void setRandomNum(String randomNum) {
this.randomNum = randomNum;
}
}
@Entity
@Table(name = "writer")
public class WriterSO {
@Id
@GeneratedValue(strategy=GenerationType.IDENTITY)
private Long id;
private String fullName;
private String randomNum;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getFullName() {
return fullName;
}
public void setFullName(String fullName) {
this.fullName = fullName;
}
public String getRandomNum() {
return randomNum;
}
public void setRandomNum(String randomNum) {
this.randomNum = randomNum;
}
}
application.properties
spring.datasource.driverClassName=org.postgresql.Driver
spring.datasource.url=jdbc:postgresql://192.168.99.100:5432/batch
spring.datasource.username=postgres
spring.datasource.password=mypwd
##spring.jpa.database-platform=org.hibernate.dialect.PostgreSQLDialect
spring.jpa.show-sql=true
spring.jpa.generate-ddl=true
spring.jpa.hibernate.ddl-auto=create-drop
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.PostgreSQLDialect
processor
public class RecordProcessor implements ItemProcessor<RecordSO, WriterSO> {
private static final Logger LOGGER = LoggerFactory.getLogger(RecordProcessor.class);
@Override
public WriterSO process(RecordSO item) throws Exception {
LOGGER.info("Processing Record: {}", item);
WriterSO writerSo = new WriterSO();
writerSo.setId(item.getId());
writerSo.setFullName(item.getFirstName() + " " + item.getLastName());
writerSo.setRandomNum(String.valueOf(Math.random()).substring(3, 8));
LOGGER.info("Processed Writer: {}", writerSo);
return writerSo;
}
}
docs