← Về danh sách bài họcBài 17/20

🌊 Bài 17: Kafka Streams & Stream Processing

⏱️ Thời gian đọc: 25 phút | 📚 Độ khó: Nâng cao

🎯 Sau bài học này, bạn sẽ:

1. Stream Processing Là Gì?

📌 Batch vs Stream:
Batch: Thu thập data → xử lý sau (hàng giờ/ngày). VD: báo cáo hàng ngày
Stream: Xử lý real-time khi data đến. VD: fraud detection, live dashboard
Stream Processing Pipeline:

Source (Kafka) → Transform → Aggregate → Sink (Kafka/DB)

VD: E-commerce real-time analytics
┌─────────┐    ┌───────────┐    ┌────────────┐    ┌──────────┐
│ orders  │───▶│ Filter    │───▶│ Aggregate  │───▶│ metrics  │
│ topic   │    │ (> 100K)  │    │ (per hour) │    │ topic    │
└─────────┘    └───────────┘    └────────────┘    └──────────┘

2. Real-time Pipeline với Node.js

// stream-processor.js
const { Kafka } = require('kafkajs');

const kafka = new Kafka({ clientId: 'stream-processor', brokers: ['localhost:9092'] });
const consumer = kafka.consumer({ groupId: 'stream-processors' });
const producer = kafka.producer();

// Windowed aggregation state
const windows = new Map(); // windowKey → { count, total }
const WINDOW_SIZE = 60000; // 1 phút

async function processStream() {
    await consumer.connect();
    await producer.connect();
    await consumer.subscribe({ topic: 'orders' });

    await consumer.run({
        eachMessage: async ({ message }) => {
            const order = JSON.parse(message.value.toString());

            // 1. Filter: chỉ xử lý orders > 100K
            if (order.total < 100000) return;

            // 2. Transform: enrich data
            const enriched = {
                ...order,
                category: categorize(order.total),
                processedAt: new Date().toISOString()
            };

            // 3. Windowed aggregation
            const windowKey = Math.floor(Date.now() / WINDOW_SIZE);
            if (!windows.has(windowKey)) {
                windows.set(windowKey, { count: 0, total: 0 });
            }
            const window = windows.get(windowKey);
            window.count++;
            window.total += order.total;

            // 4. Emit to output topic
            await producer.send({
                topic: 'order-metrics',
                messages: [{
                    key: `window-${windowKey}`,
                    value: JSON.stringify({
                        window: windowKey,
                        orderCount: window.count,
                        totalRevenue: window.total,
                        avgOrderValue: window.total / window.count
                    })
                }]
            });
        }
    });
}

function categorize(total) {
    if (total >= 10000000) return 'premium';
    if (total >= 1000000) return 'standard';
    return 'basic';
}

processStream();

3. Event Enrichment Pattern

// Enrich events bằng cách join với data khác
const userCache = new Map();

async function enrichOrder(order) {
    // Lookup user from cache/DB
    let user = userCache.get(order.userId);
    if (!user) {
        user = await fetchUserFromDB(order.userId);
        userCache.set(order.userId, user);
    }

    return {
        ...order,
        userName: user.name,
        userTier: user.tier,
        userCity: user.city
    };
}

// Stream: orders → enriched-orders
await consumer.run({
    eachMessage: async ({ message }) => {
        const order = JSON.parse(message.value.toString());
        const enriched = await enrichOrder(order);

        await producer.send({
            topic: 'enriched-orders',
            messages: [{
                key: message.key?.toString(),
                value: JSON.stringify(enriched)
            }]
        });
    }
});

4. Exactly-Once Semantics

// Kafka Transactions: đảm bảo exactly-once
const producer = kafka.producer({
    transactionalId: 'my-transactional-producer',
    maxInFlightRequests: 1,
    idempotent: true
});

await producer.connect();

const transaction = await producer.transaction();
try {
    // Gửi messages trong transaction
    await transaction.send({
        topic: 'output-topic',
        messages: [{ value: 'processed data' }]
    });

    // Commit consumer offset trong cùng transaction
    await transaction.sendOffsets({
        consumerGroupId: 'my-group',
        topics: [{
            topic: 'input-topic',
            partitions: [{ partition: 0, offset: '42' }]
        }]
    });

    await transaction.commit();
} catch (err) {
    await transaction.abort();
    throw err;
}
💡 Exactly-once use cases:
• Financial transactions
• Inventory management
• Critical event processing mà không thể chấp nhận duplicate

📝 Tóm Tắt