Anatomía de una contrapresión en corrientes en chorro

Al leer numerosos artículos sobre el tema de los flujos reactivos, el lector puede llegar a la conclusión de que:



  • la contrapresión es genial
  • la contrapresión solo está disponible en bibliotecas que implementan la especificación de flujos reactivos
  • esta especificación es tan compleja que ni siquiera debería intentar implementarla usted mismo


En este artículo intentaré demostrar que:



  • la contrapresión es muy simple
  • para implementar contrapresión asíncrona, basta con hacer una versión asíncrona del semáforo
  • si hay una implementación de semáforo asincrónica, la interfaz org.reactivestreams.Publisher se implementa en unas pocas docenas de líneas de código


La contrapresión es una retroalimentación que ajusta la velocidad del productor de datos para que coincida con la velocidad del consumidor. En ausencia de dicha conexión, el productor más rápido puede desbordar el búfer del consumidor o, si el búfer es adimensional, agotar toda la RAM.



En la programación multiproceso, este problema fue resuelto por Dijkstroy, quien propuso un nuevo mecanismo de sincronización: el semáforo. Un semáforo se puede considerar como un contador de permisos. Se asume que el productor solicita permiso al semáforo antes de realizar una acción intensiva en recursos. Si el semáforo está vacío, entonces el hilo del productor está bloqueado.



Los programas asincrónicos no pueden bloquear subprocesos, por lo que no pueden acceder a un semáforo vacío para obtener permiso (pero pueden realizar todas las demás operaciones de semáforo). Deben bloquear su ejecución de otra manera. Esta otra forma es que simplemente dejan el hilo de trabajo en el que se estaban ejecutando, pero antes de eso, se arreglan para regresar al trabajo tan pronto como el semáforo esté lleno.



La forma más elegante de pausar y reanudar un programa asincrónico es estructurarlo como un actor de flujo de datos con puertos :







un modelo de flujo de datos: actores con puertos, las conexiones dirigidas entre sus puertos y tokens iniciales. Tomado de: Una descripción estructurada de los actores del flujo de datos y su aplicación



Hay puertos de entrada y salida. Los puertos de entrada reciben tokens (mensajes y señales) de los puertos de salida de otros actores. Si el puerto de entrada contiene tokens y el puerto de salida tiene un lugar para colocar tokens, entonces se considera activo. Si todos los puertos del actor están activos, se envía para su ejecución. Por lo tanto, al reanudar su trabajo, el programa actor puede leer de forma segura tokens de los puertos de entrada y escribir en el fin de semana. Toda la sabiduría de la programación asincrónica reside en este simple mecanismo. La asignación de puertos como subobjetos de actor separados simplifica enormemente la codificación de programas asincrónicos y le permite aumentar su diversidad combinando puertos de diferentes tipos.



El actor clásico de Hewitt contiene 2 puertos: uno es visible, con un búfer para los mensajes entrantes, el otro es un binario oculto que se bloquea cuando el actor se envía para su ejecución y, por lo tanto, evita que el actor se reinicie hasta el final del lanzamiento inicial. El semáforo asíncrono deseado es un cruce entre estos dos puertos. Como un búfer de mensajes, puede almacenar muchos tokens y, como un puerto oculto, estos tokens son negros, es decir, indistinguibles, como en las redes de Petri, y un contador de tokens es suficiente para almacenarlos.



En el primer nivel de la jerarquía, tenemos una clase AbstractActorcon tres clases anidadas: base Porty derivadas AsyncSemaPorty InPort, así como con un mecanismo para lanzar un actor para su ejecución en ausencia de puertos bloqueados. En resumen, se ve así:



public abstract class AbstractActor {
    /**    */
    private int blocked = 0;

    protected synchronized void restart() {
            controlPort.unBlock();
    }

    private synchronized void incBlockCount() {
        blocked++;
    }

    private synchronized void decBlockCount() {
        blocked--;
        if (blocked == 0) {
            controlPort.block();
            excecutor.execute(this::run);
        }
    }

    protected abstract void turn() throws Throwable;

    /**   */
    private void run() {
        try {
            turn();
            restart();
        } catch (Throwable throwable) {
            whenError(throwable);
        }
    }
}


Contiene un conjunto mínimo de clases de puertos:



Port- clase base de todos los puertos



    protected  class Port {
        private boolean isBlocked = true;

        public Port() {
            incBlockCount();
        }

        protected synchronized void block() {
            if (isBlocked) {
                return;
            }
            isBlocked = true;
            incBlockCount();
        }

        protected synchronized void unBlock() {
            if (!isBlocked) {
                return;
            }
            isBlocked = false;
            decBlockCount();
        }
    }


Semáforo asincrónico:



    public class AsyncSemaPort extends Port {
        private long permissions = 0;

        public synchronized void release(long n) {
            permissions += n;
            if (permissions > 0) {
                unBlock();
            }
        }

        public synchronized void aquire(long delta) {
            permissions -= delta;
            if (permissions <= 0) { 
                //    
                //        ,
                //       
                block();
            }
        }
    }


InPort - búfer mínimo para un mensaje entrante:



    public class InPort<T> extends Port implements OutMessagePort<T> {
        private T item;

        @Override
        public void onNext(T item) {
            this.item = item;
            unBlock();
        }

        public synchronized T poll() {
            T res = item;
            item = null;
            return res;
        }
    }


La versión completa de la clase AbstractActorse puede ver aquí.



En el siguiente nivel de la jerarquía, tenemos tres actores abstractos con puertos específicos, pero con rutinas de procesamiento indefinidas:



  • una clase AbstractProduceres un actor con un puerto del tipo de semáforo asíncrono (y un puerto de control interno, presente en todos los actores por defecto).
  • la clase AbstractTransformeres un actor de Hewitt normal, con una referencia al puerto de entrada del siguiente actor de la cadena, donde envía los tokens convertidos.
  • la clase AbstractConsumertambién es un actor ordinario, pero no envía los tokens convertidos a ninguna parte, mientras que tiene un enlace al semáforo del productor y abre este semáforo después de absorber el token de entrada. De esta forma, el número de tokens en proceso se mantiene constante y no se produce un desbordamiento del búfer.


En el último nivel, ya en el directorio de pruebas, se definen los actores específicos utilizados en las pruebas :



  • la clase ProducerActorgenera un flujo finito de enteros.
  • la clase TransformerActortoma el siguiente número de la secuencia y lo envía por la cadena.
  • clase ConsumerActor: acepta e imprime los números resultantes


Ahora podemos construir una cadena de manipuladores de trabajo paralelos y asíncronos de la siguiente manera: productor - cualquier número de transformadores - consumidor







Por lo tanto, hemos implementado la contrapresión, e incluso en una forma más general que en la especificación de flujos reactivos : la retroalimentación puede abarcar un número arbitrario de cascadas de procesamiento, y no sólo adyacentes, como en la especificación.



Para implementar la especificación, debe definir un puerto de salida que sea sensible a la cantidad de permisos que se le pasaron utilizando el método request (); esto será Publisher, y complementar el existente con una InPortllamada a este método, esto será Subscriber. Es decir, asumimos que las interfaces PublisherySubscriberdescribir el comportamiento de los puertos, no de los actores. Pero a juzgar por el hecho de que en la lista de interfaces también hay Processor, que de ninguna manera puede ser una interfaz de puerto, los autores de la especificación consideran que sus interfaces son interfaces de actor. Bueno, podemos hacer actores que implementen todas estas interfaces delegando la ejecución de las funciones de la interfaz a los puertos correspondientes.



Para simplificar, dejemos que el nuestro Publisherno tenga su propio búfer y escribirá directamente en el búfer Subscriber. Para hacer esto, necesita que alguien se Subscribersuscriba y cumpla request(), es decir, tenemos 2 condiciones y, en consecuencia, necesitamos 2 puertos, InPort<Subscriber>y AsyncSemaPort. Ninguno de ellos es adecuado como base para la implementación.Publisher'a, ya que contiene métodos innecesarios, por lo que haremos estos puertos variables internas:



public class ReactiveOutPort<T> implements Publisher<T>, Subscription, OutMessagePort<T> {
    protected AbstractActor.InPort<Subscriber<? super T>> subscriber;
    protected AbstractActor.AsyncSemaPort sema;

    public ReactiveOutPort(AbstractActor actor) {
        subscriber = actor.new InPort<>();
        sema = actor.new AsyncSemaPort();
    }
}


Esta vez, ReactiveOutPortno definimos la clase como anidada, por lo que se necesitaba un parámetro de constructor, una referencia al actor adjunto, para instanciar los puertos definidos como clases anidadas.



El método se subscribe(Subscriber subscriber)reduce a salvar al suscriptor y llamar subscriber.onSubscribe():



    public synchronized void subscribe(Subscriber<? super T> subscriber) {
        if (subscriber == null) {
            throw new NullPointerException();
        }
        if (this.subscriber.isFull()) {
            subscriber.onError(new IllegalStateException());
            return;
        }
        this.subscriber.onNext(subscriber);
        subscriber.onSubscribe(this);
    }


que generalmente resulta en una llamada Publisher.request()que se reduce a subir el semáforo con una llamada AsyncSemaPort.release():



    public synchronized void request(long n) {
        if (subscriber.isEmpty()) {
            return; // this spec requirement
        }
        if (n <= 0) {
            subscriber.current().onError(new IllegalArgumentException());
            return;
        }
        sema.release(n);
    }


Y ahora nos queda no olvidar bajar el semáforo usando una llamada AsyncSemaPort.aquire()en el momento del uso del recurso:



    public synchronized void onNext(T item) {
        Subscriber<? super T> subscriber = this.subscriber.current();
        if (subscriber == null) {
            throw  new IllegalStateException();
        }
        sema.aquire();
        subscriber.onNext(item);
    }


El proyecto AsyncSemaphore fue diseñado especialmente para este artículo. Se hace deliberadamente lo más compacto posible para no cansar al lector. Como resultado, contiene importantes limitaciones:



  • Publisher' Subscriber'
  • Subscriber' 1


Además, AsyncSemaPortno es un análogo completo de un semáforo sincrónico : solo un cliente puede realizar la operación aquire()y AsyncSemaPort(es decir, el actor envolvente). Pero esto no es una desventaja, AsyncSemaPortcumple bien su función. En principio, puede hacerlo de manera diferente: tómelo java.util.concurrent.Semaphorey complételo con una interfaz de suscripción asincrónica (consulte AsyncSemaphore.java del proyecto DF4J ). Tal semáforo puede unir actores e hilos de ejecución en cualquier orden.



En general, cada tipo de interacción sincrónica (bloqueo) tiene su propia contraparte asincrónica (no bloqueante). Entonces, en el mismo proyecto DF4J hay una implementaciónBlockingQueue, complementado por una interfaz asincrónica. Esto abre la posibilidad de una transformación paso a paso de un programa multiproceso en uno asincrónico, reemplazando parte por parte los hilos con actores.



All Articles