Inicialización de la cadena Rx

Hola a todos, mi nombre es Ivan, soy un desarrollador de Android. Hoy quiero compartir mi experiencia con RxJava2 y contarte cómo se lleva a cabo la inicialización de la cadena. ¿Por qué decidí mencionar esto? Después de hablar con otros desarrolladores, me di cuenta de que no todos los que usan esta herramienta entienden cómo funciona. Y luego decidí averiguar cómo se organizan las suscripciones en RxJava2 y en qué secuencia se inicializa todo el trabajo. No he encontrado un solo artículo que explique esto. A la luz de esto, entré en el código fuente para ver cómo funciona todo y esbocé una pequeña hoja de trucos, que se convirtió en este artículo.





En este artículo, no voy a describir lo que es Observable



, Observer



y todas las demás entidades que se utilizan en RxJava2. Si decide leer este artículo, supongo que ya está familiarizado con esta información. Y si aún no está familiarizado con estos conceptos, le recomiendo que se familiarice con ellos antes de leerlos.





He aquí cómo empezar:





Grock * RxJava





Explorando RxJava 2 para Android





Veamos cómo funciona la cadena más simple:





Observable.just (1, 2, 3, 4, 5)
.map {…}
.filter {…}
.subscribe();
      
      



Encima

Primero, describiré brevemente cada paso que atravesamos en esta cadena (los pasos comienzan de arriba a abajo):





  • Se crea un objeto en la declaración justa ObservableFromArray



    .





  • Se crea un objeto en la declaración del mapa ObservableMap



    , que toma en el constructor una referencia al objeto creado previamente en la declaración justa.





  • filter ObservableFilter



    , map, just.





  • Observable



    Observable



    subscribe()



    ( ObservableFilter



    filter) Observer



    , .





  • ObservableFilter.subscribe()



    ObservableFilter.subscribeActual()



    , Observer



    , filter, FilterObserver



    . Observer



    Observer



    ObservableFilter.subscribe()



    .





  • ObservableMap.subscribe()



    ObservableMap.subscribeActual()



    Observer,



    map, MapObserver



    , FilterObserver



    .





  • ObservableFromArray.subscribe()



    ObservableFromArray.subscribeActual()



    , onSubscribe()



    ObservableFromArray.subscribeActual()



    Observer



    ’.





  • onSubscribe()



    Observer



    ’ .





  • ObservableFromArray



    onNext()



    Observer



    ’.





Una representación visual del diagrama anterior.
.

, just()



null, fromArray(),



Observable



.





public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5) {
   ObjectHelper.requireNonNull(item1, "item1 is null");
   ObjectHelper.requireNonNull(item2, "item2 is null");
   ObjectHelper.requireNonNull(item3, "item3 is null");
   ObjectHelper.requireNonNull(item4, "item4 is null");
   ObjectHelper.requireNonNull(item5, "item5 is null");

   return fromArray(item1, item2, item3, item4, item5);
}
      
      



fromArray()



, .





public static <T> Observable<T> fromArray(T... items) {
   ObjectHelper.requireNonNull(items, "items is null");
   if (items.length == 0) {
       return empty();
   }
   if (items.length == 1) {
       return just(items[0]);
   }
   return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
      
      



ObservableFromArray



, .





onAssembly()



, - Observable



, , .





public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
   Function<? super Observable, ? extends Observable> f = onObservableAssembly;
   if (f != null) {
       return apply(f, source);
   }
   return source;
}
      
      



onAssembly()



Observable



- , :





RxJavaPlugins.setOnObservableAssembly(o -> {
	if (o instanceof ObservableFromArray) {
    	return new ObservableFromArray<>(new Integer[] { 4, 5, 6 });
	}
	return o;
});
 
Observable.just(1, 2, 3)
.filter(v -> v > 3)
.test()
.assertResult(4, 5, 6);
      
      



El ObservableFromArray recién creado
ObservableFromArray

map()



. , . null, ObservableMap



.





public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
   ObjectHelper.requireNonNull(mapper, "mapper is null");
   return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
      
      



, ObservableMap



mapper, , this (source). this ObservableFromArray



. ObservableMap



AbstractObservableWithUpstream



, source.





AbstractObservableWithUpstream



, Observable



.





onAssembly()



Observable







Esquema actualizado con ObservableMap generado
ObservableMap

filter()



. , , ObservableFilter



this ObservableMap



( ObservableFromArray



, ) .





public final Observable<T> filter(Predicate<? super T> predicate) {
   ObjectHelper.requireNonNull(predicate, "predicate is null");
   return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
}
      
      



Esquema actualizado con ObservableFilter generado
ObservableFilter

subscribe()



, . onNext()



. subscribe()



ObservableFilter



, Observable



.





public final Disposable subscribe(Consumer<? super T> onNext) {
   return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
      
      



null, LambdaObserver



.





public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
       Action onComplete, Consumer<? super Disposable> onSubscribe) {
   ObjectHelper.requireNonNull(onNext, "onNext is null");
   ObjectHelper.requireNonNull(onError, "onError is null");
   ObjectHelper.requireNonNull(onComplete, "onComplete is null");
   ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

   LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

   subscribe(ls);

   return ls;
}
      
      



, .





public final void subscribe(Observer<? super T> observer) {
   ObjectHelper.requireNonNull(observer, "observer is null");
   try {
       observer = RxJavaPlugins.onSubscribe(this, observer);

       ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

       subscribeActual(observer);
   } catch (NullPointerException e) { 
     ......
 }
}
      
      



subscribeActual()



LambdaObserver



. subscribeActual()



ObservableFilter



. .





public void subscribeActual(Observer<? super T> observer) {
   source.subscribe(new FilterObserver<T>(observer, predicate));
}
      
      



FilterObserver



, LambdaObserver



, ObservableFilter



.





FilterObserver



BasicFuseableObserver



, onSubscribe()



. BasicFuseableObserver



, Observer



’. , 6 , FilterObserver



MapObserver



. BasicFuseableObserver.onSubscribe()



onSubscribe()



Observer



’, . :





public final void onSubscribe(Disposable d) {
   if (DisposableHelper.validate(this.upstream, d)) {
       this.upstream = d;
       if (d instanceof QueueDisposable) {
           this.qd = (QueueDisposable<T>)d;
       }
       if (beforeDownstream()) {

           downstream.onSubscribe(this);

           afterDownstream();
       }
   }
}
      
      



, ObservableFilter



FilterObserver



, source.subscribe()



. , source ObservableMap



, . ObservableMap



subscribe()



.





public final void subscribe(Observer<? super T> observer) {
   ObjectHelper.requireNonNull(observer, "observer is null");
   try {
       observer = RxJavaPlugins.onSubscribe(this, observer);

       ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

       subscribeActual(observer);
   } catch (NullPointerException e) { 
     ......
 }
}
      
      



, subscribe()



subscribeActual()



, ObservableMap



. subscribeActual()



MapObserver



FilterObserver



mapper



’. 





public void subscribeActual(Observer<? super U> t) {
   source.subscribe(new MapObserver<T, U>(t, function));
}
      
      



public void subscribeActual(Observer<? super T> observer) {
   FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);

   observer.onSubscribe(d);

   if (d.fusionMode) {
       return;
   }
   d.run();
}
      
      



Observer



BasicFuseableObserver



, onSubscribe()



, Observer



, onSubscribe()



.





subscribeActual()



run()



, Observer



.





void run() {
   T[] a = array;
   int n = a.length;

   for (int i = 0; i < n && !isDisposed(); i++) {
       T value = a[i];
       if (value == null) {
           downstream.onError(new NullPointerException("The element at index " + i + " is null"));
           return;
       }
       downstream.onNext(value);
   }
   if (!isDisposed()) {
       downstream.onComplete();
   }
}
      
      



onNext()



Observer



’, onComplete()



onError()



, .





Representación visual del proceso de creación y suscripción





Observable



callback’ Observer



, .





onSubscribe()



, doOnSubscribe()



.





3 :









  • Observable







  • Observer







Por lo tanto, al usar operadores, se debe tener en cuenta que cada operador asigna memoria para varios objetos y no se deben agregar operadores a la cadena, solo porque es “posible”.





RxJava es una herramienta poderosa, pero debe comprender cómo funciona y para qué usarla. Si solo necesita ejecutar una solicitud de red en un hilo en segundo plano y luego ejecutar el resultado en el hilo principal, entonces es como "disparar gorriones con un cañón", puede ser atrapado, pero las consecuencias pueden ser graves.








All Articles