RabbitMQ考题

1、为什么使用MQ

(1)异步处理

异步常见于以下场景:发邮件、发短信,还有数据的异构都应用到了MQ异步的特性

(2)应用解耦

比如商品服务新增了1个商品,该商品信息在库存服务、活动服务、代理服务中都需要,那么我们就需要在商品服务中调用三个服务的接口。

但这其中,还要考虑如果某个接口挂了,那整个添加商品也会导致失败,而且如果这时再有两个服务也要添加商品的数据,是不是又要在商品服务中新增调用代码。

这显然是不太合理的,就可以通过引入MQ来解决这个问题,一个发布订阅消息模型就可以解决了,需要的就订阅,不需要的就不订阅。进而将商品服务和其他服务解耦。

(3)流量削峰

在某段时间,请求数量激增,超过了MySQL处理上线,如果这时不将请求直接打到MySQL,而是存放到RabbitMQ,然后根据MySQL每秒钟处理上限拉取消息,让MySQL进行处理,这样就可以保证MySQL不会挂掉,也就保证了业务不会挂掉。

虽然这样会导致一定量的请求积压,但是随着请求高峰过去,积压的请求很快就会被处理掉。

(4)日志处理

kafka是日志处理的标志性建筑,超高吞吐量使得其能有优秀完成实时日志采集、分析

 

2、RabbitMQ 常见的工作模式有哪些

(1)simple模式(simple)

生产者生产消息,将消息放入队列,消费者监听消息队列,如果队列中有消息就消费掉,消息被拿走后会自动从队列中删除。消费者可以设置ACK确认,消费者处理完后发送ack给队列,队列删除消息。

简单队列的不足:耦合性过高,生产者一一对应消费者,如果有多个消费者想消费队列中消息就无法实现了。

(2)工作模式(work)

生产者将消息放入队列,消费者可以有多个。

一般有两种模式:

  • 轮询分发(round-robin):MQ不管两个消费者谁忙,数据总是你一个我一个,MQ 给两个消费发数据的时候是不知道消费者性能的,默认就是雨露均沾。此时 autoAck = true。

  • 公平分发:要让消费者消费完毕一条数据后就告知MQ,再让MQ发数据即可。自动应答要关闭,实现按照消费者性能消费。

(3)发布订阅模式(fanout)

类似公众号的订阅跟发布,属于 fanout 模式,不需要指routingKey,我们只需要把队列绑定到交换机,并指定交换机类型为fanout。

在该模式下,生产者并不是直接操作队列,而是将消息发送给交换机,然后由交换机将消息发送给当前与之绑定的队列。

(4)路由模式(direct)

此时生产者发送数据到MQ的时候会指定routingKey,消费者也会指定routingKey,只有key一样消息才会被传送到队列中

(5)主题模式(topic)

将路由键跟某个模式匹配,生产者会带 routingKey,但是消费者的MQ会带模糊routingKey

 

3、RabbitMQ中,消息是怎么路由的

生成者生产消息后消息带有 routing Key,通过routing Key 消费者队列被绑定到交换器上,消息到达交换器根据交换器规则匹配,常见交换器如下:

  • fanout:如果交换器收到消息,将会广播到所有绑定的队列上

  • direct:如果路由键完全匹配,消息就被投递到相应的队列

  • topic:可以使来自不同源头的消息能够到达同一个队列。使用 topic 交换器时,可以使用通配符

 

4、RabbitMQ 消息基于什么传输

信道是生产消费者与rabbit通信的渠道,生产者 publish 或是消费者 subscribe 一个队列都是通过信道来通信的。

信道是建立在TCP连接上的虚拟连接,就是说 RabbitMQ 在一条TCP上建立成百上千个信道来达到多个线程处理,这个TCP被多个线程共享,每个线程对应一个信道,信道在RabbitMQ 都有唯一的ID来保证信道私有性,对应唯一的线程使用。

用信道而不用 TCP 的原因是由于 TCP 连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。

 

5、如何保证 RabbitMQ 消息不丢失

消息丢失主要分为 生产者丢失消息、消息列表丢失消息、消费者丢失消息。

(1)保证生产者不丢失消息

通过开启 confirm模式。

在生产者那里设置开启confirm模式之后,每次消息到达MQ都会被分配一个唯一的id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack消息,告诉你说这个消息ok了。如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack接口,告诉你这个消息接收失败,你可以重试。

而且你可以结合这个机制自己在内存里维护每个消息id的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。

(2)保证消息列表不丢失消息

这种情况就是就是 RabbitMQ 自己弄丢了数据。

解决方式是开启持久化磁盘的配置,可以通过开启 RabbitMQ 的持久化来解决。

就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,这样就可以保证数据不会丢。

设置持久化有三个步骤:

第一个是设置交换器持久化:

$exchange = new AMQPExchange($channel);

// 设置交换机持久化,rabbitMq重启时交换机自动恢复
$exchange->setFlags(AMQP_DURABLE);

第二个是设置队列持久化

$queue = new AMQPQueue($channel);
// 设置队列名称
$queue->setName($queueName);
// 设置队列持久化,rabbitMq重启时队列自动恢复
$queue->setFlags(AMQP_DURABLE);

第三个是发送消息到MQ时设置消息持久化:

$exchange->publish(json_encode($msg), $routingKey, AMQP_NOPARAM, ['delivery_mode' => 2])

当交换机、队列和发送的消息都设置了持久化后,MQ即使挂掉,重启后也会恢复。

但也有一种可能,就是这个消息写到了 RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据丢失。

所以,持久化可以跟生产者那边的 confirm 机制配合起来。只有消息被持久化到磁盘之后,才会通知生产者 ack 了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到 ack,你也是可以自己重发的。

3、保证消费者不丢失消息

消费者丢失消息:消费者丢数据一般是因为采用了自动确认消息模式,改为手动确认消息即可!

消费者在收到消息之后,处理消息之前,会自动回复RabbitMQ已收到消息;如果这时处理消息失败,就会丢失该消息。

解决方案:处理消息成功后,再手动回复确认消息。消费者跟消息队列的连接不中断,RabbitMQ 给了 Consumer 足够长的时间来处理消息,保证数据的最终一致性。

注意点:

消费者接收到消息却没有确认消息,连接也未断开,则 RabbitMQ 认为该消费者繁忙,将不会给该消费者分发更多的消息。

如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ 会认为消息没有被分发,然后重新分发给下一个订阅的消费者,这时可能存在消息重复消费的隐患,需要去重!

4、如何避免消息重复消费

消息重复消费是各个MQ都会发生的常见问题之一,在一些比较敏感的场景下,重复消费会造成比较严重的后果,比如重复扣款等。

这个就要求生产者生产消息时,在消息体中附带一个唯一ID,消费者在消费消息前,先去Redis查询是否有记录,如果有记录,说明消息已经被消费过了,就不再处理该消息了。

如果Redis中没有该唯一ID的记录,则说明消息还没被消费过,那就消费消息,并在消费消息后,将消息中的唯一ID写入Redis。

注:唯一ID是对于同一业务全局唯一,如支付ID、订单ID、帖子ID等,也可以直接全局唯一,通过时间戳+随机数+队列名称+机器码的方式生成。

小提醒:写入redis的值别忘了设置好过期时间,要不然太吃内存了

5、说说RabbitMQ 有哪几种集群模式

(1)主备架构模式

主节点提供读写,备用节点不提供读写。

如果主节点挂了,就切换到备用节点,原来的备用节点升级为主节点提供读写服务,当原来的主节点恢复运行后,原来的主节点就变成备用节点。

(2)多活架构模式

该架构模式是实现异地数据复制的主流模式。

这种模式需要依赖 RabbitMQ的 federation 插件,可以实现持续的,可靠的 AMQP 数据通信,多活模式在实际配置与应用非常的简单。

在多活架构模式下,采用双中心模式(多中心),在两套(或多套)数据中心各部署一套 RabbitMQ 集群,各中心的 RabbitMQ 服务除了需要为业务提供正常的消息服务外,中心之间还需要实现部分队列消息共享。

(3)镜像mirro架构模式【最主流】

多个节点的RabbitMQ互为主备,保证了数据的100%不丢失。使用Ha-proxy和keepalived保证了架构的高负载高可用。

在RabbitMQ镜像mirro架构模式中,有两种写入方式可选,分别是普通队列和镜像队列。如果选择镜像队列,则一条命令在所有节点都会执行完后才判定执行成功;如果选择普通队列,则一条命令再当前节点执行完后就会判定执行成功。

6、说说RabbitMQ的死信队列

(1)、死信队列的生命周期
1、业务消息被投入业务队列

2、消费者消费业务队列的消息,由于处理过程中发生异常,于是进行了nck或者reject操作

3、被nck或reject的消息由RabbitMQ投递到死信交换机中

4、死信交换机将消息投入相应的死信队列

5、死信队列的消费者消费死信消息
(2)死信队列适用场景:

在较为重要的业务队列中,确保未被正确消费的消息不被丢弃,在系统因为参数解析、数据校验、网咯拨打等导致异常后通过配置死信队列,可以让未正确处理的消息暂存到另一个队列中,待后续排查清楚问题后,编写相应的处理代码来处理死信消息。

死信消息是 RabbitMQ 为我们做的一层保证,其实我们也可以不使用死信队列,而是在消息消费异常时,将消息主动投递到另一个交换机中。

明白死信队列运行机制后就知道这些 Exchange 和 Queue 想怎样配合就能怎么配合。比如从死信队列拉取消息,然后发送邮件、短信、钉钉通知来通知开发人员关注。或者将消息重新投递到一个队列然后设置过期时间,来进行延时消费。

(3)说明

死信 Dead Letter 是 RabbitMQ 中的一种消息机制,当消费消息时队列里的消息出现以下情况那么该消息将成为死信。死信消息会被RabbitMQ进行特殊处理,如果配置了 死信队列 信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃:

1、消息被否定确认,使用channel.basicNack 或 channel.basicReject  ,并且此时 default-requeue-rejected(由于监听器抛出异常而拒绝的消息是否被重新放回队列) 属性被设置为false。

消息在队列的存活时间超过设置的TTL时间。

消息队列的消息数量已经超过最大队列长度。

7、讲讲RabbitMQ的延迟队列

延时队列中的消息是希望被在指定时间得到取出和处理,所以延时队列中的消息都是带时间属性的,一般用在如下场景:

1、订单在 15 分钟之内未支付则自动取消。

2、账单在一周内未支付,则自动结算。

3、用户注册成功后,如果三天内没有登陆则进行短信提醒。

4、用户发起退款,如果三天内没有得到处理则通知相关运营人员。

5、预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。

延迟队列 = 死信队列 + TTL

8、说说rabbitmq生产者消息运转的流程?

1. Producer 先连接到Broker,建立连接Connection,开启一个信道(Channel)。
2. Producer 声明一个交换器并设置好相关属性。
3. Producer 声明一个队列并设置好相关属性。
4. 将队列和交换器、路由键绑定起来。
5. Producer 发送消息到 Broker ,其中包含路由键、交换器等信息。
6. 相应的交换器根据接收到的路由键查找匹配的队列。
7. 如果找到,将消息存入对应的队列,如果没有找到,会根据生产者的配置丢弃或者退回给生产者。
8. 关闭信道。
9. 管理连接。

9、说说消费者接收消息过程?

1. Producer 先连接到 Broker ,建立连接 Connection ,开启一个信道( Channel )。
2. 向 Broker 请求消费相应的队列中消息,可能会设置响应的回调函数。
3. 等待 Broker 回应并投递相应队列中的消息,接收消息。
4. 消费者确认收到的消息, ack 。
5. RabbitMq 从队列中删除已经确定的消息。
6. 关闭信道。
7. 关闭连接。

10、说说生产者如何将消息可靠投递到RabbitMQ?

1. 生产者发送消息给MQ
2. MQ将消息持久化后,发送Ack消息给生产者,此处有可能因为网络问题导致Ack消息无法发送到生产者,那么生产者在等待超时后,会重传消息;
3. 生产者收到Ack消息后,认为消息已经投递成功。

11、说说RabbitMQ如何将消息可靠投递到消费者?

1. MQ将消息push给消费者(或Client来pull消息)
2. 消费者得到消息并做完业务逻辑
3. 消费者发送Ack消息给MQ,通知MQ删除该消息,此处有可能因为网络问题导致Ack失败,那么消费者就会重复消费消息,这里就引出消费幂等的问题,幂等问题可以通过生产者在消息中添加唯一ID解决;
4. MQ将已消费的消息删除。

12、说一下AMQP的3层协议?

Module Layer:协议最高层,主要定义了一些客户端调用的命令,客户端可以用这些命令实现自己的业务逻辑。
Session Layer:中间层,主要负责客户端命令发送给服务器,再将服务端应答返回客户端,提供可靠性同步机制和错误处理。
TransportLayer:最底层,主要传输二进制数据流,提供帧的处理、信道服用、错误检测和数据表示等。