NestJS BullMQ에서 Kafka로 전환하기
NestJS 애플리케이션에서 메시지 큐를 BullMQ에서 Kafka로 전환한 경험을 공유합니다
  • Development

BullMQ에서 Kafka로 전환하게 된 이유

메시지 큐는 현대 분산 시스템에서 매우 중요한 역할을 합니다. BullMQ는 Redis 기반의 큐 시스템으로 간단한 작업 큐를 구현하기에 좋은 도구입니다. 하지만 시스템이 커지고 처리해야 할 메시지의 양이 늘어나면서 다음과 같은 한계점들이 드러났습니다:

  1. 확장성 제한

    • Redis 기반 큐의 한계
    • 대용량 메시지 처리의 어려움
  2. 메시지 영속성

    • Redis의 메모리 기반 특성으로 인한 데이터 유실 가능성
    • 장애 복구 시나리오의 한계
  3. 분산 처리의 한계

    • 복잡한 파티셔닝 구현의 어려움
    • 수평적 확장의 제한

이러한 문제들을 해결하기 위해 Kafka로의 전환을 결정했습니다.

Kafka 소개

Apache Kafka는 LinkedIn에서 개발한 분산 스트리밍 플랫폼입니다. 다음과 같은 특징을 가지고 있습니다:

  1. 높은 처리량

    • 초당 수백만 건의 메시지 처리 가능
    • 수평적 확장이 용이
  2. 메시지 영속성

    • 디스크 기반 저장으로 안정적인 데이터 보관
    • 설정된 보관 기간 동안 메시지 유지
  3. 분산 시스템 지원

    • 파티셔닝을 통한 병렬 처리
    • 고가용성을 위한 리플리케이션

NestJS에서 Kafka 구현하기

1. 의존성 설치

Copy
npm install kafkajs

2. Kafka 설정 및 연결

Copy
// kafka.config.ts
import { Kafka } from 'kafkajs';

export const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092'],
});

3. Producer 구현

Copy
// kafka.producer.service.ts
import { Injectable, OnModuleInit } from '@nestjs/common';
import { Producer } from 'kafkajs';
import { kafka } from './kafka.config';

@Injectable()
export class KafkaProducerService implements OnModuleInit {
  private producer: Producer;

  constructor() {
    this.producer = kafka.producer();
  }

  async onModuleInit() {
    await this.producer.connect();
  }

  async sendMessage(topic: string, message: any) {
    await this.producer.send({
      topic,
      messages: [
        { 
          value: JSON.stringify(message) 
        },
      ],
    });
  }

  async onModuleDestroy() {
    await this.producer.disconnect();
  }
}

4. Consumer 구현

Copy
// kafka.consumer.service.ts
import { Injectable, OnModuleInit } from '@nestjs/common';
import { Consumer } from 'kafkajs';
import { kafka } from './kafka.config';

@Injectable()
export class KafkaConsumerService implements OnModuleInit {
  private consumer: Consumer;

  constructor() {
    this.consumer = kafka.consumer({ 
      groupId: 'my-consumer-group' 
    });
  }

  async onModuleInit() {
    await this.consumer.connect();
  }

  async subscribe(topic: string, callback: (message: any) => Promise<void>) {
    await this.consumer.subscribe({ topic, fromBeginning: false });
    
    await this.consumer.run({
      eachMessage: async ({ message }) => {
        const value = message.value?.toString();
        if (value) {
          await callback(JSON.parse(value));
        }
      },
    });
  }

  async onModuleDestroy() {
    await this.consumer.disconnect();
  }
}

5. 실제 사용 예시

Copy
// email.service.ts
import { Injectable, OnModuleInit } from '@nestjs/common';
import { KafkaConsumerService } from './kafka.consumer.service';

@Injectable()
export class EmailService implements OnModuleInit {
  constructor(private kafkaConsumerService: KafkaConsumerService) {}

  async onModuleInit() {
    // BullMQ의 기존 큐 이름을 Kafka 토픽으로 사용
    await this.kafkaConsumerService.subscribe('email-notification', 
      async (message) => {
        await this.processEmail(message);
      }
    );
  }

  private async processEmail(message: any) {
    // 기존 BullMQ의 프로세스 로직을 그대로 활용
    console.log('Processing email:', message);
    // 이메일 전송 로직...
  }
}

6. 모듈 설정

Copy
// kafka.module.ts
import { Module } from '@nestjs/common';
import { KafkaProducerService } from './kafka.producer.service';
import { KafkaConsumerService } from './kafka.consumer.service';

@Module({
  providers: [KafkaProducerService, KafkaConsumerService],
  exports: [KafkaProducerService, KafkaConsumerService],
})
export class KafkaModule {}

BullMQ에서 Kafka로 전환 전략

1. 점진적 전환

한 번에 모든 큐를 전환하는 것은 위험할 수 있습니다. 다음과 같은 단계별 접근을 추천합니다:

  1. 신규 기능은 Kafka로 구현
  2. 기존 BullMQ 큐를 우선순위에 따라 순차적으로 전환
  3. 전환 과정에서 두 시스템을 병행 운영

2. 메시지 포맷 통일

Copy
// message.interface.ts
interface KafkaMessage {
  id: string;
  type: string;
  data: any;
  timestamp: number;
  metadata?: {
    retry?: number;
    originalQueue?: string;
  };
}

3. 에러 처리와 재시도 전략

Copy
// retry.strategy.ts
import { Injectable } from '@nestjs/common';
import { KafkaMessage } from './message.interface';

@Injectable()
export class RetryStrategy {
  async handleError(message: KafkaMessage, error: Error) {
    const maxRetries = 3;
    const retryCount = message.metadata?.retry || 0;

    if (retryCount < maxRetries) {
      // 재시도 로직
      return this.retryMessage({
        ...message,
        metadata: {
          ...message.metadata,
          retry: retryCount + 1
        }
      });
    }

    // 최대 재시도 횟수 초과 시 처리
    await this.handleFailedMessage(message, error);
  }
}

모니터링 및 운영

1. 헬스체크 구현

Copy
// health.controller.ts
import { Controller, Get } from '@nestjs/common';
import { HealthCheck, HealthCheckService } from '@nestjs/terminus';
import { KafkaHealthIndicator } from './kafka.health';

@Controller('health')
export class HealthController {
  constructor(
    private health: HealthCheckService,
    private kafkaHealth: KafkaHealthIndicator,
  ) {}

  @Get()
  @HealthCheck()
  check() {
    return this.health.check([
      () => this.kafkaHealth.isHealthy('kafka'),
    ]);
  }
}

2. 메트릭 수집

Copy
// metrics.service.ts
import { Injectable } from '@nestjs/common';
import { PrometheusService } from './prometheus.service';

@Injectable()
export class MetricsService {
  private messageCounter;
  private processingTimeHistogram;

  constructor(private prometheusService: PrometheusService) {
    this.messageCounter = this.prometheusService.createCounter({
      name: 'kafka_messages_total',
      help: 'Total number of Kafka messages processed'
    });

    this.processingTimeHistogram = this.prometheusService.createHistogram({
      name: 'kafka_message_processing_time',
      help: 'Time spent processing Kafka messages'
    });
  }

  recordMessage(topic: string) {
    this.messageCounter.inc({ topic });
  }

  recordProcessingTime(topic: string, time: number) {
    this.processingTimeHistogram.observe({ topic }, time);
  }
}

결론

BullMQ에서 Kafka로의 전환은 시스템의 확장성과 안정성을 크게 향상시켰습니다. 주요 개선 사항은 다음과 같습니다:

  1. 처리량 향상

    • 초당 처리 메시지 수 증가
    • 지연 시간 감소
  2. 안정성 개선

    • 메시지 유실 감소
    • 시스템 장애 복구 능력 향상
  3. 운영 효율성

    • 모니터링 및 디버깅 용이
    • 확장 작업 간소화

다만, Kafka는 BullMQ에 비해 초기 설정과 운영이 복잡하므로, 시스템 규모와 요구사항을 고려하여 전환을 결정하는 것이 중요합니다.

참고 자료

본 글은 AI를 통해 작성되었습니다.