Giới thiệu
Swoole Rx Events là một thư viện giúp xây dựng hệ thống sự kiện phản ứng cho PHP, kết hợp giữa RxPHP và Swoole. Thư viện này cho phép bạn dễ dàng thực hiện việc phát và đăng ký các sự kiện liên quan đến miền và cơ sở hạ tầng, tạo ra các pipeline bằng các toán tử Rx và chạy các toán tử theo thời gian trong vòng lặp sự kiện của Swoole.
Các thành phần chính
- EventBus: Một bus đơn giản hỗ trợ Rx với các phương thức như
on()
,onMany()
,payloads()
,once()
,request()
. - SwooleScheduler: Giao diện
AsyncSchedulerInterface
sử dụngSwoole\Timer
, tương thích với các toán tử thời gian của RxPHP. - Mô hình sự kiện:
BasicEvent
bao gồm các thuộc tính như tên, payload, metadata và rid, cùng vớiEventInterface
chứa id tương quan.
Yêu cầu hệ thống
- PHP 8.3+
ext-swoole
4.8+ / 5.xreactivex/rxphp
(2.x)
Cài đặt
Để cài đặt thư viện, bạn có thể sử dụng Composer:
bash
composer require small/swoole-rx-events
Bắt đầu nhanh
Dưới đây là cách sử dụng cơ bản của thư viện:
php
use Small\SwooleRxEvents\EventBus;
use Small\SwooleRxEvents\SwooleScheduler;
use Small\SwooleRxEvents\Event\BasicEvent;
// Tạo bus với Swoole async scheduler
$bus = new EventBus(new SwooleScheduler());
// Đăng ký sự kiện theo tên
$bus->on('order.created')->subscribe(function ($e) {
echo "order rid={$e->getRid()} payload=", json_encode($e->getPayload()), PHP_EOL;
});
// Phát sự kiện
$bus->emitName('order.created', ['id' => 123]);
// Nếu bạn đang trong một script CLI thông thường, giữ vòng lặp sống trong một khoảng thời gian ngắn:
\Swoole\Timer::after(20, fn () => \Swoole\Event::exit());
\Swoole\Event::wait();
Khái niệm
Sự kiện (Event)
Tất cả các sự kiện phải triển khai EventInterface
:
php
namespace Small\SwooleRxEvents\Contract;
interface EventInterface
{
public function getName(): string;
public function getRid(): string;
public function setRid(string $rid): self;
}
BasicEvent
mang theo các thuộc tính:
name
(string)payload
(array)meta
(array, ví dụ: tracing, user)rid
(string, id tương quan tự động sinh ra)
Bus
stream()
— tất cả các sự kiệnon($name)
/onMany([...])
— luồng được lọcpayloads($name)
— luồng chỉ chứa payloadonce($name, ?map, ?timeoutMs)
— giải quyết sự kiện khớp đầu tiên (có thể được ánh xạ)request($requestName, $responseName, $payload = [], $meta = [], ?$timeoutMs)
— phát một yêu cầu với mộtrid
mới, chờ phản hồi đầu tiên với cùngrid
.
Các yêu cầu timeout cần một scheduler async. Thư viện này cung cấp
SwooleScheduler
thực hiệnAsyncSchedulerInterface
.
Ví dụ API
1) Nghe và phát sự kiện
php
$bus->on('user.created')->subscribe(fn($e) => audit($e->getMeta(), $e->getPayload()));
$bus->emitName('user.created', ['id' => 42], ['by' => 'admin']);
2) Yêu cầu/Phản hồi với id tương quan
php
// Responder: sao chép rid từ 'REQ' đến 'RESP'
$bus->on('REQ')->subscribe(function ($e) use ($bus) {
$bus->emit(
(new BasicEvent('RESP', ['ok' => true], $e->getMeta()))
->setRid($e->getRid()) // tương quan
);
});
// Caller: request() đăng ký TRƯỚC, sau đó phát; không có điều kiện đua
$bus->request('REQ', 'RESP', ['foo' => 'bar'], ['trace' => 'abc'], 100)
->subscribe(
fn($resp) => var_dump($resp->getPayload()), // ['ok' => true]
fn($err) => error_log($err->getMessage())
);
3) once()
với ánh xạ và timeout
php
$bus->once('health.ok', fn($e) => $e->getMeta()['node'] ?? 'unknown', 50)
->subscribe(
fn($node) => echo "node=$node\n",
fn($err) => echo "timeout\n"
);
$bus->emitName('health.ok', [], ['node' => 'api-1']);
4) Backpressure / batching (Rx composition)
php
$bus->on('order.created')
->bufferWithTimeOrCount(500, 100, $bus->scheduler()) // mỗi 0.5s hoặc 100 mục
->filter(fn($batch) => !empty($batch))
->subscribe(fn(array $batch) => persist_batch($batch));
Mẹo tích hợp Swoole
- Máy chủ HTTP: trong
on('request')
, phát một sự kiện với meta chứa callablerespond
hoặc đối tượngResponse
. Các subscriber downstream có thể tạo mộtResponseEvent
. - Coroutines cho từng subscriber: sử dụng coroutines Swoole trong các subscriber của bạn nếu bạn thực hiện IO; các toán tử Rx sẽ tổ chức trình tự.
- Vòng lặp sự kiện trong CLI: bên ngoài một
Server
của Swoole, bắt đầu/dừng reactor vớiSwoole\Event::wait()
/Event::exit()
để các bộ hẹn giờ có thể chạy.
Thực tiễn tốt nhất
- Đảm bảo luôn kiểm tra các lỗi khi phát và đăng ký sự kiện.
- Sử dụng các phương thức
once()
để tránh việc lặp lại không cần thiết cho các sự kiện không quan trọng. - Tối ưu hóa việc sử dụng tài nguyên bằng cách sử dụng batching cho các sự kiện có thể xử lý hàng loạt.
Những cạm bẫy thường gặp
- Không xử lý đúng các lỗi trong các callback có thể dẫn đến ứng dụng bị treo hoặc không phản hồi.
- Bỏ qua việc quản lý timeout có thể dẫn đến các yêu cầu không được xử lý đúng cách.
Mẹo hiệu suất
- Sử dụng
SwooleScheduler
để tối ưu hóa việc xử lý thời gian và sự kiện. - Kiểm tra và tối ưu hóa mã của bạn để đảm bảo rằng tất cả các callback được thực thi một cách hiệu quả.
Kết luận
Swoole Rx Events cung cấp một hệ thống sự kiện mạnh mẽ và linh hoạt cho PHP, cho phép các nhà phát triển xây dựng các ứng dụng có khả năng mở rộng và phản ứng nhanh chóng với các sự kiện. Hãy bắt đầu tích hợp thư viện này vào dự án của bạn ngay hôm nay để trải nghiệm những lợi ích mà nó mang lại!
Câu hỏi thường gặp (FAQ)
1. Swoole Rx Events có yêu cầu gì về môi trường?
Yêu cầu PHP 8.3 trở lên và phiên bản Swoole tương ứng.
2. Làm thế nào để cài đặt thư viện này?
Bạn có thể cài đặt bằng cách sử dụng Composer với lệnh composer require small/swoole-rx-events
.
3. Có thể sử dụng Swoole Rx Events với các framework PHP khác không?
Có, bạn có thể tích hợp thư viện này vào bất kỳ ứng dụng PHP nào hỗ trợ Swoole.