← Về danh sách bài họcBài 7/20

📦 Bài 7: Exchange, Queue & Binding

⏱️ Thời gian đọc: 25 phút | 📚 Độ khó: Trung bình

🎯 Sau bài học này, bạn sẽ:

1. Exchange Types

Exchange Routing Logic Use Case
Direct Exact match routing key Task routing, logging levels
Fanout Broadcast to ALL bound queues Notifications, events
Topic Pattern match (*.#) Flexible event routing
Headers Match message headers Complex routing rules

Direct Exchange

Producer → routing_key="error"
                                  ┌──── binding_key="error" ──── Error Queue ✓
Direct Exchange ──┤
                                  └──── binding_key="info"  ──── Info Queue  ✗
// Direct Exchange - Log routing
const amqp = require('amqplib');

async function setupDirectExchange() {
    const conn = await amqp.connect('amqp://admin:secret123@localhost');
    const ch = await conn.createChannel();

    // Tạo direct exchange
    await ch.assertExchange('logs-direct', 'direct', { durable: true });

    // Tạo queues
    await ch.assertQueue('error-logs', { durable: true });
    await ch.assertQueue('info-logs', { durable: true });

    // Binding
    ch.bindQueue('error-logs', 'logs-direct', 'error');
    ch.bindQueue('info-logs', 'logs-direct', 'info');

    // Publish
    ch.publish('logs-direct', 'error', Buffer.from('Database connection failed!'));
    ch.publish('logs-direct', 'info', Buffer.from('User logged in'));
}

Fanout Exchange

// Fanout - Broadcast to all queues
await ch.assertExchange('notifications', 'fanout', { durable: true });

await ch.assertQueue('email-queue');
await ch.assertQueue('sms-queue');
await ch.assertQueue('push-queue');

// Binding (routing key bị bỏ qua với fanout)
ch.bindQueue('email-queue', 'notifications', '');
ch.bindQueue('sms-queue', 'notifications', '');
ch.bindQueue('push-queue', 'notifications', '');

// Publish → TẤT CẢ 3 queues đều nhận
ch.publish('notifications', '', Buffer.from(
    JSON.stringify({ msg: 'New order #123!' })
));

Topic Exchange

// Topic Exchange - Pattern matching
await ch.assertExchange('app-events', 'topic', { durable: true });

// Patterns:
// * = match exactly 1 word
// # = match 0 or more words

// Queue nhận tất cả events về order
ch.bindQueue('order-queue', 'app-events', 'order.*');

// Queue nhận TẤT CẢ events
ch.bindQueue('analytics-queue', 'app-events', '#');

// Queue chỉ nhận order.created
ch.bindQueue('billing-queue', 'app-events', 'order.created');

// Publish
ch.publish('app-events', 'order.created', Buffer.from('...'));
// → order-queue ✓, analytics-queue ✓, billing-queue ✓

ch.publish('app-events', 'order.shipped', Buffer.from('...'));
// → order-queue ✓, analytics-queue ✓, billing-queue ✗

ch.publish('app-events', 'user.registered', Buffer.from('...'));
// → order-queue ✗, analytics-queue ✓, billing-queue ✗

2. Queue Properties

await ch.assertQueue('my-queue', {
    durable: true,       // Queue tồn tại sau restart
    exclusive: false,    // Nhiều consumer có thể dùng
    autoDelete: false,   // Không tự xóa khi hết consumer
    arguments: {
        'x-message-ttl': 60000,        // Message hết hạn sau 60s
        'x-max-length': 10000,         // Tối đa 10K messages
        'x-dead-letter-exchange': 'dlx', // Dead letter exchange
        'x-max-priority': 10           // Priority queue (0-10)
    }
});
⚠️ durable vs persistent:
durable: true → Queue tồn tại sau restart broker
persistent: true (trong message properties) → Message được lưu disk
Cần CẢ HAI để message không bị mất khi restart!

3. Dead Letter Exchange (DLX)

Khi message bị reject, expired, hoặc queue đầy → message được chuyển đến Dead Letter Exchange.

// Setup Dead Letter Exchange
await ch.assertExchange('dlx', 'direct', { durable: true });
await ch.assertQueue('dead-letters', { durable: true });
ch.bindQueue('dead-letters', 'dlx', 'failed');

// Main queue với DLX
await ch.assertQueue('orders', {
    durable: true,
    arguments: {
        'x-dead-letter-exchange': 'dlx',
        'x-dead-letter-routing-key': 'failed',
        'x-message-ttl': 30000 // 30s timeout
    }
});

// Consumer reject message → vào DLX
ch.consume('orders', (msg) => {
    try {
        processOrder(JSON.parse(msg.content.toString()));
        ch.ack(msg);
    } catch (err) {
        // Reject + no requeue → vào Dead Letter
        ch.nack(msg, false, false);
    }
});

📝 Tóm Tắt