RabbitMQ实现RPC(Remote procedure call)

鳄鱼君

发表文章数:615

热门标签

,

Vieu四代商业主题

高扩展、安全、稳定、响应式布局多功能模板。

¥69 现在购买
首页 » Python » RabbitMQ实现RPC(Remote procedure call)

RabbitMQ实现消息持久化及广播模式中,我们已经可以对收到的消息进行过滤,但是现在我想实现双向通信。send端发送receive端接收消息并恢复,这就叫RPC,如何实现?

确切的说,send端发送一条命令,receive端接收指令并返回结果。这就需要两端即是发送端也是接收端,现在我们不叫发送和接收端,而是通过客户端和服务端来介绍,这样更有助于理解!

# client.py
import pika,uuid,time

class RpcClient(object):
    def __init__(self):
        # 建立socket链接
        self.connection=pika.BlockingConnection(
                pika.ConnectionParameters(
                host='192.168.99.100',port=5672
                ))

        # 声明一个管道
        self.channel=self.connection.channel()
        result=self.channel.queue_declare('',exclusive=True)
        self.callback_queue=result.method.queue

        self.channel.basic_consume( self.callback_queue, # 队列名称
                                    self.on_response,  # 回调函数
                                   True) # 扔掉消息
    def on_response(self,ch,method,props,body):
        if self.corr_id == props.correlation_id:
            self.response = body.decode() # 队列的返回

    def call(self, cmd):
        self.response = None
        self.corr_id = str(uuid.uuid4()) # 生成一个随机的id

        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue', # queue名称
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue, # 服务器返回命令到的queue
                                       correlation_id=self.corr_id, # 将id传给服务器端
                                   ),
                                   body=cmd) #消息内容
        while self.response is None: # 如果response为None就一直收消息
            # 没有消息就往下走,有消息就触发on_response回调函数
            self.connection.process_data_events() # 非阻塞版的 start_consuming()
            print('正在等待返回消息.....')
            time.sleep(3)
        return self.response

ci_rpc = RpcClient() # 实例化
while True:
    cmd=input('准备接受指令:')
    response = ci_rpc.call(cmd) # 传入指令
    print(" 指令返回结果: %r" % response)
# server.py
import pika,os
import time

connection = pika.BlockingConnection(
    pika.ConnectionParameters(
    host='192.168.99.100',port=5672))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')


def fib(cmd):
    res_cmd=os.popen(cmd).read()
    return res_cmd


def on_request(ch, method, props, body): # 收到消息,返回命令执行结果
    cmd = body.decode()

    print(" 接收指令:%s" % cmd)
    response = fib(cmd)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to, # 获取客户端的随机生成的queue
                     properties=pika.BasicProperties(correlation_id=props.correlation_id),

                     body=response)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume( 'rpc_queue',
                        on_request)

print(" 等待指令中....")
channel.start_consuming()

未经允许不得转载:作者:鳄鱼君, 转载或复制请以 超链接形式 并注明出处 鳄鱼君
原文地址:《RabbitMQ实现RPC(Remote procedure call)》 发布于2020-06-23

分享到:
赞(0) 赏杯咖啡

评论 抢沙发

7 + 6 =


文章对你有帮助可赏作者一杯咖啡

支付宝扫一扫打赏

微信扫一扫打赏

Vieu4.6主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。
切换注册

登录

忘记密码 ?

您也可以使用第三方帐号快捷登录

Q Q 登 录
微 博 登 录
切换登录

注册