BullMQ에서 Kafka로 전환하게 된 이유
메시지 큐는 현대 분산 시스템에서 매우 중요한 역할을 합니다. BullMQ는 Redis 기반의 큐 시스템으로 간단한 작업 큐를 구현하기에 좋은 도구입니다. 하지만 시스템이 커지고 처리해야 할 메시지의 양이 늘어나면서 다음과 같은 한계점들이 드러났습니다:
-
확장성 제한
- Redis 기반 큐의 한계
- 대용량 메시지 처리의 어려움
-
메시지 영속성
- Redis의 메모리 기반 특성으로 인한 데이터 유실 가능성
- 장애 복구 시나리오의 한계
-
분산 처리의 한계
- 복잡한 파티셔닝 구현의 어려움
- 수평적 확장의 제한
이러한 문제들을 해결하기 위해 Kafka로의 전환을 결정했습니다.
Kafka 소개
Apache Kafka는 LinkedIn에서 개발한 분산 스트리밍 플랫폼입니다. 다음과 같은 특징을 가지고 있습니다:
-
높은 처리량
- 초당 수백만 건의 메시지 처리 가능
- 수평적 확장이 용이
-
메시지 영속성
- 디스크 기반 저장으로 안정적인 데이터 보관
- 설정된 보관 기간 동안 메시지 유지
-
분산 시스템 지원
- 파티셔닝을 통한 병렬 처리
- 고가용성을 위한 리플리케이션
NestJS에서 Kafka 구현하기
1. 의존성 설치
npm install kafkajs
2. Kafka 설정 및 연결
// kafka.config.ts
import { Kafka } from 'kafkajs';
export const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'],
});
3. Producer 구현
// kafka.producer.service.ts
import { Injectable, OnModuleInit } from '@nestjs/common';
import { Producer } from 'kafkajs';
import { kafka } from './kafka.config';
@Injectable()
export class KafkaProducerService implements OnModuleInit {
private producer: Producer;
constructor() {
this.producer = kafka.producer();
}
async onModuleInit() {
await this.producer.connect();
}
async sendMessage(topic: string, message: any) {
await this.producer.send({
topic,
messages: [
{
value: JSON.stringify(message)
},
],
});
}
async onModuleDestroy() {
await this.producer.disconnect();
}
}
4. Consumer 구현
// kafka.consumer.service.ts
import { Injectable, OnModuleInit } from '@nestjs/common';
import { Consumer } from 'kafkajs';
import { kafka } from './kafka.config';
@Injectable()
export class KafkaConsumerService implements OnModuleInit {
private consumer: Consumer;
constructor() {
this.consumer = kafka.consumer({
groupId: 'my-consumer-group'
});
}
async onModuleInit() {
await this.consumer.connect();
}
async subscribe(topic: string, callback: (message: any) => Promise<void>) {
await this.consumer.subscribe({ topic, fromBeginning: false });
await this.consumer.run({
eachMessage: async ({ message }) => {
const value = message.value?.toString();
if (value) {
await callback(JSON.parse(value));
}
},
});
}
async onModuleDestroy() {
await this.consumer.disconnect();
}
}
5. 실제 사용 예시
// email.service.ts
import { Injectable, OnModuleInit } from '@nestjs/common';
import { KafkaConsumerService } from './kafka.consumer.service';
@Injectable()
export class EmailService implements OnModuleInit {
constructor(private kafkaConsumerService: KafkaConsumerService) {}
async onModuleInit() {
// BullMQ의 기존 큐 이름을 Kafka 토픽으로 사용
await this.kafkaConsumerService.subscribe('email-notification',
async (message) => {
await this.processEmail(message);
}
);
}
private async processEmail(message: any) {
// 기존 BullMQ의 프로세스 로직을 그대로 활용
console.log('Processing email:', message);
// 이메일 전송 로직...
}
}
6. 모듈 설정
// kafka.module.ts
import { Module } from '@nestjs/common';
import { KafkaProducerService } from './kafka.producer.service';
import { KafkaConsumerService } from './kafka.consumer.service';
@Module({
providers: [KafkaProducerService, KafkaConsumerService],
exports: [KafkaProducerService, KafkaConsumerService],
})
export class KafkaModule {}
BullMQ에서 Kafka로 전환 전략
1. 점진적 전환
한 번에 모든 큐를 전환하는 것은 위험할 수 있습니다. 다음과 같은 단계별 접근을 추천합니다:
- 신규 기능은 Kafka로 구현
- 기존 BullMQ 큐를 우선순위에 따라 순차적으로 전환
- 전환 과정에서 두 시스템을 병행 운영
2. 메시지 포맷 통일
// message.interface.ts
interface KafkaMessage {
id: string;
type: string;
data: any;
timestamp: number;
metadata?: {
retry?: number;
originalQueue?: string;
};
}
3. 에러 처리와 재시도 전략
// retry.strategy.ts
import { Injectable } from '@nestjs/common';
import { KafkaMessage } from './message.interface';
@Injectable()
export class RetryStrategy {
async handleError(message: KafkaMessage, error: Error) {
const maxRetries = 3;
const retryCount = message.metadata?.retry || 0;
if (retryCount < maxRetries) {
// 재시도 로직
return this.retryMessage({
...message,
metadata: {
...message.metadata,
retry: retryCount + 1
}
});
}
// 최대 재시도 횟수 초과 시 처리
await this.handleFailedMessage(message, error);
}
}
모니터링 및 운영
1. 헬스체크 구현
// health.controller.ts
import { Controller, Get } from '@nestjs/common';
import { HealthCheck, HealthCheckService } from '@nestjs/terminus';
import { KafkaHealthIndicator } from './kafka.health';
@Controller('health')
export class HealthController {
constructor(
private health: HealthCheckService,
private kafkaHealth: KafkaHealthIndicator,
) {}
@Get()
@HealthCheck()
check() {
return this.health.check([
() => this.kafkaHealth.isHealthy('kafka'),
]);
}
}
2. 메트릭 수집
// metrics.service.ts
import { Injectable } from '@nestjs/common';
import { PrometheusService } from './prometheus.service';
@Injectable()
export class MetricsService {
private messageCounter;
private processingTimeHistogram;
constructor(private prometheusService: PrometheusService) {
this.messageCounter = this.prometheusService.createCounter({
name: 'kafka_messages_total',
help: 'Total number of Kafka messages processed'
});
this.processingTimeHistogram = this.prometheusService.createHistogram({
name: 'kafka_message_processing_time',
help: 'Time spent processing Kafka messages'
});
}
recordMessage(topic: string) {
this.messageCounter.inc({ topic });
}
recordProcessingTime(topic: string, time: number) {
this.processingTimeHistogram.observe({ topic }, time);
}
}
결론
BullMQ에서 Kafka로의 전환은 시스템의 확장성과 안정성을 크게 향상시켰습니다. 주요 개선 사항은 다음과 같습니다:
-
처리량 향상
- 초당 처리 메시지 수 증가
- 지연 시간 감소
-
안정성 개선
- 메시지 유실 감소
- 시스템 장애 복구 능력 향상
-
운영 효율성
- 모니터링 및 디버깅 용이
- 확장 작업 간소화
다만, Kafka는 BullMQ에 비해 초기 설정과 운영이 복잡하므로, 시스템 규모와 요구사항을 고려하여 전환을 결정하는 것이 중요합니다.
참고 자료
- NestJS 공식 문서: https://docs.nestjs.com/microservices/kafka
- Kafka 공식 문서: https://kafka.apache.org/documentation/
- KafkaJS 문서: https://kafka.js.org/docs/getting-started