[Nest.js] Kafka microservice
Nest.js에서는 공식적으로
kafka microservice를 제공하고 있어.
근데
구려. ㅋ
해보면 알아~ 해보자구 ㅋ
nest new kafka_msa_test
위 명령으로 nest.js 새 프로젝트 시작
카프카랑 @nestjs/microservices도 미리 설치해주고~
npm i --save kafkajs
npm i --save @nestjs/microservices
그리고 main.ts에다가 microservice를 추가해 줄거야~
👇👇👇
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { Transport } from '@nestjs/microservices';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
app.connectMicroservice({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'peterKafkaClient',
brokers: ['localhost:9094'],
},
consumer: {
groupId: 'peterKafkaGroup',
},
},
});
await app.listen(3000);
await app.startAllMicroservices();
}
bootstrap();
여기에 명시된 broker는 내가 로컬 도커에 띄운 kafka 브로커임.
아래 글 참고
2023.06.25 - [Server services/Docker] - [Docker] Docker desktop + kafka + zookeeper + kafka ui
그리고 기본적으로 생성된 app.controller.ts에다가
아래와 같이 @MessagePattern('구독할 토픽') 을 넣어주면 끝!
import { Controller, Get } from '@nestjs/common';
import { AppService } from './app.service';
import { MessagePattern, Payload } from '@nestjs/microservices';
@Controller()
export class AppController {
constructor(private readonly appService: AppService) {}
//이부분 추가요~
@MessagePattern('peterKafka')
helloKafka(@Payload() payload) {
console.log(JSON.stringify(payload));
}
@Get()
getHello(): string {
return this.appService.getHello();
}
}
이제 테스트를 해보자.
npm run start
kafka-ui에서 확인해보면 자신이 설정한 카프가 그룹이 보일거야.
이제 구독중인 토픽으로 메세지를 날려보자구
그럼 콘솔창에서 정상적으로 메세지가 수신되는 걸 확인할 수가 있지!
👇👇👇🎉🎉🎉
엄청 쉽고 강력해! ㅋㅋ
그런데 좀 구린게 있어.
토픽을 dynamic하게 구독할 수가 없어.
구독할 토픽을 변경하려면
코드나 설정을 변경한 다음에
서버를 내렸다가 다시 올려야함 -_-;;
이게 메세지를 발행할 때도 문제가 돼 ㅋ
response 받아야 하는 경우가 있잖아?
그럴 땐 response 토픽을 수신해야 하는데;;;
이때 dynamic하게 추가할 수 없으면;;; 음.. 구려~
그래서 난 nest.js에서 제공하는 kafka micro service 대신
그냥 kafkajs를 써서 kafka agent를 구현할 거임 ㅋ
다음 시간에! ㅋ 안녕!