← Về danh sách bài họcBài 7/20
📦 Bài 7: Exchange, Queue & Binding
🎯 Sau bài học này, bạn sẽ:
- Hiểu 4 loại Exchange: Direct, Fanout, Topic, Headers
- Nắm Queue properties: durable, exclusive, auto-delete
- Hiểu Binding và Routing Key
- Thực hành code Node.js cho từng exchange type
1. Exchange Types
| Exchange | Routing Logic | Use Case |
|---|---|---|
| Direct | Exact match routing key | Task routing, logging levels |
| Fanout | Broadcast to ALL bound queues | Notifications, events |
| Topic | Pattern match (*.#) | Flexible event routing |
| Headers | Match message headers | Complex routing rules |
Direct Exchange
Producer → routing_key="error"
┌──── binding_key="error" ──── Error Queue ✓
Direct Exchange ──┤
└──── binding_key="info" ──── Info Queue ✗
// Direct Exchange - Log routing
const amqp = require('amqplib');
async function setupDirectExchange() {
const conn = await amqp.connect('amqp://admin:secret123@localhost');
const ch = await conn.createChannel();
// Tạo direct exchange
await ch.assertExchange('logs-direct', 'direct', { durable: true });
// Tạo queues
await ch.assertQueue('error-logs', { durable: true });
await ch.assertQueue('info-logs', { durable: true });
// Binding
ch.bindQueue('error-logs', 'logs-direct', 'error');
ch.bindQueue('info-logs', 'logs-direct', 'info');
// Publish
ch.publish('logs-direct', 'error', Buffer.from('Database connection failed!'));
ch.publish('logs-direct', 'info', Buffer.from('User logged in'));
}
Fanout Exchange
// Fanout - Broadcast to all queues
await ch.assertExchange('notifications', 'fanout', { durable: true });
await ch.assertQueue('email-queue');
await ch.assertQueue('sms-queue');
await ch.assertQueue('push-queue');
// Binding (routing key bị bỏ qua với fanout)
ch.bindQueue('email-queue', 'notifications', '');
ch.bindQueue('sms-queue', 'notifications', '');
ch.bindQueue('push-queue', 'notifications', '');
// Publish → TẤT CẢ 3 queues đều nhận
ch.publish('notifications', '', Buffer.from(
JSON.stringify({ msg: 'New order #123!' })
));
Topic Exchange
// Topic Exchange - Pattern matching
await ch.assertExchange('app-events', 'topic', { durable: true });
// Patterns:
// * = match exactly 1 word
// # = match 0 or more words
// Queue nhận tất cả events về order
ch.bindQueue('order-queue', 'app-events', 'order.*');
// Queue nhận TẤT CẢ events
ch.bindQueue('analytics-queue', 'app-events', '#');
// Queue chỉ nhận order.created
ch.bindQueue('billing-queue', 'app-events', 'order.created');
// Publish
ch.publish('app-events', 'order.created', Buffer.from('...'));
// → order-queue ✓, analytics-queue ✓, billing-queue ✓
ch.publish('app-events', 'order.shipped', Buffer.from('...'));
// → order-queue ✓, analytics-queue ✓, billing-queue ✗
ch.publish('app-events', 'user.registered', Buffer.from('...'));
// → order-queue ✗, analytics-queue ✓, billing-queue ✗
2. Queue Properties
await ch.assertQueue('my-queue', {
durable: true, // Queue tồn tại sau restart
exclusive: false, // Nhiều consumer có thể dùng
autoDelete: false, // Không tự xóa khi hết consumer
arguments: {
'x-message-ttl': 60000, // Message hết hạn sau 60s
'x-max-length': 10000, // Tối đa 10K messages
'x-dead-letter-exchange': 'dlx', // Dead letter exchange
'x-max-priority': 10 // Priority queue (0-10)
}
});
⚠️ durable vs persistent:
•
•
Cần CẢ HAI để message không bị mất khi restart!
•
durable: true → Queue tồn tại sau restart broker•
persistent: true (trong message properties) → Message được lưu diskCần CẢ HAI để message không bị mất khi restart!
3. Dead Letter Exchange (DLX)
Khi message bị reject, expired, hoặc queue đầy → message được chuyển đến Dead Letter Exchange.
// Setup Dead Letter Exchange
await ch.assertExchange('dlx', 'direct', { durable: true });
await ch.assertQueue('dead-letters', { durable: true });
ch.bindQueue('dead-letters', 'dlx', 'failed');
// Main queue với DLX
await ch.assertQueue('orders', {
durable: true,
arguments: {
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'failed',
'x-message-ttl': 30000 // 30s timeout
}
});
// Consumer reject message → vào DLX
ch.consume('orders', (msg) => {
try {
processOrder(JSON.parse(msg.content.toString()));
ch.ack(msg);
} catch (err) {
// Reject + no requeue → vào Dead Letter
ch.nack(msg, false, false);
}
});
📝 Tóm Tắt
- Direct: Exact match routing key
- Fanout: Broadcast tất cả queues
- Topic: Pattern matching (* và #)
- Queue: durable, exclusive, autoDelete, TTL, max-length
- DLX: Xử lý message lỗi/expired