¿Cómo utilizamos Kafka Streams en el equipo de almacenamiento de datos de Vivid Money?

¡Oye! Mi nombre es Andrey Serebryansky, soy ingeniero de datos en el equipo de Operaciones de datos. Nuestro equipo es responsable de llenar nuestro repositorio de Snowflake, así como de asegurar que el resto de los equipos tengan datos en tiempo real. Por ejemplo, el feed de transacciones (estas son compras de clientes, sus transferencias, reembolsos recibidos por ellos) se completa en función de nuestros datos.





Para todas estas tareas, usamos Kafka y, lo más importante, Kafka Streams. Hoy hablaré sobre las tareas para las que se puede usar Kafka Streams y mostraré el código para nuestros ejemplos simples. Esto será útil para aquellos que usan Kafka pero aún no han probado Kafka Streams. Si desea preservar el estado mientras procesa temas de Kafka, o buscaba una sintaxis simple para enriquecer algunos temas con información de otros, hoy le mostraré cómo puede hacerlo de manera fácil y prácticamente inmediata.





Esquema del artículo

  1. Un poco sobre Kafka Streams





  2. ¿Por qué necesitamos Kafka Streams?





  3. Caso No. 1. Enriqueciendo las compras de nuestros clientes con información de marca





  4. Caso número 2. Llevamos los datos de los clientes del equipo de originación a nuestro almacenamiento





  5. ¿Cómo empezar todo esto?





  6. Un poco sobre la escalabilidad de Kafka Streams





  7. conclusiones





Un poco sobre Kafka Streams

Kafka Streams - Java. Kafka Java/Scala.





exactly once processing kafka transactions.





Kafka Streams , stateful (, ).





Kafka Streams?

: , - , , , , .





, - . , , . , , , , , .





: , . , , , .





Obtenemos secuencialmente datos de diferentes fuentes, esperando si algo salió mal en la fuente.
, , -

, .





Demasiados amigos

, , , . . Kafka Streams. , ,





Kafka Streams obtiene los datos necesarios por adelantado
Kafka Streams

, .





№1.

, . (brand_id) ( ).





Mejores marcas

.





Tema de autorización

.









builder.streams("authorization-events")
    .join(
        builder.globalTable("brands"), 
        auth -> auth.get("brand_id"), // ,      
        (brand, auth) -> auth.set("brandName", brand.get("name")) //  
    );

      
      



builder? . :





import org.apache.kafka.streams.StreamsBuilder;
...

StreamsBuilder builder = new StreamsBuilder();

      
      



, Kafka Streams id ( , ).





id ?

Kafka Streams , , - . builder.globalTable(topicName)



.





. , , . , . , , .





https://kafka.apache.org/0110/documentation/streams/developer-guide#streams_duality
https://kafka.apache.org/0110/documentation/streams/developer-guide#streams_duality

, Kafka Streams .





№2. Origination

Vivid Money, , . Origination - Vivid.





La información sobre el nombre y apellido va a la base de datos del equipo de origen
Origination

Kafka Connect open-source dynamodb JSON.





Llevamos datos de dynamodb a nuestro kafka
dynamodb

, . , , . Apache AVRO. .





Avro
{
  "type": "record",
  "name": "OriginationClient",
  "namespace": "datahub",
  "fields": [
    {
      "name": "firstName",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "lastName",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    ...
  ]
}

      
      



, :





Schema schema = new Schema.Parser().parse(new File("path/to/schema.avsc"));
AvroConverter avroConverter = new AvroConverter(schema);

builder.stream("origination-json-topic")
    .mapValues(val -> avroConverter.convert(val))
    .to("origination-avro-topic");

      
      



AvroConverter - , . open source https://github.com/allegro/json-avro-converter . .





, . , , , . (diff) . , .





, . . . , Kafka Streams . , , .





:





import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
...

var changes = builder.stream(sourceTopic);
var stateStoreSupplier = Stores.keyValueStoreBuilder(
    Stores.persistentKeyValueStore**("state-store"**), //                
  Serdes.Bytes(), //       
    new GenericAvroSerde() //       
);
builder.addStateStore(stateStoreSupplier);
changes.transform(() -> new ChangeTransformer(), "state-store") //  ,  
    .to(outputTopic);

      
      



ChangeTransformer :





public class ChangeTransformer implements Transformer {
  private KeyValueStore<Bytes, GenericRecord> stateStore;

  @Override
  public void init(ProcessorContext processorContext) {
     this.stateStore = processorContext.getStateStore("state-store");
  }
  @Override
  public KeyValue<String, GenericRecord> transform(String recordKey, GenericRecord record) {
    GenericRecord prevState = stateStore.get(recordKey);
    return extractDiff(prevState, record);
  }
  ...
}

      
      



?

StreamsBuilder builder = new StreamsBuilder();builder.stream("my-input-topic")
        .filter(...)
        .map(...)
        .to("my-output-topic");
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
kafkaStreams.start(); // 
...
kafkaStreams.stop();

      
      



Kafka Streams

Kafka Streams . . 16 , 16 . , .





, state-store ( ChangeTransformer-), , ! .





: https://docs.confluent.io/platform/current/streams/architecture.html#parallelism-model





Kafka Streams :





  • stateful (join, get previous state). , .





  • . map, filter, join DSL. , transform()



    . ChangeTransformer-, .





  • . . .





P.S. ) , !








All Articles