What is Pub/Sub Pattern ?
Publish/Subscribe Pattern is a messaging pattern where senders of messages, called publishers, do not send messages directly to specific receivers, called subscribers. Instead, published messages are characterized into classes, without knowledge of what, if any, subscribers there may be. Similarly, subscribers express interest in one or more classes and only receive messages that are of interest, without knowledge of what, if any, publishers there are.
What is Kafka ?
Kafka is an open-source distributed streaming platform developed by Apache Software Foundation. It is designed to handle large amounts of data in real-time from multiple sources.
Kafka is a publish-subscribe messaging system that allows producers to publish messages to a topic, and consumers to subscribe to a topic and receive the messages in real-time. It is designed to be scalable, fault-tolerant, and highly available.
Kafka’s architecture consists of producers, consumers, brokers, and topics. Producers are the applications that generate the data and publish it to the brokers, which are responsible for storing and replicating the data across a cluster of servers. Consumers subscribe to a topic and receive messages from the brokers in real-time.
Kafka is widely used in modern data architectures for streaming data processing, real-time analytics, and event-driven architectures. Some common use cases include real-time data pipelines, log aggregation, and data integration.
How to implement Pub/Sub Messaging System with Kafka, .NET Core ?
Set up your Kafka environment with docker
This docker-compose.yml
file defines three services: zookeeper
, kafka.
version: '3'
services:
zookeeper:
image: bitnami/zookeeper:latest
ports:
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: bitnami/kafka:3.4
ports:
- "9092:9092"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
The zookeeper
service runs a ZooKeeper server, which is required by Kafka as a coordination service.
The kafka
service runs a Kafka broker and exposes port 9092
, which is the default port for Kafka brokers.
Create a topic
Kafka stores messages in topics. It’s good practice to explicitly create them before using them, even if Kafka is configured to automagically create them when referenced.
Run this command to create a new topic into which we’ll write and read some test messages.
docker exec broker \\
kafka-topics --bootstrap-server broker:9092 \\
--create \\
--topic demo-topic
How to create .NET Client ?
After all we need to add the Confluent.Kafka nuget package to your project: https://www.nuget.org/packages/Confluent.Kafka/
Producer
using Confluent.Kafka;
using System.Net;
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092"
};
using (var producer = new ProducerBuilder<Null, string>(config).Build())
{
try
{
var message = new Message<Null, string>
{
Value = "Hello, Kafka!" // Replace with the message you want to send
};
var result = await producer.ProduceAsync("demo-topic", message);
Console.WriteLine($"Delivered '{result.Value}' to '{result.TopicPartitionOffset}'");
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
}
In this example, we create a ProducerConfig
object with the appropriate Kafka bootstrap server address.
We then use a ProducerBuilder
to create a Producer
object.
Finally, we create a Message
object with the message we want to send, and use the ProduceAsync
method to send the message to the Kafka topic « demo-topic ».
Consumer
using Confluent.Kafka;
using System;
using System.Threading;
class Program
{
static void Main(string[] args)
{
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092", // Replace with the appropriate bootstrap server(s)
GroupId = "my-group", // Replace with the name of your consumer group
AutoOffsetReset = AutoOffsetReset.Earliest // Replace with the desired offset reset policy
};
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
consumer.Subscribe("demo-topic"); // Replace with the name of the Kafka topic you want to consume messages from
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // Prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
var result = consumer.Consume(cts.Token);
Console.WriteLine($"Consumed message '{result.Message.Value}' at: '{result.TopicPartitionOffset}'.");
}
}
catch (OperationCanceledException)
{
// The consumer was stopped.
}
finally
{
consumer.Close();
}
}
}
}
In this example, we create a ConsumerConfig
object with the appropriate Kafka bootstrap server address, consumer group name, and offset reset policy. We then use a ConsumerBuilder
to create a Consumer
object, and subscribe to the Kafka topic « my-topic ». Finally, we enter a loop to consume messages from the topic using the Consume
method, and print the message value to the console.
Conclusion
In this article, we have learned how to build a Pub/Sub messaging system using Apache Kafka, .NET Core, on a local environment with docker. We covered all the necessary steps, from setting up the environment to creating a .NET Core project. We also created a Kafka producer that publishes messages to a Kafka topic and a Kafka consumer that subscribes to the same topic and processes the messages.
Kafka’s Pub/Sub pattern allows for scalable, reliable, and high-performance messaging, making it a popular choice for modern applications. With .NET Core, we can easily build applications that integrate with Kafka and take advantage of its features.
To take full advantage of pub/sub, we need to share the model between the publisher and subscriber using serialization and deserialization. Kafka supports messages in Avro, JSON, and Protobuf formats.
Full code in github : https://github.com/FiftyCloud/kafkademo
Laisser un commentaire