Server services/nest.js

[Nest.js] Kafka microservice

방피터 2023. 6. 25. 23:36

Nest.js에서는 공식적으로

kafka microservice를 제공하고 있어.

kafka micro service

근데

구려. ㅋ

해보면 알아~ 해보자구 ㅋ

nest new kafka_msa_test

위 명령으로 nest.js 새 프로젝트 시작

새 프로젝트 설치 완료

카프카랑 @nestjs/microservices도 미리 설치해주고~

npm i --save kafkajs
npm i --save @nestjs/microservices

그리고 main.ts에다가 microservice를 추가해 줄거야~

👇👇👇

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { Transport } from '@nestjs/microservices';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  app.connectMicroservice({
    transport: Transport.KAFKA,
    options: {
      client: {
        clientId: 'peterKafkaClient',
        brokers: ['localhost:9094'],
      },
      consumer: {
        groupId: 'peterKafkaGroup',
      },
    },
  });

  await app.listen(3000);
  await app.startAllMicroservices();
}
bootstrap();

여기에 명시된 broker는 내가 로컬 도커에 띄운 kafka 브로커임.

아래 글 참고

2023.06.25 - [Server services/Docker] - [Docker] Docker desktop + kafka + zookeeper + kafka ui

 

[Docker] Docker desktop + kafka + zookeeper + kafka ui

도커에 대한 설명? 구글링 해보면 "경량화된 가상화 서버"라는 말을 어렵고 유식하게 설명해 놓은 글이 차고 넘치니 아무거나 읽어보는 척하면서 스크롤 죽죽 당기자. 왠지 이런 툴들은 리눅스

engschool.tistory.com

그리고 기본적으로 생성된 app.controller.ts에다가

아래와 같이 @MessagePattern('구독할 토픽') 을 넣어주면 끝!

import { Controller, Get } from '@nestjs/common';
import { AppService } from './app.service';
import { MessagePattern, Payload } from '@nestjs/microservices';

@Controller()
export class AppController {
  constructor(private readonly appService: AppService) {}
  
  //이부분 추가요~
  @MessagePattern('peterKafka')
  helloKafka(@Payload() payload) {
    console.log(JSON.stringify(payload));
  }

  @Get()
  getHello(): string {
    return this.appService.getHello();
  }
}

이제 테스트를 해보자.

npm run start

kafka-ui에서 확인해보면 자신이 설정한 카프가 그룹이 보일거야. 

이제 구독중인 토픽으로 메세지를 날려보자구

localhost:8080 (kafka-ui)

그럼 콘솔창에서 정상적으로 메세지가 수신되는 걸 확인할 수가 있지!

👇👇👇🎉🎉🎉

정상적으로 메세지가 수신된다.

엄청 쉽고 강력해! ㅋㅋ

그런데 좀 구린게 있어.

토픽을 dynamic하게 구독할 수가 없어.

구독할 토픽을 변경하려면

코드나 설정을 변경한 다음에

서버를 내렸다가 다시 올려야함 -_-;;

이게 메세지를 발행할 때도 문제가 돼 ㅋ

response 받아야 하는 경우가 있잖아?

그럴 땐 response 토픽을 수신해야 하는데;;;

이때 dynamic하게 추가할 수 없으면;;; 음.. 구려~

 

그래서 난 nest.js에서 제공하는 kafka micro service 대신

그냥 kafkajs를 써서 kafka agent를 구현할 거임 ㅋ

다음 시간에! ㅋ 안녕!

반응형