Productor / Consumidor en Kafka y Kotlin

La traducción del artículo se preparó la víspera del inicio del curso "Desarrollo backend en Kotlin"








En este artículo, hablaremos sobre cómo crear una aplicación Spring Boot simple con Kafka y Kotlin.



Introducción



Comience visitando https://start.spring.io y agregue las siguientes dependencias:



Groovy



implementation("org.springframework.boot:spring-boot-starter-data-rest")
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.apache.kafka:kafka-streams")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.springframework.kafka:spring-kafka")


Gradle. Maven.



. IntelliJ IDEA.



Apache Kafka



Apache Kafka . Windows 10. Kafka «too many lines encountered». Kafka . , - Power Shell.



Kafka, :



Shell



.\zookeeper-server-start.bat ..\..\config\zookeeper.properties
.\kafka-server-start.bat ..\..\config\server.properties


/bin/windows.



Kafka, Zookeeper. Zookeeper – Apache, .



Spring Boot



IDE , KafkaDemoApplication.kt. Spring, .



:



Kotlin



import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication

@SpringBootApplication
class KafkaDemoApplication 

fun main(args: Array<String>) {
   runApplication<KafkaDemoApplication>(*args)
}




. .



-, . KafkaController.kt. :



Kotlin



var kafkaTemplate:KafkaTemplate<String, String>? = null;
val topic:String = "test_topic"

@GetMapping("/send")
fun sendMessage(@RequestParam("message") message : String) : ResponseEntity<String> {
    var lf : ListenableFuture<SendResult<String, String>> = kafkaTemplate?.send(topic, message)!!
    var sendResult: SendResult<String, String> = lf.get()
    return ResponseEntity.ok(sendResult.producerRecord.value() + " sent to topic")
}


, test_topic, KafkaTemplate. ListenableFuture, . , .





Kafka – KafkaProducer. :



Kotlin



@GetMapping("/produce")
fun produceMessage(@RequestParam("message") message : String) : ResponseEntity<String> {
    var producerRecord :ProducerRecord<String, String> = ProducerRecord(topic, message)

    val map = mutableMapOf<String, String>()
    map["key.serializer"]   = "org.apache.kafka.common.serialization.StringSerializer"
    map["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
    map["bootstrap.servers"] = "localhost:9092"

    var producer = KafkaProducer<String, String>(map as Map<String, Any>?)
    var future:Future<RecordMetadata> = producer?.send(producerRecord)!!
    return ResponseEntity.ok(" message sent to " + future.get().topic());
}


.



KafkaProduce Map, . , StringSerializer.



, Serializer – Kafka, . Apache Kafka , ByteArraySerializer, ByteSerializer, FloatSerializer .



map StringSerializer.



Kotlin



map["key.serializer"]   = "org.apache.kafka.common.serialization.StringSerializer"
map["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"


– bootstrap-, Kafka.



Kotlin



map["bootstrap.servers"] = "localhost:9092"


, KafkaProducer.



ProducerRecord . :



Kotlin



var producerRecord :ProducerRecord<String, String> = ProducerRecord(topic, message)


:



Kotlin



var future:Future<RecordMetadata> = producer?.send(producerRecord)!!


future , .





, . . , , .

MessageConsumer.kt Service.



Kotlin



@KafkaListener(topics= ["test_topic"], groupId = "test_id")
fun consume(message:String) :Unit {
    println(" message received from topic : $message");
}


@KafkaListener , . , , .

GitHub.






«Backend- Kotlin»







All Articles