Operadores personalizados RxJS

La biblioteca RxJS, debido a su amplia variedad de operadores, se considera legítimamente una herramienta extremadamente poderosa en el arsenal del desarrollador. En esta publicación, quiero presentarles el concepto de operadores personalizados RxJS con ejemplos de implementación.





RxJS . , . RxJS .





RxJS — , (observable) . , RxJS JavaScript (TypeScript). (identity), :





import { interval, Observable } from "rxjs";
import { take } from "rxjs/operators";

const source$ = interval(1000).pipe(take(3));

function identity<T>(source$: Observable<T>): Observable<T> {
  return source$;
}

const results$ = source$.pipe(identity);

results$.subscribe(console.log);
  
// console output: 0, 1, 2
      
      



- .





( ) :





<Copy>
import { interval, Observable } from "rxjs";
import { take, tap } from "rxjs/operators";

const source$ = interval(1000).pipe(take(3));

function log<T>(source$: Observable<T>): Observable<T> {
  return source$.pipe(tap(v => console.log(`log: ${v}`)));
}

const results$ = source$.pipe(log);

results$.subscribe(console.log);
  
// console output: log: 0, log: 1, log: 2
      
      



source$, pipe.





. , . :





import { interval, Observable } from "rxjs";
import { take, tap } from "rxjs/operators";

const source$ = interval(1000).pipe(take(3));

function logWithTag<T>(tag: string): (source$: Observable<T>) => Observable<T> {
  return source$ =>
    source$.pipe(tap(v => console.log(`logWithTag(${tag}): ${v}`)));
}

const results$ = source$.pipe(logWithTag("RxJS"));

results$.subscribe(console.log);
  
// console output: logWithTag(RxJS): 0, logWithTag(RxJS): 1, logWithTag(RxJS): 2
      
      



, MonoTypeOperatorFunction  RxJS. , pipe :





import { interval, MonoTypeOperatorFunction, pipe } from "rxjs";
import { take, tap } from "rxjs/operators";

const source$ = interval(1000).pipe(take(3));

function logWithTag<T>(tag: string): MonoTypeOperatorFunction<T> {
  return pipe(tap(v => console.log(`logWithTag(${tag}): ${v}`)));
}

const results$ = source$.pipe(logWithTag("RxJS"));

results$.subscribe(console.log);
  
// console output: logWithTag(RxJS): 0, logWithTag(RxJS): 1, logWithTag(RxJS): 2
      
      



RxJS .





. :





import { interval, MonoTypeOperatorFunction, pipe } from "rxjs";
import { take, tap } from "rxjs/operators";

const source$ = interval(1000).pipe(take(3));

function tapOnce<T>(job: Function): MonoTypeOperatorFunction<T> {
  let isFirst = true;

  return pipe(
    tap(v => {
      if (!isFirst) {
        return;
      }

      job(v);
      isFirst = false;
    })
  );
}

const results$ = source$.pipe(tapOnce(() => console.log("First value emitted")));

results$.subscribe(console.log);
results$.subscribe(console.log);
  
// console output: First value emitted, 0, 0, 1, 1, 2, 2
      
      



, defer:





import { defer, interval, MonoTypeOperatorFunction } from "rxjs";
import { take, tap } from "rxjs/operators";

const source$ = interval(1000).pipe(take(3));

function tapOnceUnique<T>(job: Function): MonoTypeOperatorFunction<T> {
  return source$ =>
    defer(() => {
      let isFirst = true;

      return source$.pipe(
        tap(v => {
          if (!isFirst) {
            return;
          }

          job(v);
          isFirst = false;
        })
      );
    });
}

const results$ = source$.pipe(tapOnceUnique(() => console.log("First value emitted")));

results$.subscribe(console.log);
results$.subscribe(console.log);
  
// console output: First value emitted, 0, First value emitted, 0, 1, 1, 2, 2
      
      



tapOnce



.





firstTruthy



:





import { MonoTypeOperatorFunction, of, pipe } from "rxjs";
import { first } from "rxjs/operators";

const source1$ = of(0, "", "foo", 69);

function firstTruthy<T>(): MonoTypeOperatorFunction<T> {
  return pipe(first(v => Boolean(v)));
}

const result1$ = source1$.pipe(firstTruthy());

result1$.subscribe(console.log);

// console output: foo
      
      



evenMultiplied



:





import { interval, MonoTypeOperatorFunction, pipe } from "rxjs";
import { filter, map, take } from "rxjs/operators";

const source2$ = interval(10).pipe(take(3));

function evenMultiplied(multiplier: number): MonoTypeOperatorFunction<number> {
  return pipe(
    filter(v => v % 2 === 0),
    map(v => v * multiplier)
  );
}

const result2$ = source2$.pipe(evenMultiplied(3));

result2$.subscribe(console.log);
  
// console output: 0, 6
      
      



liveSearch



:





import { ObservableInput, of, OperatorFunction, pipe  } from "rxjs";
import { debounceTime, delay, distinctUntilChanged, switchMap } from "rxjs/operators";

const source3$ = of("politics", "sport");

type DataProducer<T> = (q: string) => ObservableInput<T>;

function liveSearch<R>(
  time: number,
  dataProducer: DataProducer<R>
): OperatorFunction<string, R> {
  return pipe(
    debounceTime(time),
    distinctUntilChanged(),
    switchMap(dataProducer)
  );
}

const newsProducer = (q: string) =>
  of(`Data fetched for ${q}`).pipe(delay(2000));

const result3$ = source3$.pipe(liveSearch(500, newsProducer));

result3$.subscribe(console.log);
  
// console output: Data fetched for sport
      
      



RxJS . , pipe-.





: [ ]





, - .






"JavaScript Developer. Professional". , , .








All Articles