1.什么是MQ
消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。
其主要用途:不同进程Process/线程Thread之间通信。
为什么会产生消息队列?有几个原因:
不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;
不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;
MQ框架非常之多,比较流行的有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里开源的RocketMQ。本文主要介绍RabbitMq。
2.RabbitMQ
2.1.RabbitMQ的简介
2.1.1.AMQP
AMQP是消息队列的一个协议。
2.2.官网
2.3.MQ的其他产品
2.4.学习5种队列
2.5.安装文档
3.搭建RabbitMQ环境
3.1.下载
下载地址:http://www.rabbitmq.com/download.html
3.2.windows下安装
3.2.1.安装Erlang
下载:http://www.erlang.org/download/otp_win64_17.3.exe
安装:
3.2.2.安装RabbitMQ
安装完成。
开始菜单里出现如下选项:
3.2.3.启用管理工具
1、双击这里写图片描述
2、进入C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.4.1\sbin输入命令:
rabbitmq-plugins enable rabbitmq_management
这样就启动了管理工具,可以试一下命令:
停止:net stop RabbitMQ
启动:net start RabbitMQ
3、在浏览器中输入地址查看:http://127.0.0.1:15672/
4、使用默认账号登录:guest/ guest
3.3.Linux下安装
3.3.1.安装Erlang
3.3.2.添加yum支持
cd /usr/local/src/
mkdir rabbitmq
cd rabbitmq
wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
rpm --import http://packages.erlang-solutions.com/rpm/erlang_solutions.asc
使用yum安装:
sudo yum install erlang
3.3.3.安装RabbitMQ
上传rabbitmq-server-3.4.1-1.noarch.rpm文件到/usr/local/src/rabbitmq/
安装:
rpm -ivh rabbitmq-server-3.4.1-1.noarch.rpm
3.3.4.启动、停止
service rabbitmq-server start
service rabbitmq-server stop
service rabbitmq-server restart
3.3.5.设置开机启动
chkconfig rabbitmq-server on
3.3.6.设置配置文件
cd /etc/rabbitmq
cp /usr/share/doc/rabbitmq-server-3.4.1/rabbitmq.config.example /etc/rabbitmq/
mv rabbitmq.config.example rabbitmq.config
3.3.7.开启用户远程访问
vi /etc/rabbitmq/rabbitmq.config
注意要去掉后面的逗号。
3.3.8.开启web界面管理工具
rabbitmq-plugins enable rabbitmq_management
service rabbitmq-server restart
3.3.9.防火墙开放15672端口
/sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT
/etc/rc.d/init.d/iptables save
3.4.安装的注意事项
1、推荐使用默认的安装路径
2、系统用户名必须是英文
Win10改名字非常麻烦,具体方法百度
如果安装失败应该如何解决:
1、重装系统 – 不推荐
2、将RabbitMQ安装到linux虚拟机中
a)推荐
3、使用别人安装好的RabbitMQ服务
a)只要给你开通一个账户即可。
b)使用公用的RabbitMQ服务,在192.168.50.22
c)推荐
常见错误:
3.5.安装完成后操作
1、系统服务中有RabbitMQ服务,停止、启动、重启
输入命令rabbitmq-plugins enable rabbitmq_management启用管理插件
查看管理页面
通过默认账户 guest/guest 登录
如果能够登录,说明安装成功。
4.添加用户
4.1.添加admin用户
4.2.用户角色
1、超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
2、监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
3、策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
4、普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
5、其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
4.3.创建Virtual Hosts
看到权限已加:
4.4.管理界面中的功能
5.学习五种队列
5.1.导入my-rabbitmq项目
项目下载地址:
https://download.csdn.net/download/zpcandzhj/10585077
5.2.简单队列
5.2.1.图示
C:消息的消费者
红色:队列
生产者将消息发送到队列,消费者从队列中获取消息。
5.2.2.导入RabbitMQ的客户端依赖
comrabbitmqgroupId
amqpclientartifactId
version
dependency
5.2.3.获取MQ的连接
factory
factory
factory
factory
factory
factory
connection factory
connection
5.2.4.生产者发送消息到队列
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
message
channel QUEUE_NAME message
out message
channel
connection
5.2.5.管理工具中查看消息
点击上面的队列名称,查询具体的队列中的信息:
5.2.6.消费者从队列中获取消息
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
consumer channel
channelQUEUE_NAME consumer
delivery consumer
message delivery
out message
5.3.Work模式
5.3.1.图示
一个生产者、2个消费者。
一个消息只能被一个消费者获取。
5.3.2.消费者1
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
consumer channel
channelQUEUE_NAME consumer
delivery consumer
message delivery
out message
5.3.3.消费者2
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
consumer channel
channelQUEUE_NAME consumer
delivery consumer
message delivery
out message
5.3.4.生产者
向队列中发送100条消息。
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
i i i
message i
channel QUEUE_NAME message
out message
i
channel
connection
5.3.5.测试
测试结果:
1、消费者1和消费者2获取到的消息内容是不同的,同一个消息只能被一个消费者获取。
2、消费者1和消费者2获取到的消息的数量是相同的,一个是消费奇数号消息,一个是偶数。
其实,这样是不合理的,因为消费者1线程停顿的时间短。应该是消费者1要比消费者2获取到的消息多才对。RabbitMQ 默认将消息顺序发送给下一个消费者,这样,每个消费者会得到相同数量的消息。即轮询(round-robin)分发消息。
怎样才能做到按照每个消费者的能力分配消息呢?联合使用 Qos 和 Acknowledge 就可以做到。
basicQos 方法设置了当前信道最大预获取(prefetch)消息数量为1。消息从队列异步推送给消费者,消费者的 ack 也是异步发送给队列,从队列的视角去看,总是会有一批消息已推送但尚未获得 ack 确认,Qos 的 prefetchCount 参数就是用来限制这批未确认消息数量的。设为1时,队列只有在收到消费者发回的上一条消息 ack 确认后,才会向该消费者发送下一条消息。prefetchCount 的默认值为0,即没有限制,队列会将所有消息尽快发给消费者。
个概念
轮询分发 :使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通过增加工作者(消费者)来解决这一问题,使得系统的伸缩性更加容易。在默认情况下,RabbitMQ将逐个发送消息到在序列中的下一个消费者而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin(轮询)。
公平分发 :虽然上面的分配法方式也还行,但是有个问题就是:比如:现在有2个消费者,所有的奇数的消息都是繁忙的,而偶数则是轻松的。按照轮询的方式,奇数的任务交给了第一个消费者,所以一直在忙个不停。偶数的任务交给另一个消费者,则立即完成任务,然后闲得不行。而RabbitMQ则是不了解这些的。这是因为当消息进入队列,RabbitMQ就会分派消息。它不看消费者为应答的数目,只是盲目的将消息发给轮询指定的消费者。
为了解决这个问题,我们使用basicQos( prefetchCount = 1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。
还有一点需要注意,使用公平分发,必须关闭自动应答,改为手动应答。
5.4.Work模式的“能者多劳”
打开上述代码的注释:
channel
channeldelivery
同时改为手动确认:
factory
factory
factory
factory
factory
factory
connection factory
connection
0
测试:
消费者1比消费者2获取的消息更多。
5.5.消息的确认模式
消费者从队列中获取消息,服务端如何知道消息已经被消费呢?
模式1:自动确认
只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费。
模式2:手动确认
消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。
手动模式:
自动模式:
5.6.订阅模式
5.6.1.图示
解读:
1、1个生产者,多个消费者
2、每一个消费者都有自己的一个队列
3、生产者没有将消息直接发送到队列,而是发送到了交换机
4、每个队列都要绑定到交换机
5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的
注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费
5.6.2.消息的生产者(看作是后台系统)
向交换机中发送消息。
factory
factory
factory
factory
factory
factory
connection factory
connection
1
注意:消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中。
5.6.3.消费者1(看作是前台系统)
factory
factory
factory
factory
factory
factory
connection factory
connection
2
5.6.4.消费者2(看作是搜索系统)
factory
factory
factory
factory
factory
factory
connection factory
connection
2
5.6.5.测试
测试结果:
同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。
在管理工具中查看队列和交换机的绑定关系:
5.7.路由模式
5.7.1.图示
5.7.2.生产者
5.7.3.消费者1(假设是前台系统)
5.7.4.消费2(假设是搜索系统)
5.8.主题模式(通配符模式)
5.8.1.图示
同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。
5.8.2.生产者
factory
factory
factory
factory
factory
factory
connection factory
connection
1
5.8.3.消费者1(前台系统)
factory
factory
factory
factory
factory
factory
connection factory
connection
2
5.8.4.消费者2(搜索系统)
factory
factory
factory
factory
factory
factory
connection factory
connection
2
6.Spring-Rabbit
6.1.Spring项目
http://spring.io/projects
6.2.简介
6.3.使用
6.3.1.消费者
factory
factory
factory
factory
factory
factory
connection factory
connection
7
6.3.2.生产者
factory
factory
factory
factory
factory
factory
connection factory
connection
8
6.3.3.配置文件
1、定义连接工厂
factory
factory
factory
factory
factory
factory
connection factory
connection
9
2、定义模板(可以指定交换机或队列)
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
message
channel QUEUE_NAME message
out message
channel
connection
0
3、定义队列、交换机、以及完成队列和交换机的绑定
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
message
channel QUEUE_NAME message
out message
channel
connection
1
4、定义监听
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
message
channel QUEUE_NAME message
out message
channel
connection
2
5、定义管理,用于管理队列、交换机等:
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
message
channel QUEUE_NAME message
out message
channel
connection
3
完整配置文件rabbitmq-context.xml
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
message
channel QUEUE_NAME message
out message
channel
connection
4
6.4.持久化交换机和队列
持久化:将交换机或队列的数据保存到磁盘,服务器宕机或重启之后依然存在。
非持久化:将交换机或队列的数据保存到内存,服务器宕机或重启之后将不存在。
非持久化的性能高于持久化。
如何选择持久化?非持久化? – 看需求。
7.Spring集成RabbitMQ一个完整案例
创建三个系统A,B,C
A作为生产者,B、C作为消费者(B,C作为web项目启动)
项目下载地址:https://download.csdn.net/download/zpcandzhj/10585077
7.1.在A系统中发送消息到交换机
7.1.1.导入依赖
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
message
channel QUEUE_NAME message
out message
channel
connection
5
7.1.2.队列和交换机的绑定关系
实现:
1、在配置文件中将队列和交换机完成绑定
2、可以在管理界面中完成绑定
a)绑定关系如果发生变化,需要修改配置文件,并且服务需要重启
b)管理更加灵活
c)更容易对绑定关系的权限管理,流程管理
本例选择第2种方式
7.1.3.配置
rabbitmq-context.xml
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
message
channel QUEUE_NAME message
out message
channel
connection
6
7.1.4.消息内容
方案:
1、消息内容使用对象做json序列化发送
a)数据大
b)有些数据其他人是可能用不到的
2、发送特定的业务字段,如id、操作类型
7.1.5.实现
生产者MsgSender.java:
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
message
channel QUEUE_NAME message
out message
channel
connection
7
7.2.在B系统接收消息
7.2.1.导入依赖
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
message
channel QUEUE_NAME message
out message
channel
connection
8
7.2.2.配置
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
message
channel QUEUE_NAME message
out message
channel
connection
9
7.2.3.具体处理逻辑
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
consumer channel
channelQUEUE_NAME consumer
delivery consumer
message delivery
out message
0
7.2.4.在界面管理工具中完成绑定关系
选中定义好的交换机(exchange)
2)fanout
3)topic
7.3.在C系统中接收消息
(和B系统配置差不多,无非是Q名和Q对应的处理逻辑变了)
7.3.1.配置
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
message
channel QUEUE_NAME message
out message
channel
connection
9
7.3.2.处理业务逻辑
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
consumer channel
channelQUEUE_NAME consumer
delivery consumer
message delivery
out message
0
7.3.3.在管理工具中绑定队列和交换机
见7.2.4
7.3.4.测试
分别启动B,C两个web应用,然后运行A的MsgSender主方法发送消息,分别测试fanout、direct、topic三种类型
8.Springboot集成RabbitMQ
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
consumer channel
channelQUEUE_NAME consumer
delivery consumer
message delivery
out message
3
8.1.简单队列
1、配置pom文件,主要是添加spring-boot-starter-amqp的支持
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
consumer channel
channelQUEUE_NAME consumer
delivery consumer
message delivery
out message
4
2、配置application.properties文件
配置rabbitmq的安装地址、端口以及账户信息
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
consumer channel
channelQUEUE_NAME consumer
delivery consumer
message delivery
out message
5
3、配置队列
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
consumer channel
channelQUEUE_NAME consumer
delivery consumer
message delivery
out message
6
4、发送者
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
consumer channel
channelQUEUE_NAME consumer
delivery consumer
message delivery
out message
7
5、接收者
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
consumer channel
channelQUEUE_NAME consumer
delivery consumer
message delivery
out message
8
6、测试
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
consumer channel
channelQUEUE_NAME consumer
delivery consumer
message delivery
out message
9
8.2.多对多使用(Work模式)
注册两个Receiver:
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
consumer channel
channelQUEUE_NAME consumer
delivery consumer
message delivery
out message
8
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
consumer channel
channelQUEUE_NAME consumer
delivery consumer
message delivery
out message
1
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
consumer channel
channelQUEUE_NAME consumer
delivery consumer
message delivery
out message
2
8.3.Topic Exchange(主题模式)
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
consumer channel
channelQUEUE_NAME consumer
delivery consumer
message delivery
out message
3
首先对topic规则配置,这里使用两个队列(消费者)来演示。
1)配置队列,绑定交换机
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
consumer channel
channelQUEUE_NAME consumer
delivery consumer
message delivery
out message
4
2)创建2个消费者
q_topic_message 和q_topic_messages
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
consumer channel
channelQUEUE_NAME consumer
delivery consumer
message delivery
out message
8
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
consumer channel
channelQUEUE_NAME consumer
delivery consumer
message delivery
out message
8
3)消息发送者(生产者)
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
consumer channel
channelQUEUE_NAME consumer
delivery consumer
message delivery
out message
7
send1方法会匹配到topic.#和topic.message,两个Receiver都可以收到消息,发送send2只有topic.#可以匹配所有只有Receiver2监听到消息。
4)测试
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
consumer channel
channelQUEUE_NAME consumer
delivery consumer
message delivery
out message
8
8.4.Fanout Exchange(订阅模式)
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
consumer channel
channelQUEUE_NAME consumer
delivery consumer
message delivery
out message
9
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
consumer channel
channelQUEUE_NAME consumer
delivery consumer
message delivery
out message
0
2)创建3个消费者
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
consumer channel
channelQUEUE_NAME consumer
delivery consumer
message delivery
out message
1
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
consumer channel
channelQUEUE_NAME consumer
delivery consumer
message delivery
out message
1
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
consumer channel
channelQUEUE_NAME consumer
delivery consumer
message delivery
out message
1
3)生产者
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
consumer channel
channelQUEUE_NAME consumer
delivery consumer
message delivery
out message
4
4)测试
QUEUE_NAME
argv
connection
channel connection
channelQUEUE_NAME
consumer channel
channelQUEUE_NAME consumer
delivery consumer
message delivery
out message
5
结果如下,三个消费者都收到消息:
AReceiver : hi, fanout msg
CReceiver : hi, fanout msg
BReceiver : hi, fanout msg
9.总结
使用MQ实现商品数据的同步优势:
1、降低系统间耦合度
2、便于管理数据的同步(数据一致性)
还没有评论,来说两句吧...