메시지 큐는 현대 분산 시스템에서 매우 중요한 역할을 합니다. BullMQ는 Redis 기반의 큐 시스템으로 간단한 작업 큐를 구현하기에 좋은 도구입니다. 하지만 시스템이 커지고 처리해야 할 메시지의 양이 늘어나면서 다음과 같은 한계점들이 드러났습니다:
확장성 제한
메시지 영속성
분산 처리의 한계
이러한 문제들을 해결하기 위해 Kafka로의 전환을 결정했습니다.
Apache Kafka는 LinkedIn에서 개발한 분산 스트리밍 플랫폼입니다. 다음과 같은 특징을 가지고 있습니다:
높은 처리량
메시지 영속성
분산 시스템 지원
npm install kafkajs
// kafka.config.ts
import { Kafka } from 'kafkajs';
export const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'],
});
// kafka.producer.service.ts
import { Injectable, OnModuleInit } from '@nestjs/common';
import { Producer } from 'kafkajs';
import { kafka } from './kafka.config';
@Injectable()
export class KafkaProducerService implements OnModuleInit {
private producer: Producer;
constructor() {
this.producer = kafka.producer();
}
async onModuleInit() {
await this.producer.connect();
}
async sendMessage(topic: string, message: any) {
await this.producer.send({
topic,
messages: [
{
value: JSON.stringify(message)
},
],
});
}
async onModuleDestroy() {
await this.producer.disconnect();
}
}
// kafka.consumer.service.ts
import { Injectable, OnModuleInit } from '@nestjs/common';
import { Consumer } from 'kafkajs';
import { kafka } from './kafka.config';
@Injectable()
export class KafkaConsumerService implements OnModuleInit {
private consumer: Consumer;
constructor() {
this.consumer = kafka.consumer({
groupId: 'my-consumer-group'
});
}
async onModuleInit() {
await this.consumer.connect();
}
async subscribe(topic: string, callback: (message: any) => Promise<void>) {
await this.consumer.subscribe({ topic, fromBeginning: false });
await this.consumer.run({
eachMessage: async ({ message }) => {
const value = message.value?.toString();
if (value) {
await callback(JSON.parse(value));
}
},
});
}
async onModuleDestroy() {
await this.consumer.disconnect();
}
}
// email.service.ts
import { Injectable, OnModuleInit } from '@nestjs/common';
import { KafkaConsumerService } from './kafka.consumer.service';
@Injectable()
export class EmailService implements OnModuleInit {
constructor(private kafkaConsumerService: KafkaConsumerService) {}
async onModuleInit() {
// BullMQ의 기존 큐 이름을 Kafka 토픽으로 사용
await this.kafkaConsumerService.subscribe('email-notification',
async (message) => {
await this.processEmail(message);
}
);
}
private async processEmail(message: any) {
// 기존 BullMQ의 프로세스 로직을 그대로 활용
console.log('Processing email:', message);
// 이메일 전송 로직...
}
}
// kafka.module.ts
import { Module } from '@nestjs/common';
import { KafkaProducerService } from './kafka.producer.service';
import { KafkaConsumerService } from './kafka.consumer.service';
@Module({
providers: [KafkaProducerService, KafkaConsumerService],
exports: [KafkaProducerService, KafkaConsumerService],
})
export class KafkaModule {}
한 번에 모든 큐를 전환하는 것은 위험할 수 있습니다. 다음과 같은 단계별 접근을 추천합니다:
// message.interface.ts
interface KafkaMessage {
id: string;
type: string;
data: any;
timestamp: number;
metadata?: {
retry?: number;
originalQueue?: string;
};
}
// retry.strategy.ts
import { Injectable } from '@nestjs/common';
import { KafkaMessage } from './message.interface';
@Injectable()
export class RetryStrategy {
async handleError(message: KafkaMessage, error: Error) {
const maxRetries = 3;
const retryCount = message.metadata?.retry || 0;
if (retryCount < maxRetries) {
// 재시도 로직
return this.retryMessage({
...message,
metadata: {
...message.metadata,
retry: retryCount + 1
}
});
}
// 최대 재시도 횟수 초과 시 처리
await this.handleFailedMessage(message, error);
}
}
// health.controller.ts
import { Controller, Get } from '@nestjs/common';
import { HealthCheck, HealthCheckService } from '@nestjs/terminus';
import { KafkaHealthIndicator } from './kafka.health';
@Controller('health')
export class HealthController {
constructor(
private health: HealthCheckService,
private kafkaHealth: KafkaHealthIndicator,
) {}
@Get()
@HealthCheck()
check() {
return this.health.check([
() => this.kafkaHealth.isHealthy('kafka'),
]);
}
}
// metrics.service.ts
import { Injectable } from '@nestjs/common';
import { PrometheusService } from './prometheus.service';
@Injectable()
export class MetricsService {
private messageCounter;
private processingTimeHistogram;
constructor(private prometheusService: PrometheusService) {
this.messageCounter = this.prometheusService.createCounter({
name: 'kafka_messages_total',
help: 'Total number of Kafka messages processed'
});
this.processingTimeHistogram = this.prometheusService.createHistogram({
name: 'kafka_message_processing_time',
help: 'Time spent processing Kafka messages'
});
}
recordMessage(topic: string) {
this.messageCounter.inc({ topic });
}
recordProcessingTime(topic: string, time: number) {
this.processingTimeHistogram.observe({ topic }, time);
}
}
BullMQ에서 Kafka로의 전환은 시스템의 확장성과 안정성을 크게 향상시켰습니다. 주요 개선 사항은 다음과 같습니다:
처리량 향상
안정성 개선
운영 효율성
다만, Kafka는 BullMQ에 비해 초기 설정과 운영이 복잡하므로, 시스템 규모와 요구사항을 고려하여 전환을 결정하는 것이 중요합니다.