Giới Thiệu
Trong phần trước, chúng ta đã khám phá những kiến thức cơ bản về CompletableFuture: chuỗi, kết hợp, xử lý lỗi và các tình huống sử dụng thực tế. Bây giờ, hãy đi sâu hơn và tìm hiểu về các tính năng nâng cao giúp nó sẵn sàng cho môi trường sản xuất:
Mục Lục
- Thiết lập Thời gian chờ
- Hủy Tác vụ
- Sử dụng
ExecutorServiceTùy Chỉnh - Mẫu
anyOf - Chuỗi gọi bất đồng bộ phụ thuộc
- Thực Tiễn Tốt Nhất
- Cạm Bẫy Thường Gặp
- Mẹo Hiệu Suất
- Giải Quyết Vấn Đề
1. Thiết lập Thời gian chờ (orTimeout & completeOnTimeout)
Đôi khi, các tác vụ có thể bị treo do API chậm hoặc sự cố mạng. Thay vì chờ đợi mãi mãi, CompletableFuture cho phép bạn thiết lập thời gian chờ.
orTimeout() → thất bại sau thời gian chờ
java
import java.util.concurrent.*;
public class CompletableFutureTimeout {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(3000); } catch (InterruptedException e) {}
return "Hoàn thành!";
}).orTimeout(2, TimeUnit.SECONDS);
try {
System.out.println(future.get());
} catch (Exception e) {
System.out.println("Thời gian chờ! " + e.getMessage());
}
}
}
👉 Sau 2 giây, future sẽ ném ra một TimeoutException.
completeOnTimeout() → trả về giá trị mặc định nếu hết thời gian
java
public class CompletableFutureCompleteOnTimeout {
public static void main(String[] args) throws Exception {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(3000); } catch (InterruptedException e) {}
return "Dữ liệu đã được tải!";
}).completeOnTimeout("Giá trị thay thế", 2, TimeUnit.SECONDS);
System.out.println(future.get()); // in ra "Giá trị thay thế"
}
}
2. Hủy Tác vụ
Nếu một tác vụ không còn cần thiết, bạn có thể hủy nó.
java
import java.util.concurrent.*;
public class CompletableFutureCancel {
public static void main(String[] args) throws Exception {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
return "Hoàn thành!";
} catch (InterruptedException e) {
return "Bị gián đoạn!";
}
});
Thread.sleep(1000);
boolean cancelled = future.cancel(true); // cố gắng ngừng thực thi
System.out.println("Đã hủy: " + cancelled);
}
}
3. Sử dụng ExecutorService Tùy Chỉnh
Theo mặc định, CompletableFuture sử dụng ForkJoinPool.commonPool(). Để kiểm soát tốt hơn, đặc biệt trong môi trường sản xuất, bạn nên cung cấp một bể luồng tùy chỉnh.
java
import java.util.concurrent.*;
public class CompletableFutureCustomExecutor {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Chạy trong bể luồng tùy chỉnh: " + Thread.currentThread().getName();
}, executor);
System.out.println(future.get());
executor.shutdown();
}
}
📌 Tại sao sử dụng một executor tùy chỉnh?
- Ngăn chặn việc khóa bể chung với các tác vụ dài.
- Cho phép bạn điều chỉnh số lượng luồng cho các tác vụ CPU-bound và I/O-bound.
4. Mẫu anyOf
Khi bạn muốn có phản hồi nhanh nhất từ nhiều tác vụ.
java
import java.util.concurrent.*;
public class CompletableFutureAnyOf {
public static void main(String[] args) throws Exception {
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
sleep(2000);
return "Kết quả từ dịch vụ 1";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "Kết quả từ dịch vụ 2";
});
CompletableFuture<Object> fastest = CompletableFuture.anyOf(f1, f2);
System.out.println("Người chiến thắng: " + fastest.get());
}
private static void sleep(int ms) {
try { Thread.sleep(ms); } catch (InterruptedException e) {}
}
}
👉 Kết quả:
Người chiến thắng: Kết quả từ dịch vụ 2
5. Chuỗi gọi bất đồng bộ phụ thuộc
Hãy tưởng tượng việc lấy thông tin người dùng, sau đó lấy đơn hàng của họ.
java
public class CompletableFuturePipeline {
public static void main(String[] args) throws Exception {
CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "User123";
});
CompletableFuture<String> ordersFuture = userFuture.thenCompose(user ->
CompletableFuture.supplyAsync(() -> {
sleep(1500);
return "Đơn hàng cho " + user;
})
);
System.out.println(ordersFuture.get());
}
private static void sleep(int ms) {
try { Thread.sleep(ms); } catch (InterruptedException e) {}
}
}
👉 thenCompose() được sử dụng khi một tác vụ bất đồng bộ phụ thuộc vào tác vụ khác.
6. Thực Tiễn Tốt Nhất
- Thời gian chờ giúp ngăn chặn các tác vụ bị khóa mãi mãi.
- Hủy bỏ giúp ngừng các tác vụ không cần thiết.
- Executors tùy chỉnh cung cấp kiểm soát hiệu suất tốt hơn.
- anyOf/allOf cho phép xử lý tác vụ song song mạnh mẽ.
- thenCompose là lựa chọn tốt cho các cuộc gọi bất đồng bộ phụ thuộc.
7. Cạm Bẫy Thường Gặp
- Không xử lý trường hợp
TimeoutExceptioncó thể dẫn đến các lỗi không mong muốn trong ứng dụng. - Hủy tác vụ không phải lúc nào cũng thành công, cần kiểm tra trạng thái.
- Sử dụng
CompletableFuturemà không có kế hoạch về tài nguyên có thể dẫn đến hiệu suất kém.
8. Mẹo Hiệu Suất
- Tối ưu hóa kích thước bể luồng: Đảm bảo rằng bể luồng của bạn đủ lớn để xử lý tải nhưng không quá lớn để gây ra việc khóa.
- Giới hạn số lượng tác vụ song song: Điều này giúp tránh tình trạng quá tải hệ thống.
- Sử dụng
CompletableFuture.allOf()để đồng bộ hóa nhiều tác vụ và xử lý kết quả sau khi tất cả hoàn thành.
9. Giải Quyết Vấn Đề
- Vấn đề với
TimeoutException: Hãy chắc chắn rằng bạn đã định nghĩa rõ ràng các kế hoạch dự phòng cho các tác vụ có thể bị timeout. - Hủy tác vụ không thành công: Kiểm tra trạng thái của tác vụ để đảm bảo không có tác vụ nào bị rời rạc.
- Quản lý tài nguyên: Đảm bảo rằng bể luồng của bạn không bị tắc nghẽn bởi các tác vụ lâu dài.
Kết Luận
CompletableFuture không chỉ là về thực thi bất đồng bộ—đó là một khung để tổ chức các tác vụ:
- Chạy các tác vụ song song
- Chuỗi các pipelines phụ thuộc
- Xử lý thời gian chờ, thử lại, lỗi
- Tùy chỉnh thực thi với executors
Khi được sử dụng đúng cách, nó giúp xây dựng các ứng dụng hiệu suất cao, không bị chặn, và có khả năng phục hồi trong Java.
Bạn có muốn tôi cũng bao gồm một ví dụ mini-project đầy đủ (như lấy dữ liệu từ 3 dịch vụ với thời gian chờ, giá trị thay thế và thực thi song song) để bạn có thể thấy cách tất cả các tính năng này hoạt động cùng nhau trong mã thực tế không?