Skip to content

๐Ÿค  RabbitMQ๋กœ ๋ถ„์‚ฐ ์„œ๋ฒ„์—๊ฒŒ ๋ฉ”์‹œ์ง€๋ฅผ ๋ถ„๋ฐฐํ•˜๊ธฐ

baegyeong edited this page Nov 29, 2024 · 4 revisions
๋ถ„์•ผ ์ž‘์„ฑ์ž ์ž‘์„ฑ์ผ
BE ๊น€๋ฏผ์ˆ˜ 24๋…„ 11์›” 17์ผ

RabbitMQ๋กœ ๋ถ„์‚ฐ ์„œ๋ฒ„์—๊ฒŒ ๋ฉ”์‹œ์ง€๋ฅผ ๋ถ„๋ฐฐํ•˜๊ธฐ

๊ฐœ์š”

ํ˜„์žฌ ํ”„๋กœ์ ํŠธ์—์„œ ์‹ค์‹œ๊ฐ„์œผ๋กœ ํ•œ๊ตญํˆฌ์ž์ฆ๊ถŒ API๋กœ๋ถ€ํ„ฐ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ›๊ณ  ์žˆ์ง€๋งŒ, ๋‹จ์ผ ์ธ์Šคํ„ด์Šค๋กœ๋Š” ํ•œ๊ณ„๋ฅผ ๋Š๋ผ๊ฒŒ ๋˜์—ˆ๊ณ  ์ด๋ฅผ ๊ทน๋ณตํ•˜๊ธฐ ์œ„ํ•ด ์˜คํ†  ์Šค์ผ€์ผ๋ง์„ ๊ณ ๋ คํ•˜๊ฒŒ ๋˜์—ˆ๋‹ค. ์ด๋•Œ ๊ฐ ์„œ๋ฒ„๋งˆ๋‹ค ๋ฐ›์•„์•ผํ•˜๋Š” ์ฃผ์‹์„ ํ• ๋‹นํ•˜๊ธฐ ์œ„ํ•œ ๋ฐฉ๋ฒ•์„ ๊ณ ๋ฏผํ•˜๋‹ค๊ฐ€ ๋ฉ”์‹œ์ง€ ํ๋ฅผ ํ™œ์šฉํ•˜์—ฌ ๋ถ„๋ฐฐํ•˜๊ธฐ๋กœ ํ–ˆ๋‹ค.

RabbitMQ ์ด๋ž€?

์˜คํ”ˆ ์†Œ์Šค ๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค๋กœ, ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ๊ฐ„ ๋ฉ”์‹œ์ง€๋ฅผ ์†ก์ˆ˜์‹ ํ•˜๋Š”๋ฐ ์‚ฌ์šฉ๋œ๋‹ค. ์ฃผ๋กœ ๋ฉ”์‹œ์ง€ ํ ์—ญํ• ์„ ์ˆ˜ํ–‰ํ•œ๋‹ค.

ํ•ต์‹ฌ ๊ฐœ๋…

  • Publisher: ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ด๋Š” ์ฃผ์ฒด
  • Consumer: Publisher ๋กœ๋ถ€ํ„ฐ ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐ›์•„ ์ฒ˜๋ฆฌํ•˜๋Š” ์ฃผ์ฒด
  • Exchange: Publisher ๋กœ๋ถ€ํ„ฐ ์ „๋‹ฌ ๋ฐ›์€ ๋ฉ”์‹œ์ง€๋ฅผ ํ๋กœ ์ „๋‹ฌํ•˜๋Š” ๊ณณ
  • Queue: Consumer ๊ฐ€ ๋ฉ”์‹œ์ง€๋ฅผ ์†Œ๋น„ํ•˜๊ธฐ ์ „๊นŒ์ง€ ๋ณด๊ด€ํ•˜๋Š” ์žฅ์†Œ
  • Binding: Exchange ์™€ Queue ์˜ ๊ด€๊ณ„, ๋ณดํ†ต ์‚ฌ์šฉ์ž๊ฐ€ ํŠน์ • exchange ๊ฐ€ ํŠน์ • queue ๋ฅผ binding ํ•˜๋„๋ก ์ •์˜ํ•œ๋‹ค.

RabbitMQ ์„ค์น˜ ํ›„ ์‹คํ–‰

์šฐ์„  rabbitMQ๋ฅผ docket์— ์„ค์น˜ํ•˜๊ธฐ ์œ„ํ•ด ์•„๋ž˜์˜ ๋ช…๋ น์–ด๋ฅผ ํ†ตํ•ด์„œ ์ด๋ฏธ์ง€๋ฅผ ๋‹ค์šด๋ฐ›์€ ํ›„ ์ปจํ…Œ์ด๋„ˆ๋ฅผ ์‹คํ–‰ํ–ˆ๋‹ค.

// rabbitmq ์ด๋ฏธ์ง€ ์„ค์น˜
docker pull rabbitmq

// ์ปจํ…Œ์ด๋„ˆ ์‹คํ–‰
docker run -d -p 15672:15672 -p 5672:5672 --name rabbitmq rabbitmq

// rabbitmq_management ์„ค์น˜
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_management

์ดํ›„ localhost:15672 ์— ์ ‘์†ํ•˜๋ฉด ๊ด€๋ฆฌ์ž ํŽ˜์ด์ง€์— ์ ‘์†ํ•  ์ˆ˜ ์žˆ๋‹ค. ์ด๋•Œ ์•„์ด๋””์™€ ๋น„๋ฐ€๋ฒˆํ˜ธ๋Š” ๋ชจ๋‘ guest์ด๋‹ค.

์ƒ์‚ฐ์ž ๋งŒ๋“ค๊ธฐ

์šฐ์„  ์ƒ์‚ฐ์ž๋Š” ๊ฐ„๋‹จํ•˜๊ฒŒ node๋งŒ์œผ๋กœ ๋งŒ๋“ค์—ˆ๋‹ค. amqplib ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋ฅผ ํ†ตํ•ด์„œ ํ๋ฅผ ์ •ํ•˜์—ฌ ๋ฉ”์‹œ์ง€๋ฅผ ์ „๋‹ฌํ•˜๋„๋ก ํ–ˆ๋‹ค.

๋จผ์ € RabbitMQ ์„œ๋ฒ„์— ์—ฐ๊ฒฐํ•˜๊ณ  ์ฑ„๋„์„ ์ƒ์„ฑํ•œ๋‹ค. ์ด๋•Œ amqp://localhost ์€ localhost์˜ rabbitMQ์˜ ๊ธฐ๋ณธ ํฌํŠธ(5672)๋กœ ์ ‘์†ํ•˜๊ฒŒ ๋˜๋ฏ€๋กœ ํฌํŠธ๊ฐ€ ๋‹ค๋ฅด๋ฉด ์ˆ˜์ •ํ•ด์•ผํ•œ๋‹ค.

const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();

์ฑ„๋„์„ ์ƒ์„ฑํ•œ ํ›„ assertQueue๋กœ ํ๋ฅผ ํ• ๋‹นํ•œ๋‹ค. ์ด๋•Œ ํ๊ฐ€ ์—†์œผ๋ฉด ์ƒ์„ฑํ•œ๋‹ค.

await channel.assertQueue(queue, {
    durable: true,  // ํ๊ฐ€ ์„œ๋ฒ„๊ฐ€ ์žฌ์‹œ์ž‘ํ•ด๋„ ์œ ์ง€๋˜๋„๋ก ์„ค์ •
});

์ดํ›„ ์›ํ•˜๋Š” ๋กœ์ง์„ ์‹คํ–‰ํ•œ๋‹ค. ์ด๋ฒˆ์— ์ง„ํ–‰ํ•  ๋กœ์ง์€ 100๊ฐœ์˜ ๋ฉ”์‹œ์ง€๋ฅผ ํ์— ๋„ฃ๋Š” ๊ฒƒ์ด๋‹ค.

// ๋ฉ”์‹œ์ง€ ์ „์†ก
for (let i = 0; i< 100; i++) {
    const message = `Hello World! ${i}`;
    channel.sendToQueue(queue, Buffer.from(Buffer.from(
        JSON.stringify({
            pattern: 'hello',
            data: {source: message},
        })
    )), {
        persistent: true
    });
    console.log(`sent: ${message}`);
}

๋ฉ”์‹œ์ง€ ๋‚ด์šฉ์€ pattern๊ณผ data๋กœ ๊ตฌ์„ฑ๋˜์–ด ์žˆ๋‹ค.

  • pattern: ์ด๋ฒคํŠธ ํŒจํ„ด์„ ๋‚˜ํƒ€๋‚ด๋ฉฐ nestjs์—์„œ ํŠน์ • ์ด๋ฒคํŠธ ํŒจํ„ด ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐ›๊ฒŒ๋œ๋‹ค.
  • data: ์ „๋‹ฌํ•˜๋Š” ๋ฉ”์‹œ์ง€

๋˜ํ•œ ๋ฉ”์‹œ์ง€๋ฅผ ํ์— ์ „๋‹ฌํ•  ๋•Œ persistent๋ผ๋Š” ์˜ต์…˜์„ ์„ค์ •ํ–ˆ๋Š”๋ฐ ๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค๊ฐ€ ์žฌ์‹œ์ž‘ํ•ด๋„ ์œ ์ง€ํ•˜๋Š” ์˜ต์…˜์ด๋‹ค.

์ „์ฒด ์ฝ”๋“œ

const amqp = require('amqplib');

async function sendTask() {
    const queue = 'kospi';

    try {
        // RabbitMQ ์„œ๋ฒ„์— ์—ฐ๊ฒฐ
        const connection = await amqp.connect('amqp://localhost');
        const channel = await connection.createChannel();

        // ํ ์ƒ์„ฑ (์—†์œผ๋ฉด ์ƒ์„ฑ)
        await channel.assertQueue(queue, {
            durable: true,  // ํ๊ฐ€ ์„œ๋ฒ„๊ฐ€ ์žฌ์‹œ์ž‘ํ•ด๋„ ์œ ์ง€๋˜๋„๋ก ์„ค์ •
        });

        // ๋ฉ”์‹œ์ง€ ์ „์†ก
        for (let i = 0; i< 100; i++) {
            const message = `Hello World! ${i}`;
            channel.sendToQueue(queue, Buffer.from(Buffer.from(
                JSON.stringify({
                    pattern: 'hello',
                    data: {source: message},
                })
            )), {
                persistent: true,
            });
            console.log(`sent: ${message}`);
        }

        // ์—ฐ๊ฒฐ ์ข…๋ฃŒ
        await channel.close();
        await connection.close();
    } catch (error) {
        console.error('Error sending task:', error);
    }
}
sendTask();

์†Œ๋น„์ž ๋งŒ๋“ค๊ธฐ

์†Œ๋น„์ž๋Š” ๊ธฐ์กด ์„œ๋ฒ„๊ฐ€ ์‚ฌ์šฉํ•˜๋Š” nestjs๋กœ ๊ตฌ์„ฑํ•˜๋„๋ก ํ–ˆ๋‹ค. ๋จผ์ € ์—ฐ๊ด€๋œ ํŒจํ‚ค์ง€๋ฅผ ์„ค์น˜ํ•œ๋‹ค.

npm i --save amqplib amqp-connection-manager @nestjs/microservices

์ดํ›„ main ํŒŒ์ผ์—์„œ ๋งˆ์ดํฌ๋กœ์„œ๋น„์Šค๋ฅผ ์„ค์ •ํ•œ ํ›„ ์‹œ์ž‘ํ•˜๋„๋ก ํ•œ๋‹ค.

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  app.connectMicroservice<MicroserviceOptions>({
    transport: Transport.RMQ,
    options: {
      urls: ['amqp://localhost'],
      queue: 'kospi',
      noAck: false,
      queueOptions: {
        durable: true,
      },
    },
  });
  await app.startAllMicroservices();
  await app.listen(process.env.PORT ?? 3000);
}

bootstrap();

์ด๋•Œ rabbitmq์™€ ๊ด€๋ จ๋œ ์˜ต์…˜์„ ์„ค์ •ํ–ˆ๋Š”๋ฐ ๊ฐ ์˜ต์…˜์€ ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

  • urls - rabbitmq ์ฃผ์†Œ
  • queue - ํ ์ด๋ฆ„
  • noAck - ๋กœ์ง์ด ์™„๋ฃŒ๋œ ํ›„ ack ํ˜ธ์ถœ ์—†์ด ์ž๋™์œผ๋กœ ๋ณด๋‚ด๋Š” ์˜ต์…˜์ด๋‹ค.
  • queueOptions - ํ์™€ ์—ฐ๊ด€๋œ ์˜ต์…˜

์ดํ›„ ์ƒ์‚ฐ์ž์— ์ „๋‹ฌํ•˜๋Š” ์ด๋ฒคํŠธ ํŒจํ„ด ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐ›๋Š” ์ปจํŠธ๋กค๋Ÿฌ๋ฅผ ๊ตฌํ˜„ํ•œ๋‹ค.

@Controller()
export class AppController {
  @EventPattern('hello')
  async handleMessage(@Payload() data: number[], @Ctx() context: RmqContext) {
    const channel = context.getChannelRef();
    const originalMsg = context.getMessage();
    console.log('data:', data);
    channel.ack(originalMsg);
  }
}

์ดํ›„ 2๊ฐœ์˜ nestjs ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์„ ์‹คํ–‰ ํ›„ 100๊ฐœ์˜ ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐ›์„ ๋•Œ ์•„๋ž˜์™€ ๊ฐ™์ด ์ž˜ ๋ถ„๋ฐฐ๊ฐ€ ๋˜๋Š” ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

๋งŒ์•ฝ ACK๋ฅผ ์‹คํ–‰ํ•˜์ง€ ์•Š์œผ๋ฉด?

์ด์ „์— noAck๋ฅผ false๋กœ ์„ค์ •ํ–ˆ๋‹ค. ์ด๋ ‡๊ฒŒ ํ•œ ์ด์œ ๋Š” ๋กœ์ง์ด ์‹คํŒจํ–ˆ์„ ๋•Œ ๋ฉ”์‹œ์ง€๊ฐ€ ์‚ฌ๋ผ์ง€์ง€ ์•Š๊ณ  ๋‹ค์‹œ ๋ฉ”์‹œ์ง€ ํ๋กœ ์ „๋‹ฌํ•˜๊ธฐ ์œ„ํ•จ์ด๋‹ค.

noAck๋ฅผ false๋กœ ํ•œ ์ƒํƒœ๋กœ ack๋ฅผ ํ˜ธ์ถœํ•˜์ง€ ์•Š์œผ๋ฉด ์ž๋™์œผ๋กœ rabbitMQ๋กœ ๋Œ์•„๊ฐˆ ๊ฒƒ์ด๋ผ ์˜ˆ์ƒ๋˜์ง€๋งŒ ๊ทธ๋ ‡์ง€ ์•Š๋‹ค. ์šฐ์„  ์ด์ „์˜ ์ฝ”๋“œ์—์„œ 0.5ํ™•๋ฅ ๋กœ ack๋ฅผ ํ˜ธ์ถœํ•˜์ง€ ์•Š๋„๋ก ํ•˜๊ณ , 1๊ฐœ์˜ ์„œ๋ฒ„๋งŒ ์‹คํ–‰ํ•œ๋‹ค

@EventPattern('hello')
async handleMessage(@Payload() data: number[], @Ctx() context: RmqContext) {
  const channel = context.getChannelRef();
  const originalMsg = context.getMessage();
  if (Math.random() > 0.5) {
    return;
  }
  console.log('data:', data);
  channel.ack(originalMsg);
}

์ดํ›„ ์‹คํ–‰ํ–ˆ์„ ๋•Œ ์•„๋ž˜์˜ ๊ฒฐ๊ณผ์ฒ˜๋Ÿผ ์ผ๋ถ€ ๋ฉ”์‹œ์ง€๊ฐ€ ack ๊ฐ€ ๋˜์ง€ ์•Š๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

๊ทธ๋ ‡๋‹ค๋ฉด RabbitMQ์— ๋ฉ”์‹œ์ง€๊ฐ€ ๋Œ์•„์™”๋Š”์ง€ ํ™•์ธํ•ด๋ณด์ž

RabbitMQ ์—์„œ๋Š” ๋ชจ๋“  ๋ฉ”์‹œ์ง€๊ฐ€ ์ „๋‹ฌ๋˜์–ด ๋น„์–ด์žˆ๋Š” ์ƒํƒœ์ด๋‹ค. ์ด๋ฒˆ์—๋Š” ์„œ๋ฒ„๋ฅผ ๊ฐ•์ œ๋กœ ์ค‘์ง€ํ•˜๊ณ  ๋‹ค์‹œ ํ™•์ธํ•ด๋ณด์ž

๋†€๋ž๊ฒŒ๋„ ์„œ๋ฒ„๊ฐ€ ์ข…๋ฃŒ๋  ๋•Œ ๋‹ค์‹œ ๋ฉ”์‹œ์ง€๊ฐ€ ์ฑ„์›Œ์ง„ ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ๋‹ค. ํ•ด๋‹น ๋ฉ”์‹œ์ง€๋Š” ์•„๋งˆ๋„ ack๊ฐ€ ํ˜ธ์ถœ๋˜์ง€ ์•Š์€ ๋ฉ”์‹œ์ง€๋กœ ๋ณด์ธ๋‹ค. ์ด๋Ÿฌํ•œ ๊ฒฐ๊ณผ๋ฅผ ํ†ตํ•ด์„œ ack ๋ฅผ ํ˜ธ์ถœํ•˜์ง€ ์•Š์œผ๋ฉด ์„œ๋ฒ„์— ๊ณ„์† ์Œ“์ด๊ฒŒ ๋˜์–ด ๋ฉ”๋ชจ๋ฆฌ๊ฐ€ ๋‚ญ๋น„๋˜๊ฒŒ ๋œ๋‹ค.

๊ทธ๋ ‡๋‹ค๋ฉด ๋ฉ”์‹œ์ง€์— ๋Œ€ํ•œ ๋กœ์ง์ด ์‹คํŒจํ•˜์—ฌ ๋ฉ”์‹œ์ง€ ํ๋กœ ๋‹ค์‹œ ๋˜๋Œ๋ฆด ๋•Œ ์–ด๋–ป๊ฒŒ ํ•ด์•ผํ•˜๋Š”๊ฐ€? ๋ฐ”๋กœ nack ๋ฅผ ํ˜ธ์ถœํ•˜๋ฉด ๋œ๋‹ค. ํ•ด๋‹น ๋ฉ”์„œ๋“œ์˜ ์‹œ๊ทธ๋‹ˆ์ฒ˜๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

channel.nack(message, allUpTo, requeue);
  • message: ๋ฉ”์‹œ์ง€ ๊ฐ์ฒด์ด๋‹ค. ์ฑ„๋„์— ์ „์†ก๋œ ๋ฉ”์‹œ์ง€๋ฅผ ์ฐธ์กฐํ•œ๋‹ค.
  • allUpTo (boolean): true๋กœ ์„ค์ •ํ•˜๋ฉด, ์ฃผ์–ด์ง„ ๋ฉ”์‹œ์ง€ ๊นŒ์ง€์˜ ๋ชจ๋“  ์ด์ „ ๋ฉ”์‹œ์ง€๋“ค๋„ ํ•จ๊ป˜ ๋ถ€์ • ํ™•์ธ๋œ๋‹ค. ๊ธฐ๋ณธ๊ฐ’์€ false
  • requeue (boolean): true๋กœ ์„ค์ •ํ•˜๋ฉด, ๋ฉ”์‹œ์ง€๋ฅผ ํ์˜ ์ฒ˜์Œ์œผ๋กœ ๋‹ค์‹œ ๋„ฃ์–ด ์žฌ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋„๋ก ํ•œ๋‹ค. false๋กœ ์„ค์ •ํ•˜๋ฉด ๋ฉ”์‹œ์ง€๊ฐ€ ๋ฒ„๋ ค์ง„๋‹ค. ๊ธฐ๋ณธ๊ฐ’์€ true

์ด๋ฅผ ์ฐธ๊ณ ํ•˜์—ฌ ๋‹ค์Œ๊ณผ ๊ฐ™์ด ์ฝ”๋“œ๋ฅผ ์ˆ˜์ •ํ–ˆ๋‹ค.

@EventPattern('hello')
async handleMessage(@Payload() data: number[], @Ctx() context: RmqContext) {
  const channel = context.getChannelRef();
  const originalMsg = context.getMessage();
  if (Math.random() > 0.5) {
    return channel.nack(originalMsg, false, true);
  }
  console.log('data:', data);
  channel.ack(originalMsg);
}

์ดํ›„ ์•„๋ž˜์˜ ๊ฒฐ๊ณผ๋ฅผ ํ†ตํ•ด ack๊ฐ€ ๋˜์ง€ ์•Š๋Š” ๋ฉ”์‹œ์ง€๋Š” ํ์— ๋‹ค์‹œ ์ฑ„์›Œ์ง€๊ณ  ์žฌ์ฒ˜๋ฆฌ๋˜๋Š” ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

๐Ÿœ ํŒ€ ๊ฐœ๋ฏธ

๐Ÿ›๏ธ ํŒ€ ๋ฌธํ™”

๊ฐœ๋ฐœ ์œ„ํ‚ค

FE

BE

Infra

๐Ÿ—ฃ๏ธ ๋ฐœํ‘œ

๐Ÿ“š ํšŒ์˜๋ก

๐Ÿ”ด ์ธํ„ฐ๋ฏธ์…˜
๐ŸŸ  1์ฃผ์ฐจ
๐ŸŸก 2์ฃผ์ฐจ
๐ŸŸข 3์ฃผ์ฐจ
๐Ÿ”ต 4์ฃผ์ฐจ
๐ŸŸฃ 5์ฃผ์ฐจ
๐ŸŸค 6์ฃผ์ฐจ

๐Ÿ’ญ ํšŒ๊ณ 

๐Ÿง‘โ€๐Ÿคโ€๐Ÿง‘ ๋ฉ˜ํ† ๋ง

Clone this wiki locally