一、前言
1、kafka的自我介绍
kafka在 Apache Kafka官方网站 给自己的介绍是这样的,一个开源的分布式的流平台,被数千家公司用于高性能数据管道、流分析、数据集成和任务关键型应用程序。
Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications。
因此严格意义上来讲,kafka并不是一个消息队列,它是一个用来做分布式流处理的平台。它能做消息队列,但它自己给自己的定义显然不想做消息队列,实际上也确实如此,目前消息队列大型公司用rocketMQ,中小型公司用RabbitMQ。而kafka更多是被用来做日志系统。
2、思考
前面我们选择了MQ中的RabbitMQ做了消息队列,那同为MQ,为什么还要去学习和使用kafka。
主要原因是kafka相较于RabbitMQ拥有以下特点:
-
吞吐量更高,而日志,用户行为、应用信息监控等数据恰恰是大批量涌入;
-
持久化日志,这些日志可以被重复读取和无限期保留;
-
支持实时的流式处理;
这些特点使得kafka在应用监控、日志采集、分析,网站用户行为追踪这些业务上,比RabbitMQ更适合。
但是在作为消息队列这一块,RabbitMQ拥有的一些高级特性使其比kafka更适合做消息队列。比如confirm机制可以确保消息百分百投递成功,ack、nack机制可以确保百分百消费成功,死信队列机制为消息做了最后一层保障,qos限流机制保证了消费者不会因为激增的高流量就把自己弄挂等等。
只能说,rabbitmq和kafka虽然都是消息中间件,但是其适用场景并不同,这就像MySQL的PXC集群和replication集群一样,虽然都是集群,但功能是互补的,而非互斥。
二、Kafka适用场景
1、应用监控
利用Kafka采集应用程序和服务器健康相关的指标,如CPU占用率、IO、内存、连接数、TPS、QPS等,然后将指标信息进行处理,从而构建一个具有监控仪表盘、曲线图等可视化监控系统。例如,很多公司采用Kafka与ELK(ElasticSearch、Logstash和Kibana)整合构建应用服务监控系统。
2、网站用户行为追踪
为了更好地了解用户行为、操作习惯,改善用户体验,进而对产品升级改进,将用户操作轨迹、内容等信息发送到Kafka集群上,通过Hadoop、Spark或Strom等进行数据分析处理,生成相应的统计报告,为推荐系统推荐对象建模提供数据源,进而为每个用户进行个性化推荐。
3、流处理
需要将已收集的流数据提供给其他流式计算框架进行处理,用Kafka收集流数据是一个不错的选择,而且当前版本的Kafka提供了Kafka Streams支持对流数据的处理。
4、持久性日志
Kafka可以为外部系统提供一种持久性日志的分布式系统。日志可以在多个节点间进行备份,Kafka为故障节点数据恢复提供了一种重新同步的机制。同时,Kafka很方便与HDFS和Flume进行整合,这样就方便将Kafka采集的数据持久化到其他外部系统。
三、Kafka中的基本概念
1、Topic
一个虚拟的概念,由1到多个Partitions组成,可以理解为存放消息的地方
2、Partition
实际消息存储单位
3、Producer
消息生产者
4、Consumer
消息消费者
四、Kafka的基本使用
注:kafka的单机安装可以查看文章 centos安装zookeeper和kafka
1、kafka常用命令
# 启动kafka
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
# 停止kafka
/usr/local/kafka/bin/kafka-server-stop.sh
# 创建Topic
/usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server 192.168.78.102:9092 --replication-factor 1 --partitions 1 --topic haveyb-topic
# 查看已经创建的Topic列表
/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server 192.168.78.102:9092
# 发送消息
[root@v2 /]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.78.102:9092 --topic haveyb-topic
>haveyb
>heihei
>{"sku_sn":"100002","price":"12000"}
>
# 接收消息,执行开始就会接收到我们刚刚发送的消息
[root@v2 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.78.102:9092 --topic haveyb-topic --from-beginning
haveyb
heihei
{"sku_sn":"100002","price":"12000"}
注:新版本的kafka命令较旧版本有一些变化,旧版本的--zookeeper zookeeperIp:zookeeperPort 都要换成 --bootstrap-server kafkaIp:kafkaPort