RabbitMQ消息队列的环境配置和简单使用

鳄鱼君

发表文章数:642

Vieu四代商业主题

高扩展、安全、稳定、响应式布局多功能模板。

¥69 现在购买
首页 » Python » RabbitMQ消息队列的环境配置和简单使用

RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的。

RabbitMQ介绍

RabbitMQ特点:

  1. 开源、性能优秀,稳定性保障
  2. 提供可靠性消息投递模式、返回模式
  3. 与Spring AMQP完美整合,API丰富
  4. 集群模式丰富,表达式配置,HA模式,镜像队列模型
  5. 保证数据不丢失的前提做到高可靠性、可用性

MQ典型应用场景:

  1. 异步处理。把消息放入消息中间件中,等到需要的时候再去处理。
  2. 流量削峰。例如秒杀活动,在短时间内访问量急剧增加,使用消息队列,当消息队列满了就拒绝响应,跳转到错误页面,这样就可以使得系统不会因为超负载而崩溃。
  3. 日志处理
  4. 应用解耦。假设某个服务A需要给许多个服务(B、C、D)发送消息,当某个服务(例如B)不需要发送消息了,服务A需要改代码再次部署;当新加入一个服务(服务E)需要服务A的消息的时候,也需要改代码重新部署;另外服务A也要考虑其他服务挂掉,没有收到消息怎么办?要不要重新发送呢?是不是很麻烦,使用MQ发布订阅模式,服务A只生产消息发送到MQ,B、C、D从MQ中读取消息,需要A的消息就订阅,不需要了就取消订阅,服务A不再操心其他的事情,使用这种方式可以降低服务或者系统之间的耦合。

Docker安装

Docker提供一种安全、可重复的环境中自动部署软件的方式,鳄鱼君Ba这里使用Docker进行安装RabbitMQ。进入官方下载地址,选择使用Docker安装,跳转到dockerhub查看镜像。

选择3.8.0-beta.4-management进行安装,带有management是含有管理界面的。这里可以直接在docker中拉取镜像和启动,如果不知道如何使用docker和镜像加速的参考:Docker容器的安装以及常见错误 Docker如何进行镜像加速

查询rabbitmq镜像:docker search rabbitmq:management
拉取rabbitmq镜像:docker pull rabbitmq:management

docker run -d --hostname my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3.8.0-beta.4-management

--hostname:指定容器主机名称
my-rabbit:指定容器名称
-p:将mq端口号映射到本地
15672:控制台端口号 5672:应用访问端口号

设置为自启
docker run -d --hostname my-rabbit -d --restart=unless-stopped

备选启动同时设置用户和密码

docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management

查看本地镜像:docker images

打开浏览器访问:http://192.168.99.100:15672/,进行填写账号密码:默认账号密码都是guest.到此,RabbitMQ已经安装并运行起来了。记得不要关闭docker服务器哟!

RabbitMQ消息队列的环境配置和简单使用

这样的链接的话,host就是192.168.99.100,port就是5672,需要注意!

本地安装

安装: http://www.rabbitmq.com/install-standalone-mac.html,3.8.5版本的已经包含erlang,不需要单独安装,速度比较慢,你还是使用docker吧,这里不过多介绍!

安装python rabbitMQ module 使用:pip install pika命令即可,源码:https://pypi.python.org/pypi/pika。目前pika版本为1.1.0,所以说代码可能有些变化。

实现最简单的队列通信

鳄鱼君Ba这里使用docker进行,所以host和port都需要注意,不是本地的。一个send.py用于发送,receive.py接收:

# send.py
import pika

# 建立socket
connection=pika.BlockingConnection(
    pika.ConnectionParameters(
        host='192.168.99.100',port=5672
    ))
# 声明一个管道
channel=connection.channel()

# 在管道中声明queue
channel.queue_declare(queue='eyujun')

# 通过管道
channel.basic_publish(exchange='',
                      routing_key='eyujun',# queue名字
                      body='Hello eyujun!')
print('发送 "Hello eyujun!"')
connection.close()
# receive.py
import pika
# 建立socket链接
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
    host='192.168.99.100',port=5672
    ))
# 声明管道
channel = connection.channel()
# 在管道中声明queue
channel.queue_declare(queue='eyujun')

def callback(ch, method, properties, body):
    print(ch,'\n',method,'\n',properties) # 可以自己打印看看是什么
    print(" 收到消息: %r" % body)
channel.basic_consume('eyujun', # 队列名称
                        callback, # 消费消息的函数,如果收到消息,就调用callback函数处理消息
                      )
print(' 正在等待消息,要退出,请按CTRL+C:')

# 开始接收消息
channel.start_consuming()

在发送端send和接收端receive都需要声明管道并在管道中声明queue,我们无法确定哪一端先启动,如果没有声明管道的一段先启动就会报错。

消息队列

RabbitMQ会默认把p发的消息依次分发给各个receive端,跟负载均衡差不多。现在创建多个receive端接收send端发送的消息,启动3个receive端,1个send端,多次send消息,receive会一次接收到消息,也就是说一个receive只接收一个。代码还是上面的代码:

# send.py
import pika

# 建立socket
connection=pika.BlockingConnection(
    pika.ConnectionParameters(
        host='192.168.99.100',port=5672
    ))
# 声明一个管道
channel=connection.channel()

# 在管道中声明queue
channel.queue_declare(queue='eyujun')

# 通过管道
channel.basic_publish(exchange='',
                      routing_key='eyujun',# queue名字
                      body='Hello eyujun!')
print('发送 "Hello eyujun!"')
connection.close()
# receive.py

import pika
# 建立socket链接
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
    host='192.168.99.100',port=5672
    ))
# 声明管道
channel = connection.channel()
# 在管道中声明queue
channel.queue_declare(queue='eyujun')

def callback(ch, method, properties, body):
    print(ch,'\n',method,'\n',properties)
    print(" 收到消息: %r" % body)
channel.basic_consume('eyujun', # 队列名称
                        callback, # 消费消息的函数,如果收到消息,就调用callback函数处理消息
                     )
print(' 正在等待消息,要退出,请按CTRL+C:')

# 开始接收消息
channel.start_consuming()

pycharm启动多个receive.py文件,需要在Run菜单中找到Edit Configurations并勾选Allow parallel run

现在假设receive端在接收消息的时候需要处理20s,可以理解为花费20s,在这个时间内宕机或者断电了,消息就没有收到。RabbitMQ默认的处理方式是会自动确认消息是否处理完。其中channel.basic_consume的第三个参数默认为False,如果修改为True,就会扔掉消息!

我们可以将channel.basic_consume的第三个参数修改为True模拟一下,只需要在receive.py文件的callback函数中sleep即可。启动send和receive,在receive等待的20内断掉,这个消息就没了!

# receive.py
import time
import pika
# 建立socket链接
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
    host='192.168.99.100',port=5672
    ))
# 声明管道
channel = connection.channel()
# 在管道中声明queue
channel.queue_declare(queue='eyujun')

def callback(ch, method, properties, body): # 回调函数
    time.sleep(3)
    print(" 收到消息: %r" % body)

channel.basic_consume('eyujun', # 队列名称
                        callback, # 消费消息的函数,如果收到消息,就调用callback函数处理消息
                      True)
print(' 正在等待消息,要退出,请按CTRL+C:')

# 开始接收消息
channel.start_consuming()

send.py文件不变,这时候你就会发现消息会被扔掉!

未经允许不得转载:作者:鳄鱼君, 转载或复制请以 超链接形式 并注明出处 鳄鱼君
原文地址:《RabbitMQ消息队列的环境配置和简单使用》 发布于2020-06-19

分享到:
赞(0) 赏杯咖啡

评论 抢沙发

8 + 1 =


文章对你有帮助可赏作者一杯咖啡

支付宝扫一扫打赏

微信扫一扫打赏

Vieu4.6主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。
切换注册

登录

忘记密码 ?

您也可以使用第三方帐号快捷登录

Q Q 登 录
微 博 登 录
切换登录

注册