Cola DelayedQueue

Hace un par de a帽os, en uno de nuestros proyectos, nos enfrentamos a la necesidad de posponer la ejecuci贸n de una acci贸n durante un tiempo determinado. Por ejemplo, puede averiguar el estado del pago en tres horas o volver a enviar la notificaci贸n despu茅s de 45 minutos. Sin embargo, en ese momento no encontramos bibliotecas adecuadas que pudieran "posponerse" y no requirieran tiempo adicional para la configuraci贸n y operaci贸n. Analizamos las posibles opciones y escribimos nuestra propia biblioteca de cola retrasada en Java usando Redis como repositorio. En este art铆culo hablar茅 sobre las capacidades de la biblioteca, sus alternativas y esos "rastrillos" que encontramos en el proceso.



Funcionalidad



Entonces, 驴qu茅 hace la cola retrasada? Un evento agregado a la cola pendiente se entrega al controlador en el intervalo de tiempo especificado. Si el procesamiento falla, el evento se entregar谩 nuevamente m谩s tarde. Adem谩s, el n煤mero m谩ximo de intentos es limitado. Redis no garantiza la seguridad y debe estar preparado para la p茅rdida de eventos . Sin embargo, en la versi贸n de cl煤ster, Redis muestra una confiabilidad bastante alta y nunca hemos encontrado esto en un a帽o y medio de operaci贸n.



API



Agregar un evento a la cola



eventService.enqueueWithDelayNonBlocking(new DummyEvent("1"), Duration.ofHours(1)).subscribe();


Tenga en cuenta que el m茅todo regresa Mono, por lo que para ejecutarlo debe realizar una de las siguientes acciones:



  • subscribe(...)
  • block()


Se proporcionan explicaciones m谩s detalladas en la documentaci贸n de Project Reactor. El contexto se agrega al evento de esta manera:



eventService.enqueueWithDelayNonBlocking(new DummyEvent("1"), Duration.ofHours(1), Map.of("key", "value")).subscribe();


Registrar controlador de eventos



eventService.addHandler(DummyEvent.class, e -> Mono.just(true), 1);


, :



eventService.addHandler(
        DummyEvent.class,
        e -> Mono
            .subscriberContext()
            .doOnNext(ctx -> {
                Map<String, String> eventContext = ctx.get("eventContext");
                log.info("context key {}", eventContext.get("key"));
            })
            .thenReturn(true),
        1
);




eventService.removeHandler(DummyEvent.class);




"-":



import static com.github.fred84.queue.DelayedEventService.delayedEventService;

var eventService = delayedEventService().client(redisClient).build();


:



import static com.github.fred84.queue.DelayedEventService.delayedEventService;

var eventService = delayedEventService()
        .client(redisClient)
        .mapper(objectMapper)
        .handlerScheduler(Schedulers.fromExecutorService(executor))
        .schedulingInterval(Duration.ofSeconds(1))
        .schedulingBatchSize(SCHEDULING_BATCH_SIZE)
        .enableScheduling(false)
        .pollingTimeout(POLLING_TIMEOUT)
        .eventContextHandler(new DefaultEventContextHandler())
        .dataSetPrefix("")
        .retryAttempts(10)
        .metrics(new NoopMetrics())
        .refreshSubscriptionsInterval(Duration.ofMinutes(5))
        .build();


( Redis) eventService.close() , @javax.annotation.PreDestroy.





- , . :



  • , Redis;
  • , ( "delayed.queue.ready.for.handling.count" )




, delayed queue. 2018

Amazon Web Services.

, . : " , Amazon-, ".





:





- , JMS . SQS , 15 .





" " . , Redis :





, Netflix dyno-queues

. , , .



, " " sorted set list, ( ):



var events = redis.zrangebyscore("delayed_events", Range.create(-1, System.currentTimeMillis()), 100);
events.forEach(key -> {
  var payload = extractPayload(key);
  var listName = extractType(key);
  redis.lpush(listName, payload);
  redis.zrem("delayed_events", key);
});


Spring Integration, :



redis.brpop(listName)


.





"list" (, ), list . Redis , 2 .



events.forEach(key -> {
  ...
  redis.multi();
  redis.zrem("delayed_events", key);
  redis.lpush(listName, payload);
  redis.exec();
});




list-a . , . "sorted_set" .



events.forEach(key -> {
  ...
  redis.multi();
  redis.zadd("delayed_events", nextAttempt(key))
  redis.zrem("delayed_events", key);
  redis.lpush(listName, payload);
  redis.exec();
});




, , " " "delayed queue" . "sorted set"

metadata;payload, payload , metadata - . . , metadata payload Redis hset "sorted set" .



var envelope = metadata + SEPARATOR + payload;
redis.zadd(envelope, scheduledAt);




var envelope = metadata + SEPARATOR + payload;
var key = eventType + SEPARATOR + eventId;

redis.multi();
redis.zadd(key, scheduledAt);
redis.hset("metadata", key, envelope)
redis.exec();




, . , list . TTL :



redis.set(lockKey, "value", ex(lockTimeout.toMillis() * 1000).nx());




Spring, . " " :





Lettuce , . Project Reactor , " ".

, Subscriber



redis
  .reactive()
  .brpop(timeout, queue)
  .map(e -> deserialize(e))
  .subscribe(new InnerSubscriber<>(handler, ... params ..))




class InnerSubscriber<T extends Event> extends BaseSubscriber<EventEnvelope<T>> {

    @Override
    protected void hookOnNext(@NotNull EventEnvelope<T> envelope) {
        Mono<Boolean> promise = handler.apply(envelope.getPayload());
        promise.subscribe(r -> request(1));
    }
}


, ( Netflix dyno queue, poll- ).



?



  • Kotlin DSL. Kotlin suspend fun API Project Reactor


Enlaces






All Articles