Primer toque de Kafka

Antes de comenzar, me gustaría señalar que este es solo un pequeño tutorial de inicio rápido para aquellos que, como yo, nunca han usado Kafka en la práctica.





¡Así que comencemos!





El único broker Kafka y el ZooKeeper necesario para su funcionamiento lo ejecutaré en Docker .





Primero creemos una red separada kafkanet







docker network create kafkanet
      
      



Ejecutando un contenedor con ZooKeeper





docker run -d --network=kafkanet --name=zookeeper -e ZOOKEEPER_CLIENT_PORT=2181 -e ZOOKEEPER_TICK_TIME=2000 -p 2181:2181 confluentinc/cp-zookeeper
      
      



Ejecutando un contenedor con Kafka





docker run -d --network=kafkanet --name=kafka -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 -p 9092:9092 confluentinc/cp-kafka
      
      



Para asegurarse de que no haya errores, puede mostrar el registro docker logs kafka







Kafka, , ,





kafka







docker exec -it kafka bash
      
      



demo-topic







/bin/kafka-topics --create --topic demo-topic --bootstrap-server kafka:9092
      
      







/bin/kafka-topics --list --zookeeper zookeeper:2181
      
      







/bin/kafka-topics --describe --topic demo-topic --bootstrap-server kafka:9092
      
      







/bin/kafka-console-producer --topic demo-topic --bootstrap-server kafka:9092
      
      







/bin/kafka-console-consumer --topic demo-topic --from-beginning --bootstrap-server kafka:9092
      
      



.NET : KafkaProducer



, , KafkaConsumer



, . Confluent.Kafka Microsoft.Extensions.Hosting.





KafkaProducer



KafkaProducerService







using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Threading;
using System.Threading.Tasks;

namespace KafkaProducer
{
    public class KafkaProducerService : IHostedService
    {
        private readonly ILogger<KafkaProducerService> _logger;
        private readonly IProducer<Null, string> _producer;

        public KafkaProducerService(ILogger<KafkaProducerService> logger)
        {
            _logger = logger;
            var config = new ProducerConfig
            {
                BootstrapServers = "localhost:9092"
            };
            _producer = new ProducerBuilder<Null, string>(config).Build();
        }

        public async Task StartAsync(CancellationToken cancellationToken)
        {
            for (var i = 0; i < 5; i++)
            {
                var value = $"Event N {i}";
                _logger.LogInformation($"Sending >> {value}");
                await _producer.ProduceAsync(
                    "demo-topic",
                    new Message<Null, string> { Value = value },
                    cancellationToken);
            }
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            _producer?.Dispose();
            _logger.LogInformation($"{nameof(KafkaProducerService)} stopped");
            return Task.CompletedTask;
        }
    }
}
      
      



Program.cs







using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;

namespace KafkaProducer
{
    class Program
    {
        static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
            Console.ReadKey();
        }

        private static IHostBuilder CreateHostBuilder(string[] args) =>
            Host
                .CreateDefaultBuilder(args)
                .ConfigureServices((context, collection) =>
                    collection.AddHostedService<KafkaProducerService>());
    }
}
      
      



KafkaConsumer



KafkaConsumerService







using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Threading;
using System.Threading.Tasks;

namespace KafkaConsumer
{
    public class KafkaConsumerService : IHostedService
    {
        private readonly ILogger<KafkaConsumerService> _logger;
        private readonly IConsumer<Ignore, string> _consumer;

        public KafkaConsumerService(ILogger<KafkaConsumerService> logger)
        {
            _logger = logger;
            var config = new ConsumerConfig
            {
                BootstrapServers = "localhost:9092",
                GroupId = "demo-group",
                AutoOffsetReset = AutoOffsetReset.Earliest
            };
            _consumer = new ConsumerBuilder<Ignore, string>(config).Build();
        }

        public Task StartAsync(CancellationToken cancellationToken)
        {
            _consumer.Subscribe("demo-topic");
            while (!cancellationToken.IsCancellationRequested)
            {
                var consumeResult = _consumer.Consume(cancellationToken);
                _logger.LogInformation($"Received >> {consumeResult.Message.Value}");
            }
            return Task.CompletedTask;
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            _consumer?.Dispose();
            _logger.LogInformation($"{nameof(KafkaConsumerService)} stopped");
            return Task.CompletedTask;
        }
    }
}
      
      



Program.cs







using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;

namespace KafkaConsumer
{
    class Program
    {
        static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
            Console.ReadKey();
        }

        private static IHostBuilder CreateHostBuilder(string[] args) =>
            Host
                .CreateDefaultBuilder(args)
                .ConfigureServices((context, collection) =>
                    collection.AddHostedService<KafkaConsumerService>());
    }
}
      
      



( )








All Articles