¡Oye! Mi nombre es Andrey Serebryansky, soy ingeniero de datos en el equipo de Operaciones de datos. Nuestro equipo es responsable de llenar nuestro repositorio de Snowflake, así como de asegurar que el resto de los equipos tengan datos en tiempo real. Por ejemplo, el feed de transacciones (estas son compras de clientes, sus transferencias, reembolsos recibidos por ellos) se completa en función de nuestros datos.
Para todas estas tareas, usamos Kafka y, lo más importante, Kafka Streams. Hoy hablaré sobre las tareas para las que se puede usar Kafka Streams y mostraré el código para nuestros ejemplos simples. Esto será útil para aquellos que usan Kafka pero aún no han probado Kafka Streams. Si desea preservar el estado mientras procesa temas de Kafka, o buscaba una sintaxis simple para enriquecer algunos temas con información de otros, hoy le mostraré cómo puede hacerlo de manera fácil y prácticamente inmediata.
Esquema del artículo
-
¿Por qué necesitamos Kafka Streams?
Caso No. 1. Enriqueciendo las compras de nuestros clientes con información de marca
Caso número 2. Llevamos los datos de los clientes del equipo de originación a nuestro almacenamiento
-
Un poco sobre la escalabilidad de Kafka Streams
-
Un poco sobre Kafka Streams
Kafka Streams - Java. Kafka Java/Scala.
exactly once processing kafka transactions.
Kafka Streams , stateful (, ).
Kafka Streams?
: , - , , , , .
, - . , , . , , , , , .
: , . , , , .
, .
, , , . . Kafka Streams. , ,
, .
№1.
, . (brand_id) ( ).
.
.
builder.streams("authorization-events")
.join(
builder.globalTable("brands"),
auth -> auth.get("brand_id"), // ,
(brand, auth) -> auth.set("brandName", brand.get("name")) //
);
builder? . :
import org.apache.kafka.streams.StreamsBuilder;
...
StreamsBuilder builder = new StreamsBuilder();
, Kafka Streams id ( , ).
id ?
Kafka Streams , , - . builder.globalTable(topicName)
.
. , , . , . , , .
, Kafka Streams .
№2. Origination
Vivid Money, , . Origination - Vivid.
Kafka Connect open-source dynamodb JSON.
, . , , . Apache AVRO. .
Avro
{
"type": "record",
"name": "OriginationClient",
"namespace": "datahub",
"fields": [
{
"name": "firstName",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "lastName",
"type": [
"null",
"string"
],
"default": null
},
...
]
}
, :
Schema schema = new Schema.Parser().parse(new File("path/to/schema.avsc"));
AvroConverter avroConverter = new AvroConverter(schema);
builder.stream("origination-json-topic")
.mapValues(val -> avroConverter.convert(val))
.to("origination-avro-topic");
AvroConverter - , . open source https://github.com/allegro/json-avro-converter . .
, . , , , . (diff) . , .
, . . . , Kafka Streams . , , .
:
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
...
var changes = builder.stream(sourceTopic);
var stateStoreSupplier = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore**("state-store"**), //
Serdes.Bytes(), //
new GenericAvroSerde() //
);
builder.addStateStore(stateStoreSupplier);
changes.transform(() -> new ChangeTransformer(), "state-store") // ,
.to(outputTopic);
ChangeTransformer :
public class ChangeTransformer implements Transformer {
private KeyValueStore<Bytes, GenericRecord> stateStore;
@Override
public void init(ProcessorContext processorContext) {
this.stateStore = processorContext.getStateStore("state-store");
}
@Override
public KeyValue<String, GenericRecord> transform(String recordKey, GenericRecord record) {
GenericRecord prevState = stateStore.get(recordKey);
return extractDiff(prevState, record);
}
...
}
?
StreamsBuilder builder = new StreamsBuilder();builder.stream("my-input-topic")
.filter(...)
.map(...)
.to("my-output-topic");
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
kafkaStreams.start(); //
...
kafkaStreams.stop();
Kafka Streams
Kafka Streams . . 16 , 16 . , .
, state-store ( ChangeTransformer-), , ! .
: https://docs.confluent.io/platform/current/streams/architecture.html#parallelism-model
Kafka Streams :
stateful (join, get previous state). , .
. map, filter, join DSL. ,
transform()
. ChangeTransformer-, .
. . .
P.S. ) , !