AmqpTemplate,RabbitTemplate
Spring AMQP提供了一个发送和接收消息的操作模板类AmqpTemplate。 AmqpTemplate它定义包含了发送和接收消息等的一些基本的操作功能。RabbitTemplate是AmqpTemplate的一个实现。
RabbitTemplate支持消息的确认与返回,为了返回消息,RabbitTemplate 需要设置mandatory 属性为true,并且CachingConnectionFactory 的publisherReturns属性也需要设置为true。返回的消息会根据它注册的RabbitTemplate.ReturnCallback setReturnCallback 回调发送到给客户端,
一个RabbitTemplate仅能支持一个ReturnCallback 。
为了确认Confirms消息, CachingConnectionFactory 的publisherConfirms 属性也需要设置为true,确认的消息会根据它注册的RabbitTemplate.ConfirmCallback setConfirmCallback回调发送到给客户端。一个RabbitTemplate也仅能支持一个ConfirmCallback.
org.springframework.boot
spring-boot-starter-amqp
SpringBoot集成RabbitMQ
server.portspring.application.namerabbitmq-hello-sending
spring.rabbitmq.addresses:ip1:port1,ip2:port2,ip3:port3
spring.rabbitmq.usernamelinpeng
spring.rabbitmq.password
spring.rabbitmq.virtual-host/test
spring.rabbitmq.connection-timeout5s
spring.rabbitmq.publisher-confirmstrue
spring.rabbitmq.publisher-returnstrue
spring.rabbitmq.listener.direct.acknowledge-modemanual
spring.rabbitmq.listener.simple.acknowledge-modemanual
spring.rabbitmq.listener.concurrency //最小消息监听线程数
spring.rabbitmq.listener.max-concurrency //最大消息监听线程数
spring.rabbitmq.listener.simple.prefetch
spring.rabbitmq.listener.simple.auto-startuptrue
spring.rabbitmq.listener.simple.default-requeue-rejected
spring.rabbitmq.template.retry.enabledtrue
spring.rabbitmq.template.retry.initial-interval
spring.rabbitmq.template.retry.max-attempts
spring.rabbitmq.template.retry.max-interval
spring.rabbitmq.template.retry.multiplier
RabbitTemplate
默认一个
RabbitTemplate在RabbitMQ中相当于一个connection,每发送一次消息相当于channel,MQ接收消息后释放channel。每个connection最多支持2048个channel,假如从一个connection同时超过2048个线程并发发送,channel超过2048,会报错
org.springframework.amqp.AmqpResourceNotAvailableException: The channelMax limit is reached. Try later。
测试启动publisher-confirms后,400个线程通过一个
RabbitTemplate并发发送10000消息,同时就可能产生1000左右的channel。因为发送端channel等待confirm回调所以没有释放。10000消息全部发送在几秒内完成,10000消息全部confirm回调完成用时22秒。(测试机/测试服务器配置都很低)
后台管理页面查看connection+channel
此connection中有10个线程并发发送消息,监控到10个channel生成,MQ完成接收后释放channel。如果是publisher-confirms模式,channel会保持到confirm回调完成再释放,
影响并发性能。每个connection最多支持2048个channel。
测试启动publisher-confirms后,500个线程并发发送,部分消息报
AmqpResourceNotAvailableException。400个线程通过一个
RabbitTemplate并发发送10000消息,最高同时就可能产生1000多的channel。因为channel在等待执行confirm回调。10000消息全部发送在几秒内完成,10000消息全部confirm回调完成用时22秒,此时所有channel全部释放。
绑定队列
若在rabbitmq的管理页面手动创建队列和交换机,则可以不再代码中声明
* Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
fanoutExchange
fanoutExchange
fanoutExchange
fanoutExchange
**消息发送者 **
ConfirmCallback :ACK=true仅仅标示消息已被Broker接收到,并不表示已成功投放至消息队列中, ACK=false标示消息由于Broker处理错误,消息并未处理成功。如未找到对应交换机返回ACK=false。
ReturnCallback:当消息发送出去找不到对应路由队列时,将会把消息退回 。如果有任何一个路由队列接收投递消息成功,则不会退回消息。MQ成功接收,但是未找到对应队列触发
通过以上异步确认机制,增加降级、补偿处理。比如发送时保存信息和消息ID,ConfirmCallback 通过ID找到对应信息重发,注意要保证
幂等性。
rabbitTemplate
context
out context
rabbitTemplate
rabbitTemplatecorrelationData confirm cause
confirm
out cause correlationData
out
rabbitTemplate context
obj
obj
obj
obj
obj
out obj
rabbitTemplate obj
message i cause exchange queue
测试发送:
使用Spring默认的rabbitTemplate发送消息,CorrelationData可以重复。
交换机+路由键+消息Object+CorrelationData
rabbitTemplate
在rabbitmq控制台上getmessage查看 ,rabbitTemplate默认发送deliverymode=2消息,已经设置了消息持久化。
测试速度:
测试100个线程同时并发向同一队列发送简单消息(15左右长度的字符串)。从发送到100个消息全部完成ConfirmCallback,用时为600ms左右。此过程不计入消费速度。
400个线程通过一个
RabbitTemplate并发发送10000消息,同时就可能产生1000左右的channel。因为channel等在confirm。10000消息全部发送在几秒内完成,10000消息全部confirm回调完成用时22秒。
测试ConfirmCallback回调:
public void confirm(CorrelationData correlationData, boolean confirm, String cause) ;
confirm==true仅仅标示消息已被Broker接收到,并不表示已成功投放至消息队列中, confirm==false标示消息由于Broker处理错误,消息并未处理成功。如未找到对应交换机返回confirm==false。
在此方法中针对confirm==false的消息实现
降级/补偿处理:重发、本地缓存、计入数据库/Redis等、更新状态.....
测试环境:实例化一个ConfirmCallback接口对象,作为rabbitTemplate共用回调处理对象。
回调测试结果:
1 先发送到MQ的消息,先完成confirm回调。
2 ConfirmCallback默认是由同一个线程执行回调,打印线程名可以看到线程名为【AMQP Connection rabbitmqIp:port】
为了提高效率ConfirmCallback可以把任务提交到线程池,避免阻塞后边的Confirm任务。
3 若发送时没有携带CorrelationData,回调时这里correlationData==null
4.设置消息确认会影响并发性能,每个线程发送生成一个channel,channel会保持到confirm回调完成再释放。因为每个connection最多支持2048个channel,当channel达到2048时,会报错
org.springframework.amqp.AmqpResourceNotAvailableException: The channelMax limit is reached. Try later。
测试ReturnCallback 回调:
public void returnedMessage(Message message, int i, String cause, String exchange, String queue) ;
MQ成功接收消息,但是未找到对应路由键的队列后回调。实现
降级/补偿处理。
测试环境:实例化一个ReturnCallback接口对象,作为rabbitTemplate共用回调处理对象。
回调测试结果:
默认是由同一个线程执行回调,打印线程名可以看到线程名为【AMQP Connection rabbitmqIp:port】
**message=**返回的Message对象中的成员,Body为发送时的消息内容 ,receivedDeliveryMode=PERSISTENT=2 为持久化消息。spring_returned_message_correlation=发送时的CorrelationData
(Body:'String:message' MessageProperties [headers={spring_returned_message_correlation=111}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
**cause=**NO_ROUTE
**exchange、queue **为发送时的配置
消息消费者
设置QOS,避免触发流控机制
#消费者每次从队列获取的消息数量 (默认一次250个)
spring.rabbitmq.listener.simple.prefetch= 5
当QUEUE达到5条Unacked消息时,不会再推送消息给Consumer。查看后台管理器中queue的unacked数量
queues
msg channel message
out msg
channelmessage
out
e
e
out
msg是消息内容,相当于
Message对象中的body。
Message对象的成员:
可以看到有消息信息BODY,发送方生成的消息CorrelationData,还有执行的Method对象(@RabbitListener标注的方法),目标BEAN
备注:我们用注解的方式来接受消息 就不要用 自己创建对象实现ChannelAwareMessageListener的方式来接受消息 这种方式还要去全局里面配置麻烦,直接用@RabbitListener(queues = "hello")最简单
消息确认 因为我在属性配置文件里面开启了ACK确认 所以如果代码没有执行ACK确认 你在RabbitMQ的后台会看到消息会一直留在队列里面未消费掉 只要程序一启动开始接受该队列消息的时候 又会收到
方法参数详解:https://www.cnblogs.com/piaolingzxh/p/5448927.html
channelmessage deliveryTag该消息的index,由发送方生成
multiple:是否批量将一次性ack所有小于deliveryTag的消息。
channelmessage deliveryTag该消息的index
multiple:是否批量将一次性拒绝所有小于deliveryTag的消息。
requeue:被拒绝的是否重新入队列, 放在队首 消息进入绑定的DLX。一定注意:若此消息一直重入队会导致的死循环
channel.basicNack 与 channel.basicReject 的区别在于basicNack可以拒绝多条消息,而basicReject一次只能拒绝一条消息
channelmessage deliveryTag该消息的index
requeue:被拒绝的是否重新入队列。 消息进入绑定的DLX
ShutdownSignalException
1 队列名找不到
2 代码中有ack,但是没有配置手动ACK
消费超时
消费超时,queue中unacked的消息会退回到queue中,且消费者ACK时会失败。
使用@Payload和@Headers注解
queues
body headers
out
outheaders
outbody
@RabbitListener 和 @RabbitHandler 搭配使用
@RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用
@RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型
server.portspring.application.namerabbitmq-hello-sending
spring.rabbitmq.addresses:ip1:port1,ip2:port2,ip3:port3
spring.rabbitmq.usernamelinpeng
spring.rabbitmq.password
spring.rabbitmq.virtual-host/test
spring.rabbitmq.connection-timeout5s
spring.rabbitmq.publisher-confirmstrue
spring.rabbitmq.publisher-returnstrue
spring.rabbitmq.listener.direct.acknowledge-modemanual
spring.rabbitmq.listener.simple.acknowledge-modemanual
spring.rabbitmq.listener.concurrency //最小消息监听线程数
spring.rabbitmq.listener.max-concurrency //最大消息监听线程数
spring.rabbitmq.listener.simple.prefetch
spring.rabbitmq.listener.simple.auto-startuptrue
spring.rabbitmq.listener.simple.default-requeue-rejected
spring.rabbitmq.template.retry.enabledtrue
spring.rabbitmq.template.retry.initial-interval
spring.rabbitmq.template.retry.max-attempts
spring.rabbitmq.template.retry.max-interval
spring.rabbitmq.template.retry.multiplier
0
序列化
当中默认的序列化类为SimpleMessageConverter。
仅仅有调用了convertAndSend方法才会使用对应的MessageConvert进行消息的序列化与反序列化。
SimpleMessageConverter对于要发送的消息体body为字节数组时。不进行处理。
对于假设是String。则将String转成字节数组。
对于假设是Java对象,则使用jdk序列化Serializable将消息转成字节数组。转出来的结果较大,含class类名。类对应方法等信息。因此性能较差。
当使用RabbitMq作为中间件时,数据量比較大,此时就要考虑使用类似Jackson2JsonMessageConverter。hessian等序列化形式。以此提高性能。
使用 JSON 序列化与反序列化
https://www.daimajiaoliu.com/daima/56a7c0bc754cc04
发送
server.portspring.application.namerabbitmq-hello-sending
spring.rabbitmq.addresses:ip1:port1,ip2:port2,ip3:port3
spring.rabbitmq.usernamelinpeng
spring.rabbitmq.password
spring.rabbitmq.virtual-host/test
spring.rabbitmq.connection-timeout5s
spring.rabbitmq.publisher-confirmstrue
spring.rabbitmq.publisher-returnstrue
spring.rabbitmq.listener.direct.acknowledge-modemanual
spring.rabbitmq.listener.simple.acknowledge-modemanual
spring.rabbitmq.listener.concurrency //最小消息监听线程数
spring.rabbitmq.listener.max-concurrency //最大消息监听线程数
spring.rabbitmq.listener.simple.prefetch
spring.rabbitmq.listener.simple.auto-startuptrue
spring.rabbitmq.listener.simple.default-requeue-rejected
spring.rabbitmq.template.retry.enabledtrue
spring.rabbitmq.template.retry.initial-interval
spring.rabbitmq.template.retry.max-attempts
spring.rabbitmq.template.retry.max-interval
spring.rabbitmq.template.retry.multiplier
1
server.portspring.application.namerabbitmq-hello-sending
spring.rabbitmq.addresses:ip1:port1,ip2:port2,ip3:port3
spring.rabbitmq.usernamelinpeng
spring.rabbitmq.password
spring.rabbitmq.virtual-host/test
spring.rabbitmq.connection-timeout5s
spring.rabbitmq.publisher-confirmstrue
spring.rabbitmq.publisher-returnstrue
spring.rabbitmq.listener.direct.acknowledge-modemanual
spring.rabbitmq.listener.simple.acknowledge-modemanual
spring.rabbitmq.listener.concurrency //最小消息监听线程数
spring.rabbitmq.listener.max-concurrency //最大消息监听线程数
spring.rabbitmq.listener.simple.prefetch
spring.rabbitmq.listener.simple.auto-startuptrue
spring.rabbitmq.listener.simple.default-requeue-rejected
spring.rabbitmq.template.retry.enabledtrue
spring.rabbitmq.template.retry.initial-interval
spring.rabbitmq.template.retry.max-attempts
spring.rabbitmq.template.retry.max-interval
spring.rabbitmq.template.retry.multiplier
2
接收
server.portspring.application.namerabbitmq-hello-sending
spring.rabbitmq.addresses:ip1:port1,ip2:port2,ip3:port3
spring.rabbitmq.usernamelinpeng
spring.rabbitmq.password
spring.rabbitmq.virtual-host/test
spring.rabbitmq.connection-timeout5s
spring.rabbitmq.publisher-confirmstrue
spring.rabbitmq.publisher-returnstrue
spring.rabbitmq.listener.direct.acknowledge-modemanual
spring.rabbitmq.listener.simple.acknowledge-modemanual
spring.rabbitmq.listener.concurrency //最小消息监听线程数
spring.rabbitmq.listener.max-concurrency //最大消息监听线程数
spring.rabbitmq.listener.simple.prefetch
spring.rabbitmq.listener.simple.auto-startuptrue
spring.rabbitmq.listener.simple.default-requeue-rejected
spring.rabbitmq.template.retry.enabledtrue
spring.rabbitmq.template.retry.initial-interval
spring.rabbitmq.template.retry.max-attempts
spring.rabbitmq.template.retry.max-interval
spring.rabbitmq.template.retry.multiplier
3
server.portspring.application.namerabbitmq-hello-sending
spring.rabbitmq.addresses:ip1:port1,ip2:port2,ip3:port3
spring.rabbitmq.usernamelinpeng
spring.rabbitmq.password
spring.rabbitmq.virtual-host/test
spring.rabbitmq.connection-timeout5s
spring.rabbitmq.publisher-confirmstrue
spring.rabbitmq.publisher-returnstrue
spring.rabbitmq.listener.direct.acknowledge-modemanual
spring.rabbitmq.listener.simple.acknowledge-modemanual
spring.rabbitmq.listener.concurrency //最小消息监听线程数
spring.rabbitmq.listener.max-concurrency //最大消息监听线程数
spring.rabbitmq.listener.simple.prefetch
spring.rabbitmq.listener.simple.auto-startuptrue
spring.rabbitmq.listener.simple.default-requeue-rejected
spring.rabbitmq.template.retry.enabledtrue
spring.rabbitmq.template.retry.initial-interval
spring.rabbitmq.template.retry.max-attempts
spring.rabbitmq.template.retry.max-interval
spring.rabbitmq.template.retry.multiplier
4
消费者+json反序列化 造成手动ACK配置失效
解决方案: https://www.daimajiaoliu.com/daima/4796ad98110041c
这是springboot集成RabbitMQ的一个大坑。当消费者配置JSON反序列化时,配置文件中的手动ACK会失效,消费者会变成自动ACK模式。
spring.rabbitmq.listener.direct.acknowledge-mode=
manual
,spring.rabbitmq.listener.simple.acknowledge-mode=
manual 配置失效。
解决方法是消费者配置RabbitListenerContainerFactory这个Bean时(见上),设置factory.setAcknowledgeMode(AcknowledgeMode.MANUAL)。把消费者强制转换为手动ACK。
如果配置失效切换为自动ACK,但是代码中又使用channel.basicAck手动ACK。这样会造成双ACK的ERROR,接着信道会重启重连。如下:
server.portspring.application.namerabbitmq-hello-sending
spring.rabbitmq.addresses:ip1:port1,ip2:port2,ip3:port3
spring.rabbitmq.usernamelinpeng
spring.rabbitmq.password
spring.rabbitmq.virtual-host/test
spring.rabbitmq.connection-timeout5s
spring.rabbitmq.publisher-confirmstrue
spring.rabbitmq.publisher-returnstrue
spring.rabbitmq.listener.direct.acknowledge-modemanual
spring.rabbitmq.listener.simple.acknowledge-modemanual
spring.rabbitmq.listener.concurrency //最小消息监听线程数
spring.rabbitmq.listener.max-concurrency //最大消息监听线程数
spring.rabbitmq.listener.simple.prefetch
spring.rabbitmq.listener.simple.auto-startuptrue
spring.rabbitmq.listener.simple.default-requeue-rejected
spring.rabbitmq.template.retry.enabledtrue
spring.rabbitmq.template.retry.initial-interval
spring.rabbitmq.template.retry.max-attempts
spring.rabbitmq.template.retry.max-interval
spring.rabbitmq.template.retry.multiplier
5
unknown delivery tag 1表示当前Channel中找不到delivery-tag=1的消息,其实是这个消息已经自动ACK了,basicAck时就会出错。测试显示,消息并不会丢失而是在出现ERROR异常后走向Nack后重新入队,再多次重复消费后最终ACK成功,严重降低消费者的执行效率。
Delivery Tags投递的标识
当一个消费者向RabbitMQ注册后,RabbitMQ会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它在一个channel中唯一代表了一次投递。delivery tag的唯一标识范围限于channel. delivery tag是单调递增的正整数,客户端获取投递的方法用用dellivery tag作为一个参数。
TestController测试
server.portspring.application.namerabbitmq-hello-sending
spring.rabbitmq.addresses:ip1:port1,ip2:port2,ip3:port3
spring.rabbitmq.usernamelinpeng
spring.rabbitmq.password
spring.rabbitmq.virtual-host/test
spring.rabbitmq.connection-timeout5s
spring.rabbitmq.publisher-confirmstrue
spring.rabbitmq.publisher-returnstrue
spring.rabbitmq.listener.direct.acknowledge-modemanual
spring.rabbitmq.listener.simple.acknowledge-modemanual
spring.rabbitmq.listener.concurrency //最小消息监听线程数
spring.rabbitmq.listener.max-concurrency //最大消息监听线程数
spring.rabbitmq.listener.simple.prefetch
spring.rabbitmq.listener.simple.auto-startuptrue
spring.rabbitmq.listener.simple.default-requeue-rejected
spring.rabbitmq.template.retry.enabledtrue
spring.rabbitmq.template.retry.initial-interval
spring.rabbitmq.template.retry.max-attempts
spring.rabbitmq.template.retry.max-interval
spring.rabbitmq.template.retry.multiplier
6
ACK场景测试
我们把HelloReceiver的ACK确认代码注释掉 ,那消息就算程序收到了, 但是未确认ACK导致消息服务器以为他是未成功消费的,若此时消费者断开则消息返回队列,后续还会再发。
还没有评论,来说两句吧...