Antes de comenzar, me gustaría señalar que este es solo un pequeño tutorial de inicio rápido para aquellos que, como yo, nunca han usado Kafka en la práctica.
¡Así que comencemos!
El único broker Kafka y el ZooKeeper necesario para su funcionamiento lo ejecutaré en Docker .
Primero creemos una red separada kafkanet
docker network create kafkanet
Ejecutando un contenedor con ZooKeeper
docker run -d --network=kafkanet --name=zookeeper -e ZOOKEEPER_CLIENT_PORT=2181 -e ZOOKEEPER_TICK_TIME=2000 -p 2181:2181 confluentinc/cp-zookeeper
Ejecutando un contenedor con Kafka
docker run -d --network=kafkanet --name=kafka -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 -p 9092:9092 confluentinc/cp-kafka
Para asegurarse de que no haya errores, puede mostrar el registro docker logs kafka
Kafka, , ,
kafka
docker exec -it kafka bash
demo-topic
/bin/kafka-topics --create --topic demo-topic --bootstrap-server kafka:9092
/bin/kafka-topics --list --zookeeper zookeeper:2181
/bin/kafka-topics --describe --topic demo-topic --bootstrap-server kafka:9092
/bin/kafka-console-producer --topic demo-topic --bootstrap-server kafka:9092
/bin/kafka-console-consumer --topic demo-topic --from-beginning --bootstrap-server kafka:9092
.NET : KafkaProducer
, , KafkaConsumer
, . Confluent.Kafka Microsoft.Extensions.Hosting.
KafkaProducer
KafkaProducerService
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Threading;
using System.Threading.Tasks;
namespace KafkaProducer
{
public class KafkaProducerService : IHostedService
{
private readonly ILogger<KafkaProducerService> _logger;
private readonly IProducer<Null, string> _producer;
public KafkaProducerService(ILogger<KafkaProducerService> logger)
{
_logger = logger;
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092"
};
_producer = new ProducerBuilder<Null, string>(config).Build();
}
public async Task StartAsync(CancellationToken cancellationToken)
{
for (var i = 0; i < 5; i++)
{
var value = $"Event N {i}";
_logger.LogInformation($"Sending >> {value}");
await _producer.ProduceAsync(
"demo-topic",
new Message<Null, string> { Value = value },
cancellationToken);
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
_producer?.Dispose();
_logger.LogInformation($"{nameof(KafkaProducerService)} stopped");
return Task.CompletedTask;
}
}
}
Program.cs
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;
namespace KafkaProducer
{
class Program
{
static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
Console.ReadKey();
}
private static IHostBuilder CreateHostBuilder(string[] args) =>
Host
.CreateDefaultBuilder(args)
.ConfigureServices((context, collection) =>
collection.AddHostedService<KafkaProducerService>());
}
}
KafkaConsumer
KafkaConsumerService
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Threading;
using System.Threading.Tasks;
namespace KafkaConsumer
{
public class KafkaConsumerService : IHostedService
{
private readonly ILogger<KafkaConsumerService> _logger;
private readonly IConsumer<Ignore, string> _consumer;
public KafkaConsumerService(ILogger<KafkaConsumerService> logger)
{
_logger = logger;
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "demo-group",
AutoOffsetReset = AutoOffsetReset.Earliest
};
_consumer = new ConsumerBuilder<Ignore, string>(config).Build();
}
public Task StartAsync(CancellationToken cancellationToken)
{
_consumer.Subscribe("demo-topic");
while (!cancellationToken.IsCancellationRequested)
{
var consumeResult = _consumer.Consume(cancellationToken);
_logger.LogInformation($"Received >> {consumeResult.Message.Value}");
}
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_consumer?.Dispose();
_logger.LogInformation($"{nameof(KafkaConsumerService)} stopped");
return Task.CompletedTask;
}
}
}
Program.cs
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;
namespace KafkaConsumer
{
class Program
{
static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
Console.ReadKey();
}
private static IHostBuilder CreateHostBuilder(string[] args) =>
Host
.CreateDefaultBuilder(args)
.ConfigureServices((context, collection) =>
collection.AddHostedService<KafkaConsumerService>());
}
}