Microservicios con comunicaci贸n v铆a Axon

En este sencillo tutorial, crearemos un par de microservicios en Spring Boot y organizaremos la comunicaci贸n entre ellos a trav茅s del marco Axon.










Digamos que tenemos esa tarea.



Existe una fuente de transacciones en el mercado de valores. Esta fuente nos env铆a transacciones a trav茅s de la interfaz Rest.



Necesitamos obtener estas transacciones, guardarlas en la base de datos y hacer un almacenamiento conveniente en memoria.



Este repositorio debe realizar las siguientes funciones:



  • devolver una lista de operaciones;
  • devolver la posici贸n completa, es decir cuadro "instrumento" - "n煤mero actual de valores";
  • posici贸n de retorno para un instrumento dado.


驴C贸mo abordamos esta tarea?



De acuerdo con los preceptos de la moda de microservicios, necesitamos dividir la tarea en componentes de microservicios:



  • recepci贸n de una transacci贸n por parte de Rest;
  • guardar la transacci贸n en la base de datos;
  • almacenamiento en memoria para presentar datos de posici贸n.


Hagamos el primer y tercer servicio en el marco de este tutorial, y dejemos el segundo para la segunda parte (escriba en los comentarios si es interesante).



Entonces, tenemos dos microservicios.



El primero recibe datos del exterior.



El segundo procesa estos datos y responde a las solicitudes entrantes.



Por supuesto, queremos obtener escalado horizontal, actualizaciones continuas y otros beneficios de los microservicios.



驴Cu谩l es una tarea muy dif铆cil ante nosotros?



En realidad, hay muchos de ellos, pero ahora hablemos de c贸mo fluir谩n los datos entre estos microservicios. Tambi茅n puedes hacer Descanso entre ellos, puedes poner alg煤n tipo de cola, puedes pensar en un mont贸n de cosas con sus pros y contras.



Veamos un enfoque posible: la comunicaci贸n asincr贸nica a trav茅s del marco Axon .



驴Cu谩les son las ventajas de esta soluci贸n?



En primer lugar, la comunicaci贸n asincr贸nica aumenta la flexibilidad (s铆, aqu铆 hay un inconveniente, pero hasta ahora solo estamos hablando de ventajas).



En segundo lugar, obtenemos Event Sourcing y CQRS de inmediato .

En tercer lugar, Axon proporciona una infraestructura lista para usar y solo debemos centrarnos en desarrollar la l贸gica empresarial.



Empecemos.



Tendremos el proyecto en gradle. Tendr谩 tres m贸dulos:



  • com煤n. m贸dulo con estructuras de datos comunes (no nos gusta copiar y pegar);
  • tradeCreator. m贸dulo con microservicio para aceptar transacciones en Rest;
  • tradeQueries. m贸dulo con microservicio para mostrar la posici贸n.


Tomemos Spring Boot como base y conectemos el arrancador Axon.



Axon funciona bien sin Spring, pero los usaremos juntos.



Tenemos que detenernos aqu铆 y decirles algunas palabras sobre Axon.



Es un sistema cliente-servidor. Hay un servidor: esta es una aplicaci贸n separada, la ejecutaremos en la ventana acoplable.



Y hay clientes que se integran en microservicios.

Esta es la foto. Primero, se lanza el servidor Axon (en la ventana acoplable), luego nuestros microservicios.



Al inicio, los microservicios buscan un servidor y comienzan a interactuar con 茅l. La interacci贸n se puede dividir condicionalmente en dos tipos: t茅cnica y comercial.



El t茅cnico es el intercambio de mensajes "Estoy vivo" (estos mensajes se pueden ver en el modo de registro de depuraci贸n).



El negocio se ve oscurecido por mensajes como "nuevo trato".



Una caracter铆stica importante, despu茅s de iniciar el microservicio, puede preguntarle al servidor Axon "qu茅 sucedi贸" y el servidor env铆a los eventos acumulados al microservicio. Por lo tanto, el microservicio puede reiniciarse con relativa seguridad sin p茅rdida de datos.

Con este esquema de intercambio, podemos ejecutar muy f谩cilmente muchas instancias de microservicios

y en diferentes hosts.



S铆, una instancia de Axon Server no es confiable, pero hasta ahora.



Trabajamos en los paradigmas Event Sourcing y CQRS. Esto significa que debemos tener "equipos", "eventos" y "muestras".



Tendremos un comando: "crear una oferta", un evento "oferta creada" y tres selecciones: "mostrar todas las ofertas", "mostrar posici贸n", "mostrar posici贸n para un instrumento".



El esquema de trabajo es el siguiente:



  1. El microservicio TradeCreator acepta una transacci贸n Rest.
  2. El microservicio tradeCreator crea un comando "crear comercio" y lo env铆a al servidor Axon.
  3. El servidor Axon recibe el comando y reenv铆a el comando al destinatario interesado, en nuestro caso es el microservicio tradeCreator.
  4. El microservicio tradeCreator recibe un comando, genera un evento de "acuerdo creado" y lo env铆a al servidor Axon.
  5. El servidor Axon recibe el evento y lo reenv铆a a los suscriptores interesados.
  6. Ahora solo tenemos un destinatario interesado: el microservicio tradeQueries.
  7. El microservicio tradeQueries recibe el evento y actualiza los datos internos.


(Es importante que en el momento en que se forme el evento, el microservicio tradeQueries puede no estar disponible, pero tan pronto como comience, recibir谩 inmediatamente el evento).



S铆, el servidor axon est谩 en el centro de comunicaciones, todos los mensajes pasan por 茅l.



Pasemos a la codificaci贸n.



Para no saturar la publicaci贸n con c贸digo, a continuaci贸n solo dar茅 fragmentos, el enlace al ejemplo completo estar谩 debajo.



Comencemos con el m贸dulo com煤n.



En 茅l, las partes comunes son el evento (clase CreatedTradeEvent). Presta atenci贸n al nombre, de hecho, este es el nombre del equipo que gener贸 este evento, pero en tiempo pasado. En el pasado, porque primero, aparece el comando, que conduce a la creaci贸n del evento.



Otras estructuras comunes incluyen clases para describir una posici贸n (posici贸n de clase), una operaci贸n (clase Trade) y un lado de una operaci贸n (enum Side), es decir, comprar o vender.



Pasemos al m贸dulo tradeCreator.



Este m贸dulo tiene una interfaz Rest (clase TradeController) para aceptar intercambios.

El comando "crear un trato" se forma a partir del trato recibido y se env铆a al servidor axon.



    @PostMapping("/trade")
    public ResponseEntity<String> create(@RequestBody Trade trade) {
        var createTradeCommand = CreateTradeCommand.builder()
                .tradeId(trade.getTradeId())
	...
                .build();
        var result = commandGateway.sendAndWait(createTradeCommand, 3, TimeUnit.SECONDS);
        return ResponseEntity.ok(result.get().toString());
    }


Para procesar el comando, se usa la clase TradeAggregate.

Para que Axon lo encuentre, agregamos la anotaci贸n @Aggregate.

El m茅todo para procesar el comando se ve as铆 (con una abreviatura):



    @CommandHandler
    public TradeAggregate(CreateTradeCommand command) {
        log.info("command: {}", command);
        var event = CreatedTradeEvent.builder()
                .tradeId(command.tradeId())
		....
                .build();
        AggregateLifecycle.apply(event);
    }


Se genera un evento a partir del comando y se env铆a al servidor.

El comando est谩 en la clase CreateTradeCommand.



Ahora echemos un vistazo al 煤ltimo m贸dulo de TradeQueries.



Las selecciones se describen en el paquete de consultas.

Este m贸dulo tambi茅n tiene una interfaz

TradeController Rest de clase p煤blica.



Por ejemplo, veamos el procesamiento de la solicitud: "mostrar todas las transacciones".



    @GetMapping("/trade/all")
    public List<Trade> findAllTrades() {
        return queryGateway.query(new FindAllTradesQuery(),
                ResponseTypes.multipleInstancesOf(Trade.class)).join();
    }


Se crea una solicitud de recuperaci贸n y se env铆a al servidor.



La clase TradesEventHandler se utiliza para procesar la solicitud de recuperaci贸n.

Tiene un m茅todo anotado



   @QueryHandler
    public List<Position> handleFindCurrentPositionQuery(FindCurrentPositionQuery query)


Es 茅l quien es responsable de obtener datos del almacenamiento en memoria.



Surge la pregunta de c贸mo se actualiza la informaci贸n en esta tienda.



Para empezar, esto es solo una colecci贸n de ConcurrentHashMaps dise帽ados para selecciones espec铆ficas.

Para actualizarlos se aplica el m茅todo:



    @EventHandler
    public void on(CreatedTradeEvent event) {
        log.info("event:{}", event);

        var trade = Trade.builder()
	...
                .build();
        trades.put(event.tradeId(), trade);
        position.merge(event.shortName(), event.size(),
                (oldValue, value) -> event.side() == Side.BUY ? oldValue + value : oldValue - value);
    }


Recibe el evento "acuerdo creado" y actualiza los mapas.



Estos son los puntos principales del desarrollo de microservicios.



驴Qu茅 pasa con las deficiencias de Axon?



En primer lugar, esta es la complicaci贸n de la infraestructura, ha aparecido un punto de falla: el servidor Axon, todas las comunicaciones pasan por 茅l.



En segundo lugar, la desventaja de tales sistemas distribuidos se manifiesta muy claramente: inconsistencia temporal de datos. En nuestro caso, puede pasar un tiempo inaceptablemente largo entre la recepci贸n de una nueva oferta y la actualizaci贸n de los datos de las muestras.



驴Qu茅 queda detr谩s de escena?



No se dice nada sobre Event Sourcing y CQRS, qu茅 es y para qu茅 sirve.

Sin revelar estos conceptos, algunos puntos podr铆an no estar claros.



Quiz谩s algunos fragmentos de c贸digo tambi茅n requieran aclaraci贸n.



Hablamos de esto en un seminario web abierto .



Ejemplo completo .



All Articles