← Về danh sách bài họcBài 9/20
⚡ Bài 9: RabbitMQ Advanced Patterns
🎯 Sau bài học này, bạn sẽ:
- Triển khai retry với exponential backoff
- Sử dụng Priority Queue
- Tạo Delayed/Scheduled messages
- Xử lý Idempotency
1. Retry Với Exponential Backoff
Khi message xử lý lỗi, thay vì retry ngay (có thể gây overload), ta dùng delayed retry với thời gian tăng dần.
// retry-pattern.js
async function setupRetrySystem(ch) {
// Main exchange
await ch.assertExchange('main', 'direct', { durable: true });
// Retry exchanges (delay levels)
const delays = [5000, 15000, 60000]; // 5s, 15s, 60s
for (const delay of delays) {
await ch.assertQueue(`retry-${delay}ms`, {
durable: true,
arguments: {
'x-message-ttl': delay,
'x-dead-letter-exchange': 'main',
'x-dead-letter-routing-key': 'process'
}
});
}
// Dead Letter Queue (final failure)
await ch.assertQueue('dead-letters', { durable: true });
// Main processing queue
await ch.assertQueue('process-queue', { durable: true });
ch.bindQueue('process-queue', 'main', 'process');
}
// Consumer với retry logic
ch.consume('process-queue', async (msg) => {
const retryCount = msg.properties.headers?.['x-retry'] || 0;
const delays = [5000, 15000, 60000];
try {
await processMessage(msg);
ch.ack(msg);
} catch (err) {
ch.ack(msg); // ACK message gốc
if (retryCount < delays.length) {
const delay = delays[retryCount];
ch.sendToQueue(`retry-${delay}ms`, msg.content, {
persistent: true,
headers: { 'x-retry': retryCount + 1 }
});
console.log(`🔄 Retry ${retryCount + 1} after ${delay}ms`);
} else {
ch.sendToQueue('dead-letters', msg.content, {
persistent: true,
headers: { error: err.message }
});
console.log('💀 Max retries reached → DLQ');
}
}
});
2. Priority Queue
// Priority Queue: message ưu tiên cao được xử lý trước
await ch.assertQueue('tasks', {
durable: true,
arguments: { 'x-max-priority': 10 }
});
// Gửi messages với priority khác nhau
ch.sendToQueue('tasks', Buffer.from('Low priority task'), {
priority: 1
});
ch.sendToQueue('tasks', Buffer.from('HIGH priority task'), {
priority: 9
});
ch.sendToQueue('tasks', Buffer.from('Normal task'), {
priority: 5
});
// Consumer nhận: HIGH → Normal → Low
⚠️ Lưu ý Priority Queue:
• Ảnh hưởng performance (RabbitMQ phải sort)
• Max 255 levels, khuyến nghị ≤ 10
• Chỉ hiệu quả khi queue có messages backlog
• Ảnh hưởng performance (RabbitMQ phải sort)
• Max 255 levels, khuyến nghị ≤ 10
• Chỉ hiệu quả khi queue có messages backlog
3. Delayed Message (Scheduled)
// Phương pháp 1: TTL + DLX (không cần plugin)
async function scheduleMessage(ch, queue, data, delayMs) {
const delayQueue = `${queue}.delay.${delayMs}`;
await ch.assertQueue(delayQueue, {
durable: true,
arguments: {
'x-message-ttl': delayMs,
'x-dead-letter-exchange': '',
'x-dead-letter-routing-key': queue
}
});
ch.sendToQueue(delayQueue, Buffer.from(JSON.stringify(data)), {
persistent: true
});
console.log(`⏰ Scheduled for ${delayMs}ms later`);
}
// Sử dụng: Gửi reminder sau 24 giờ
scheduleMessage(ch, 'reminders', {
userId: 123,
message: 'Đơn hàng của bạn sắp hết hạn!'
}, 24 * 60 * 60 * 1000);
4. Idempotency (Xử lý message trùng lặp)
// Idempotent consumer: xử lý message trùng lặp an toàn
const processedIds = new Set(); // Production: dùng Redis
ch.consume('orders', async (msg) => {
const data = JSON.parse(msg.content.toString());
const messageId = msg.properties.messageId;
// Check đã xử lý chưa
if (processedIds.has(messageId)) {
console.log(`⏭️ Skipping duplicate: ${messageId}`);
ch.ack(msg);
return;
}
try {
await processOrder(data);
processedIds.add(messageId);
ch.ack(msg);
} catch (err) {
ch.nack(msg, false, true);
}
});
// Producer: luôn gửi kèm messageId unique
ch.sendToQueue('orders', Buffer.from(JSON.stringify(order)), {
messageId: require('crypto').randomUUID(),
persistent: true
});
💡 Production idempotency:
• Dùng Redis SET lưu processed message IDs
• Set TTL cho key (VD: 24h) để tránh memory leak
• Hoặc dùng database unique constraint trên messageId
• Dùng Redis SET lưu processed message IDs
• Set TTL cho key (VD: 24h) để tránh memory leak
• Hoặc dùng database unique constraint trên messageId
📝 Tóm Tắt
- Retry: Exponential backoff với TTL + DLX
- Priority Queue: Message ưu tiên cao xử lý trước
- Delayed Message: TTL + DLX để schedule
- Idempotency: Dùng messageId để tránh xử lý trùng