Giới thiệu về hàng đợi
Hàng đợi là một cấu trúc dữ liệu trong lập trình được sử dụng để truyền tải thông điệp giữa các thành phần của hệ thống. Hàng đợi hoạt động theo nguyên tắc FIFO (First In First Out), tức là nếu một thông điệp được đẩy vào hàng đợi trước, thông điệp đó sẽ được xử lý đầu tiên bởi các thành phần tiêu thụ.
Các thành phần trong hàng đợi:
- Nhà sản xuất (Producer): Nơi mà thông điệp được tạo ra và đẩy vào hàng đợi.
- Người tiêu dùng (Consumer): Nơi mà thông điệp được nhận và xử lý từ hàng đợi.
- Hàng đợi thông điệp (Message queue): Nơi thông điệp được lưu trữ.
- Thông điệp (Message): Là thông điệp được tạo ra bởi nhà sản xuất.
Tại sao nên sử dụng hàng đợi thông điệp?
Hàng đợi thông điệp thường tăng độ bền vững cho hệ thống và hỗ trợ xử lý bất đồng bộ. Ví dụ: Khi người dùng nhập vào một danh sách sản phẩm, danh sách này sẽ được đẩy vào hàng đợi và hệ thống sẽ xử lý nó. Người dùng sẽ nhận thông báo khi quá trình xử lý hoàn tất mà không cần chờ đợi.
Redis Lists là gì?
Redis Lists là danh sách liên kết các giá trị chuỗi. Redis Lists thường được sử dụng để:
- Thực hiện thư viện và hàng đợi
- Xây dựng quản lý hàng đợi cho các hệ thống Golang.
Các lệnh cơ bản của Redis Lists:
- LPUSH: Thêm một phần tử mới vào đầu danh sách; RPUSH: Thêm vào cuối danh sách.
- LPOP: Xóa và trả về một phần tử từ đầu danh sách; RPOP: Làm điều tương tự nhưng từ cuối danh sách.
- LLEN: Trả về độ dài của danh sách.
- LMOVE: Di chuyển atomically các phần tử từ danh sách này sang danh sách khác.
- LRANGE: Trích xuất một khoảng các phần tử từ danh sách.
- LTRIM: Giảm kích thước của danh sách về khoảng các phần tử xác định.
Các lệnh chặn của Redis Lists:
Danh sách hỗ trợ một số lệnh chặn. Ví dụ:
- BLPOP: Xóa và trả về một phần tử từ đầu danh sách. Nếu danh sách trống, lệnh sẽ chặn lại cho đến khi có phần tử.
- BRPOP: Là phiên bản chặn của RPOP, chặn kết nối khi không có phần tử nào để xóa từ danh sách.
Vì hàng đợi hoạt động theo nguyên tắc FIFO, tôi sẽ sử dụng LPUSH và BRPOP để triển khai.
Cài đặt Redis với Golang
Khởi tạo kết nối Redis
go
func initRedis() (*RedisClient, error) {
var redisClient *redis.Client
opts, err := redis.ParseURL("redis://default:@localhost:6379")
if err != nil {
log.Fatal("thất bại khi khởi tạo redis:", err)
return nil, err
}
opts.PoolSize = 30
opts.DialTimeout = 10 * time.Second
opts.ReadTimeout = 5 * time.Second
opts.WriteTimeout = 5 * time.Second
opts.IdleTimeout = 5 * time.Second
opts.Username = ""
redisClient = redis.NewClient(opts)
cmd := redisClient.Ping(context.Background())
if cmd.Err() != nil {
return nil, cmd.Err()
}
return &RedisClient{
Redis: redisClient,
}, nil
}
Worker tiêu thụ hàng đợi
go
go func() {
for {
// dùng BRPop sẽ chặn nếu hàng đợi trống.
message, err := redisClient.BRPop(context.Background(), queue, 0)
if err != nil {
fmt.Println(err)
continue
}
fmt.Println(fmt.Sprintf("thông điệp: %v", message))
}
}()
Worker đẩy thông điệp vào hàng đợi
go
go func() {
for i := 0; i < 10; i++ {
_, err := redisClient.LPush(ctx, queue, fmt.Sprintf("hello %v", i))
if err != nil {
return
}
}
}()
Hàm chính để thực hiện
go
func main() {
// khởi tạo redis
redisClient, err := initRedis()
if err != nil {
logrus.Warnf("khởi tạo redis client thất bại với lỗi: %v", err)
return
}
ctx, queue := context.Background(), "tuan"
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
// worker tiêu thụ hàng đợi
go func() {
for {
message, err := redisClient.BRPop(context.Background(), queue, 0)
if err != nil {
fmt.Println(err)
continue
}
fmt.Println(fmt.Sprintf("thông điệp: %v", message))
}
}()
go func() {
for i := 0; i < 10; i++ {
_, err := redisClient.LPush(ctx, queue, fmt.Sprintf("hello %v", i))
if err != nil {
return
}
}
}()
<-quit
log.Println("đang tắt hệ thống")
}
Tài liệu tham khảo:
- Mã nguồn: GitHub
- Danh sách Redis
- Các lệnh Redis
- Bài viết của tôi trên blog
source: viblo