본문 바로가기
오픈소스/RabbitMQ

[RMQ] Python으로 배우는 RabbitMQ 튜토리얼(3) - Publish/Subscribe

by sangyeon 2022. 1. 12.
728x90

이전과 동일하게 Pika Python Client를 사용한다.

 

이전 글(Python으로 배우는 RabbitMQ 튜토리얼(2) - Work Queue)에서는 작업 대기열(Work Queue)를 실습했다.

Work Queue의 경우에는 정확히 한 작업자(Worker)에게 메세지를 전달하는 것이었다면

이번 글에서 배울 Publish/Subscribe의 경우에는 특정 토픽을 구독하는 여러 Consumer에게 메세지를 전달하는 방식이다.

 

여기에 동작 방식을 잘 설명하기 위해 간단한 로깅 시스템을 추가로 구축할 예정이다.

아래의 예시는 하나의 프로그램에서 Log 메세지를 전송하면, Disk에 Log를 기록하는 프로그램과 화면에 Log를 출력하는 프로그램이 각각 메세지를 수신한다.

 

1. 교환(Exchanges)

 

이전 튜토리얼(1,2)에서는 메세지를 큐에서 보내고 큐에서 꺼내 사용하는 내용이었다. 이제 Rabiit에서 전체 메세징 모델을 소개한다.

  • Producer(생산자)는 메세지를 전송하는 사용자 응용 프로그램이다.
  • Queue(큐)는 메세지를 저장하는 버퍼이다.
  • Consumer(소비자)는 메세지를 받는 사용자 응용 프로그램이다.

RabbitMQ 메세징 모델의 핵심은 Producer가 대기열(Queue)에 직접 메세지를 보내지 않는다는 것이다. Producer는 메세지가 대기열에 Delivery될 것인지조차 알지 못한다.

 

대신 아래 그림과 같이 Producer는 X(교환, exchange)에만 메세지를 보낼 수 있다. 

교환(X)은 Producer로부터 메세지를 수신하고 대기열로 메세지를 Push 한다.

 

1) 메세지를 특정 큐에 넣을지?

2) 메세지를 여러 큐들에 넣을지?

3) 메세지를 버릴지

에 대한 정의나 룰은 교환 Type으로 정의할 수 있다.

 

 

이미지 출처 - https://www.rabbitmq.com/tutorials/tutorial-three-python.html

교환 타입의 종류

- direct 

- topic 

- headers 

- fanout 

 

이번 글에서는 fanout에 대해 예제를 보려고 한다.

 

fanout 교환은 굉장히 심플하다.

이름에서 짐작할 수 있듯이 수신한 모든 메세지를 알고있는 모든 대기열로 브로드캐스트 한다. 

 

Exchange는 아래와 같이 선언할 수 있다.

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')

> exchange=''은 교환 이름을 설정하는 것이고 ''은 기본 설정을 의미합니다. exchange 값이 입력되지 않은 경우 default exchange를 사용하게 되고 메세지는 routing_key 값의 이름을 갖는 Queue에 전달 된다.

 

우리는 아래와 같이 이름을 명시한 교환에 메세지를 Push할 것이다.

channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)

 

2. Temporary Queues

 

Pub/Sub 방식으로 동작할 때 여러 개의 Subscirber를 동작시키려면 여러 개의 Queue가 필요하다. 위의 예시에서 Disk Logging 프로그램과 Screen Logging 프로그램 각각의 Queue를 사용해야 하는데 각각의 Queue 이름을 queue-disk-logging, queue-screen-logging와 같이 정할 수 있지만 새로운 프로그램이 추가될 때마다 Queue의 이름을 계속 짓는 것은 불편할 수 있다.

 

이럴 때 Temporary Queues 기능을 사용하면 Queue의 이름을 랜덤으로 지을 수 있다. 또한, exclusive=True 옵션을 사용해서 Subscriber가 종료되면 해당 Queue를 자동으로 없어지도록 할 수 있다.

result = channel.queue_declare(queue='', exclusive=True)

이렇게 설정하면 랜덤으로 지어진 Queue의 이름은 result.method.queue로 리턴된다.

또한 exclusive=True 옵션에 의해 Subscriber가 종료되면 해당 Queue를 자동으로 지우도록 한다.

 

 

3. Binding

 

Exchange와 Queue를 연결하는 작업이다. 아래와 같이 코딩하면 logs라는 이름의 exchange와 랜덤으로 지어진 Queue를 연결한다. 

이미지 출처 - https://www.rabbitmq.com/tutorials/tutorial-three-python.html

 

 

4. Python Source Code for Test

 

- emit_log.py(Pusblish)

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()

> 위 소스는 Log 메세지를 pusblish하는 예제이다.

 

위에서 눈여결볼 코드는 다음 라인이다.

channel.basic_publish(exchange='logs', routing_key='', body=message)

기존 예제에서는 exchange 값이 비어있고, routing_key 값에 Queue의 이름이 들어갔으나, 지금 예제에는 exchange 값이 채워지고 routing_key는 비워져 있다.

그 이유는 exchange의 routing_key는 queue의 이름이 랜덤으로 들어가기 때문에 지정할 수 없는 부분이고 exchange를 통해 해당 exchange를 구독하는 Consumer에게만 보낸다는 의미이다.

 

- receive_logs.py(Subscribe)

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

> 위 소스에서 눈여겨 볼 코드는 다음 라인이다.

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs', queue=queue_name)

랜덤한 큐 이름을 생성하고 그 Queue를 logs라는 이름의 exchange에 binding하는 코드 부분이다.

 

 

5. Pub/Sub 테스트 

 

이제 소스 코드는 준비가 끝났다.

 

5-1. 프로그램1 - 로그를 파일 디스크에 쌓기 위한 프로그램은 아래와 같이 실행한다.

root@master:~/python/ex3-pub_sub# python3 receive_logs.py > logs_from_rabbit.log

 

5-2. 프로그램2 - 로그를 화면에서 보기 위한 프로그램을 실행한다.

root@master:~/python/ex3-pub_sub# python3 receive_logs.py 
 [*] Waiting for logs. To exit press CTRL+C

 

5-3. 이제 로그를 보내본다.

root@master:~/python/ex3-pub_sub# python3 emit_log.py 
 [x] Sent 'info: Hello World!'

 

참고로 rabbitmqctl list_bindings를 사용하면 코드가 실제로 바인딩과 대기열을 원하는대로 생성하는지 확인할 수 있다.

위의 두개의 프로그램1,2는 아래와 같이 결과가 나올 것이다.

root@b014fe475894:/# rabbitmqctl list_bindings
Listing bindings for vhost /...
source_name source_kind destination_name destination_kind routing_key arguments
logs exchange amq.gen-N1AR-wk0Q0_bCW2oW8bmIA queue amq.gen-N1AR-wk0Q0_bCW2oW8bmIA []
logs exchange amq.gen-bz3lH2SBxl_Fz-z28v-PQQ queue amq.gen-bz3lH2SBxl_Fz-z28v-PQQ []

 

5-4.  결과 화면

프로그램 1
: 로그 파일 확인

root@master:~/python/ex3-pub_sub# cat logs_from_rabbit.log 
 [*] Waiting for logs. To exit press CTRL+C
 [x] b'info: Hello World!'
 [x] b'info: Hello World!'
 [x] b'info: Hello World!'
 [x] b'info: Hello World!'


프로그램 2
: Screen에서 찍히는 메세지 확인

root@master:~/python/ex3-pub_sub# python3 receive_logs.py 
 [*] Waiting for logs. To exit press CTRL+C
 [x] b'info: Hello World!'
 [x] b'info: Hello World!'
 [x] b'info: Hello World!'
 [x] b'info: Hello World!'

728x90