RxJS Transformation Operators trong RxJS

0 phút đọc

Pipeable Operators trong angular

Một Pipeable Operator là một function nó nhận đầu vào là một Observable và returns một Observable khác. Chúng là pure operation: Observable truyền vào sẽ không bị thay đổi gì.

Cú pháp:

observableInstance.pipe(operator1(), operator2());

Với cú pháp trên thì observableInstancepipe bao nhiêu operator đi nữa thì nó vẫn không đổi, và cuối cùng chúng ta sẽ nhận lại một Observable nên để có thể sử dụng thì chúng ta cần gán lại, hoặc thực hiện subscribe ngay sau khi pipe:

const returnObservable = observableInstance.pipe(operator1(), operator2());

Nếu bạn dùng với RxJS version < 5.5 thì có thể các bạn sẽ thấy cú pháp sử dụng khác là prototype method chain, nhưng nếu bạn dùng từ version 5.5 trở lên thì nên dùng pipe operators, dựa theo một số giải thích ở đây: pipeable operators

Pipeable Operators có thể chia thành nhiều category khác nhau, trong ngày hôm nay chúng ta sẽ tìm hiểu về Transformation Operators.

Transformation Operators

Chắc hẳn các bạn đã quá quen với làm việc cùng Array trong JS, chúng ta có thể lặp qua từng phần tử trong mảng, sau đó apply một function lên mỗi phần tử, kết quả trả về sẽ được đưa vào một mảng mới có kích thước giống như mảng ban đầu như sau:

const users = [
  {
    id: "ddfe3653-1569-4f2f-b57f-bf9bae542662",
    username: "tiepphan",
    firstname: "tiep",
    lastname: "phan",
  },
  {
    id: "34784716-019b-4868-86cd-02287e49c2d3",
    username: "nartc",
    firstname: "chau",
    lastname: "tran",
  },
];

const usersVm = users.map((user) => {
  return {
    ...user,
    fullname: `${user.firstname} ${user.lastname}`,
  };
});

Kết quả có được sẽ có dạng như sau:

usersVm = [
  {
    id: "ddfe3653-1569-4f2f-b57f-bf9bae542662",
    username: "tiepphan",
    firstname: "tiep",
    lastname: "phan",
    fullname: "tiep phan",
  },
  {
    id: "34784716-019b-4868-86cd-02287e49c2d3",
    username: "nartc",
    firstname: "chau",
    lastname: "tran",
    fullname: "chau tran",
  },
];

Như vậy qua một lần biến đổi, chúng ta sẽ có được dữ liệu như ý muốn.

Vậy với Observable thì sao. Giả sử chúng ta đang có một hệ thống tracking xem những ai đăng nhập vào hệ thống. Do đó ở một số thời điểm sẽ có một/một vài người đăng nhập, và mỗi lần như thế hệ thống sẽ gửi cho chúng ta một event để biết. Bây giờ chúng ta cũng làm nhiệm vụ tương tự như map ở trên thì sao.

import { Observable } from "rxjs";
import { map } from "rxjs/operators";

interface User {
  id: string;
  username: string;
  firstname: string;
  lastname: string;
}

const source = new Observable<User>((observer) => {
  const users = [
    {
      id: "ddfe3653-1569-4f2f-b57f-bf9bae542662",
      username: "tiepphan",
      firstname: "tiep",
      lastname: "phan",
    },
    {
      id: "34784716-019b-4868-86cd-02287e49c2d3",
      username: "nartc",
      firstname: "chau",
      lastname: "tran",
    },
  ];

  setTimeout(() => {
    observer.next(users[0]);
  }, 1000);
  setTimeout(() => {
    observer.next(users[1]);
    observer.complete();
  }, 3000);
});

const observer = {
  next: (value) => console.log(value),
  error: (err) => console.error(err),
  complete: () => console.log("completed"),
};
source.subscribe(observer);

Khi chạy chương trình bạn sẽ thấy rằng, sau 1 giây thì sẽ emit ra user đầu tiên, và sau đó 2 giây thì sẽ emit ra user thứ hai kèm theo complete signal.

map trong RxJS

map<T, R>(project: (value: T, index: number) => R, thisArg?: any): OperatorFunction<T, R>

Giả sử bạn cần hiển thị thông tin fullname của user trong next thì bạn sẽ có thể dùng cách nào.

Cách đơn giản nhất là bạn sẽ vào hàm next để thực hiện tính toán. Nhưng chúng ta có thể transform stream data trước khi nó đi đến với điểm cuối.

Đây chính là lúc bạn có thể sử dụng đến Operator như map của RxJS.

import { map } from "rxjs/operators";

source
  .pipe(
    map((user) => {
      return {
        ...user,
        fullname: `${user.firstname} ${user.lastname}`,
      };
    })
  )
  .subscribe(observer);

Hoặc giả sử yêu cầu của chúng ta giờ đây thay đổi, chỉ cần trả về id của user mỗi khi được emit.

source.pipe(map((user) => user.id)).subscribe(observer);

Cách dùng map này khá giống cách dùng map của array ở trên phải không???

RxJS map là gì

pluck trong RxJS

pluck<T, R>(...properties: string[]): OperatorFunction<T, R>

Đối với yêu cầu map ra một property trong một object như vừa rồi, bạn có thể sử dụng một cách khác là pluck:

import { pluck } from "rxjs/operators";

source.pipe(pluck("id")).subscribe(observer);
RxJS pluck

mapTo trong RxJS

mapTo<T, R>(value: R): OperatorFunction<T, R>

Sẽ thế nào nếu bạn muốn bất cứ khi nào stream emit một giá trị thì bạn luôn trả về một giá trị fixed không?

Giả sử bạn đang làm chức năng để lắng nghe mouse hover. Như bạn cũng có thể biết chúng ta sẽ cần kết hợp giữa mouseovermouseleave event chẳng hạn.

Khi mouseover chúng ta luôn trả về true, và khi mouseleave chúng ta luôn trả về false.

Trong đoạn code dưới đây các bạn tạm thời hiểu rằng merge sẽ gộp 2 streams lại thành một, chúng ta sẽ học về combine streams những ngày sau.

const element = document.querySelector("#hover");

const mouseover$ = fromEvent(element, "mouseover");
const mouseleave$ = fromEvent(element, "mouseleave");

const hover$ = merge(
  mouseover$.pipe(mapTo(true)),
  mouseleave$.pipe(mapTo(false))
);

hover$.subscribe(observer);

Giờ đây chúng ta đã có một stream hover$ để biết được khi nào chúng ta in/out ở một element.

RxJS mapTo

scan trong RxJS

scan<T, R>(accumulator: (acc: R, value: T, index: number) => R, seed?: T | R): OperatorFunction<T, R>

Bây giờ mỗi lần stream emit một value, bạn muốn apply một function lên value đó nhưng có sử dụng kèm theo kết quả lưu trữ trước đó (accumulator). Các bạn có thể liên tưởng ngay đến hàm reduce của Array.

Ví dụ: Count số lần người dùng đã click vào một button (giống như bài đầu tiên về RxJS).

const button = document.querySelector("#add");

const click$ = fromEvent(button, "click");

click$.pipe(scan((acc, curr) => acc + 1, 0)).subscribe(observer);

Count số bài đăng của những người dùng đăng nhập theo thời gian:

const users$ = new Observable<User>((observer) => {
  const users = [
    {
      id: "ddfe3653-1569-4f2f-b57f-bf9bae542662",
      username: "tiepphan",
      firstname: "tiep",
      lastname: "phan",
      postCount: 5,
    },
    {
      id: "34784716-019b-4868-86cd-02287e49c2d3",
      username: "nartc",
      firstname: "chau",
      lastname: "tran",
      postCount: 22,
    },
  ];

  setTimeout(() => {
    observer.next(users[0]);
  }, 1000);
  setTimeout(() => {
    observer.next(users[1]);
    observer.complete();
  }, 3000);
});

users$.pipe(scan((acc, curr) => acc + curr.postCount, 0)).subscribe(observer);
RxJS scan

reduce trong RxJS

reduce<T, R>(accumulator: (acc: T | R, value: T, index?: number) => T | R, seed?: T | R): OperatorFunction<T, T | R>

Operator này khá giống scan là nó sẽ reduce value overtime, nhưng nó sẽ đợi đến khi source complete rồi thì nó mới emit một giá trị cuối cùng và gửi đi complete.

users$.pipe(reduce((acc, curr) => acc + curr.postCount, 0)).subscribe(observer);
RxJS reduce

toArray trong RxJS

toArray<T>(): OperatorFunction<T, T[]>

Giả sử bạn cần collect toàn bộ các value emit bởi stream rồi lưu trữ thành một array, sau đó đợi đến khi stream complete thì emit một array và complete. Lúc này bạn hoàn toàn có thể sử dụng reduce:

users$.pipe(reduce((acc, curr) => [...acc, curr], [])).subscribe(observer);

Nhưng có một cách viết khác ngắn gọn hơn đó là dùng toArray.

users$.pipe(toArray()).subscribe(observer);

buffer trong RxJS

buffer<T>(closingNotifier: Observable<any>): OperatorFunction<T, T[]>

Lưu trữ giá trị được emit ra và đợi đến khi closingNotifier emit thì emit những giá trị đó thành 1 array.

const interval$ = interval(1000);

const click$ = fromEvent(document, "click");

const buffer$ = interval$.pipe(buffer(click$));

const subscribe = buffer$.subscribe((val) =>
  console.log("Buffered Values: ", val)
);

// output có dạng
"Buffered Values: "[(0, 1)];
"Buffered Values: "[(2, 3, 4, 5, 6)];
RxJS buffer

bufferTime trong RxJS

bufferTime<T>(bufferTimeSpan: number): OperatorFunction<T, T[]>

Tương tự như buffer, nhưng emit values mỗi khoảng thời gian bufferTimeSpan ms.

const source = interval(500);

const bufferTime = source.pipe(
  bufferTime(2000)
);

const bufferTimeSub = bufferTime.subscribe(
  val => console.log('Buffered with Time:', val)
);
// output
"Buffered with Time:"
[0, 1]
"Buffered with Time:"
[2, 3]
"Buffered with Time:"
[4, 5]
...
RxJS bufferTime

Lời kết

Như vậy trong bài này chúng ta đã tìm hiểu cơ bản về một số Transformation Operators hay dùng trong RxJS, các bạn có thể thực hành thêm thông qua các ví dụ từ trang rxjs.dev để hiểu thêm.

Bình luận

Chưa có bình luận nào

Chưa có bình luận nào

Avatar TechMely Team
Được viết bởi

TechMely Team

Ta không thể bắt đầu lại nhưng ta có thể mở đầu bây giờ và làm nên một kết thúc mới.