PHP使用kafka进行消息的生产和消费

一、前言

我们前面完成了 单机zookeeper、kafka的搭建

那个单机的kafka使命到这里就结束了。可以通过 ps -ef | grep kafka 找到进程号,然后 kill -9 进程号杀掉进程,再把kafka的目录删掉了,因为用不上了。

通过单机的kafka,我们了解了kafka的配置文件,以及实践了一些常用命令的使用。本节文章,我们将搭建kafka集群,并且通过PHP操作kafka。

搭建kafka集群请看文章 docker快速搭建kafka集群, 后面我们操作kafka就都是操作这个集群了。

 

二、命令行操作kafka集群

1、创建topic
# 进入一个kafka容器
docker exec -it kafka1 bash

# 进入kafka所在目录
cd /opt/kafka

# 创建一个topic
./bin/kafka-topics.sh --create --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --replication-factor 1 --partitions 1 --topic test
2、获取当前集群的topic列表
./bin/kafka-topics.sh --list --bootstrap-server kafka1:9092
3、发送消息
bash-5.1# ./bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic test
>{"sku_sn":"100002","price":"12000"}
>hello,world!
>
4、接收消息
bash-5.1# ./bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic test --from-beginning
{"sku_sn":"100002","price":"12000"}
hello,world!

kafka有一个特性,就是消息被消费过后,并不会删除,换个--bootstrap-server仍然还会收到消息。

比如上面我已经通过 ./bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic test --from-beginning 消费了消息,如果我再开个消费者 ./bin/kafka-console-consumer.sh --bootstrap-server kafka3:9092 --topic test --from-beginning,仍然会收到消息,不管重开多少次,消息仍然存在。

这一点是需要注意的,它和RabbitMQ有着非常大的不同。

 

三、PHP使用kafka进行消息投递(生产者)
1、引入composer组件包
composer require nmred/kafka-php
2、这里,我创建了两个topic,分别名为haveyb-top、test,所以我开两个窗口,分别消费这两个topic
# 窗口1,消费test
./bin/kafka-console-consumer.sh --bootstrap-server kafka3:9092 --topic test --from-beginning

# 窗口2,消费haveyb-topic
./bin/kafka-console-consumer.sh --bootstrap-server kafka3:9092 --topic haveyb-topic --from-beginning
3、编写代码 testKafka.php
<?php
declare(strict_types = 1);

require __DIR__ . '/../vendor/autoload.php';

use Kafka\ProducerConfig;
use Kafka\Producer;

$config = ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('192.168.78.200:9091');
$config->setBrokerVersion('1.0.0');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new Producer();

// 发送消息
$producer->send([
    [
        'topic' => 'haveyb-topic',
        'value' => '{"sku_sn":"101","price":"15000"}',
        'key' => ''
    ],
    [
        'topic' => 'test',
        'value' => '{"sku_sn":"102","price":"12000"}',
        'key' => ''
    ],
    [
        'topic' => 'test',
        'value' => '{"sku_sn":"103","price":"10000"}',
        'key' => ''
    ]
]);

上面的代码,我们给haveyb-topic发了一条消息,给test发了两条消息

执行该代码:

php testKafka.php

我们会发现,消息已经分别到达了我们之前开的两个端口分别消费的topic。

 

四、PHP使用kafka进行消息消费(消费者)kafkaConsumer.php

<?php
declare(strict_types = 1);

require __DIR__ . '/../vendor/autoload.php';
use Kafka\ConsumerConfig;
use Kafka\Consumer;

$config = ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('192.168.78.200:9091');
$config->setGroupId('test_1');
$config->setBrokerVersion('1.0.0');
$config->setTopics(['haveyb-topic']);

$consumer = new Consumer();
$consumer->start(function ($topic, $part, $message) {
    print_r($message);
});

我们先执行php kafkaConsumer.php来消费消息,然后用上面的生产者生产消息,看看能不能收到,结果是可以收到消息的