RabbitMQ实现消息持久化及广播模式

鳄鱼君

发表文章数:525

Vieu四代商业主题

高扩展、安全、稳定、响应式布局多功能模板。

¥69 现在购买
首页 » Python教程 » RabbitMQ实现消息持久化及广播模式

RabbitMQ消息队列的环境配置和简单使用这篇文章中,我们已经知道的RabbitMQ的简单使用。现在我想要看一下消息队列的个数该怎么办?

鳄鱼君Ba使用的是docker,启动后再浏览器访问:http://192.168.99.100:15672,输入用户名和密码就可以看到相关的信息,包括队列的个数

RabbitMQ实现消息持久化及广播模式

如果安装在本地,在RabbitMQ的sbin目录输入:rabbitmqctl.bat list_queues就可以了,这里不多说明!

图片上面的ready是准备接受的消息数量,unacked是已接收的消息数量,total是消息总数。按照前面的代码,我们重新创建个队列eyujun1

# send.py
import pika

# 建立socket
connection=pika.BlockingConnection(
    pika.ConnectionParameters(
        host='192.168.99.100',port=5672
    ))
# 声明一个管道
channel=connection.channel()

# 在管道中声明queue
channel.queue_declare(queue='eyujun1')

# 通过管道
channel.basic_publish(exchange='',
                      routing_key='eyujun1',# queue名字
                      body='Hello eyujun!')
print('发送 "Hello eyujun!"')
connection.close()
# receive.py
import time
import pika
# 建立socket链接
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
    host='192.168.99.100',port=5672
    ))
# 声明管道
channel = connection.channel()
# 在管道中声明queue
channel.queue_declare(queue='eyujun1')

def callback(ch, method, properties, body): # 回调函数
    time.sleep(3)
    print(" 收到消息: %r" % body)

channel.basic_consume('eyujun1', # 队列名称
                        callback, # 消费消息的函数,如果收到消息,就调用callback函数处理消息
                      )
print(' 正在等待消息,要退出,请按CTRL+C:')

# 开始接收消息
channel.start_consuming()

以上代码测试的效果:先启动一个receive和send,多次send,receive就会接收多次,这时候所有的消息都会保存,重新启一个receive就会接收全部的消息,也就是原来的receive已经接收到所有的消息了,重启的receive仍然可以接收到所有的消息。这就是receive端没有进行确认,消息收到需要确认,不然消息会一直存在。只需要修改receive端的callback函数即可:

def callback(ch, method, properties, body): # 回调函数
    time.sleep(3)
    print(" 收到消息: %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

再次测试,你会发现消息就会进行确认,Ready、Unacked、 Total一直都是0.

消息持久化

现在假设RabbitMQ服务被关掉,然后再重启,查看一下queues的数量,这时候你会发现消息都丢了!那么我现在想让RabbitMQ把这个队列永久的保存下来,怎么做呢?只需要在声明队列的时候做一些处理:

# 声明管道
channel = connection.channel()
# 在管道中声明queue
channel.queue_declare(queue='eyujun2',durable=True)

send和receive都需要修改。这种情况只会持久化队列,队列里面的消息还是会丢失!这Tm有什么用,我们再次修改一下代码,在send端

# 通过管道
channel.basic_publish(exchange='',
                      routing_key='eyujun2',# queue名字
                      body='Hello eyujun!',
                      properties=pika.BasicProperties(
                          delivery_mode=2, # 持久化消息
                      ))

广播模式

RabbitMQ默认处理消息的方式是采用轮流接收,前面我们已经知道,一个send和多个receive端,receive是轮流接收消息的。RabbitMQ也可以按照处理器的性能,来发送消息,处理的慢就少发点消息,处理的快就多发点消息。

现在只需要求改receive端即可:

# receive.py
import time
import pika
# 建立socket链接
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
    host='192.168.99.100',port=5672
    ))
# 声明管道
channel = connection.channel()
# 在管道中声明queue
channel.queue_declare(queue='eyujun2',durable=True)

def callback(ch, method, properties, body): # 回调函数
    time.sleep(3)
    print(" 收到消息: %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)  # 最多只能处理1条消息
channel.basic_consume('eyujun2', # 队列名称
                        callback, # 消费消息的函数,如果收到消息,就调用callback函数处理消息
                      )
print(' 正在等待消息,要退出,请按CTRL+C:')

# 开始接收消息
channel.start_consuming()

模拟方式也很简单,新建一个receive2.py,在接收消息的时候sleep一下,receive.py不错处理,然后send多次消息,观察两者接受的情况!

现在我想要实现一个广播的效果,send发送的消息,每个receive都可以接收到。这时候就需要使用exchange,可以理解为转发器。Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息

fanout广播模式

fanout: 所有bind到此exchange的queue都可以接收消息,需要先启动receive端,再启动send端,跟收音机差不多,消息是实时发送的,没有启动,就收不到。

# send.py
import pika

# 建立socket
connection=pika.BlockingConnection(
    pika.ConnectionParameters(
        host='192.168.99.100',port=5672
    ))
# 声明一个管道
channel=connection.channel()

channel.exchange_declare(exchange='logs', # exchange转发器名字
                         exchange_type='fanout')
message='Hello eyujun!'
# 通过管道
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message,
                      )
print('发送 "Hello eyujun!"')
connection.close()
# receive.py
import time
import pika
# 建立socket链接
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
    host='192.168.99.100',port=5672
    ))
# 声明管道
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

# 不指定queue的名字,就会随机生成一个名字
# exclusive=True接收到消息断开后,就会将其删除
result=channel.queue_declare('',exclusive=True)
queue_name=result.method.queue

channel.queue_bind(exchange='logs', # 绑定到exchange转发器
                   queue=queue_name)

def callback(ch, method, properties, body): # 回调函数
    time.sleep(3)
    print(" 收到消息: %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)


print(' 正在等待消息,要退出,请按CTRL+C:')

channel.basic_consume(queue_name,
                    callback,
                    )
# 开始接收消息
channel.start_consuming()

direct广播模式

direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息。RabbitMQ支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

# send.py
import pika,sys

# 建立socket
connection=pika.BlockingConnection(
    pika.ConnectionParameters(
        host='192.168.99.100',port=5672
    ))
# 声明一个管道
channel=connection.channel()

channel.exchange_declare(exchange='eyujun', # exchange转发器名字
                         exchange_type='direct')

severity=sys.argv[1:] if len(sys.argv) > 1 else 'info'  # 指定名称

message=' '.join(sys.argv[2:]) or 'Hello eyujun!'
# 通过管道
channel.basic_publish(exchange='eyujun',
                      routing_key=severity, # 指定消息发送的名称
                      body=message,
                      )
print('发送 %r:%r"' % (severity, message))
connection.close()
# receive.py
import time
import pika,sys
# 建立socket链接
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
    host='192.168.99.100',port=5672
    ))
# 声明管道
channel = connection.channel()

channel.exchange_declare(exchange='eyujun',
                         exchange_type='direct')


result=channel.queue_declare('',exclusive=True)
queue_name=result.method.queue

serverities =sys.argv[1:]  #sys.argv[1:] 代表为 Python 程序提供的第一个参数
if not serverities: # 如果没有指定名称,则退出
    sys.stderr.write('指定名称:【info】【warning】【error】') # 输入错误信息
    sys.exit(1) # 退出
print(serverities) # list

for serverity in serverities: # 绑定每一个名称
    channel.queue_bind(exchange='eyujun', # 绑定到exchange转发器
                        queue=queue_name,
                       routing_key=serverity)

def callback(ch, method, properties, body): # 回调函数
    time.sleep(3)
    print(" 收到消息: %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)


print(' 正在等待消息,要退出,请按CTRL+C:')

channel.basic_consume(queue_name,
                    callback,
                    )
# 开始接收消息
channel.start_consuming()
python send.py  warning from warning # 发到warning
python send.py error from error # 发到error

python receive.py info warning # 收info和warning
python receive.py error warning # 收error和warning

这时候我们呢运行代码就需要在命令行进行,并且需要指定关键字名称。send默认的关键字为info,输入命令:python receive.py info就可以收到info的消息。绑定哪个 哪个就收,具体自己尝试!

topic更细致的消息过滤

topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息

表达式符号说明:#代表一个或多个字符,*代表任何字符
 例:#.a会匹配a.a,aa.a,aaa.a等
     *.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout 
# send.py
import pika,sys

# 建立socket
connection=pika.BlockingConnection(
    pika.ConnectionParameters(
        host='192.168.99.100',port=5672
    ))
# 声明一个管道
channel=connection.channel()

channel.exchange_declare(exchange='eyujun1', # exchange转发器名字
                         exchange_type='topic')

routing_key=sys.argv[1] if len(sys.argv) > 1 else 'a.info'  # 指定名称

message=' '.join(sys.argv[2:]) or 'Hello eyujun!'
# 通过管道
channel.basic_publish(exchange='eyujun1',
                      routing_key=routing_key, # 指定消息发送的名称
                      body=message,
                      )
print('发送 %r:%r"' % (routing_key, message))
connection.close()
# receive.py
import time
import pika,sys
# 建立socket链接
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
    host='192.168.99.100',port=5672
    ))
# 声明管道
channel = connection.channel()

channel.exchange_declare(exchange='eyujun1',
                         exchange_type='topic')


result=channel.queue_declare('',exclusive=True)
queue_name=result.method.queue

serverities =sys.argv[1:]  #sys.argv[1:] 代表为 Python 程序提供的第一个参数
if not serverities: # 如果没有指定名称,则退出
    sys.stderr.write('指定名称:【info】【warning】【error】') # 输入错误信息
    sys.exit(1) # 退出
print(serverities) # list

for serverity in serverities: # 绑定每一个名称
    channel.queue_bind(exchange='eyujun1', # 绑定到exchange转发器
                        queue=queue_name,
                       routing_key=serverity)

def callback(ch, method, properties, body): # 回调函数
    time.sleep(3)
    print(" 收到消息 %r:%r" % (method.routing_key, body))
    ch.basic_ack(delivery_tag=method.delivery_tag)


print(' 正在等待消息,要退出,请按CTRL+C:')

channel.basic_consume(queue_name,
                    callback,
                    )
# 开始接收消息
channel.start_consuming()
# 启动receive.py
python receive.py *.error mysql.*  # 收所有error结尾的和mysql开头的
python receive.py *.info   #收所有info结尾的

# send.py
python send.py # 默认a.info
python send.py t.error #  *.error收到
python send.py mysql.info #  两者都收到 

python receive.py #   收所有的所有

未经允许不得转载:作者:鳄鱼君, 转载或复制请以 超链接形式 并注明出处 鳄鱼君
原文地址:《RabbitMQ实现消息持久化及广播模式》 发布于2020-06-22

分享到:
赞(0) 赏杯咖啡

评论 抢沙发

9 + 9 =


文章对你有帮助可赏作者一杯咖啡

支付宝扫一扫打赏

微信扫一扫打赏

Vieu4.6主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。
切换注册

登录

忘记密码 ?

您也可以使用第三方帐号快捷登录

Q Q 登 录
微 博 登 录
切换登录

注册