一、前言
我们前面完成了 单机zookeeper、kafka的搭建
那个单机的kafka使命到这里就结束了。可以通过 ps -ef | grep kafka 找到进程号,然后 kill -9 进程号杀掉进程,再把kafka的目录删掉了,因为用不上了。
通过单机的kafka,我们了解了kafka的配置文件,以及实践了一些常用命令的使用。本节文章,我们将搭建kafka集群,并且通过PHP操作kafka。
搭建kafka集群请看文章 docker快速搭建kafka集群, 后面我们操作kafka就都是操作这个集群了。
二、命令行操作kafka集群
1、创建topic
# 进入一个kafka容器
docker exec -it kafka1 bash
# 进入kafka所在目录
cd /opt/kafka
# 创建一个topic
./bin/kafka-topics.sh --create --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --replication-factor 1 --partitions 1 --topic test
2、获取当前集群的topic列表
./bin/kafka-topics.sh --list --bootstrap-server kafka1:9092
3、发送消息
bash-5.1# ./bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic test
>{"sku_sn":"100002","price":"12000"}
>hello,world!
>
4、接收消息
bash-5.1# ./bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic test --from-beginning
{"sku_sn":"100002","price":"12000"}
hello,world!
kafka有一个特性,就是消息被消费过后,并不会删除,换个--bootstrap-server仍然还会收到消息。
比如上面我已经通过 ./bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic test --from-beginning
消费了消息,如果我再开个消费者 ./bin/kafka-console-consumer.sh --bootstrap-server kafka3:9092 --topic test --from-beginning
,仍然会收到消息,不管重开多少次,消息仍然存在。
这一点是需要注意的,它和RabbitMQ有着非常大的不同。
三、PHP使用kafka进行消息投递(生产者)
1、引入composer组件包
composer require nmred/kafka-php
2、这里,我创建了两个topic,分别名为haveyb-top、test,所以我开两个窗口,分别消费这两个topic
# 窗口1,消费test
./bin/kafka-console-consumer.sh --bootstrap-server kafka3:9092 --topic test --from-beginning
# 窗口2,消费haveyb-topic
./bin/kafka-console-consumer.sh --bootstrap-server kafka3:9092 --topic haveyb-topic --from-beginning
3、编写代码 testKafka.php
<?php
declare(strict_types = 1);
require __DIR__ . '/../vendor/autoload.php';
use Kafka\ProducerConfig;
use Kafka\Producer;
$config = ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('192.168.78.200:9091');
$config->setBrokerVersion('1.0.0');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new Producer();
// 发送消息
$producer->send([
[
'topic' => 'haveyb-topic',
'value' => '{"sku_sn":"101","price":"15000"}',
'key' => ''
],
[
'topic' => 'test',
'value' => '{"sku_sn":"102","price":"12000"}',
'key' => ''
],
[
'topic' => 'test',
'value' => '{"sku_sn":"103","price":"10000"}',
'key' => ''
]
]);
上面的代码,我们给haveyb-topic发了一条消息,给test发了两条消息
执行该代码:
php testKafka.php
我们会发现,消息已经分别到达了我们之前开的两个端口分别消费的topic。
四、PHP使用kafka进行消息消费(消费者)kafkaConsumer.php
<?php
declare(strict_types = 1);
require __DIR__ . '/../vendor/autoload.php';
use Kafka\ConsumerConfig;
use Kafka\Consumer;
$config = ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('192.168.78.200:9091');
$config->setGroupId('test_1');
$config->setBrokerVersion('1.0.0');
$config->setTopics(['haveyb-topic']);
$consumer = new Consumer();
$consumer->start(function ($topic, $part, $message) {
print_r($message);
});
我们先执行php kafkaConsumer.php
来消费消息,然后用上面的生产者生产消息,看看能不能收到,结果是可以收到消息的