Giới Thiệu
Trong các doanh nghiệp hiện đại dựa trên dữ liệu, hệ thống lập lịch công việc được xem như "hệ thần kinh trung ương" của chuỗi dữ liệu. Từ các tác vụ ETL đến đào tạo máy học, từ việc tạo báo cáo đến giám sát thời gian thực, gần như tất cả các quy trình kinh doanh quan trọng đều phụ thuộc vào một động cơ lập lịch ổn định, hiệu quả và có thể mở rộng.
Apache DolphinScheduler 3.1.9 là một phiên bản ổn định và được sử dụng rộng rãi, vì vậy bài viết này sẽ tập trung vào phiên bản này, phân tích các quy trình liên quan đến việc khởi động dịch vụ Master, khám phá sâu về mã nguồn, thiết kế kiến trúc, phân chia module và các cơ chế triển khai chủ chốt. Mục tiêu là giúp các nhà phát triển hiểu rõ cách thức hoạt động của Master và tạo cơ sở cho việc phát triển thứ cấp hoặc tối ưu hóa hiệu suất.
Bài viết này được chia thành ba phần: quy trình khởi động Server Master, quy trình khởi động Server Worker và các sơ đồ quy trình liên quan. Đây là phần đầu tiên.
1. Tổng Quan Về Khởi Động Server Master
1.1 Mã Nguồn Chính
- Mã nhập:
org.apache.dolphinscheduler.server.master.MasterServer#run
java
public void run() throws SchedulerException {
// 1. Khởi tạo rpc server
this.masterRPCServer.start();
// 2. Cài đặt plugin tác vụ
this.taskPluginManager.loadPlugin();
// 3. Tự chịu lỗi
this.masterRegistryClient.start();
this.masterRegistryClient.setRegistryStoppable(this);
// 4. Lập lịch Master
this.masterSchedulerBootstrap.init();
this.masterSchedulerBootstrap.start();
// 5. Dịch vụ thực thi sự kiện
this.eventExecuteService.start();
// 6. Cơ chế chịu lỗi
this.failoverExecuteThread.start();
// 7. Lập lịch Quartz
this.schedulerApi.start();
...
}
1.2 Khởi Động RPC
- Mô tả: Đăng ký các bộ xử lý cho các lệnh liên quan, chẳng hạn như thực thi tác vụ, kết quả thực thi tác vụ, kết thúc tác vụ, v.v.
- Mã nhập:
org.apache.dolphinscheduler.server.master.rpc.MasterRPCServer#start
java
public void start() {
...
// Bộ xử lý yêu cầu thực thi tác vụ
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
// Bộ xử lý yêu cầu kết quả thực thi tác vụ
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT, taskExecuteResponseProcessor);
// Bộ xử lý yêu cầu kết thúc tác vụ
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
...
this.nettyRemotingServer.start();
logger.info("Đã khởi động Master RPC Server...");
}
1.3 Khởi Tạo Plugin Tác Vụ
- Mô tả: Thực hiện các thao tác mẫu liên quan đến tác vụ như tạo tác vụ, phân tích tham số tác vụ và lấy thông tin tài nguyên tác vụ. Plugin này cần được đăng ký trên API, Master và Worker. Vai trò trong Master là thiết lập thông tin nguồn dữ liệu và UDF.
1.4 Tự Chịu Lỗi (Đăng Ký Master)
- Mô tả: Đăng ký thông tin của master vào registry (sử dụng Zookeeper làm ví dụ), và lắng nghe các thay đổi trong việc đăng ký bản thân, các master khác và tất cả các nút worker để thực hiện xử lý chịu lỗi.
- Mã nhập:
org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient#start
java
public void start() {
try {
this.masterHeartBeatTask = new MasterHeartBeatTask(masterConfig, registryClient);
// 1. Đăng ký bản thân vào registry;
registry();
// 2. Lắng nghe trạng thái kết nối với registry;
registryClient.addConnectionStateListener(
new MasterConnectionStateListener(masterConfig, registryClient, masterConnectStrategy));
// 3. Lắng nghe trạng thái của các master và worker khác trong registry và thực hiện công việc chịu lỗi
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());
} catch (Exception e) {
throw new RegistryException("Lỗi khởi động client đăng ký master", e);
}
}
1.5 Lập Lịch Master
- Mô tả: Một luồng quét kiểm tra định kỳ bảng
commandtrong cơ sở dữ liệu và thực hiện các thao tác khác nhau dựa trên các loại lệnh. Đây là logic cốt lõi cho việc khởi động workflow, xử lý lỗi phiên bản, v.v. - Mã nhập:
org.apache.dolphinscheduler.server.master.runner.MasterSchedulerBootstrap#run
java
public void run() {
while (!ServerLifeCycleManager.isStopped()) {
try {
// Nếu server không ở trạng thái đang chạy, không thể tiêu thụ lệnh
if (!ServerLifeCycleManager.isRunning()) {
logger.warn("Server hiện tại {} không ở trạng thái đang chạy, không thể tiêu thụ lệnh.", this.masterAddress);
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
// Xử lý quá tải khối lượng công việc (CPU/bộ nhớ)
boolean isOverload = OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());
if (isOverload) {
logger.warn("Server hiện tại {} đang quá tải, không thể tiêu thụ lệnh.", this.masterAddress);
MasterServerMetrics.incMasterOverload();
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}
// Lấy lệnh từ cơ sở dữ liệu
List<Command> commands = findCommands();
if (CollectionUtils.isEmpty(commands)) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}
// Chuyển đổi lệnh thành các phiên bản xử lý và xử lý logic workflow
List<ProcessInstance> processInstances = command2ProcessInstance(commands);
if (CollectionUtils.isEmpty(processInstances)) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}
// Xử lý thực thi phiên bản workflow
processInstances.forEach(processInstance -> {
try {
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
WorkflowExecuteRunnable workflowRunnable = new WorkflowExecuteRunnable(processInstance, ...);
processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable);
workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW, processInstance.getId()));
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
});
} catch (InterruptedException interruptedException) {
logger.warn("Khởi động lập lịch master bị gián đoạn, đóng vòng lặp", interruptedException);
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
logger.error("Lỗi workflow lập lịch master", e);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
}
}
}
1.6 Dịch Vụ Thực Thi Sự Kiện
- Mô tả: Chịu trách nhiệm lấy mẫu hàng đợi sự kiện của phiên bản workflow. Các sự kiện như lỗi khi gửi workflow hoặc thay đổi trạng thái tác vụ được xử lý tại đây.
- Mã nhập:
org.apache.dolphinscheduler.server.master.runner.EventExecuteService#run
java
public void run() {
while (!ServerLifeCycleManager.isStopped()) {
try {
// Xử lý các sự kiện thực thi workflow
workflowEventHandler();
// Xử lý các sự kiện thực thi tác vụ stream
streamTaskEventHandler();
TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
} ...
}
}
1.7 Cơ Chế Chịu Lỗi
- Mô tả: Chịu trách nhiệm cho việc chịu lỗi của Master và Worker.
- Mã nhập:
org.apache.dolphinscheduler.server.master.service.MasterFailoverService#checkMasterFailover
java
public void checkMasterFailover() {
List<String> needFailoverMasterHosts = processService.queryNeedFailoverProcessInstanceHost()
.stream()
.filter(host -> localAddress.equals(host) || !registryClient.checkNodeExists(host, NodeType.MASTER))
.distinct()
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(needFailoverMasterHosts)) {
return;
}
for (String needFailoverMasterHost : needFailoverMasterHosts) {
failoverMaster(needFailoverMasterHost);
}
}
2. Thực Hành Tốt Nhất (Best Practices)
- Kiểm Tra Định Kỳ: Đảm bảo rằng các tác vụ và lệnh được kiểm tra định kỳ để phát hiện sớm lỗi.
- Ghi Nhận Log Chi Tiết: Ghi nhận log chi tiết để dễ dàng theo dõi và điều tra sự cố.
- Sử Dụng Cơ Chế Chịu Lỗi: Đảm bảo rằng các cơ chế chịu lỗi được thiết lập đúng để giảm thiểu thời gian chết.
3. Câu Hỏi Thường Gặp (FAQ)
1. DolphinScheduler là gì?
DolphinScheduler là một hệ thống lập lịch workflow nguồn mở, cho phép quản lý các tác vụ và quy trình kinh doanh một cách hiệu quả.
2. Tại sao nên sử dụng phiên bản 3.1.9?
Phiên bản 3.1.9 được coi là ổn định và tích hợp nhiều tính năng cải tiến, giúp tối ưu hóa hiệu suất.
Kết Luận
Bài viết này cung cấp cái nhìn sâu sắc về quy trình khởi động dịch vụ Master của Apache DolphinScheduler 3.1.9, các cơ chế chịu lỗi và kiến trúc tổng thể. Các bài viết tiếp theo sẽ khám phá quy trình khởi động Worker và các tương tác giữa Master và Worker.
Hãy theo dõi để không bỏ lỡ những thông tin hữu ích tiếp theo!