📌

Message Queue Là Gì?

Tổng quan

Message Queue (MQ) là cơ chế giao tiếp bất đồng bộ giữa các thành phần trong hệ thống phân tán. Thay vì gọi trực tiếp (synchronous), các service gửi message vào hàng đợi và bên nhận xử lý khi sẵn sàng.

💡 Tại sao cần Message Queue?
Decouple - Tách biệt các service, giảm phụ thuộc
Scalability - Mở rộng consumer độc lập
Reliability - Message không bị mất khi service down
Async Processing - Xử lý tác vụ nặng ở background
Peak Handling - Buffer traffic đột biến

Event Loop

Event Loop là cơ chế xử lý bất đồng bộ trong JavaScript/Node.js. Hiểu Event Loop là nền tảng để hiểu cách Message Queue hoạt động.

// Minh họa Event Loop
console.log('1. Synchronous');

setTimeout(() => {
    console.log('3. Macrotask (setTimeout)');
}, 0);

Promise.resolve().then(() => {
    console.log('2. Microtask (Promise)');
});

// Output:
// 1. Synchronous
// 2. Microtask (Promise)
// 3. Macrotask (setTimeout)

Mô hình Producer - Consumer

┌──────────┐     ┌─────────────┐     ┌──────────┐
│ Producer │────▶│   Message   │────▶│ Consumer │
│ (Gửi)   │     │   Queue     │     │ (Nhận)   │
└──────────┘     └─────────────┘     └──────────┘
     │                                     │
  Gửi message                        Xử lý message
  vào queue                          từ queue

Producer gửi message → Queue lưu trữ → Consumer nhận và xử lý. Đơn giản nhưng rất mạnh mẽ!

⚖️

RabbitMQ vs Kafka

So sánh nhanh

Tiêu chí 🐰 RabbitMQ 🦅 Kafka
Mô hình Message Broker (AMQP) Distributed Log
Throughput ~50K msg/s ~1M msg/s
Lưu trữ Xóa sau khi consume Giữ lại theo retention
Routing Linh hoạt (Exchange) Đơn giản (Topic)
Use case Task queue, RPC Event streaming, Log
Ordering Per queue Per partition
Replay Không Có (offset reset)

Khi nào dùng gì?

🐰 Chọn RabbitMQ khi:
• Cần routing phức tạp (topic, headers, fanout)
• Cần priority queue hoặc delayed message
• Mô hình request-reply (RPC)
• Ứng dụng nhỏ-vừa, cần đơn giản
🦅 Chọn Kafka khi:
• Throughput cực cao (millions msg/s)
• Cần replay/reprocess message
• Event sourcing, stream processing
• Big data pipeline, analytics
📚

Khóa Học Message Queue Từ A Đến Z (20 Bài)

🎯 Khóa học toàn diện! Từ Event Loop cơ bản đến xây dựng hệ thống phân tán với RabbitMQ và Kafka. Bao gồm dự án thực tế cuối mỗi phần.
🐰

RabbitMQ Quick Start

Cài đặt với Docker

# Chạy RabbitMQ với Management UI
docker run -d --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  rabbitmq:3-management

# Truy cập Management UI
# http://localhost:15672
# Username: guest | Password: guest

Producer (Node.js)

const amqp = require('amqplib');

async function sendMessage() {
    // Kết nối RabbitMQ
    const conn = await amqp.connect('amqp://localhost');
    const channel = await conn.createChannel();

    const queue = 'hello';
    const message = 'Xin chào từ RabbitMQ! 🐰';

    // Đảm bảo queue tồn tại
    await channel.assertQueue(queue, { durable: true });

    // Gửi message
    channel.sendToQueue(queue, Buffer.from(message), {
        persistent: true  // Message không mất khi restart
    });

    console.log(`[✓] Đã gửi: "${message}"`);

    setTimeout(() => {
        conn.close();
        process.exit(0);
    }, 500);
}

sendMessage();

Consumer (Node.js)

const amqp = require('amqplib');

async function receiveMessage() {
    const conn = await amqp.connect('amqp://localhost');
    const channel = await conn.createChannel();

    const queue = 'hello';
    await channel.assertQueue(queue, { durable: true });

    // Chỉ nhận 1 message tại 1 thời điểm
    channel.prefetch(1);

    console.log('[*] Đang chờ message...');

    channel.consume(queue, (msg) => {
        const content = msg.content.toString();
        console.log(`[✓] Nhận được: "${content}"`);

        // Xử lý xong → acknowledge
        channel.ack(msg);
    });
}

receiveMessage();
💡 Acknowledge (ACK):
Consumer phải gửi ACK sau khi xử lý xong message. Nếu consumer crash trước khi ACK, RabbitMQ sẽ gửi lại message cho consumer khác.
🦅

Kafka Quick Start

Cài đặt với Docker Compose

# docker-compose.yml
version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Producer (Node.js - KafkaJS)

const { Kafka } = require('kafkajs');

const kafka = new Kafka({
    clientId: 'my-app',
    brokers: ['localhost:9092']
});

const producer = kafka.producer();

async function sendEvent() {
    await producer.connect();

    await producer.send({
        topic: 'user-events',
        messages: [
            {
                key: 'user-123',
                value: JSON.stringify({
                    event: 'page_view',
                    url: '/products/iphone',
                    timestamp: Date.now()
                })
            }
        ]
    });

    console.log('[✓] Event đã được gửi!');
    await producer.disconnect();
}

sendEvent();

Consumer (Node.js - KafkaJS)

const { Kafka } = require('kafkajs');

const kafka = new Kafka({
    clientId: 'my-app',
    brokers: ['localhost:9092']
});

const consumer = kafka.consumer({
    groupId: 'analytics-group'
});

async function consumeEvents() {
    await consumer.connect();
    await consumer.subscribe({
        topic: 'user-events',
        fromBeginning: true  // Đọc từ đầu
    });

    await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
            const event = JSON.parse(message.value.toString());
            console.log(`[Partition ${partition}]`, event);
        }
    });
}

consumeEvents();
⚠️ Kafka vs RabbitMQ: Kafka giữ message ngay cả sau khi consumer đọc (theo retention policy). Bạn có thể "replay" lại event bất kỳ lúc nào bằng cách reset offset.