¿Cómo funciona realmente la confirmación automática en Kafka y podemos contar con ello?

En este artículo, quiero explicar con un poco más de detalle cómo funciona el mecanismo de confirmación automática para los oyentes en la biblioteca kafka-clients (considere la versión 2.6.0)





En la documentación, podemos encontrar la siguiente formulación que describe cómo funciona la confirmación automática:





La confirmación automática funciona básicamente como un cron con un período establecido a través de la propiedad de configuración auto.commit.interval.ms. Si el consumidor se bloquea, luego de un reinicio o un reequilibrio, la posición de todas las particiones propiedad del consumidor bloqueado se restablecerá al último desplazamiento comprometido.





Los documentos de Java para KafkaConsumer, a su vez, contienen la siguiente descripción:





El consumidor puede comprometer automáticamente compensaciones periódicamente; o puede elegir controlar esta posición comprometida manualmente llamando a una de las API de confirmación (por ejemplo, commitSync y commitAsync).





A partir de estas formulaciones, puede surgir una idea errónea de que se produce un compromiso de compensación automático sin bloqueo en segundo plano y no está del todo claro cómo se relaciona con el proceso de recepción de mensajes de un consumidor específico y, lo más importante, qué garantías de entrega tenemos. ?





Echemos un vistazo más de cerca al mecanismo para recibir mensajes por parte del oyente con la configuración enable.auto.commit = true usando el ejemplo de la implementación de la clase KafkaConsumer de la biblioteca org.apache.kafka: kafka-clients: 2.6.0 





Para hacer esto, considere el ejemplo dado en el java doc KafkaConsumer :





Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  for (ConsumerRecord<String, String> record : records)
    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
      
      



¿Cómo ocurre la confirmación automática en este caso? La respuesta debe encontrarse en el propio método para recibir nuevos mensajes. 





consumer.poll(Duration.ofMillis(100));
      
      



. KafkaConsumer auto-commit enable.auto.commit auto.commit.interval.ms ConsumerCoordinator , auto-commit.





maybeAutoCommitOffsetsAsync





public void maybeAutoCommitOffsetsAsync(long now) {
  if (autoCommitEnabled) {
    nextAutoCommitTimer.update(now);
    if (nextAutoCommitTimer.isExpired()) {
      nextAutoCommitTimer.reset(autoCommitIntervalMs);
      doAutoCommitOffsetsAsync();
    }
  }
}
      
      



enable.auto.commit = true auto.commit.interval.ms , , ( doAutoCommitOffsetsAsync)





private void doAutoCommitOffsetsAsync() {
  Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed();
  log.debug("Sending asynchronous auto-commit of offsets {}", allConsumedOffsets);

  commitOffsetsAsync(allConsumedOffsets, (offsets, exception) -> {
    if (exception != null) {
      if (exception instanceof RetriableCommitFailedException) {
        log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", offsets,
                  exception);
        nextAutoCommitTimer.updateAndReset(rebalanceConfig.retryBackoffMs);
      } else {
        log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, exception.getMessage());
      }
    } else {
      log.debug("Completed asynchronous auto-commit of offsets {}", offsets);
    }
  });
}
      
      



poll KafkaConsumer. updateAssignmentMetadataIfNeeded, poll ConsumerCoordinator, , maybeAutoCommitOffsetsAsync





poll KafkaConsumer:





  1. offset





  2. .





KafkaConsumer , . 





.1 enable.auto.commit = true auto.commit.interval.ms. .. poll() 3 , auto.commit.interval.ms=6000, . 





? “at least once delivery”, . 








All Articles