EDA with Kafka and .Net Core

What is Pub/Sub Pattern ?

Publish/Subscribe Pattern is a messaging pattern where senders of messages, called publishers, do not send messages directly to specific receivers, called subscribers. Instead, published messages are characterized into classes, without knowledge of what, if any, subscribers there may be. Similarly, subscribers express interest in one or more classes and only receive messages that are of interest, without knowledge of what, if any, publishers there are.

Publisher / Subscriber with Kafak
Producer - Broker (Topic) - Consumer Group

What is Kafka ?

Kafka is an open-source distributed streaming platform developed by Apache Software Foundation. It is designed to handle large amounts of data in real-time from multiple sources.

Kafka is a publish-subscribe messaging system that allows producers to publish messages to a topic, and consumers to subscribe to a topic and receive the messages in real-time. It is designed to be scalable, fault-tolerant, and highly available.

Kafka’s architecture consists of producers, consumers, brokers, and topics. Producers are the applications that generate the data and publish it to the brokers, which are responsible for storing and replicating the data across a cluster of servers. Consumers subscribe to a topic and receive messages from the brokers in real-time.

Kafka is widely used in modern data architectures for streaming data processing, real-time analytics, and event-driven architectures. Some common use cases include real-time data pipelines, log aggregation, and data integration.

How to implement Pub/Sub Messaging System with Kafka, .NET Core ?

Set up your Kafka environment with docker

This docker-compose.yml file defines three services: zookeeper, kafka.

version: '3'

services:
  zookeeper:
    image: bitnami/zookeeper:latest
    ports:
      - "2181:2181"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  
  kafka:
    image: bitnami/kafka:3.4
    ports:
      - "9092:9092"
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper

The zookeeper service runs a ZooKeeper server, which is required by Kafka as a coordination service.

The kafka service runs a Kafka broker and exposes port 9092, which is the default port for Kafka brokers.

Create a topic

Kafka stores messages in topics. It’s good practice to explicitly create them before using them, even if Kafka is configured to automagically create them when referenced.

Run this command to create a new topic into which we’ll write and read some test messages.

docker exec broker \\
kafka-topics --bootstrap-server broker:9092 \\
             --create \\
             --topic demo-topic

How to create .NET Client ?

After all we need to add the Confluent.Kafka nuget package to your project: https://www.nuget.org/packages/Confluent.Kafka/

Producer

using Confluent.Kafka;
using System.Net;

var config = new ProducerConfig
{
    BootstrapServers = "localhost:9092"
};

using (var producer = new ProducerBuilder<Null, string>(config).Build())
{
    try
    {
        var message = new Message<Null, string>
        {
            Value = "Hello, Kafka!" // Replace with the message you want to send
        };

        var result = await producer.ProduceAsync("demo-topic", message);

        Console.WriteLine($"Delivered '{result.Value}' to '{result.TopicPartitionOffset}'");
    }
    catch (ProduceException<Null, string> e)
    {
        Console.WriteLine($"Delivery failed: {e.Error.Reason}");
    }
}

In this example, we create a ProducerConfig object with the appropriate Kafka bootstrap server address.

We then use a ProducerBuilder to create a Producer object.

Finally, we create a Message object with the message we want to send, and use the ProduceAsync method to send the message to the Kafka topic « demo-topic ».

Consumer

using Confluent.Kafka;
using System;
using System.Threading;

class Program
{
    static void Main(string[] args)
    {
        var config = new ConsumerConfig
        {
            BootstrapServers = "localhost:9092", // Replace with the appropriate bootstrap server(s)
            GroupId = "my-group", // Replace with the name of your consumer group
            AutoOffsetReset = AutoOffsetReset.Earliest // Replace with the desired offset reset policy
        };

        using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
        {
            consumer.Subscribe("demo-topic"); // Replace with the name of the Kafka topic you want to consume messages from

            CancellationTokenSource cts = new CancellationTokenSource();
            Console.CancelKeyPress += (_, e) => {
                e.Cancel = true; // Prevent the process from terminating.
                cts.Cancel();
            };

            try
            {
                while (true)
                {
                    var result = consumer.Consume(cts.Token);

                    Console.WriteLine($"Consumed message '{result.Message.Value}' at: '{result.TopicPartitionOffset}'.");
                }
            }
            catch (OperationCanceledException)
            {
                // The consumer was stopped.
            }
            finally
            {
                consumer.Close();
            }
        }
    }
}

In this example, we create a ConsumerConfig object with the appropriate Kafka bootstrap server address, consumer group name, and offset reset policy. We then use a ConsumerBuilder to create a Consumer object, and subscribe to the Kafka topic « my-topic ». Finally, we enter a loop to consume messages from the topic using the Consume method, and print the message value to the console.

Conclusion

In this article, we have learned how to build a Pub/Sub messaging system using Apache Kafka, .NET Core, on a local environment with docker. We covered all the necessary steps, from setting up the environment to creating a .NET Core project. We also created a Kafka producer that publishes messages to a Kafka topic and a Kafka consumer that subscribes to the same topic and processes the messages.

Kafka’s Pub/Sub pattern allows for scalable, reliable, and high-performance messaging, making it a popular choice for modern applications. With .NET Core, we can easily build applications that integrate with Kafka and take advantage of its features.

To take full advantage of pub/sub, we need to share the model between the publisher and subscriber using serialization and deserialization. Kafka supports messages in Avro, JSON, and Protobuf formats.

Full code in github : https://github.com/FiftyCloud/kafkademo

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée. Les champs obligatoires sont indiqués avec *