본문 바로가기
오픈소스/RabbitMQ

[RMQ] Python으로 배우는 RabbitMQ 튜토리얼(6) - RPC

by sangyeon 2022. 1. 20.
728x90

1. RPC란?

RPC는 Remote Procedure Call의 약자로 

간단하게 Client는 Request를 Server에 전달하고 Server는 해당 Request를 처리하여 알맞은 결과 값을 다시 Client에 Response해주는 방법을 뜻한다.

 

즉, RPC는 원격지에 메세지를 전달해서 결과를 다시 전달 받는 것을 뜻한다.

 

1-1. Message Properties

  • DeliveryMode : 메세지 속성이 persistent인지 transient인지 표시(휘발성 or 비휘발성)
  • ContentType : 내용물의 mime-type
  • ReplyTo : 일반적으로 callback queue의 이름을 지정하는데 사용
  • CorrelationID: 요청을 구분할 수 잇는 유일한 값

 

1-2. 아래는 RPC의 데이터 처리 흐름을 나타낸다.

1) 클라이언트는 CorrelationID, ReplyTo를 지정하여 RabbitMQ의 RPC Queue에 데이터를 Push한다.

2) 해당 Queue에 데이터를 Server에서 요청을 처리한다.

3) 요청 처리 후 Request에서 받은 CorrelationID와 ReplyTo를 추출하여 요청 ID를 속성으로 갖는 Response를 ReplyToQueue에 Push 한다.

4) 클라이언트는 ReplyToQueue를 구독하고 있다가 Response가 오면 CorrelationID를 보고 어떤 요청에 대한 응답인지를 구분하여 처리한다.

 

이미지 출처 - https://www.rabbitmq.com/tutorials/tutorial-six-python.html

 

1-3. RPC의 이점

  • RPC서버가 너무 느린 경우 rpc 서버를 하나 더 띄워 확장시킬 수 있다. (새 콘솔에서 rpc_server.py를 더 실행한다.)
  • 클라이언트 측에서 RPC는 하나의 메세지만 보내고 받기 때문에 queue_declare와 같이 동기 호출이 필요하지 않다. 결과적으로 RPC 클라이언트는 단일 RPC 요청에 대해 단 한번의 네트워크 왕복만 있으며 된다.

 

아래 예제는 피보나치 수열에 대하여 값을 구해주는 예제이다.

 

 

2. 소스코드(예제)

 

- rpc_server.py

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)

def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")
channel.start_consuming()

 

- rpc_client.py

#!/usr/bin/env python
import pika
import uuid

class FibonacciRpcClient(object):

    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
            ),
            body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)


fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

 

root@master:~/RabbitMQ/ex6-rpc# python3 rpc_client.py 
 [x] Requesting fib(30)
 [.] Got 832040
root@master:~/RabbitMQ/ex6-rpc# python3 rpc_client.py 
 [x] Requesting fib(10)
 [.] Got 55


-------------------------------------------------------------------------------------
root@master:~/RabbitMQ/ex6-rpc# python3 rpc_server.py 
 [x] Awaiting RPC requests
 [.] fib(30)
 [.] fib(10)
728x90