MQ 07 AMQP 高级消息队列协议

2019/08/14 MQ 共 1511 字,约 5 分钟
MiniPa

AMQP

Advanced Message Queue Protocol 高级消息队列协议,应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

AMQP中消息的路由过程和 JMS 存在一些差别,AMQP中增加了ExchangeBinging的角色

生产者把消息发布到 Exchange 上,
消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到哪个队列

amqp

Channel模式 和 Connection模式

1.这是 spring-amqp 中的概念
Sharing of the connection is possible since the "unit of work" for messaging with AMQP 
is actually a "channel"
 (in some ways, this is similar to the relationship between a Connection and a Session in JMS).

Channel:信道

序运行期间 ConnectionFactory 会维护着一个 Connection,所有的操作都会使用这个 Connection,一个Connection 中可以有多个 Channel,操作 rabbitmq 之前都必须先获取到一个 Channel,否则就会阻塞(可以通过 setChannelCheckoutTimeout() 设置等待时间)这些Channel会被缓存,(缓存的数量可以通过setChannelCacheSize() 设置)

Connection:连接

允许创建多个 Connection,会缓存一定数量的 Connection,每个 Connection 中同样会缓存一些 Channel,除了可以有多个 Connection,其它都跟 CHANNEL 模式一样。 The use of separate connections,might be useful in some environments,such as consuming from an HA cluster,in conjunction with a load balancer,to connect to different cluster members。

setChannelCacheSize:

设置每个 Connection 中(注意是每个 Connection)可以缓存的 Channel 数量,注意只是缓存的Channel数量,不是 Channel 的数量上限。

操作 rabbitmq 之前(send/receive message等)要先获取到一个 Channel,获取Channel时会先从缓存中找闲置的 Channel,如果没有则创建新的 Channel。

当 Channe l数量大于缓存数量时,多出来没法放进缓存的会被关闭。

注意,改变这个值不会影响已经存在的Connection,只影响之后创建的Connection。

有时会出现connection closed错误。rabbitTemplate作者对于这种问题的解决方案,他给的方案很简单,单纯的增加connection数:

setChannelCheckoutTimeout:
当这个值大于0时,channelCacheSize不仅是缓存数量,同时也会变成数量上限,
从缓存获取不到可用的Channel时,不会创建新的Channel,
会等待这个值设置的毫秒数,到时间仍然获取不到可用的Channel会抛出AmqpTimeoutException异常

同时,在CONNECTION模式,这个值也会影响获取Connection的等待时间,超时获取不到Connection也会抛出AmqpTimeoutException异常。

参考:

文档信息

Search

    Table of Contents