Giới thiệu Dự án
Trong bài viết này, chúng ta sẽ tìm hiểu cách sử dụng Spring Batch để xử lý nhiều file CSV trong một quy trình. Dự án này sẽ giúp bạn:
- Đọc dữ liệu từ nhiều file CSV sử dụng
ResourceAwareItemReaderItemStream
- Thực hiện các phép toán và ghi vào một file trung gian
- Đọc từ file trung gian và ghi vào cơ sở dữ liệu
Cấu trúc Dự án
Dự án của chúng ta sẽ bao gồm các thành phần chính sau:
- Maven Dependencies: Định nghĩa các thư viện cần thiết trong
pom.xml
- Mô hình Domain: Các lớp để đại diện cho dữ liệu.
- Reader tùy chỉnh: Xử lý từng dòng từ nhiều file.
- Cấu hình Batch: Định nghĩa các bước và quy trình xử lý.
- Lớp Listener: Theo dõi và thông báo khi công việc hoàn thành.
1. Dependencies Maven (pom.xml)
xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
2. Mô hình Domain
java
// Employee.java - Mô hình đầu vào
public class Employee {
private String id;
private String firstName;
private String lastName;
private double salary;
// Constructors, getters, setters
}
// EmployeeProcessed.java - Mô hình trung gian
public class EmployeeProcessed {
private String employeeId;
private String fullName;
private double annualSalary;
private double taxAmount;
// Constructors, getters, setters
}
// EmployeeDB.java - Mô hình cơ sở dữ liệu
@Entity
@Table(name = "employees")
public class EmployeeDB {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String employeeId;
private String fullName;
private double annualSalary;
private double taxAmount;
private Date processedDate;
// Constructors, getters, setters
}
3. Reader Tùy Chỉnh ResourceAwareItemReaderItemStream
java
/**
* Reader tùy chỉnh xử lý từng dòng từ nhiều file
* Thực hiện ResourceAwareItemReaderItemStream để xử lý nhiều nguồn
*/
public class MultiFileEmployeeReader implements ResourceAwareItemReaderItemStream<Employee> {
private FlatFileItemReader<String> lineReader;
private Resource[] resources;
private int currentResourceIndex = -1;
public MultiFileEmployeeReader(FlatFileItemReader<String> lineReader) {
this.lineReader = lineReader;
}
@Override
public void setResource(Resource resource) {
// Phương thức này được gọi cho mỗi resource
}
@Override
public void setResources(Resource[] resources) {
this.resources = resources;
}
@Override
public Employee read() throws Exception {
String line = lineReader.read();
while (line == null) {
currentResourceIndex++;
if (currentResourceIndex >= resources.length) {
return null; // Tất cả các resource đã được xử lý
}
lineReader.setResource(resources[currentResourceIndex]);
lineReader.open(new ExecutionContext());
line = lineReader.read();
}
return parseEmployee(line);
}
private Employee parseEmployee(String line) {
String[] fields = line.split(",");
Employee employee = new Employee();
employee.setId(fields[0]);
employee.setFirstName(fields[1]);
employee.setLastName(fields[2]);
employee.setSalary(Double.parseDouble(fields[3]));
return employee;
}
@Override
public void open(ExecutionContext executionContext) {
currentResourceIndex = -1;
}
@Override
public void update(ExecutionContext executionContext) {
// Không thực hiện để đơn giản hóa
}
@Override
public void close() {
if (lineReader != null) {
lineReader.close();
}
}
}
4. Cấu Hình Batch
java
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Value("classpath:input/employees*.csv")
private Resource[] inputResources;
@Value("file:output/processed_employees.csv")
private Resource outputResource;
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Employee, EmployeeProcessed>chunk(10)
.reader(multiResourceEmployeeReader())
.processor(employeeProcessor())
.writer(employeeFileWriter())
.build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2")
.<EmployeeProcessed, EmployeeDB>chunk(10)
.reader(processedEmployeeReader())
.processor(employeeDBProcessor())
.writer(employeeDBWriter())
.build();
}
@Bean
public Job processEmployeeJob(JobCompletionNotificationListener listener) {
return jobBuilderFactory.get("processEmployeeJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1())
.next(step2())
.end()
.build();
}
@Bean
public MultiResourceItemReader<Employee> multiResourceEmployeeReader() {
FlatFileItemReader<String> lineReader = new FlatFileItemReaderBuilder<String>()
.name("lineReader")
.lineMapper((line, lineNumber) -> line)
.build();
MultiFileEmployeeReader reader = new MultiFileEmployeeReader(lineReader);
reader.setResources(inputResources);
return new MultiResourceItemReaderBuilder<Employee>()
.name("multiResourceEmployeeReader")
.resources(inputResources)
.delegate(reader)
.build();
}
@Bean
public ItemProcessor<Employee, EmployeeProcessed> employeeProcessor() {
return employee -> {
EmployeeProcessed processed = new EmployeeProcessed();
processed.setEmployeeId(employee.getId());
processed.setFullName(employee.getFirstName() + " " + employee.getLastName());
double annualSalary = employee.getSalary() * 12;
processed.setAnnualSalary(annualSalary);
processed.setTaxAmount(annualSalary * 0.2);
return processed;
};
}
@Bean
public FlatFileItemWriter<EmployeeProcessed> employeeFileWriter() {
return new FlatFileItemWriterBuilder<EmployeeProcessed>()
.name("employeeFileWriter")
.resource(outputResource)
.delimited()
.names(new String[]{"employeeId", "fullName", "annualSalary", "taxAmount"})
.build();
}
@Bean
public FlatFileItemReader<EmployeeProcessed> processedEmployeeReader() {
return new FlatFileItemReaderBuilder<EmployeeProcessed>()
.name("processedEmployeeReader")
.resource(outputResource)
.delimited()
.names(new String[]{"employeeId", "fullName", "annualSalary", "taxAmount"})
.targetType(EmployeeProcessed.class)
.build();
}
@Bean
public ItemProcessor<EmployeeProcessed, EmployeeDB> employeeDBProcessor() {
return processed -> {
EmployeeDB employeeDB = new EmployeeDB();
employeeDB.setEmployeeId(processed.getEmployeeId());
employeeDB.setFullName(processed.getFullName());
employeeDB.setAnnualSalary(processed.getAnnualSalary());
employeeDB.setTaxAmount(processed.getTaxAmount());
employeeDB.setProcessedDate(new Date());
return employeeDB;
};
}
@Bean
public JdbcBatchItemWriter<EmployeeDB> employeeDBWriter() {
return new JdbcBatchItemWriterBuilder<EmployeeDB>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO employees (employee_id, full_name, annual_salary, tax_amount, processed_date) " +
"VALUES (:employeeId, :fullName, :annualSalary, :taxAmount, :processedDate)")
.dataSource(dataSource)
.build();
}
@Bean
public DataSourceInitializer dataSourceInitializer(DataSource dataSource) {
DataSourceInitializer initializer = new DataSourceInitializer();
initializer.setDataSource(dataSource);
ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
populator.addScript(new ClassPathResource("schema.sql"));
initializer.setDatabasePopulator(populator);
return initializer;
}
}
5. Schema Cơ Sở Dữ Liệu
sql
-- schema.sql
CREATE TABLE employees (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
employee_id VARCHAR(50) NOT NULL,
full_name VARCHAR(100) NOT NULL,
annual_salary DOUBLE NOT NULL,
tax_amount DOUBLE NOT NULL,
processed_date DATE NOT NULL
);
6. Listener Thông Báo Hoàn Thành Công Việc
java
@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {
private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
log.info("!!! JOB FINISHED! Thời gian để xác minh kết quả");
}
}
}
Giải Thích về ResourceAwareItemReaderItemStream
Giao diện ResourceAwareItemReaderItemStream
mở rộng cả hai giao diện ItemReader
và ItemStream
và thêm khả năng xử lý các nguồn. Đây là cách mà nó cho phép đọc từ nhiều file:
- Nhận biết Tài nguyên: Giao diện cung cấp phương thức
setResource(Resource resource)
cho phép reader nhận biết tài nguyên mà nó đang xử lý. - Tích hợp MultiResourceItemReader:
MultiResourceItemReader
sử dụng việc thực hiệnResourceAwareItemReaderItemStream
bằng cách:- Lặp qua tất cả các tài nguyên đã cung cấp
- Thiết lập mỗi tài nguyên trên reader ủy quyền bằng
setResource()
- Ủy quyền việc đọc cho reader tùy chỉnh cho mỗi file
- Quản lý Trạng thái: Việc thực hiện duy trì trạng thái (chỉ số tài nguyên hiện tại, vị trí trong file) thông qua các phương thức giao diện
ItemStream
(open()
,update()
,close()
). - Chuyển tiếp Liền mạch: Khi một tài nguyên đã hoàn thành,
MultiResourceItemReader
tự động chuyển sang tài nguyên tiếp theo và cập nhật reader ủy quyền.
Trong việc thực hiện của chúng ta:
MultiFileEmployeeReader
thực hiện logic xử lý tài nguyênMultiResourceItemReader
quản lý việc lặp qua tài nguyên- Reader tùy chỉnh xử lý từng dòng và tạo ra các đối tượng domain
Chạy Ứng Dụng
- Đặt các file CSV trong
src/main/resources/input/
với tên nhưemployees1.csv
,employees2.csv
, v.v. - Chạy ứng dụng Spring Boot
- Công việc batch sẽ:
- Đọc từ tất cả các file CSV của nhân viên
- Xử lý và ghi vào một file trung gian
- Đọc từ file trung gian và ghi vào cơ sở dữ liệu
Thực Tiễn Tốt Nhất
- Luôn kiểm tra dữ liệu đầu vào trước khi xử lý để tránh lỗi.
- Sử dụng các logging để theo dõi quy trình xử lý và dễ dàng gỡ lỗi nếu có vấn đề xảy ra.
Cạm Bẫy Thường Gặp
- Không thiết lập đúng tài nguyên trong
MultiFileEmployeeReader
có thể dẫn đến việc không đọc được dữ liệu. - Bỏ qua việc xử lý ngoại lệ có thể làm cho ứng dụng gặp lỗi không mong muốn.
Mẹo Hiệu Suất
- Thực hiện các phép toán tính toán một cách hiệu quả trong processor để giảm thời gian xử lý.
- Tối ưu hóa kích thước chunk trong batch job để đạt hiệu suất tối đa.
Hỏi Đáp
H: Làm thế nào để xử lý lỗi khi đọc file?
T: Bạn nên sử dụng các khối try-catch để xử lý ngoại lệ và ghi lại các lỗi đó để dễ dàng phân tích sau này.
H: Có thể sử dụng các file định dạng khác không?
T: Có thể, bạn chỉ cần thay đổi parser trong reader tùy chỉnh để phù hợp với định dạng dữ liệu mới.
Kết Luận
Dự án này là một minh họa mạnh mẽ cho cách mà Spring Batch có thể được sử dụng để xử lý nhiều file CSV một cách hiệu quả. Bằng cách sử dụng các công cụ và thư viện phù hợp, bạn có thể xây dựng các quy trình xử lý dữ liệu mạnh mẽ và linh hoạt. Hãy thử áp dụng kiến thức này vào các dự án thực tế của bạn để nâng cao kỹ năng lập trình và phát triển ứng dụng tốt hơn.
Hãy bắt đầu xây dựng ứng dụng của bạn ngay hôm nay!