Don't think! Just do it!

종합 IT 기술 정체성 카오스 블로그! 이... 이곳은 어디지?

Server services/nest.js

[Nest.js] Kafkajs

방피터 2023. 6. 26. 21:47

지난 글에서 이야기했듯이

Nest.js에서 제공하는 Kafka micro service는 조금 구려 ㅋ

그래서 kafkajs를 직접 써보려고 해.

https://kafka.js.org/docs/getting-started

 

KafkaJS · KafkaJS, a modern Apache Kafka client for Node.js

KafkaJS, a modern Apache Kafka client for Node.js

kafka.js.org

메뉴얼이 굉장히 잘 써져 있고

사용법도 매우 직관적이고 간단하더라구.

시간내서 한번 읽어보는 것을 추천해.

 

자 새로운 nestjs 프로젝트 만들고 kafkajs 설치하고~

nest new kafkajs
npm install kafkajs

귀찮게 새로 만들지 말고 app.service.js에다가 코딩 ㄱㄱ

👇👇👇

import { Injectable } from '@nestjs/common';
import { EachMessagePayload, Kafka } from 'kafkajs';

@Injectable()
export class AppService {
  private kafka = new Kafka({
    clientId: "peter's kafka",
    brokers: ['localhost:9094'],
  });
  private producer = this.kafka.producer();
  private consumer = this.kafka.consumer({ groupId: "peter's kafka group" });

  constructor() {
    this.consumer.connect();//접속
    this.consumer.subscribe({ topics: ['test_a', 'test_b'] });//구독
    this.consumer.run({//메세지 수신 뱅뱅이
      eachMessage: this.consumerCallback,//메세지 수신 콜백
    });
  }

  async consumerCallback(payload: EachMessagePayload) {//메세지 수신 콜백
    console.log('kafka message arrived');
    console.log(
      `topic: ${payload.topic}, Message:${payload.message.value.toString()}`,
    );
  }

  getHello(): string {
    return 'Hello World!';
  }
}

 

코드가 뭐 너무 짧아서 설명할게 없어.

초기화해주고 consumer 하나 선언하고

접속 -> 구독 -> 수신 이게 전부 다야.

메세지 들어오면 콜백 호출되서 메세지 뿌리고

그리고 kafka broker는 미리 준비되어 있어야겠지?

👇👇👇

2023.06.25 - [Server services/Docker] - [Docker] Docker desktop + kafka + zookeeper + kafka ui

 

[Docker] Docker desktop + kafka + zookeeper + kafka ui

도커에 대한 설명? 구글링 해보면 "경량화된 가상화 서버"라는 말을 어렵고 유식하게 설명해 놓은 글이 차고 넘치니 아무거나 읽어보는 척하면서 스크롤 죽죽 당기자. 왠지 이런 툴들은 리눅스

engschool.tistory.com

그리고 나서 실행

npm run start:dev

그리고 kafka-ui를 통해 토픽을 생성해보면!

아주우우~ 잘 동작하는 걸 볼수가 있지!

자! 이제 새로운 토픽을 생성하는 엔드포인트를 하나 만들어보자구 ㅋ

이것도 귀찮으니까 app.controller.ts에다가 만들어봅시다.

👇👇👇

import { Body, Controller, Get, Post, Query } from '@nestjs/common';
import { AppService } from './app.service';

@Controller()
export class AppController {
  constructor(private readonly appService: AppService) {}

  @Get()
  getHello(): string {
    return this.appService.getHello();
  }
  //꼴랑 이거 하나 추가
  @Post('add-topic')
  async addSubscriptionTopic(@Body('topic') topic: string): Promise<string> {
    console.log(topic);
    if (topic == undefined) {
      return 'topic is undefined';
    } else {
      await this.appService.addSubscriptionTopic(topic);
      return `topic ${topic} added`;
    }
  }
}

그 다음에는 app.service.ts를 업데이트

import { Injectable } from '@nestjs/common';
import { EachMessagePayload, Kafka } from 'kafkajs';

@Injectable()
export class AppService {
  private kafka = new Kafka({
    clientId: "peter's kafka",
    brokers: ['localhost:9094'],
  });
  private producer = this.kafka.producer();
  private consumer = this.kafka.consumer({ groupId: "peter's kafka group" });

  constructor() {
    this.consumer.connect();
    this.consumer.subscribe({ topics: ['test_a', 'test_b'] });
    this.consumer.run({
      eachMessage: this.consumerCallback,
    });
  }

  async consumerCallback(payload: EachMessagePayload) {
    console.log('kafka message arrived');
    console.log(
      `topic: ${payload.topic}, Message:${payload.message.value.toString()}`,
    );
  }
  //아래 함수를 추가한 거임!!!!!
  async addSubscriptionTopic(topic: string) {
    await this.consumer.stop(); // 컨슈머 멈추고
    await this.consumer.subscribe({ topic }); // 구독하고
    await this.consumer.run({ // 다시 돌돌이!
      eachMessage: this.consumerCallback,
    });
  }
  
  getHello(): string {
    return 'Hello World!';
  }
}

별거 없어.

컨슈머 잠시 멈추고, 토픽 추가 구독하고 다시 돌돌이!

이런식으로 dynamic하게 토픽을 추가할 수 있지.

테스트를 해보면 👇👇

먼저 localhost:3000/app-topic에 post로 구독할 토픽을 보내.

그러면 addSubscriptionTopic이 호출되서

토픽이 추가될거야.

그리고 kafka-ui에서 추가된 토픽으로

메세지를 보내면 console창에 출력이 되겠지!.

dynamic topic subscription!

ㅎㅎㅎ 매우~ 잘 동작해.

🎉🎉🎉🎉

이런식으로 nestjs에다가 kafkajs 올려서 사용하면 좋을 듯!

 

매우 훌륭해! ㅎㅎㅎ

반응형

'Server services > nest.js' 카테고리의 다른 글

[Nest.js] Kafka microservice  (0) 2023.06.25