RxJS Error Handling và Conditional Operators trong RxJS

0 phút đọc

Trong ngày đầu tiên tìm hiểu RxJS chúng ta được biết rằng, mỗi Observable có thể gửi về các message cho Next, Error, Complete. Và nếu như có Error được phát sinh thì Observable sẽ dừng lại. Vậy làm thế nào để chúng ta có thể catch được Error đó?

image

Trong ngày hôm nay chúng ta sẽ cùng tìm hiểu một số operators để xử lý lỗi và một số khác để làm việc với các loại điều kiện (Error Handling and Conditional Operators).

Trong bài này, chúng ta sẽ tiếp tục sử dụng observer mặc định như sau:

const observer = {
  next: (val) => console.log(val),
  error: (err) => console.error(err),
  complete: () => console.log("complete"),
};

RxJS Error Handling Operators

catchError trong RxJS

Đối với trường hợp các bạn muốn bắt được lỗi và muốn xử lý lỗi đó, ví dụ: biến đổi Error thành một value thông thường, tránh bị terminate stream. Bạn có thể dùng catchError (.catch cho prototype chain).

Catches errors on the observable to be handled by returning a new observable or throwing an error. RxJS catchError

catchError<T, O extends ObservableInput<any>>(selector: (err: any, caught: Observable<T>) => O): OperatorFunction<T, T | ObservedValueOf<O>>

import { of } from "rxjs";
import { map, catchError } from "rxjs/operators";
const cached = [4, 5];
of(1, 2, 3, 4, 5)
  .pipe(
    map((n) => {
      if (cached.includes(n)) {
        throw new Error("Duplicated: " + n);
      }
      return n;
    }),
    catchError((err, caught) => of(err))
  )
  .subscribe(observer);

/**
 * Output:
 * --1--2--3--(next: Error)--|
 */

Trong trường hợp trên nếu chúng ta không bắt error thì observer.error sẽ là nơi đón Error, nhưng vì chúng ta trả về là một next: Error nên error này đã được handle bởi observer.next.

Một ví dụ trong ứng dụng là khi các bạn làm việc với forkJoin Day 23, lúc này nếu một stream nào đó emit error thì toàn bộ stream sẽ bị văng ra error. Trong trường hợp các bạn muốn nó vẫn tiếp tục chạy hết và chúng ta sẽ tách Error ra ở pipe tiếp theo thì chỉ cần catchError lại như trên là được.

forkJoin([of(1), of(2), throwError(new Error("401"))]).subscribe(observer);
/**
 * Output:
 * --(x: Error 401)--
 */

// with catchError

forkJoin([
  of(1),
  of(2),
  throwError(new Error("401")).pipe(catchError((err) => of(err))),
]).subscribe(observer);

/**
 * Output:
 * --(next: [1, 2, Error 401])|--
 */
image

Nếu bạn return về Observable trước đó (source Observable), chúng ta có thể tiến hành retry. Nhưng hãy thật cẩn thận vì có thể sinh ra infinite loop.
Nếu bạn muốn retry kèm theo giới hạn về số lần, chúng ta có thể kết hợp với take:

of(1, 2, 3, 4, 5)
  .pipe(
    map((n) => {
      if (cached.includes(n)) {
        throw new Error("Duplicated: " + n);
      }
      return n;
    }),
    catchError((err, caught) => caught),
    take(10)
  )
  .subscribe(observer);

/**
 * Output:
 * --1--2--3--1--2--3--1|
 */

Ngoài ra, trong catchError bạn hoàn toàn có thể throw về một error để pipe phía sau có thể handle tiếp.

retry trong RxJS

Returns an Observable that mirrors the source Observable with the exception of an error. If the source Observable calls error, this method will resubscribe to the source Observable for a maximum of count resubscriptions (given as a number parameter) rather than propagating the error call.

retry<T>(count: number = -1): MonoTypeOperatorFunction<T>

Operator này sẽ resubscribe vào source Observable khi có error emit từ source. Nếu chúng ta không truyền gì vào cho param count, lúc này nó có thể retry không giới hạn số lần. Ngược lại, nó sẽ retry max số lần được truyền vào.

Nó khá hữu ích khi bạn muốn retry HTTP request chẳng hạn. Lưu ý chỉ nên dùng cho get data, không nên dùng cho Create, Update, Delete vì có thể sinh ra race condition.

const cached = [4, 5];
of(1, 2, 3, 4, 5)
  .pipe(
    map((n) => {
      if (cached.includes(n)) {
        throw new Error("Duplicated: " + n);
      }
      return n;
    }),
    retry(3)
  )
  .subscribe(observer);

/**
 * Output:
 * --1--2--3--1--2--3--1--2--3--1--2--3--(x: Error)
 */
image

Lưu ý, cách hoạt động của retry operator khác với catch kèm theo kỹ thuật retry ở trên.

Ngoài retry chúng ta có thể dùng retryWhen để có thể control vào quá trình retry (ví dụ: khi nào sẽ retry).

Một use-case khá hay là retryBackoff operator, nó sẽ tăng thời gian sau mỗi lần retry:

export function retryBackoff(
  config: number | RetryBackoffConfig
): <T>(source: Observable<T>) => Observable<T> {
  const {
    initialInterval,
    maxRetries = Infinity,
    maxInterval = Infinity,
    shouldRetry = () => true,
    resetOnSuccess = false,
    backoffDelay = exponentialBackoffDelay,
  } = typeof config === "number" ? { initialInterval: config } : config;
  return <T>(source: Observable<T>) =>
    defer(() => {
      let index = 0;
      return source.pipe(
        retryWhen<T>((errors) =>
          errors.pipe(
            concatMap((error) => {
              const attempt = index++;
              return iif(
                () => attempt < maxRetries && shouldRetry(error),
                timer(
                  getDelay(backoffDelay(attempt, initialInterval), maxInterval)
                ),
                throwError(error)
              );
            })
          )
        ),
        tap(() => {
          if (resetOnSuccess) {
            index = 0;
          }
        })
      );
    });
}

RxJS Error Conditional Operators

defaultIfEmpty/throwIfEmpty trong RxJS

defaultIfEmpty<T, R>(defaultValue: R = null): OperatorFunction<T, T | R>

throwIfEmpty<T>(errorFactory: () => any = defaultErrorFactory): MonoTypeOperatorFunction<T>

Hai operators này cho phép chúng ta trả về các giá trị tương ứng (default value hoặc Error) nếu source stream là empty (không emit value nào mà chỉ có complete).

Giả sử, chúng ta cần làm yêu cầu nếu người dùng không click vào sau 1s thì sẽ báo lỗi. Ví dụ tạo transaction sau 1s không confirm thì hủy và báo lỗi cho người dùng.

import { fromEvent, timer } from "rxjs";
import { throwIfEmpty, takeUntil } from "rxjs/operators";

const click$ = fromEvent(document, "click");

click$
  .pipe(
    takeUntil(timer(1000)),
    throwIfEmpty(
      () => new Error("the document was not clicked within 1 second")
    )
  )
  .subscribe(observer);
image

every trong RxJS

Returns an Observable that emits whether or not every item of the source satisfies the condition specified.

every<T>(predicate: (value: T, index: number, source: Observable<T>) => boolean, thisArg?: any): OperatorFunction<T, boolean>

Opeator này sẽ trả về true nếu tất cả các value emit của source thỏa mãn hàm predicate.

Lưu ý nếu source không complete thì sẽ không có gì emit ra cả.

of(1, 2, 3, 4, 5, 6)
  .pipe(every((x) => x < 5))
  .subscribe(observer);

/**
 * Output:
 * ------false|
 */
image

Các method của Array trong JS, có cả everysome, nếu các bạn muốn có một operator giống some ở trong RxJS thì có thể dùng first kèm theo predicate function. Giống như trong phần Router của Angular

of(1, 2, 3, 14, 5, 6)
  .pipe(
    first((x) => x > 10, false),
    map((v) => Boolean(v))
  )
  .subscribe(observer);

/**
 * Output:
 * ------true|
 */

iif trong RxJS

Decides at subscription time which Observable will actually be subscribed. RxJS iff

iif<T = never, F = never>(condition: () => boolean, trueResult: SubscribableOrPromise<T> = EMPTY, falseResult: SubscribableOrPromise<F> = EMPTY): Observable<T | F>

Opeartor này cho phép chúng ta lựa chọn Observable tương ứng với hàm điều kiện khi thực hiện subscribe.

iif accepts a condition function and two Observables. When an Observable returned by the operator is subscribed, condition function will be called. Based on what boolean it returns at that moment, consumer will subscribe either to the first Observable (if condition was true) or to the second (if condition was false). Condition function may also not return anything - in that case condition will be evaluated as false and second Observable will be subscribed.

Note that Observables for both cases (true and false) are optional. If condition points to an Observable that was left undefined, resulting stream will simply complete immediately. That allows you to, rather than controlling which Observable will be subscribed, decide at runtime if consumer should have access to given Observable or not.

If you have more complex logic that requires decision between more than two Observables, defer will probably be a better choice. Actually iif can be easily implemented with defer and exists only for convenience and readability reasons.

import { iif, of } from "rxjs";

let subscribeToFirst;
const firstOrSecond = iif(() => subscribeToFirst, of("first"), of("second"));

subscribeToFirst = true;
firstOrSecond.subscribe((value) => console.log(value));

// Logs:
// "first"

subscribeToFirst = false;
firstOrSecond.subscribe((value) => console.log(value));

// Logs:
// "second"

Lời kết

Như vậy ngày hôm nay chúng ta đã tăng cường thêm nội lực (💪) về RxJS qua một số operators: Error Handling và Conditional. Hẹn gặp lại các bạn vào ngày mai.

Bình luận

Chưa có bình luận nào

Chưa có bình luận nào

Avatar TechMely Team
Được viết bởi

TechMely Team

Cuộc sống quan trọng nhất là sự lựa chọn.