← Về danh sách bài họcBài 20/20
🚀 Bài 20: Dự Án - Real-time E-commerce Analytics
🎯 Dự án cuối khóa - Bạn sẽ xây dựng:
- Real-time E-commerce Analytics Platform
- Order event streaming với Kafka
- Stream processing: revenue, fraud detection
- Live dashboard API với WebSocket
1. Kiến Trúc Hệ Thống
┌─────────────────────────────────────────────────────────────┐
│ E-commerce Analytics Platform │
│ │
│ ┌────────────┐ ┌──────────┐ ┌──────────────────────┐ │
│ │ Order │───▶│ Kafka │───▶│ Stream Processors │ │
│ │ Simulator │ │ │ │ │ │
│ │ (Producer) │ │ Topics: │ │ • Revenue Calculator │ │
│ └────────────┘ │ • orders │ │ • Fraud Detector │ │
│ │ • metrics│ │ • Trend Analyzer │ │
│ ┌────────────┐ │ • alerts │ └──────────┬───────────┘ │
│ │ Dashboard │◀───│ │ │ │
│ │ API │ └──────────┘ ┌──────────▼───────────┐ │
│ │ (Express + │ │ Redis (Real-time │ │
│ │ WebSocket)│◀───────────────────│ State Store) │ │
│ └────────────┘ └──────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
2. Docker Compose
# docker-compose.yml
version: '3.8'
services:
kafka:
image: confluentinc/cp-kafka:7.5.0
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:29093
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:29093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
redis:
image: redis:7-alpine
ports:
- "6379:6379"
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
3. Order Simulator (Producer)
// order-simulator.js
const { Kafka } = require('kafkajs');
const crypto = require('crypto');
const kafka = new Kafka({ clientId: 'order-simulator', brokers: ['localhost:9092'] });
const producer = kafka.producer();
const products = [
{ name: 'Laptop', price: 25000000 },
{ name: 'Điện thoại', price: 15000000 },
{ name: 'Tai nghe', price: 2000000 },
{ name: 'Chuột gaming', price: 800000 },
{ name: 'Bàn phím', price: 1500000 },
{ name: 'Màn hình', price: 8000000 },
];
const cities = ['HCM', 'Hà Nội', 'Đà Nẵng', 'Cần Thơ', 'Hải Phòng'];
function generateOrder() {
const product = products[Math.floor(Math.random() * products.length)];
const quantity = Math.floor(Math.random() * 3) + 1;
return {
orderId: crypto.randomUUID(),
userId: `user-${Math.floor(Math.random() * 1000)}`,
product: product.name,
quantity,
unitPrice: product.price,
total: product.price * quantity,
city: cities[Math.floor(Math.random() * cities.length)],
timestamp: new Date().toISOString(),
// 5% chance fraud (total > 50M)
isSuspicious: Math.random() < 0.05
};
}
async function simulate() {
await producer.connect();
console.log('🛒 Order Simulator started...');
setInterval(async () => {
const order = generateOrder();
// Fraud: tạo order giá trị cực cao
if (order.isSuspicious) {
order.total = 100000000 + Math.floor(Math.random() * 50000000);
}
await producer.send({
topic: 'orders',
messages: [{
key: order.userId,
value: JSON.stringify(order),
headers: { source: 'web' }
}]
});
console.log(`📦 Order: ${order.product} x${order.quantity} = ${(order.total/1000000).toFixed(1)}M - ${order.city}`);
}, 500 + Math.random() * 1500); // 0.5-2s between orders
}
simulate();
4. Stream Processors
Revenue Calculator
// revenue-processor.js
const { Kafka } = require('kafkajs');
const Redis = require('ioredis');
const kafka = new Kafka({ clientId: 'revenue-processor', brokers: ['localhost:9092'] });
const consumer = kafka.consumer({ groupId: 'revenue-calculators' });
const producer = kafka.producer();
const redis = new Redis();
async function processRevenue() {
await consumer.connect();
await producer.connect();
await consumer.subscribe({ topic: 'orders' });
console.log('💰 Revenue Processor started...');
await consumer.run({
eachMessage: async ({ message }) => {
const order = JSON.parse(message.value.toString());
const hour = new Date(order.timestamp).toISOString().slice(0, 13);
// Update Redis counters
await redis.incrby(`revenue:total`, order.total);
await redis.incrby(`revenue:hourly:${hour}`, order.total);
await redis.incr(`orders:total`);
await redis.incr(`orders:hourly:${hour}`);
await redis.incrby(`revenue:city:${order.city}`, order.total);
await redis.incr(`orders:product:${order.product}`);
// Emit metrics to metrics topic
const metrics = {
type: 'revenue_update',
totalRevenue: parseInt(await redis.get('revenue:total') || 0),
totalOrders: parseInt(await redis.get('orders:total') || 0),
hourlyRevenue: parseInt(await redis.get(`revenue:hourly:${hour}`) || 0),
timestamp: new Date().toISOString()
};
await producer.send({
topic: 'metrics',
messages: [{ value: JSON.stringify(metrics) }]
});
}
});
}
processRevenue();
Fraud Detector
// fraud-detector.js
const { Kafka } = require('kafkajs');
const Redis = require('ioredis');
const kafka = new Kafka({ clientId: 'fraud-detector', brokers: ['localhost:9092'] });
const consumer = kafka.consumer({ groupId: 'fraud-detectors' });
const producer = kafka.producer();
const redis = new Redis();
const FRAUD_THRESHOLD = 50000000; // 50M
const MAX_ORDERS_PER_MINUTE = 10;
async function detectFraud() {
await consumer.connect();
await producer.connect();
await consumer.subscribe({ topic: 'orders' });
console.log('🔍 Fraud Detector started...');
await consumer.run({
eachMessage: async ({ message }) => {
const order = JSON.parse(message.value.toString());
const alerts = [];
// Rule 1: High value order
if (order.total > FRAUD_THRESHOLD) {
alerts.push({
type: 'HIGH_VALUE',
message: `Order ${order.orderId}: ${(order.total/1000000).toFixed(1)}M exceeds threshold`,
severity: 'HIGH'
});
}
// Rule 2: Too many orders from same user
const minute = new Date().toISOString().slice(0, 16);
const userKey = `orders:${order.userId}:${minute}`;
const orderCount = await redis.incr(userKey);
await redis.expire(userKey, 120);
if (orderCount > MAX_ORDERS_PER_MINUTE) {
alerts.push({
type: 'VELOCITY',
message: `User ${order.userId}: ${orderCount} orders in 1 minute`,
severity: 'CRITICAL'
});
}
// Emit alerts
for (const alert of alerts) {
await producer.send({
topic: 'alerts',
messages: [{
key: order.userId,
value: JSON.stringify({
...alert,
orderId: order.orderId,
userId: order.userId,
timestamp: new Date().toISOString()
})
}]
});
console.log(`🚨 ALERT [${alert.severity}]: ${alert.message}`);
}
}
});
}
detectFraud();
5. Dashboard API
// dashboard-api.js
const express = require('express');
const { createServer } = require('http');
const { Server } = require('socket.io');
const { Kafka } = require('kafkajs');
const Redis = require('ioredis');
const app = express();
const httpServer = createServer(app);
const io = new Server(httpServer, { cors: { origin: '*' } });
const redis = new Redis();
const kafka = new Kafka({ clientId: 'dashboard', brokers: ['localhost:9092'] });
const consumer = kafka.consumer({ groupId: 'dashboard-consumers' });
// REST API: Get current stats
app.get('/api/stats', async (req, res) => {
const stats = {
totalRevenue: parseInt(await redis.get('revenue:total') || 0),
totalOrders: parseInt(await redis.get('orders:total') || 0),
avgOrderValue: 0
};
if (stats.totalOrders > 0) {
stats.avgOrderValue = Math.round(stats.totalRevenue / stats.totalOrders);
}
res.json(stats);
});
// REST API: Revenue by city
app.get('/api/stats/cities', async (req, res) => {
const cities = ['HCM', 'Hà Nội', 'Đà Nẵng', 'Cần Thơ', 'Hải Phòng'];
const data = [];
for (const city of cities) {
data.push({
city,
revenue: parseInt(await redis.get(`revenue:city:${city}`) || 0)
});
}
res.json(data.sort((a, b) => b.revenue - a.revenue));
});
// WebSocket: Real-time updates
async function startRealtimeStream() {
await consumer.connect();
await consumer.subscribe({ topics: ['metrics', 'alerts'] });
await consumer.run({
eachMessage: async ({ topic, message }) => {
const data = JSON.parse(message.value.toString());
io.emit(topic, data);
}
});
}
io.on('connection', (socket) => {
console.log(`👤 Client connected: ${socket.id}`);
socket.on('disconnect', () => console.log(`👋 Client disconnected`));
});
startRealtimeStream();
httpServer.listen(3001, () => console.log('📊 Dashboard API on port 3001'));
6. Chạy Dự Án
# 1. Khởi động infrastructure
docker-compose up -d
# 2. Tạo topics
docker exec kafka kafka-topics --create --topic orders --partitions 3 --bootstrap-server localhost:9092
docker exec kafka kafka-topics --create --topic metrics --partitions 1 --bootstrap-server localhost:9092
docker exec kafka kafka-topics --create --topic alerts --partitions 1 --bootstrap-server localhost:9092
# 3. Start processors (mỗi terminal riêng)
node revenue-processor.js
node fraud-detector.js
node dashboard-api.js
# 4. Start order simulator
node order-simulator.js
# 5. Kiểm tra
curl http://localhost:3001/api/stats
curl http://localhost:3001/api/stats/cities
# Kafka UI: http://localhost:8080
7. Mở Rộng
💡 Bài tập nâng cao:
• Thêm React dashboard UI với real-time charts
• Implement Kafka Connect để sync data vào Elasticsearch
• Thêm email/SMS alerts khi phát hiện fraud
• Implement exactly-once processing với Kafka transactions
• Scale: Chạy multiple instances của mỗi processor
• Thêm A/B testing stream processor
• Thêm React dashboard UI với real-time charts
• Implement Kafka Connect để sync data vào Elasticsearch
• Thêm email/SMS alerts khi phát hiện fraud
• Implement exactly-once processing với Kafka transactions
• Scale: Chạy multiple instances của mỗi processor
• Thêm A/B testing stream processor
🎉 Hoàn Thành Khóa Học!
Chúc mừng bạn đã hoàn thành khóa học Message Queue, RabbitMQ & Kafka! Bạn đã có kiến thức nền tảng vững chắc về:
- ✅ Event Loop và Message Queue concepts
- ✅ RabbitMQ: Exchange, Queue, Binding, Clustering, Monitoring
- ✅ Apache Kafka: Producer, Consumer, Partitions, Streams, Connect
- ✅ 2 dự án thực tế: Notification System & E-commerce Analytics
Tiếp tục thực hành và đừng ngại thử nghiệm trong các dự án thực tế! 🚀