본문 바로가기
오픈소스/RabbitMQ

[RMQ] Python으로 배우는 RabbitMQ 튜토리얼(4) - Routing

by sangyeon 2022. 1. 18.
728x90

Routing의 경우 Pub/Sub 구조와 함께 사용하는 기능으로,

기존 튜토리얼(3)에서 Pub/Sub의 경우에는 단순히 메세지를 모든 Subscriber에게 전송했다면, Routing 기능을 통해 특정 Subscriber에게 특정 메세지를 보내는 역할을 한다.

 

예를 들어, 앞선 예제의 로깅 시스템에서 Disk Logging 프로그램은 Disk의 용량 절약을 목적으로 Critical한 에러 메세지만 수신하여 파일로 떨구고 싶은 경우에 Routing 기능을 사용할 수 있다.

 

1. Binding

 

이때 사용하는 옵션이 routing_key 옵션으로 바인딩 키를 설정한다.

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key='black')

바인딩 키는 Exchange 타입의 영향을 받는다. 타입이 fanout인 경우에 바인딩 키가 무시된다.

따라서 아래 예제는 Exchange를 생성할 때, 타입을  direct 로 설정한다.

이미지 출처 - https://www.rabbitmq.com/tutorials/tutorial-four-python.html

위 그림에서 각각의 메세지는 바인딩 키(routing_key)가 등록된 Queue로 전송된다. 아래와 같이 동일한 바인딩 키로 여러 개의 Queue에 연결하는 것도 가능하다.

이미지 출처 - https://www.rabbitmq.com/tutorials/tutorial-four-python.html

 

2. Emitting Logs (로그 방출)

 

direct 타입의 exchange를 사용하여 로깅 시스템을 구현할 것이다. 로그 severity를 routing_key로 설정한다.

이를 위해 반드시 해야하는 2가지 설정이 있다.

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

그리고 publish 할 때 아래와 같이 routing_key에 값을 추가해서 해당 키워드일 때 메세지를 전송한다.

channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)

우리가 설정할 키워드는 severity이고 배열 값이다.

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'

첫 번째 매개변수로 들어오는 값은 로그레벨이며 만약에 매개변수가 없다면 해당 값은 info로 세팅된다.

두 번째 매개변수로 들어오는 값은 메세지 내용이고 아무 내용이 없으면 Hello World!를 찍는다.

 

 

3. Python Source Code for Test

 

- emit_log_direct.py

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
    exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
  • 첫 번째 매개변수로 들어오는 값은 로그레벨이며 만약에 매개변수가 없다면 해당 값은 info로 세팅된다.
  • 두 번째 매개변수로 들어오는 값은 메세지 내용이고 아무 내용이 없으면 Hello World!를 찍는다.
  • routing_key로 severity 값을 매개변수로 넣어 보낸다. ex) python emit_log_direct.py  error   "Error Detected."

 

 

 

- receive_logs_direct.py

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(
        exchange='direct_logs', queue=queue_name, routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()
  • severities = sys.argv[1:] - 매개변수로 들어오는 값에 대해 배열에 넣는다.
  • for severity in severities: - 배열을 돌면서 severities에 있는 매개변수 error warning에 대해서 모두 바안딩을 한다.  ex) python receive_logs_direct.py warning error
    --> channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=warning)
    --> channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=error

 

결국에 receive_logs_direct.py에서 설정한 로그레벨에 따라 메세징을 보내게 되고,

> python receive_logs_direct.py warning error 로 설정하면 아래 그림과 같다.

 

- warning 으로 로그레벨이 넘어오면 Q1로 메세징을 큐잉하고

- error로 로그레벨이 넘어보면 Q2로 메세징을 큐잉한다.

 

 

4. Routing Test

테스트 케이스 1)
: 특정 로그레벨(warning/ error)로 들어올 때 리다이렉트로 로그 파일 빼서 쌓는 법


1-1) 로그 receive 실행
root@master:~/RabbitMQ/ex4-routing# python3 receive_logs_direct.py warning error > logs_from_rabbit.log


1-2) 총 4개의 메세지 전송, error 1번, info 2번, warning 1번
root@master:~/RabbitMQ/ex4-routing# python3 emit_log_direct.py error "Run. Run. Error Message Detected."
 [x] Sent 'error':'Run. Run. Error Message Detected.'
root@master:~/RabbitMQ/ex4-routing# python3 emit_log_direct.py info "This is Info Message. Don't worry."
 [x] Sent 'info':"This is Info Message. Don't worry."
root@master:~/RabbitMQ/ex4-routing# python3 emit_log_direct.py warning "Be careful!"
 [x] Sent 'warning':'Be careful!'
root@master:~/RabbitMQ/ex4-routing# python3 emit_log_direct.py info "This is Info Message. Don't worry."
 [x] Sent 'info':"This is Info Message. Don't worry."


1-3) logs_from_rabbit.log 확인
> receive_logs_direct.py 파일이 종료되어야 파일이 쌓인 것을 볼 수 있다. 
> 정확히 error와 warning 메세지만 로그에 기록되었다.

root@master:~/RabbitMQ/ex4-routing# cat logs_from_rabbit.log 
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'error':b'Run. Run. Error Message Detected.'
 [x] 'warning':b'Be careful!'


테스트 케이스 2)
: 특정 로그레벨(warning/ error)로 들어올 때 화면에 찍는 법

2-1) 로그 receive 실행
root@master:~/RabbitMQ/ex4-routing# python3 receive_logs_direct.py warning error


2-2) 총 4개의 메세지 전송, error 1번, info 2번, warning 1번
root@master:~/RabbitMQ/ex4-routing# python3 emit_log_direct.py error "Run. Run. Error Message Detected."
 [x] Sent 'error':'Run. Run. Error Message Detected.'
root@master:~/RabbitMQ/ex4-routing# python3 emit_log_direct.py info "This is Info Message. Don't worry."
 [x] Sent 'info':"This is Info Message. Don't worry."
root@master:~/RabbitMQ/ex4-routing# python3 emit_log_direct.py warning "Be careful!"
 [x] Sent 'warning':'Be careful!'
root@master:~/RabbitMQ/ex4-routing# python3 emit_log_direct.py info "This is Info Message. Don't worry."
 [x] Sent 'info':"This is Info Message. Don't worry."


2-3) 화면에서 확인
> 4개의 메세지를 보냈지만 info는 기록되지 않고 warning과 error 메세지만 찍힘

root@master:~/RabbitMQ/ex4-routing# python3 receive_logs_direct.py  warning error
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'error':b'Run. Run. Error Message Detected.'
 [x] 'warning':b'Be careful!'

 

728x90