지난 글에서 이야기했듯이
Nest.js에서 제공하는 Kafka micro service는 조금 구려 ㅋ
그래서 kafkajs를 직접 써보려고 해.
https://kafka.js.org/docs/getting-started
메뉴얼이 굉장히 잘 써져 있고
사용법도 매우 직관적이고 간단하더라구.
시간내서 한번 읽어보는 것을 추천해.
자 새로운 nestjs 프로젝트 만들고 kafkajs 설치하고~
nest new kafkajs
npm install kafkajs
귀찮게 새로 만들지 말고 app.service.js에다가 코딩 ㄱㄱ
👇👇👇
import { Injectable } from '@nestjs/common';
import { EachMessagePayload, Kafka } from 'kafkajs';
@Injectable()
export class AppService {
private kafka = new Kafka({
clientId: "peter's kafka",
brokers: ['localhost:9094'],
});
private producer = this.kafka.producer();
private consumer = this.kafka.consumer({ groupId: "peter's kafka group" });
constructor() {
this.consumer.connect();//접속
this.consumer.subscribe({ topics: ['test_a', 'test_b'] });//구독
this.consumer.run({//메세지 수신 뱅뱅이
eachMessage: this.consumerCallback,//메세지 수신 콜백
});
}
async consumerCallback(payload: EachMessagePayload) {//메세지 수신 콜백
console.log('kafka message arrived');
console.log(
`topic: ${payload.topic}, Message:${payload.message.value.toString()}`,
);
}
getHello(): string {
return 'Hello World!';
}
}
코드가 뭐 너무 짧아서 설명할게 없어.
초기화해주고 consumer 하나 선언하고
접속 -> 구독 -> 수신 이게 전부 다야.
메세지 들어오면 콜백 호출되서 메세지 뿌리고
그리고 kafka broker는 미리 준비되어 있어야겠지?
👇👇👇
2023.06.25 - [Server services/Docker] - [Docker] Docker desktop + kafka + zookeeper + kafka ui
그리고 나서 실행
npm run start:dev
그리고 kafka-ui를 통해 토픽을 생성해보면!
아주우우~ 잘 동작하는 걸 볼수가 있지!
자! 이제 새로운 토픽을 생성하는 엔드포인트를 하나 만들어보자구 ㅋ
이것도 귀찮으니까 app.controller.ts에다가 만들어봅시다.
👇👇👇
import { Body, Controller, Get, Post, Query } from '@nestjs/common';
import { AppService } from './app.service';
@Controller()
export class AppController {
constructor(private readonly appService: AppService) {}
@Get()
getHello(): string {
return this.appService.getHello();
}
//꼴랑 이거 하나 추가
@Post('add-topic')
async addSubscriptionTopic(@Body('topic') topic: string): Promise<string> {
console.log(topic);
if (topic == undefined) {
return 'topic is undefined';
} else {
await this.appService.addSubscriptionTopic(topic);
return `topic ${topic} added`;
}
}
}
그 다음에는 app.service.ts를 업데이트
import { Injectable } from '@nestjs/common';
import { EachMessagePayload, Kafka } from 'kafkajs';
@Injectable()
export class AppService {
private kafka = new Kafka({
clientId: "peter's kafka",
brokers: ['localhost:9094'],
});
private producer = this.kafka.producer();
private consumer = this.kafka.consumer({ groupId: "peter's kafka group" });
constructor() {
this.consumer.connect();
this.consumer.subscribe({ topics: ['test_a', 'test_b'] });
this.consumer.run({
eachMessage: this.consumerCallback,
});
}
async consumerCallback(payload: EachMessagePayload) {
console.log('kafka message arrived');
console.log(
`topic: ${payload.topic}, Message:${payload.message.value.toString()}`,
);
}
//아래 함수를 추가한 거임!!!!!
async addSubscriptionTopic(topic: string) {
await this.consumer.stop(); // 컨슈머 멈추고
await this.consumer.subscribe({ topic }); // 구독하고
await this.consumer.run({ // 다시 돌돌이!
eachMessage: this.consumerCallback,
});
}
getHello(): string {
return 'Hello World!';
}
}
별거 없어.
컨슈머 잠시 멈추고, 토픽 추가 구독하고 다시 돌돌이!
이런식으로 dynamic하게 토픽을 추가할 수 있지.
테스트를 해보면 👇👇
먼저 localhost:3000/app-topic에 post로 구독할 토픽을 보내.
그러면 addSubscriptionTopic이 호출되서
토픽이 추가될거야.
그리고 kafka-ui에서 추가된 토픽으로
메세지를 보내면 console창에 출력이 되겠지!.
ㅎㅎㅎ 매우~ 잘 동작해.
🎉🎉🎉🎉
이런식으로 nestjs에다가 kafkajs 올려서 사용하면 좋을 듯!
매우 훌륭해! ㅎㅎㅎ
'Server services > nest.js' 카테고리의 다른 글
[Nest.js] Kafka microservice (0) | 2023.06.25 |
---|