← Về danh sách bài họcBài 14/20

📤 Bài 14: Kafka Producer & Consumer API

⏱️ Thời gian đọc: 25 phút | 📚 Độ khó: Trung bình

🎯 Sau bài học này, bạn sẽ:

1. Producer Configuration

const { Kafka, CompressionTypes } = require('kafkajs');

const kafka = new Kafka({
    clientId: 'order-service',
    brokers: ['localhost:9092'],
    retry: {
        initialRetryTime: 100,
        retries: 8
    }
});

const producer = kafka.producer({
    // Batching: gom nhiều messages gửi 1 lần
    allowAutoTopicCreation: false,
    transactionTimeout: 30000,
});

async function init() {
    await producer.connect();

    // Gửi với các options
    await producer.send({
        topic: 'orders',
        compression: CompressionTypes.GZIP,
        acks: -1,  // all replicas phải confirm (safest)
        timeout: 30000,
        messages: [
            {
                key: 'order-123',     // Partition key
                value: JSON.stringify({
                    orderId: 123,
                    items: ['laptop', 'mouse'],
                    total: 25000000
                }),
                headers: {
                    'correlation-id': 'abc-123',
                    'source': 'web-app'
                },
                timestamp: Date.now().toString()
            }
        ]
    });
}
📌 ACKs levels:
acks: 0 → Fire and forget (fastest, có thể mất message)
acks: 1 → Leader confirm (balanced)
acks: -1 → All in-sync replicas confirm (safest, chậm nhất)

2. Batch Sending

// sendBatch: gửi messages đến nhiều topics cùng lúc
await producer.sendBatch({
    topicMessages: [
        {
            topic: 'user-events',
            messages: [
                { key: 'user-1', value: JSON.stringify({ event: 'login' }) },
                { key: 'user-2', value: JSON.stringify({ event: 'signup' }) },
            ]
        },
        {
            topic: 'analytics',
            messages: [
                { value: JSON.stringify({ page: '/home', visits: 100 }) }
            ]
        }
    ]
});

3. Consumer API Chi Tiết

const consumer = kafka.consumer({
    groupId: 'order-processors',
    sessionTimeout: 30000,
    heartbeatInterval: 3000,
    maxBytesPerPartition: 1048576, // 1MB
    minBytes: 1,
    maxBytes: 10485760,  // 10MB
    maxWaitTimeInMs: 5000,
    retry: { retries: 5 }
});

async function startConsumer() {
    await consumer.connect();

    await consumer.subscribe({
        topics: ['orders', 'payments'],
        fromBeginning: false
    });

    await consumer.run({
        autoCommit: true,
        autoCommitInterval: 5000,
        autoCommitThreshold: 100,

        eachMessage: async ({ topic, partition, message, heartbeat }) => {
            const data = JSON.parse(message.value.toString());

            console.log(`[${topic}] P${partition} Offset ${message.offset}:`, data);

            // Long processing: gọi heartbeat để tránh rebalance
            await heartbeat();

            await processOrder(data);
        }
    });
}

4. Manual Commit Offset

// Manual commit: kiểm soát chính xác khi nào commit
await consumer.run({
    autoCommit: false,

    eachMessage: async ({ topic, partition, message }) => {
        try {
            await processMessage(message);

            // Commit sau khi xử lý thành công
            await consumer.commitOffsets([{
                topic,
                partition,
                offset: (parseInt(message.offset) + 1).toString()
            }]);

        } catch (err) {
            console.error('Processing failed:', err);
            // KHÔNG commit → message sẽ được đọc lại
        }
    }
});

// Hoặc commit theo batch
await consumer.run({
    autoCommit: false,
    eachBatch: async ({ batch, resolveOffset, commitOffsetsIfNecessary }) => {
        for (const message of batch.messages) {
            await processMessage(message);
            resolveOffset(message.offset);
        }
        await commitOffsetsIfNecessary();
    }
});
⚠️ At-least-once vs At-most-once:
• Commit trước khi xử lý → At-most-once (có thể mất message)
• Commit sau khi xử lý → At-least-once (có thể trùng message)
Exactly-once: cần Kafka transactions (phức tạp hơn)

5. Graceful Shutdown

// Graceful shutdown: đóng connections sạch sẽ
const errorTypes = ['unhandledRejection', 'uncaughtException'];
const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2'];

errorTypes.forEach(type => {
    process.on(type, async (e) => {
        console.log(`Error ${type}:`, e);
        await consumer.disconnect();
        process.exit(1);
    });
});

signalTraps.forEach(signal => {
    process.once(signal, async () => {
        console.log(`Signal ${signal} received, shutting down...`);
        await consumer.disconnect();
        await producer.disconnect();
    });
});

📝 Tóm Tắt