RabbitMQ中保证消息100%投递成功、100%消费成功、消息不丢失

1、使用confirm机制保证消息100%投递成功

// 开启confirm模式,confirm模式下,投递消息后,RabbitMQ会异步返回是否投递成功,confirm模式不可以和事务模式同时存在
$channel->confirm_select();

// 推送消息到RabbitMQ成功的异步回调,如果消息推送成功,想做什么业务处理写在这里
$channel->set_ack_handler(function (AMQPMessage $message) {

});

// 推送消息到RabbitMQ失败的异步回调,如果消息推送失败,想做什么业务处理写在这里
$channel->set_nack_handler(function (AMQPMessage $message) {

});

这里为了避免之后消费者消费消息时可能产生的重复消费问题,我们最好在消息中添加一个唯一ID(自己设计生成),这样之后消费者消费消息时先去缓存中查有没有消费过这个消息,如果有消费过,则不再处理并且直接ack让rabbitmq删除这条消息。如果缓存中没有这个ID,则说明没有消费过这条消息,那就先消费执行业务逻辑,执行成功后将这个ID写入缓存,然后ack确认让rabbitmq删除掉这条消息。

这里需要注意的是,建议消费者将唯一ID存到缓存中时,设置个有效期TTL,这样可以避免内存爆炸。一般设置为1-2天足以了,因为即使有失败的消息,我们的业务人员也会在1-2天内手动处理好。

 

2、使用ack、nack机制确保消息100%消费成功

在消费消息时,将no_ack参数设置为false,表明要我们确认,rabbitmq才可以删除消息,否则不可以删除消息

// 消费监听,在参数里将第四个参数no_ack设置为false,表示需要我这里确认,你rabbitmq才可以把消息删掉
$channel->basic_consume($queueName, '', false, false, false, false, function ($message) {

    // $message->body是推送过来的消息,业务代码写在这里

    // 标识取出消息后,要执行的业务是否已经执行成功
    $isSuccess = true;

    // 如果业务执行成功,则调用ack方法,告诉rabbitmq可以把这条消息删除了
    if (true == $isSuccess) {
        $message->ack();
    } else {
        // 如果如果业务执行失败,则调用nack方法,告诉rabbitmq不可以删除这条消息,我执行失败了
        $message->nack(true);
    }
});

这里,对于失败的消息,可以将消息及捕捉到的错误信息先记录到日志表,待错误排查后再重新消费

 

3、保证消息不丢失

1、声明交换机和声明队列时,要设置参数durable 为true,表示要持久化

2、发送消息的时候,要将delivery_mode 设置为2,表示消息要持久化

// 将第四个参数设置为true,表示将交换机持久化
$channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, true, false);
// 将第三个参数设置为true,表示将队列持久化
$channel->queue_declare($queueName, false, true, false, false, false);
// 构造发送的消息主体
$data = ['data' => ['sku_sn' => $skuSn, 'price' => 12000], 'uniqueId' => 'addGoods'.'_'.$skuSn];
// 设置delivery_mode为DELIVERY_MODE_PERSISTENT,表示将消息持久化
$message = new AMQPMessage(json_encode($data), [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
    'content_type' => 'text/plain'
]);
$channel->basic_publish($message, $exchange, $routingKey);