PHP使用RabbitMQ

一、准备工作

1、说明

让PHP支持rabbitmq有两种方式,一种是安装amqp扩展,另一种是在项目中composer安装组件php-amqplib/php-amqplib。

选择的方法不同,代码写出来也是不一样的,但本质上都是调用的rabbitmq对外提供的一些方法。

 

2、理解RabbitMQ的消息流转过程

关于exchange、routing key、channel 等消息队列相关概念,可以查看文章 AMQP协议

 

3、搭建rabbitmq

可以查看文章 Docker快速构建RabbitMQ集群

如果想结合haproxy+keepalived做rabbitmq的负载均衡,可以查看文章 Docker快速构建HaProxy集群,并配置好rabbitmq的负载均衡引入KeepAlived

 

三、amqp composer组件方式时,使用PHP向RabbitMQ推送消息

1、PHP代码
public function actionProduceMessage()
{
        $config = ['host' => '192.168.78.200', 'port' => 5600, 'user' => 'root', 'password' => 123456, 'vhost' => '/'];

        // 定义要使用的交换机名称
        $exchange = 'exchange_1';
        // 定义队列名称
        $queue = 'create_goods';
        // 定义使用的路由键
        $routingKey = 'goods';

        // 创建与MQ的连接
        $connection = new AMQPStreamConnection(...array_values($config));

        // 创建一个通道
        $channel = $connection->channel();

        /**
         * 声明交换机
         * 第一个参数 exchange 交换机名称
         * 第二个参数 type 交换机类型 direct-直连,fanout-广播 topic-主题 headers-消息体header匹配
         * 第三个参数 passive 是否检测同名交换机
         * 第四个参数 durable 是否开启交换机持久化
         * 第五个参数 auto_delete 通道关闭后是否删除交换机
         */
        $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);

        /**
         * 声明队列
         * 第一个参数 queue 队列名称
         * 第二个参数 passive 是否检测同名队列
         * 第三个参数 durable 是否开启队列持久化
         * 第四个参数 exclusive 队列是否可以可以被其他队列访问
         * 第五个参数 auto_delete 通道关闭后是否删除队列
         */
        $channel->queue_declare($queue, false, true, false, false);

        /**
         * 将队列与交换机、路由键绑定
         * 第一个参数 queue 指定队列
         * 第二个参数 exchange 指定交换机
         * 第三个参数 routingKey 指定路由键
         */
        $channel->queue_bind($queue, $exchange, $routingKey);

        // 组装生产的消息
        $messageBody = json_encode([
            'sku_id' => 11101,
            'sku_sn' => 'sn2001',
            'price' => 12000
        ]);

        // 创建要发送的消息对象
        $queueMessage = new AMQPMessage($messageBody, [
            'content_type' => 'text/plain',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        ]);

        /**
         * 推送消息
         * 第一个参数 msg 要发送的消息对象 object
         * 第二个参数 exchange 指定使用的交换机
         * 第三个参数 routingKey 指定路由键
         */
        $channel->basic_publish($queueMessage, $exchange, $routingKey);

        // 关闭通道
        $channel->close();
        // 关闭连接
        $connection->close();

        // 返回消息
        return json_encode([
            'status' => 1,
            'msg' => 'publish queue message success'
        ]);

}

 

2、执行结果

可以看到,我们的消息已经推送到rabbitmq集群上了,负载均衡将这条消息分发到了mq3节点。

我们点击 create_goods,进入消息详情,可以看到我们推送的具体消息

 

3、总结

到这里,就已经完成了向rabbitmq推送消息。

 

三、amqp composer组件方式时,使用PHP进行消息的消费