Kafka
Kafka is a distributed streaming platform that was originally developed by LinkedIn in 2010. It was designed to handle real-time data pipelines at a massive scale, making it ideal for applications that need to process large volumes of data in real-time.
Core Concepts
In Kafka producers send messages to topics, which are categories for storing these messages. Consumers read messages from topics, and they can belong to consumer groups, allowing multiple consumers to share the load of processing messages from a topic. Each consumer in a group reads from different partitions of the topic, ensuring balanced and efficient data processing.
digraph KafkaArchitecture {
// Node definitions
node [shape=box];
Producer [label="Producer 1"];
Producer2 [label="Producer 2"];
TopicA [label="Topic A"];
TopicB [label="Topic B"];
subgraph cluster_ConsumerGroup1 {
label="Consumer Group";
Consumer1 [label="Consumer 1"];
Consumer2 [label="Consumer 2"];
Consumer3 [label="Consumer 3"];
}
// Connections
Producer -> TopicA [label="Produces"];
Producer -> TopicB [label="Produces"];
Producer2 -> TopicB [label="Produces"];
TopicA -> Consumer1 [label="Consumed by"];
TopicA -> Consumer2 [label="Consumed by"];
TopicB -> Consumer3 [label="Consumed by"];
}
Run Kafka
To run Kafka in a development setup, add a docker-compose.yml:
services:
kafka:
image: 'bitnami/kafka:latest'
ports:
- '9094:9094'
environment:
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
- "KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,\
EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
docker-compose up -d
Interact with Kafka
bun add kafkajs
Add a producer.ts
import { Kafka, Partitioners } from 'kafkajs'
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9094'],
})
const producer = kafka.producer({
createPartitioner: Partitioners.LegacyPartitioner
})
await producer.connect()
await producer.send({
topic: 'test-topic',
messages: [{ value: 'Hello Kafka!' }],
})
await producer.disconnect()
Add a consumer.ts
import { Kafka } from 'kafkajs'
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9094'],
})
const consumer = kafka.consumer({ groupId: 'test-group' })
await consumer.connect()
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
value: message.value!.toString(),
})
},
})
Then run producer.ts and and consumer.ts using bun:
bun producer.ts
bun consumer.ts
When done, stop and remove the container:
docker-compose down --remove-orphans