← Về danh sách bài họcBài 16/20

👥 Bài 16: Consumer Groups & Rebalancing

⏱️ Thời gian đọc: 22 phút | 📚 Độ khó: Nâng cao

🎯 Sau bài học này, bạn sẽ:

1. Consumer Group Hoạt Động

Topic "orders" (6 partitions)

Consumer Group: "order-processors"
┌────────────────────────────────────────────┐
│ Consumer 1: P0, P1                         │
│ Consumer 2: P2, P3                         │
│ Consumer 3: P4, P5                         │
└────────────────────────────────────────────┘

Consumer Group: "analytics"
┌────────────────────────────────────────────┐
│ Consumer A: P0, P1, P2, P3, P4, P5        │
│ (1 consumer nhận TẤT CẢ partitions)       │
└────────────────────────────────────────────┘

Quy tắc:
• Mỗi partition chỉ thuộc 1 consumer trong group
• 1 consumer có thể đọc nhiều partitions
• Consumers > partitions → thừa consumer (idle)

2. Rebalancing

Khi consumer join/leave group, Kafka rebalance partitions giữa consumers:

Trước rebalance (3 consumers, 6 partitions):
  C1: P0, P1  |  C2: P2, P3  |  C3: P4, P5

C3 chết → Rebalance:
  C1: P0, P1, P4  |  C2: P2, P3, P5

C4 join → Rebalance:
  C1: P0, P1  |  C2: P2, P3  |  C4: P4, P5
// KafkaJS: Xử lý rebalance events
const consumer = kafka.consumer({
    groupId: 'my-group',
    sessionTimeout: 30000,
    rebalanceTimeout: 60000,
});

consumer.on('consumer.rebalancing', () => {
    console.log('🔄 Rebalancing started...');
});

consumer.on('consumer.group_join', ({ payload }) => {
    console.log('✅ Joined group, assigned:', payload.memberAssignment);
});
⚠️ Rebalancing gây stop-the-world:
• Mọi consumer trong group tạm dừng consume
• Có thể kéo dài vài giây đến vài phút
• Tránh consumer join/leave liên tục (flapping)

3. Consumer Lag

Partition 0:
  Latest offset:    1000  (newest message)
  Consumer offset:   850  (đang đọc đến đây)
  LAG = 1000 - 850 = 150 messages

Lag cao = consumer xử lý không kịp producer
# Check consumer lag
docker exec kafka kafka-consumer-groups \
  --describe \
  --group order-processors \
  --bootstrap-server localhost:9092

# Output:
# TOPIC    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# orders   0          850             1000            150
# orders   1          900             950             50
# orders   2          800             1200            400
💡 Giảm lag:
Scale consumers (thêm instances, nhưng ≤ số partitions)
Tối ưu processing (batch, async, caching)
Tăng partitions (cho phép thêm consumers)
• Kiểm tra bottleneck (database, external API calls)

4. Multiple Consumer Groups

// Group 1: Order processing
const orderConsumer = kafka.consumer({ groupId: 'order-processors' });
await orderConsumer.subscribe({ topic: 'orders' });
await orderConsumer.run({
    eachMessage: async ({ message }) => {
        await processOrder(JSON.parse(message.value.toString()));
    }
});

// Group 2: Analytics (đọc cùng topic, nhận TẤT CẢ messages)
const analyticsConsumer = kafka.consumer({ groupId: 'analytics' });
await analyticsConsumer.subscribe({ topic: 'orders' });
await analyticsConsumer.run({
    eachMessage: async ({ message }) => {
        await trackAnalytics(JSON.parse(message.value.toString()));
    }
});

// Cả 2 groups đều nhận TẤT CẢ messages từ topic "orders"
// Nhưng trong cùng 1 group, partitions được chia đều

5. Seek & Reset Offset

// Seek to specific offset
consumer.seek({ topic: 'orders', partition: 0, offset: '100' });

// Seek to beginning (replay tất cả)
const admin = kafka.admin();
await admin.connect();
await admin.resetOffsets({
    groupId: 'my-group',
    topic: 'orders',
    earliest: true
});

// Seek to timestamp (replay từ thời điểm cụ thể)
const offsets = await admin.fetchTopicOffsetsByTimestamp('orders', 
    Date.now() - 3600000 // 1 giờ trước
);
for (const offset of offsets) {
    consumer.seek({ topic: 'orders', partition: offset.partition, offset: offset.offset });
}

📝 Tóm Tắt