Uso de Azure Service Bus desde Java

¡Hola colegas! Sucedió que nuestra aplicación está escrita en la pila de Java, pero está alojada en Azure. Y estamos tratando de aprovechar al máximo los servicios de administración del proveedor de la nube.



Uno de ellos es Azure Service Bus y hoy quiero hablar sobre las características de usarlo en una aplicación Spring Boot normal.



Si desea leer sobre las funciones de rake , desplácese hasta el final del artículo.



¿Qué es Azure Service Bus?



Algunas palabras sobre Azure Service Bus es un agente de mensajes en la nube (reemplazo de la nube para RabbitMQ, ActiveMQ). Admite colas (el mensaje se entrega a un destinatario) y temas (el mecanismo de publicación / suscripción); aquí



se declara soporte con más detalle :



  1. Mensajes ordenados : la documentación dice que se trata de un FIFO, PERO se implementa utilizando el concepto de sesiones de mensajes: un grupo de mensajes, no la cola completa. Si necesita garantizar el orden de los mensajes, combine los mensajes en un grupo y ahora los mensajes del grupo se entregarán como FIFO. Por lo tanto, Azure Service Bus Queue no es un FIFO: entrega sus mensajes de la forma más aleatoria posible.
  2. Cola de mensajes fallidos: aquí todo es simple, no pudieron entregar el mensaje con éxito después de N intentos o un período de tiempo: se movió a DLQ
  3. Entrega programada : puede establecer un retraso antes de la entrega
  4. Aplazamiento de mensajes : oculta los mensajes en la cola, el mensaje no se entregará automáticamente, pero se puede recuperar por ID. Necesitamos almacenar esta identificación en algún lugar


Cómo integrarse con Azure Service Bus



Azure Service Bus admite AMQP 1.0, lo que significa que no es compatible con clientes RabbitMQ. rabbit utiliza AMQP 0.9.1.



El único cliente "estándar" que puede trabajar con Service Bus es Apache Qpid .



Hay 3 formas de emparejar su aplicación Spring Boot con Service Bus:



  1. JMS + QPID — , — QPID — .

    timeout producer — — factory.setCacheProducers(false);
  2. Spring Cloud — Azure Service Bus — , . Service Bus

    ( 1.2.6) — , azure service bus java sdk.



    Spring Integration — , «Scheduled delivery» «Message deferral» .



    sdk, MessageAndSessionPump

  3. azure service bus java sdk — ,


Spring Cloud — Azure Service Bus



Me detendré en este método con más detalle y le contaré sobre las características de usar la

aplicación de ejemplo en el repositorio oficial, por lo que no tiene sentido duplicar el código; el repositorio con un ejemplo está aquí .



Porque es Spring Integration Messaging, todo se reduce a Channel, MessageHandler, MessagingGateway, ServiceActivator.



Y luego está ServiceBusQueueTemplate .



Enviando mensajes



Debemos tener un Canal en el que escribamos el mensaje que queremos enviar, en el otro extremo hay un MessageHandler que lo envía al Service Bus.



El MessagHandler es com.microsoft.azure.spring.integration.core.DefaultMessageHandler - este es el conector para el servicio externo.



¿Cómo vincularlo a un canal? - agregue la anotación - @ServiceActivator (inputChannel = OUTPUT_CHANNEL) y ahora nuestro MessagHandler está escuchando el canal OUTPUT_CHANNEL .



A continuación, necesitamos escribir de alguna manera nuestro mensaje en el canal , aquí nuevamente la magia de la primavera, anunciamos MessagingGateway y lo vinculamos al canal por su nombre.



Un fragmento del ejemplo :



@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
public interface QueueOutboundGateway {
    void send(String text);
}


Eso es todo: Gateway -> Channel -> MessagHandler -> ServiceBusQueueTemplate -> ServiceBusMessageConverter .



En el código, queda inyectar nuestra puerta de enlace y llamar al método de envío .



Mencioné ServiceBusMessageConverter en la cadena de llamadas por una razón : si desea agregar encabezados personalizados (por ejemplo, CORRELATION_ID) al mensaje, este es el lugar donde deben moverse de org.springframework.messaging.MessageHeaders al mensaje azul.

El método especial setCustomHeaders .



En este caso, su puerta de enlace se verá así:



@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
public interface QueueOutboundGateway {
    void send(@Payload String text, @Header("CORRELATION_ID") String correlationId);
}


Recibir mensajes



De acuerdo, sabemos cómo enviar mensajes, ¿cómo recibirlos ahora?



Aquí todo es igual - MessageProducer -> Channel -> Handler



El MessageProducer es com.microsoft.azure.spring.integration.servicebus.inbound.ServiceBusQueueInboundChannelAdapter - este es nuestro conector a un servicio externo. En el interior, hay el mismo ServiceBusQueueTemplate con ServiceBusMessageConverter donde puede leer encabezados personalizados y ponerlos en el mensaje de integración de primavera.



El canal ya está instalado en él a mano:



@Bean
public ServiceBusQueueInboundChannelAdapter queueMessageChannelAdapter(
        @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, 
        ServiceBusQueueOperation queueOperation) {
    queueOperation.setCheckpointConfig(CheckpointConfig.builder().checkpointMode(CheckpointMode.MANUAL).build());
    ServiceBusQueueInboundChannelAdapter adapter = new ServiceBusQueueInboundChannelAdapter(QUEUE_NAME,
            queueOperation);
    adapter.setOutputChannel(inputChannel);
    return adapter;
}


Pero el propio Handler está adjunto al canal a través de @ServiceActivator .



@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
    String message = new String(payload);
.......


Puede obtener la línea de inmediato:



@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(String payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
.......


Es posible que haya notado el extraño parámetro Checkpointer checkpointer , que se usa para reconocer manualmente el procesamiento de mensajes.

Si establece CheckpointMode.MANUAL al crear el ServiceBusQueueInboundChannelAdapter , debe enviar el reconocimiento del mensaje usted mismo. Si usa CheckpointMode.RECORD , la confirmación se enviará automáticamente: los detalles en el código ServiceBusQueueTemplate .







Características de uso



Entonces, la lista de "rakes" y "chips" a la que ya hemos ido.



ReceiveMode.PEEKLOCK



Azure Service Bus admite el modo PEEKLOCK : el consumidor toma un mensaje, se bloquea en el bus de servicio, es inaccesible para cualquier persona durante un tiempo determinado (duración del bloqueo), pero no se elimina de él. Si dentro del tiempo asignado el consumidor no ha enviado una confirmación de procesamiento - éxito / abandono o no ha extendido el bloqueo - el mensaje se considera disponible nuevamente y se realizará un nuevo intento de entrega.



Curiosamente, abandonar simplemente restablece el bloqueo y el mensaje está disponible instantáneamente para ser reenviado.



ServiceBusQueueTemplate defecto crea QueueClient modo ReceiveMode.PEEKLOCK .



Si una excepción no manejada vuela en nuestro controlador- no se enviará ningún acuse de recibo al servidor y el mensaje permanecerá bloqueado y se volverá a enviar antes del tiempo de espera.

En este caso, el contador de entregas aumentará, lo cual es lógico.



No sé si esto es un error o una característica, pero es muy conveniente hacer un retraso entre reintentos para situaciones en las que sea necesario.



Si el mensaje no se puede procesar incluso con reintento, es necesario capturar excepciones y marcar el mensaje como procesado y agregar lógica adicional a la aplicación; de lo contrario, se entregará una y otra vez hasta que alcance el límite de número de reenvío (configurado al crear una cola en el bus de servicio )



Recuento de mensajes de concurrencia y captación previa



Como habrás adivinado, la configuración de concurrencia es responsable de la cantidad de controladores de mensajes paralelos, y el recuento de mensajes de captación previa es la cantidad de mensajes que ingresaremos en el búfer desde el servidor.



De forma predeterminada, ServiceBusQueueTemplate está autoconfigurado (AzureServiceBusQueueAutoConfiguration) con un valor de 1 para ambos parámetros, es decir, por defecto, cada cola tendrá un hilo de procesamiento, aunque el concepto de un bus de servicio con reconocimiento para cada mensaje individual implica muchos procesadores concurrentes. Esto es aún más importante si tiene un procesamiento de solicitud largo.



Desafortunadamente, esta configuración no se puede establecer a través de la configuración de la aplicación (application.yml / application.properties) y solo se puede establecer en el código. Pero incluso a través del código, no podrá establecer diferentes configuraciones para diferentes colas.



Por lo tanto, si necesita realizar configuraciones diferentes, deberá crear varios beans ServiceBusQueueTemplate para cada ServiceBusQueueInboundChannelAdapter



CompletableFuture dentro del bus de servicio azure java sdk



El sdk java del bus de servicio azure se implementa alrededor del ejecutor CompletableFuture y CachedThreadPool - MessagingFactory.INTERNAL_THREAD_POOL, así que tenga cuidado con todo tipo de beans locales de subprocesos



Mensajes ordenados



Usamos el bus de servicio como una cola de trabajos; algunos trabajos dependen unos de otros y, por lo tanto, deben ejecutarse en el orden en que se crearon.



Como mencioné anteriormente, las camisetas utilizan el concepto de sesiones de mensajes: cuando los mensajes se agrupan en una sesión por clave (transmitidos en el encabezado), la sesión existe siempre que haya al menos un mensaje con la clave de sesión; en detalle en la documentación

El bus de servicio garantiza la entrega de mensajes dentro de dicho grupo en el orden de agregar a servidor (es decir, en el orden en que el servidor de bus de servicio los escribió en el repositorio).



También vale la pena mencionar si ha creado una cola de sesiones habilitadas; esto significa que todos los mensajes deben tener un encabezado con una clave de sesión.



Inmediatamente quedamos muy satisfechos con la posibilidad del bus de servicio de alinear mensajes en una cola FIFO, aunque sea para un grupo de mensajes.



Pero después de un tiempo, comenzamos a notar problemas:



  • algunos mensajes empezaron a llegar infinidad de veces
  • el procesamiento de la cola se ralentizó
  • En las estadísticas del bus de servicio, la mitad de las solicitudes se marcan como fallidas y las solicitudes fallidas aparecen incluso en una cola vacía cuando están inactivas.


Analizando el código sdk, descubrimos la peculiaridad de trabajar con sesiones:



  1. el consumidor captura la sesión y comienza a leer todos los mensajes disponibles en ella
  2. simultáneamente, el número de sesiones se procesa igual al parámetro de concurrencia
  3. unhandled exception — 1 ( ) — re-delivery ? 0 exception — ttl .
  4. — success abandon. — delay re-delivery

    .. abandon — , delivery counter .

    delivery count


Como resultado, abandonamos esta función de autobús de servicio y escribimos una bicicleta, y el autobús de servicio actúa como un disparador.



Tan pronto como se canceló la cola de sesiones habilitadas, desaparecieron los errores en las estadísticas, la solicitud al bus de servicio.



En el paquete JMS + Qpid, esta funcionalidad no está disponible.



Posibles problemas con tamaños de cola superiores a 1G



Aún no me he encontrado, pero escuché que comienza a funcionar de manera inestable si el tamaño de la cola es superior a 1G.



Si encuentra esto o viceversa, todo funciona, escriba en los comentarios.



Problemas con las solicitudes de seguimiento



El agente estándar de Azure Application Insights no puede rastrear el envío de mensajes como dependencia y los mensajes entrantes como solicitudes.



Tuve que agregar un código.



Salir



Si necesita una cola de trabajos con un tiempo de procesamiento de mensajes largo y no necesita una cola, puede usar.



Si el procesamiento de mensajes es rápido, use Azure Event Hub; el cliente estándar de Kafka normal funciona bien.



All Articles