Uso de Spring Cloud Stream Binding con Kafka Message Broker

¡Hola a todos! Mi nombre es Vitaly, soy desarrollador en Web3Tech. En esta publicación, presentaré los conceptos básicos y las construcciones del marco Spring Cloud Stream para admitir y trabajar con los agentes de mensajes de Kafka, con un ciclo completo de sus pruebas unitarias contextuales. Usamos un esquema de este tipo en nuestro proyecto de votación electrónica en toda Rusia en la plataforma blockchain de Waves Enterprise .





Como parte del equipo del proyecto Spring Cloud, Spring Cloud Stream se basa en Spring Boot y utiliza Spring Integration para proporcionar comunicación con los corredores de mensajes. Sin embargo, se integra fácilmente con varios agentes de mensajes y requiere una configuración mínima para crear microservicios controlados por eventos o mensajes.





Configuración y dependencias

Primero, necesitamos agregar la dependencia spring-cloud-starter-stream-kafka a build.gradle :





dependencies {
   implementation(kotlin("stdlib"))
   implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion")
   implementation("com.fasterxml.jackson.module:jackson-module-kotlin")

   implementation("org.springframework.boot:spring-boot-starter-web")
   implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")

   testImplementation("org.springframework.boot:spring-boot-starter-test")
   testImplementation("org.springframework.cloud:spring-cloud-stream-test-support")
   testImplementation("org.springframework.kafka:spring-kafka-test:springKafkaTestVersion")
}
      
      



En la configuración del proyecto Spring Cloud Stream, debe incluir la URL del agente de Kafka, el nombre de la cola (tema) y otros parámetros de enlace. A continuación, se muestra un ejemplo de configuración de YAML para el servicio application.yaml :





spring:
 application:
   name: cloud-stream-binding-kafka-app
 cloud:
   stream:
     kafka:
       binder:
         brokers: 0.0.0.0:8080
         configuration:
           auto-offset-reset: latest
     bindings:
       customChannel:                   #Channel name
         destination: 0.0.0.0:8080      #Destination to which the message is sent (topic)
         group: input-group-N
         contentType: application/json
         consumer:
           max-attempts: 1
           autoCommitOffset: true
           autoCommitOnError: false
      
      



Concepto y clases

, , Spring Cloud Stream, , (SpringCloudStreamBindingKafkaApp.kt):





@EnableBinding(ProducerBinding::class)

@SpringBootApplication
 
 class SpringCloudStreamBindingKafkaApp

 fun main(args: Array<String>) {

    SpringApplication.run(SpringCloudStreamBindingKafkaApp::class.java, *args)

 }
      
      



@EnableBinding , .





.





Binding — , .

Binder — middleware .

Channel — middleware .

StreamListeners — (beans), , MessageConverter middleware “DTO”.

Message Schema — , . .





send/receive, producer consumer. , Spring Cloud Stream.





Producer Kafka, (ProducerBinding.kt):





interface ProducerBinding {

   @Output(BINDING_TARGET_NAME)
   fun messageChannel(): MessageChannel
}
      
      



onsumer Kafka .





ConsumerBinding.kt:





interface ConsumerBinding {

   companion object {
       const val BINDING_TARGET_NAME = "customChannel"
   }

   @Input(BINDING_TARGET_NAME)
   fun messageChannel(): MessageChannel
}
      
      



Consumer.kt:





@EnableBinding(ConsumerBinding::class)
class Consumer(val messageService: MessageService) {

   @StreamListener(target = ConsumerBinding.BINDING_TARGET_NAME)
   fun process(
       @Payload message: Map<String, Any?>,
       @Header(value = KafkaHeaders.OFFSET, required = false) offset: Int?
   ) {
       messageService.consume(message)
   }
}
      
      



Kafka . Kafka, spring-kafka-test.





MessageCollector

, . ProducerBinding payload ProducerTest.kt:





@SpringBootTest
class ProducerTest {

   @Autowired
   lateinit var producerBinding: ProducerBinding

   @Autowired
   lateinit var messageCollector: MessageCollector

   @Test
   fun `should produce somePayload to channel`() {
       // ARRANGE
       val request = mapOf(1 to "foo", 2 to "bar", "three" to 10101)

       // ACT
producerBinding.messageChannel().send(MessageBuilder.withPayload(request).build())
       val payload = messageCollector.forChannel(producerBinding.messageChannel())
           .poll()
           .payload

       // ASSERT
       val payloadAsMap = jacksonObjectMapper().readValue(payload.toString(), Map::class.java)
       assertTrue(request.entries.stream().allMatch { re ->
           re.value == payloadAsMap[re.key.toString()]
       })

       messageCollector.forChannel(producerBinding.messageChannel()).clear()
   }
}
      
      



Embedded Kafka

@ClassRule . Kafka Zookeeper , . Kafka Zookeper (ConsumerTest.kt):





@SpringBootTest
@ActiveProfiles("test")
@EnableAutoConfiguration(exclude = [TestSupportBinderAutoConfiguration::class])
@EnableBinding(ProducerBinding::class)
class ConsumerTest {

   @Autowired
   lateinit var producerBinding: ProducerBinding

   @Autowired
   lateinit var objectMapper: ObjectMapper

   @MockBean
   lateinit var messageService: MessageService

   companion object {
       @ClassRule @JvmField
       var embeddedKafka = EmbeddedKafkaRule(1, true, "any-name-of-topic")
   }

   @Test
   fun `should consume via txConsumer process`() {
       // ACT
       val request = mapOf(1 to "foo", 2 to "bar")
       producerBinding.messageChannel().send(MessageBuilder.withPayload(request)
           .setHeader("someHeaderName", "someHeaderValue")
           .build())

       // ASSERT
       val requestAsMap = objectMapper.readValue<Map<String, Any?>>(objectMapper.writeValueAsString(request))
       runBlocking {
           delay(20)
           verify(messageService).consume(requestAsMap)
       }
   }
}
      
      



En esta publicación, demostré las capacidades de Spring Cloud Stream y cómo usarlo con Kafka. Spring Cloud Stream ofrece una interfaz fácil de usar con matices simplificados de configuración de broker, se implementa rápidamente, funciona de manera estable y es compatible con corredores populares modernos como Kafka. Como resultado, di varios ejemplos con pruebas unitarias basadas en EmbeddedKafkaRule usando MessageCollector.





Todas las fuentes se pueden encontrar en Github . ¡Gracias por leer!








All Articles