0
0
Lập trình
Thaycacac
Thaycacac thaycacac

Xử lý nhiều file CSV với Spring Batch

Đăng vào 1 ngày trước

• 13 phút đọc

Giới thiệu Dự án

Trong bài viết này, chúng ta sẽ tìm hiểu cách sử dụng Spring Batch để xử lý nhiều file CSV trong một quy trình. Dự án này sẽ giúp bạn:

  1. Đọc dữ liệu từ nhiều file CSV sử dụng ResourceAwareItemReaderItemStream
  2. Thực hiện các phép toán và ghi vào một file trung gian
  3. Đọc từ file trung gian và ghi vào cơ sở dữ liệu

Cấu trúc Dự án

Dự án của chúng ta sẽ bao gồm các thành phần chính sau:

  • Maven Dependencies: Định nghĩa các thư viện cần thiết trong pom.xml
  • Mô hình Domain: Các lớp để đại diện cho dữ liệu.
  • Reader tùy chỉnh: Xử lý từng dòng từ nhiều file.
  • Cấu hình Batch: Định nghĩa các bước và quy trình xử lý.
  • Lớp Listener: Theo dõi và thông báo khi công việc hoàn thành.

1. Dependencies Maven (pom.xml)

xml Copy
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <dependency>
        <groupId>com.h2database</groupId>
        <artifactId>h2</artifactId>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.batch</groupId>
        <artifactId>spring-batch-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

2. Mô hình Domain

java Copy
// Employee.java - Mô hình đầu vào
public class Employee {
    private String id;
    private String firstName;
    private String lastName;
    private double salary;

    // Constructors, getters, setters
}

// EmployeeProcessed.java - Mô hình trung gian
public class EmployeeProcessed {
    private String employeeId;
    private String fullName;
    private double annualSalary;
    private double taxAmount;

    // Constructors, getters, setters
}

// EmployeeDB.java - Mô hình cơ sở dữ liệu
@Entity
@Table(name = "employees")
public class EmployeeDB {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private String employeeId;
    private String fullName;
    private double annualSalary;
    private double taxAmount;
    private Date processedDate;

    // Constructors, getters, setters
}

3. Reader Tùy Chỉnh ResourceAwareItemReaderItemStream

java Copy
/**
 * Reader tùy chỉnh xử lý từng dòng từ nhiều file
 * Thực hiện ResourceAwareItemReaderItemStream để xử lý nhiều nguồn
 */
public class MultiFileEmployeeReader implements ResourceAwareItemReaderItemStream<Employee> {

    private FlatFileItemReader<String> lineReader;
    private Resource[] resources;
    private int currentResourceIndex = -1;

    public MultiFileEmployeeReader(FlatFileItemReader<String> lineReader) {
        this.lineReader = lineReader;
    }

    @Override
    public void setResource(Resource resource) {
        // Phương thức này được gọi cho mỗi resource
    }

    @Override
    public void setResources(Resource[] resources) {
        this.resources = resources;
    }

    @Override
    public Employee read() throws Exception {
        String line = lineReader.read();
        while (line == null) {
            currentResourceIndex++;
            if (currentResourceIndex >= resources.length) {
                return null; // Tất cả các resource đã được xử lý
            }
            lineReader.setResource(resources[currentResourceIndex]);
            lineReader.open(new ExecutionContext());
            line = lineReader.read();
        }
        return parseEmployee(line);
    }

    private Employee parseEmployee(String line) {
        String[] fields = line.split(",");
        Employee employee = new Employee();
        employee.setId(fields[0]);
        employee.setFirstName(fields[1]);
        employee.setLastName(fields[2]);
        employee.setSalary(Double.parseDouble(fields[3]));
        return employee;
    }

    @Override
    public void open(ExecutionContext executionContext) {
        currentResourceIndex = -1;
    }

    @Override
    public void update(ExecutionContext executionContext) {
        // Không thực hiện để đơn giản hóa
    }

    @Override
    public void close() {
        if (lineReader != null) {
            lineReader.close();
        }
    }
}

4. Cấu Hình Batch

java Copy
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private DataSource dataSource;

    @Value("classpath:input/employees*.csv")
    private Resource[] inputResources;

    @Value("file:output/processed_employees.csv")
    private Resource outputResource;

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<Employee, EmployeeProcessed>chunk(10)
                .reader(multiResourceEmployeeReader())
                .processor(employeeProcessor())
                .writer(employeeFileWriter())
                .build();
    }

    @Bean
    public Step step2() {
        return stepBuilderFactory.get("step2")
                .<EmployeeProcessed, EmployeeDB>chunk(10)
                .reader(processedEmployeeReader())
                .processor(employeeDBProcessor())
                .writer(employeeDBWriter())
                .build();
    }

    @Bean
    public Job processEmployeeJob(JobCompletionNotificationListener listener) {
        return jobBuilderFactory.get("processEmployeeJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener)
                .flow(step1())
                .next(step2())
                .end()
                .build();
    }

    @Bean
    public MultiResourceItemReader<Employee> multiResourceEmployeeReader() {
        FlatFileItemReader<String> lineReader = new FlatFileItemReaderBuilder<String>()
                .name("lineReader")
                .lineMapper((line, lineNumber) -> line)
                .build();

        MultiFileEmployeeReader reader = new MultiFileEmployeeReader(lineReader);
        reader.setResources(inputResources);

        return new MultiResourceItemReaderBuilder<Employee>()
                .name("multiResourceEmployeeReader")
                .resources(inputResources)
                .delegate(reader)
                .build();
    }

    @Bean
    public ItemProcessor<Employee, EmployeeProcessed> employeeProcessor() {
        return employee -> {
            EmployeeProcessed processed = new EmployeeProcessed();
            processed.setEmployeeId(employee.getId());
            processed.setFullName(employee.getFirstName() + " " + employee.getLastName());
            double annualSalary = employee.getSalary() * 12;
            processed.setAnnualSalary(annualSalary);
            processed.setTaxAmount(annualSalary * 0.2);
            return processed;
        };
    }

    @Bean
    public FlatFileItemWriter<EmployeeProcessed> employeeFileWriter() {
        return new FlatFileItemWriterBuilder<EmployeeProcessed>()
                .name("employeeFileWriter")
                .resource(outputResource)
                .delimited()
                .names(new String[]{"employeeId", "fullName", "annualSalary", "taxAmount"})
                .build();
    }

    @Bean
    public FlatFileItemReader<EmployeeProcessed> processedEmployeeReader() {
        return new FlatFileItemReaderBuilder<EmployeeProcessed>()
                .name("processedEmployeeReader")
                .resource(outputResource)
                .delimited()
                .names(new String[]{"employeeId", "fullName", "annualSalary", "taxAmount"})
                .targetType(EmployeeProcessed.class)
                .build();
    }

    @Bean
    public ItemProcessor<EmployeeProcessed, EmployeeDB> employeeDBProcessor() {
        return processed -> {
            EmployeeDB employeeDB = new EmployeeDB();
            employeeDB.setEmployeeId(processed.getEmployeeId());
            employeeDB.setFullName(processed.getFullName());
            employeeDB.setAnnualSalary(processed.getAnnualSalary());
            employeeDB.setTaxAmount(processed.getTaxAmount());
            employeeDB.setProcessedDate(new Date());
            return employeeDB;
        };
    }

    @Bean
    public JdbcBatchItemWriter<EmployeeDB> employeeDBWriter() {
        return new JdbcBatchItemWriterBuilder<EmployeeDB>()
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
                .sql("INSERT INTO employees (employee_id, full_name, annual_salary, tax_amount, processed_date) " +
                     "VALUES (:employeeId, :fullName, :annualSalary, :taxAmount, :processedDate)")
                .dataSource(dataSource)
                .build();
    }

    @Bean
    public DataSourceInitializer dataSourceInitializer(DataSource dataSource) {
        DataSourceInitializer initializer = new DataSourceInitializer();
        initializer.setDataSource(dataSource);

        ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
        populator.addScript(new ClassPathResource("schema.sql"));
        initializer.setDatabasePopulator(populator);

        return initializer;
    }
}

5. Schema Cơ Sở Dữ Liệu

sql Copy
-- schema.sql
CREATE TABLE employees (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    employee_id VARCHAR(50) NOT NULL,
    full_name VARCHAR(100) NOT NULL,
    annual_salary DOUBLE NOT NULL,
    tax_amount DOUBLE NOT NULL,
    processed_date DATE NOT NULL
);

6. Listener Thông Báo Hoàn Thành Công Việc

java Copy
@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {

    private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);

    @Override
    public void afterJob(JobExecution jobExecution) {
        if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
            log.info("!!! JOB FINISHED! Thời gian để xác minh kết quả");
        }
    }
}

Giải Thích về ResourceAwareItemReaderItemStream

Giao diện ResourceAwareItemReaderItemStream mở rộng cả hai giao diện ItemReaderItemStream và thêm khả năng xử lý các nguồn. Đây là cách mà nó cho phép đọc từ nhiều file:

  1. Nhận biết Tài nguyên: Giao diện cung cấp phương thức setResource(Resource resource) cho phép reader nhận biết tài nguyên mà nó đang xử lý.
  2. Tích hợp MultiResourceItemReader: MultiResourceItemReader sử dụng việc thực hiện ResourceAwareItemReaderItemStream bằng cách:
    • Lặp qua tất cả các tài nguyên đã cung cấp
    • Thiết lập mỗi tài nguyên trên reader ủy quyền bằng setResource()
    • Ủy quyền việc đọc cho reader tùy chỉnh cho mỗi file
  3. Quản lý Trạng thái: Việc thực hiện duy trì trạng thái (chỉ số tài nguyên hiện tại, vị trí trong file) thông qua các phương thức giao diện ItemStream (open(), update(), close()).
  4. Chuyển tiếp Liền mạch: Khi một tài nguyên đã hoàn thành, MultiResourceItemReader tự động chuyển sang tài nguyên tiếp theo và cập nhật reader ủy quyền.

Trong việc thực hiện của chúng ta:

  • MultiFileEmployeeReader thực hiện logic xử lý tài nguyên
  • MultiResourceItemReader quản lý việc lặp qua tài nguyên
  • Reader tùy chỉnh xử lý từng dòng và tạo ra các đối tượng domain

Chạy Ứng Dụng

  1. Đặt các file CSV trong src/main/resources/input/ với tên như employees1.csv, employees2.csv, v.v.
  2. Chạy ứng dụng Spring Boot
  3. Công việc batch sẽ:
    • Đọc từ tất cả các file CSV của nhân viên
    • Xử lý và ghi vào một file trung gian
    • Đọc từ file trung gian và ghi vào cơ sở dữ liệu

Thực Tiễn Tốt Nhất

  • Luôn kiểm tra dữ liệu đầu vào trước khi xử lý để tránh lỗi.
  • Sử dụng các logging để theo dõi quy trình xử lý và dễ dàng gỡ lỗi nếu có vấn đề xảy ra.

Cạm Bẫy Thường Gặp

  • Không thiết lập đúng tài nguyên trong MultiFileEmployeeReader có thể dẫn đến việc không đọc được dữ liệu.
  • Bỏ qua việc xử lý ngoại lệ có thể làm cho ứng dụng gặp lỗi không mong muốn.

Mẹo Hiệu Suất

  • Thực hiện các phép toán tính toán một cách hiệu quả trong processor để giảm thời gian xử lý.
  • Tối ưu hóa kích thước chunk trong batch job để đạt hiệu suất tối đa.

Hỏi Đáp

H: Làm thế nào để xử lý lỗi khi đọc file?
T: Bạn nên sử dụng các khối try-catch để xử lý ngoại lệ và ghi lại các lỗi đó để dễ dàng phân tích sau này.

H: Có thể sử dụng các file định dạng khác không?
T: Có thể, bạn chỉ cần thay đổi parser trong reader tùy chỉnh để phù hợp với định dạng dữ liệu mới.

Kết Luận

Dự án này là một minh họa mạnh mẽ cho cách mà Spring Batch có thể được sử dụng để xử lý nhiều file CSV một cách hiệu quả. Bằng cách sử dụng các công cụ và thư viện phù hợp, bạn có thể xây dựng các quy trình xử lý dữ liệu mạnh mẽ và linh hoạt. Hãy thử áp dụng kiến thức này vào các dự án thực tế của bạn để nâng cao kỹ năng lập trình và phát triển ứng dụng tốt hơn.

Hãy bắt đầu xây dựng ứng dụng của bạn ngay hôm nay!

Gợi ý câu hỏi phỏng vấn
Không có dữ liệu

Không có dữ liệu

Bài viết được đề xuất
Bài viết cùng tác giả

Bình luận

Chưa có bình luận nào

Chưa có bình luận nào