← Về danh sách bài họcBài 14/20
📤 Bài 14: Kafka Producer & Consumer API
🎯 Sau bài học này, bạn sẽ:
- Cấu hình Producer: batching, compression, acks
- Hiểu partitioning strategy và message ordering
- Consumer API: auto/manual commit offset
- Error handling và graceful shutdown
1. Producer Configuration
const { Kafka, CompressionTypes } = require('kafkajs');
const kafka = new Kafka({
clientId: 'order-service',
brokers: ['localhost:9092'],
retry: {
initialRetryTime: 100,
retries: 8
}
});
const producer = kafka.producer({
// Batching: gom nhiều messages gửi 1 lần
allowAutoTopicCreation: false,
transactionTimeout: 30000,
});
async function init() {
await producer.connect();
// Gửi với các options
await producer.send({
topic: 'orders',
compression: CompressionTypes.GZIP,
acks: -1, // all replicas phải confirm (safest)
timeout: 30000,
messages: [
{
key: 'order-123', // Partition key
value: JSON.stringify({
orderId: 123,
items: ['laptop', 'mouse'],
total: 25000000
}),
headers: {
'correlation-id': 'abc-123',
'source': 'web-app'
},
timestamp: Date.now().toString()
}
]
});
}
📌 ACKs levels:
•
•
•
•
acks: 0 → Fire and forget (fastest, có thể mất message)•
acks: 1 → Leader confirm (balanced)•
acks: -1 → All in-sync replicas confirm (safest, chậm nhất)
2. Batch Sending
// sendBatch: gửi messages đến nhiều topics cùng lúc
await producer.sendBatch({
topicMessages: [
{
topic: 'user-events',
messages: [
{ key: 'user-1', value: JSON.stringify({ event: 'login' }) },
{ key: 'user-2', value: JSON.stringify({ event: 'signup' }) },
]
},
{
topic: 'analytics',
messages: [
{ value: JSON.stringify({ page: '/home', visits: 100 }) }
]
}
]
});
3. Consumer API Chi Tiết
const consumer = kafka.consumer({
groupId: 'order-processors',
sessionTimeout: 30000,
heartbeatInterval: 3000,
maxBytesPerPartition: 1048576, // 1MB
minBytes: 1,
maxBytes: 10485760, // 10MB
maxWaitTimeInMs: 5000,
retry: { retries: 5 }
});
async function startConsumer() {
await consumer.connect();
await consumer.subscribe({
topics: ['orders', 'payments'],
fromBeginning: false
});
await consumer.run({
autoCommit: true,
autoCommitInterval: 5000,
autoCommitThreshold: 100,
eachMessage: async ({ topic, partition, message, heartbeat }) => {
const data = JSON.parse(message.value.toString());
console.log(`[${topic}] P${partition} Offset ${message.offset}:`, data);
// Long processing: gọi heartbeat để tránh rebalance
await heartbeat();
await processOrder(data);
}
});
}
4. Manual Commit Offset
// Manual commit: kiểm soát chính xác khi nào commit
await consumer.run({
autoCommit: false,
eachMessage: async ({ topic, partition, message }) => {
try {
await processMessage(message);
// Commit sau khi xử lý thành công
await consumer.commitOffsets([{
topic,
partition,
offset: (parseInt(message.offset) + 1).toString()
}]);
} catch (err) {
console.error('Processing failed:', err);
// KHÔNG commit → message sẽ được đọc lại
}
}
});
// Hoặc commit theo batch
await consumer.run({
autoCommit: false,
eachBatch: async ({ batch, resolveOffset, commitOffsetsIfNecessary }) => {
for (const message of batch.messages) {
await processMessage(message);
resolveOffset(message.offset);
}
await commitOffsetsIfNecessary();
}
});
⚠️ At-least-once vs At-most-once:
• Commit trước khi xử lý → At-most-once (có thể mất message)
• Commit sau khi xử lý → At-least-once (có thể trùng message)
• Exactly-once: cần Kafka transactions (phức tạp hơn)
• Commit trước khi xử lý → At-most-once (có thể mất message)
• Commit sau khi xử lý → At-least-once (có thể trùng message)
• Exactly-once: cần Kafka transactions (phức tạp hơn)
5. Graceful Shutdown
// Graceful shutdown: đóng connections sạch sẽ
const errorTypes = ['unhandledRejection', 'uncaughtException'];
const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2'];
errorTypes.forEach(type => {
process.on(type, async (e) => {
console.log(`Error ${type}:`, e);
await consumer.disconnect();
process.exit(1);
});
});
signalTraps.forEach(signal => {
process.once(signal, async () => {
console.log(`Signal ${signal} received, shutting down...`);
await consumer.disconnect();
await producer.disconnect();
});
});
📝 Tóm Tắt
- Producer: acks, compression, batching, partition key
- Consumer: groupId, subscribe, eachMessage
- Commit: auto commit vs manual commit offset
- Semantics: at-least-once (commit sau xử lý) là phổ biến nhất
- Graceful shutdown: disconnect trước khi exit