본문 바로가기
오픈소스/RabbitMQ

[RMQ] Python으로 배우는 RabbitMQ 튜토리얼(2) - Work Queue

by sangyeon 2022. 1. 12.
728x90

- Pika Python Client 사용

- 필수조건 : pika RabbitMQ Client 버전 1.0.0.

- RabbitMQ 5672 port process running

 

첫번째 실습에서 명명된 큐에 메세지를 보내고 받는 프로그램을 만들었다면,

이번 글에서는 시간이 오래 걸리는 작업에 사용하거나 여러 작업자에게 분산하는데 사용할 작업 대기열(Work Queue)에 대해서 배워보려고 한다.

(시간이 걸리는 작업을 구현하기 위해 time.sleep()함수를 사용하여 처리할 예정이다.)

이미지 출처 - https://www.rabbitmq.com/tutorials/tutorial-two-python.html

작업대기열(Work Queue)의 장점은 작업을 병렬로 처리할 수 있다는 점이다.

먼저 두 개의 worker.py 스크립트를 실행하여 메세지를 처리할 Consumer(worker)를 2개 생성한다.

튜토리얼(1)의 python 파일에서 조금씩 수정이 필요하다.

 

1. Producer Message

- new_task.py

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"  # 메세지는 arg로 받거나 없으면 Hello World를 보낸다.
channel.basic_publish(
    exchange='',
    routing_key='task_queue', # 여기서 routing queue는 보낼 queue를 이름을 지정한다.
    body=message,              # body에 메세지를 담아 보낸다.
    properties=pika.BasicProperties(
        delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
    ))
print(" [x] Sent %r" % message)
connection.close()

 

2. Consumer Message

- worker.py

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body.decode())
    time.sleep(body.count(b'.'))  # body에 .의 갯수만큼 sleep을 준다
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

channel.start_consuming()

 

3. Work Queue Messaging 테스트

- worker.py을 2개 실행하여 2개의 작업자를 실행한다.

# shell 1
python3 worker.py
[*] Waiting for messages. To exit press CTRL+C


# shell 2
python3 worker.py
[*] Waiting for messages. To exit press CTRL+C

 

- new_task.py 파일을 실행하여 메세징 처리 방식을 확인한다.

root@master:~/python# python3 new_task.py "First message."
 [x] Sent 'First message.'
root@master:~/python# python3 new_task.py "Second message.."
 [x] Sent 'Second message..'
root@master:~/python# python3 new_task.py "Third message..."
 [x] Sent 'Third message...'
root@master:~/python# python3 new_task.py "Fourth message...."
 [x] Sent 'Fourth message....'
root@master:~/python# python3 new_task.py "Fifth message....."
 [x] Sent 'Fifth message.....'

 

- 작업자에게 전달되는 메세지를 확인해보면

# shell 1
root@master:~/python# python3 worker.py 
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Done
 [x] Received 'Third message...'
 [x] Done
 [x] Received 'Fifth message.....'
 [x] Done


# shell 2
root@master:~/python# python3 worker.py 
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Done
 [x] Received 'Fourth message....'
 [x] Done

기본적으로 RabbitMQ는 각 메세지를 다음 Consumer에게 순서대로 보낸다. (라운드로빈) 

그렇기 때문에 평균적으로 모든 Consumer가 동일한 수의 메세지를 받는다.

 

 

4. 메세지 승인(Acknowledgement)

 

만약 여러 Consumer 중 하나가 오래 걸리는 작업을 시작했고 그 작업을 부분적으로 수행하고 죽는다면 어떻게 될까?

위의 코드에서는 RabbitMQ가 메세지를 Consumer에게 전달하면 즉시 메모리로부터 그 메세지를 제거한다.

그렇게 되면 작업자가 죽으면 메세지를 잃게 된다.

하지만 작업자가 죽어도 그 작업을 다른 작업자에게 전달하고 싶다.

 

이러한 기능을 제공하기 위해 RabbitMQ는 Message Acknowledgement를 제공한다. Consumer는 RabbitMQ에게 ack를 되돌려 주는데 이는 특정 메세지가 처리되었음을 알리고 그 알림을 받은 메세지에 대해서 RabbitMQ가 지울수 있도록 해준다.

만약 Consumer가 ack 전송없이 죽는다면(Channel 혹은 터미널이 닫히거나 TCP Connection이 종료된다면..)

RabbitMQ는 그 메세지가 완전하게 처리되지 않았다고 판단하여 다시 큐잉한다.

 

만약 그때 다른 Consumer가 존재한다면 그 메세지를 다른 Consumer에게 재전송한다. (메세지 타임 아웃은 없다)

이러한 기능은 메세지 처리가 오래 걸리는 업무에서 유용한 기능이다.

 

Consumer delivery ack에는 시간 초과(default : 30분)가 적용된다. 이는 메세지 delivery를 확인하지 않는 버그가 있는 Consumer를 감지하는데 도움이 된다.

 

수동 메세지 승인(Manual Message Acknowledgements)는 기본적으로 활성화되어 있다. 

 

4-1. Work Queue 메세지 승인 테스트

- new_task.py

root@master:~/python# python3 new_task.py  "First message."
 [x] Sent 'First message.'
root@master:~/python# python3 new_task.py  "Second message.."
 [x] Sent 'Second message..'
root@master:~/python# python3 new_task.py  "Long Time message........................"
 [x] Sent 'Long Time message........................'

> 1초 걸리는 메세지와 2초 걸리는 메세지를 보내고 24초 걸리는 메세지를 보낸다.

예상 동작은 1초 메세지는 worker 1번에.  2초 메세지는 worker 2번에.

다시 24초 걸리는 메세지는 worker 1번에 보낸다. 

이때 worker 1번을 죽이면 해당 메세지는 즉시 취소되고 다시 큐잉되어 worker 2번에 할당한다.

 

실제 동작 과정은 아래에서 확인해보자 (예상대로 처리 되었다.)

# shell 1 
# worker 1번 Long Time message 중 kill

root@master:~/python# python3 worker.py 
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Done
 [x] Received 'Long Time message........................'
^CTraceback (most recent call last):
  File "worker.py", line 23, in <module>
    channel.start_consuming()
  File "/usr/local/lib/python3.8/dist-packages/pika/adapters/blocking_connection.py", line 1865, in start_consuming
    self._process_data_events(time_limit=None)
  File "/usr/local/lib/python3.8/dist-packages/pika/adapters/blocking_connection.py", line 2026, in _process_data_events
    self.connection.process_data_events(time_limit=time_limit)
  File "/usr/local/lib/python3.8/dist-packages/pika/adapters/blocking_connection.py", line 833, in process_data_events
    self._dispatch_channel_events()
  File "/usr/local/lib/python3.8/dist-packages/pika/adapters/blocking_connection.py", line 567, in _dispatch_channel_events
    impl_channel._get_cookie()._dispatch_events()
  File "/usr/local/lib/python3.8/dist-packages/pika/adapters/blocking_connection.py", line 1492, in _dispatch_events
    consumer_info.on_message_callback(self, evt.method,
  File "worker.py", line 15, in callback
    time.sleep(body.count(b'.'))  # body에 .의 갯수만큼 sleep을 준다
KeyboardInterrupt
# shell 2
# worker 2번 - 1번의 메세지를 받아 다시 처리 한다.

root@master:~/python# clear
root@master:~/python# python3 worker.py 
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Done
 [x] Received 'Long Time message........................'
 [x] Done

 

 

5. 메세지 내구성(Message durability)

 

위에서 Consumer가 죽더라도 작업이 손실되지 않는 법을 배웠다. 그러나 RabbitMQ 서버가 중지되면 작업은 손실될 수 밖에 없다.

RabbitMQ가 종료되거나 충돌하면 사용자가 설정하지 않는 한 큐와 메세지는 손실됩니다. 메세지가 손실되지 않도록 하기 위해서는 2가지가 필요하다.

 

5-1. 첫 번째

큐가 RabbitMQ 노드를 다시 시작해도 살아남을 수 있도록  durable 설정을 해주어야 한다.

channel.queue_declare(queue='task_queue', durable=True)

* 위 설정은 생산자(Producer)와 소비자(Consumer) 코드 두 곳 모두 설정이 되어야 한다.

 

5-2. 두 번째

channel.basic_publish(
    exchange='',
    routing_key='task_queue', 
    body=message,            
    properties=pika.BasicProperties(
        delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
    ))

- pika.spec.PERSISTENT_DELIVERY_MODE 값으로 delivery_mode 속성을 제공하여 메세지 지속성을 표시해야 한다.

 

 

 

6. 공정한 분배(Fair dispatch)

 

작업자가 2개일 때 모든 홀수 번 작업은 heavy하고 모든 짝수 번 작업은 light 하다면?

RabbitMQ는 그것을 알아채지 못하고 여전히 균등하게 메세지를 발송한다.

이를 방지하기 위해  prefetch_count=1 설정과 함께  Channel#basic_qos 채널 방법을 사용할 수 있다.

basic.qos 프로토콜 방법을 사용하여 RabbitMQ가 작업자에게 한 번에 둘 이상의 메세지를 제공하지 않도록 지시한다.

 

예를 들어 A와 B 두 개의 worker 가 있고 4개의 메세지를 보낸다고 가정하면

- 1번 메세지(40초 소요)

- 2번 메세지(1초 소요)

- 3번 메세지(2초 소요)

- 4번 메세지(3초 소요)

 

일 때, A worker 1, 3번 메세지 처리 / B worker 2, 4번 메세지 처리 하는 것이 아니라

A Worker가 40초가 소요되는 메세지를 처리하는 동안 B Worker가 사용중이지 않은 상태일 때 나머지 메세지를 지속적으로 처리한다.

 

* 그렇기 때문에 worker 상태를 모니터링 하고 모두 busy한 상태일 경우 worker를 증설하는 것을 고려할 필요가 있다.

 

 

 

 

 

728x90