0
0
Lập trình
Admin Team
Admin Teamtechmely

Giao tiếp Publish/Subscribe với Redis và Golang

Đăng vào 2 ngày trước

• 3 phút đọc

Chủ đề:

golangRedisPubSub

Giao thức Gửi Nhận Tin theo Kênh (Publish/Subscribe) sử dụng ba lệnh chính: SUBSCRIBE, UNSUBSCRIBEPUBLISH. Mô hình này cho phép các nhà phát hành (publishers) gửi tin nhắn mà không cần thiết phải biết ai là người nhận (subscribers). Thay vào đó, các tin nhắn được phân loại thành các kênh mà các người nhận có thể lựa chọn quan tâm. Điều này giúp tăng cường khả năng mở rộng (scalability) cũng như tạo nên một cấu trúc mạng linh hoạt hơn.

Theo quan điểm của tôi, Redis Pub/Sub thường được sử dụng khi hệ thống yêu cầu độ trễ thấp, vì Redis lưu trữ trong bộ nhớ, từ đó xử lý rất nhanh. Tuy nhiên, tin nhắn trong Redis có thể bị mất khi hệ thống hoặc máy chủ bị tắt.

Việc cài đặt và duy trì Pub/Sub trong Redis cũng rất đơn giản bởi vì các kỹ sư đã quen thuộc với Redis và có thể sao lưu nó một cách hiệu quả.

Các kênh Redis Pub/Sub có thể có nhiều người nhận, nhưng nếu có quá nhiều người, hiệu suất có thể bị ảnh hưởng. Dưới đây là một số trường hợp sử dụng hợp lý cho chế độ gửi tin “connected” của Redis Pub/Sub:

  1. Tin nhắn thực thời, độ trễ thấp và khẩn cấp: Những tin nhắn tồn tại trong thời gian ngắn và chỉ có giá trị trong một khoảng thời gian ngắn (gần như là “ngay lập tức”).
  2. Gửi nhận tin không đáng tin cậy/gửi nhận tin mất mát: Khi không quan trọng nếu một số tin nhắn bị bỏ qua (ví dụ: những tin nhắn dư thừa không quan trọng).
  3. Yêu cầu gửi nhận một lần tối đa cho mỗi người nhận, tức là các người nhận không thể phát hiện các tin nhắn trùng lặp và các hệ thống mục tiêu không có tính chất idempotent.
  4. Khi người nhận thường có mối quan tâm ngắn hạn, biến đổi hoặc động về các kênh, chỉ muốn nhận tin từ các kênh cụ thể trong khoảng thời gian nhất định.
  5. Nếu người nhận (các kênh và nhà phát hành cũng vậy!) có khả năng tồn tại ngắn hạn.
  6. Có chỉ một số lượng nhỏ người nhận và mẫu tin trên mỗi kênh.

Dưới đây là một mẫu Pub/Sub khi sử dụng client Redis.

Thí dụ về lệnh SUBSCRIBE kênh test

Copy
127.0.0.1:6379> SUBSCRIBE test
Reading messages... (bấm Ctrl-C để thoát)
1) "subscribe"
2) "test"
3) (integer) 1

Thí dụ về lệnh PUBLISH kênh test với tin nhắn "hihi"

Copy
127.0.0.1:6379> PUBLISH test hihi
(integer) 1
127.0.0.1:6379>

Kết quả khi người nhận nhận được tin nhắn từ nhà phát hành:

Copy
127.0.0.1:6379> SUBSCRIBE test
Reading messages... (bấm Ctrl-C để thoát)
1) "subscribe"
2) "test"
3) (integer) 1
1) "message"
2) "test"
3) "hihi"

Bây giờ, hãy cùng triển khai Redis Pub/Sub với Golang. Dưới đây là quy trình: tôi sẽ phát hành tin nhắn vào kênh test thông qua một goroutine và một goroutine khác sẽ đăng ký nhận tin từ kênh test, đồng thời đẩy nó vào một kênh khác. Tôi sử dụng 5 worker để đọc tin nhắn từ kênh và xử lý chúng.

Khởi tạo kết nối Redis

Copy
func initRedis() (*RedisClient, error) {
	var redisClient *redis.Client

	opts, err := redis.ParseURL("redis://default:@localhost:6279")
	if err != nil {
		log.Fatal("không thể khởi tạo redis:", err)
		return nil, err
	}

	opts.PoolSize = 30
	opts.DialTimeout = 10 * time.Second
	opts.ReadTimeout = 5 * time.Second
	opts.WriteTimeout = 5 * time.Second
	opts.IdleTimeout = 5 * time.Second
	opts.Username = ""

	redisClient = redis.NewClient(opts)

	cmd := redisClient.Ping(context.Background())
	if cmd.Err() != nil {
		return nil, cmd.Err()
	}

	return &RedisClient{
		Redis: redisClient,
	}, nil
}

Goroutine phát hành tin nhắn đến kênh test và xử lý chúng.

Copy
func main() {
	// khởi tạo redis
	redisClient, err := initRedis()
	if err != nil {
		logrus.Warnf("khởi tạo client redis thất bại với lỗi: %v", err)
		return
	}

	ctx, channel := context.Background(), "test"
	ch := make(chan string, 1)
	numberOfWorkers := 5
	quit := make(chan os.Signal)
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

	// 5 worker
	for i := 0; i < numberOfWorkers; i++ {
		go func() {
			for msg := range ch {
				fmt.Println(msg)
			}
		}
	}

	subscriber := redisClient.Redis.Subscribe(ctx, channel)
	go func() {
		for {
			message, err := subscriber.ReceiveMessage(ctx)
			if err != nil {
				continue
			}

			// đẩy vào kênh
			ch <- message.Payload
		}
	}()

	time.Sleep(1 * time.Second)
	go func() {
		for i := 0; i < 1000; i++ {
			redisClient.Redis.Publish(ctx, channel, fmt.Sprintf("hello %v", i))
		}
	}()

	<-quit
	log.Println("đang tắt")
}

Tài liệu tham khảo:

Gợi ý câu hỏi phỏng vấn
Không có dữ liệu

Không có dữ liệu

Bài viết được đề xuất
Bài viết cùng tác giả

Bình luận

Chưa có bình luận nào

Chưa có bình luận nào