kafka设计思想
Apache Kafka是一个分布式的基于push-subscribe的消息系统,它具备快速,可扩展、可持久化,高吞吐量等的特点。如此优秀的中间件,设计肯定有其独到之处,下面我们就探寻一下它的设计思想。
搭建环境
官网地址
下载版本
http://kafka.apache.org/downloads
最新的版本已经到了2.3.0,本文基于2.2.0来搭建
下载kafka_2.11-2.2.0.tgz文件然后上传到centos 7服务器上解压安装
安装
1
tar zxvf kafka_2.11-2.2.0.tgz -C /usr/local/
配置
1
2
3进入kafka解压后配置文件的目录
cd /usr/local/kafka_2.11-2.2.0/config
vi server.properties然后修改
1.broker.id
1
2broker.id=0
注:设置集群在不同broker上设置不同序号如broker.id=0,broker.id=1 ……2.listeners
1
2listeners=PLAINTEXT://0.0.0.0:9092
注:这里ip如果是在云服务器上要设置成内网ip,由于我的云服务器不是在同一个内网所以设置成0.0.0.03.advertised.listeners
1
2
3advertised.listeners=PLAINTEXT://120.79.232.219:9092
注:这里如果要外网访问kafka,要设置此项,本来此项是注释掉的;
ip设置成外网ip4.zookeeper.connect,配置zookeeper
1
zookeeper.connect=xx.xx.xx.xx:2181
启动
1
2
3
4
5
6
7cd /usr/local/kafka_2.11-2.2.0/bin/
./kafka-server-start.sh -daemon ../config/server.properties
ps -ef|grep kafka 查看broker是否启动成功
或者查看日志
cd /usr/local/kafka_2.11-2.2.0/logs
less kafkaServer.out如果出现
说明你的机器的内存设置不够需要调整内存
1
2
3
4
cd /usr/local/kafka_2.11-2.2.0/bin
vi kafka-server-start.sh
然后根据自己试验机修改合适内存,配置高的试验机可以忽略
export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M"
常用客户端命令
1
2
3
4
5
6
7
8
9./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
./kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
kafka应用场景
- 日志收集:kafa可以手机系统的各种log,然后统一接口开放给consumer
- 消息系统:系统的解耦
- 用户活动跟踪:如用户画像,用户操作浏览网页的各种活动等信息都可以发送到kafka的topic中,然后订阅者通过订阅各种tipic来对数据进行分析监控等
- 运营数据:kafka可以用来记录各种运营的指标数据,如收集各种分布式应用的数据
思考一下设计一个消息中间件?
基本需求
- 最基本是要能支持消息的发送和接收,需要设计网络通信就一定要涉及到NIO
- 消息中心消息存储(持久化/非持久)
- 消息的序列化和反序列化
- 消息的确认机制,如何避免消息重发
高级功能
- 消息有序性
- 是否支持事物消息
- 消息的收发性能,对高并发大数据量的支持
- 是否支持集群
- 消息的可靠性存储
- 是否支持多协议
kafka的架构
一个典型的kafka集群包含若干个Producer,若干Broker(kafka支持水平扩展,)若干个Consumer Group,以及一个zookeeper集群。kafka通过zookeeper管理集群配置及服务协同,Producer使用push模式将消息发布到broker,Consumer通过监听使用pull模式从broker订阅并消费消息。
多个broker协同工作,Producer和Consumer部署到各个业务逻辑中,三者通过zookeeper管理协调请求和转发,这样就组成一个高性能的分布式消息发布和订阅系统。
注意点:Producer发送消息到bioker是采用push方式,Consumer从broker消费消息采用主动pull方式,这个是和其他mq消息的一个不同点。
Topic
在kafka中,topic是一个存储消息的逻辑概念,可以认为是一个消息集合,每条消息发送到kafka集群的消息都有一个类别,物理上来说,不同的topic的消息是分开存储的。
每个topic可以有多个生产者向他发送消息,也可以有多个消费者去消费其中的消息。
Partition
每个topic可以划分多个分区(每个topic至少有一个分区),同一个topic下的不同分区包含的消息是不同的,每个消息在被添加到分区时,都会被分配一个offset(偏移量),它是此消息在此分区中唯一编号,kafka通过offset保证消息在分区内的顺序,offset的顺序不跨分区,换句话说kafka只保证在同一个分区内的消息是有序的。
如下图所示,对于名字为test的topic,有三个分区,分别是p0,p1,p2
注:每一条消息发送到broker时,会根据partition的规则选择存储到那个partition
topic和partition的存储
Partition是以文件的形式存储在文件系统中,比如创建一个testTopic的topic,其中有三个partition,那么kafka的数据目录(/tmp/kafka-log)中就有3个目录,testTopic0-3,命名规则是
命令方式创建一个topic
1 | sh kafka-topics.sh --create --zookeeper ip:2181 --replication-factor 1 --partitions 3 --topic testTopic |
生产端
关于消息的分发
生产端消息发送方式
同步发送
同步发送消息需要阻塞,无非是通过future.get()来等待消息的发送返回结果,但是这种方法会严重影响消息发送性能。
异步发送
本质上来说kafka采用异步方式来发送消息到broker,但是kafka每次发送消息都会直接发送到broker上,而是把消息放到一个发送队列中,然后通过一个后台线程不断从队列取出消息进行发送,发送成功后会触发callback,kafka客户端会积累到一定量的消息统一组装成一个批量消息发送出去,触发的条件是batch.seize和linger.ms参数。当然具体参数设置需要在效率和时效性方面做一个权衡。
好处:可以减少网络请求和磁盘IO的次数
kafka消息的分发策略
在kafka中一条消息有key,value两部分组成,在发送一条消息我们可以指定这个key,那么producer会根据key和partition机制来判断当前这条消息应该发送并存储到那个partition分区中。
另外我们还可以根据需要自定义进行扩展prioducer的partition机制
消息默认的分发机制
默认情况下kafka采用hash取模的分区算法,如果key为null,则会随机分配一个分区,这个随机是在参数“metadata.max.age.ms”d的时间范围内随机选择一个。对于时间段内,如果key为null,则只会发送到唯一的分区,这个值默认情况下是10分钟更新一次。
问题?
- 生产者如何保证发送到broker的消息成功?
- 如果生产者发送消息到broker失败如何处理?
- kafka中数据存储如何保证高可用,不丢失
- kafka中数据是怎么样的存储方式?
带着这些问题,我们继续探索。
分区副本机制
我们知道kafka的每个topic都可以有多个partition,并且多个partition会均匀的分布在集群的各个节点上,虽然这种方式能够有效的对数据进行分片,但是对于每个partition来说都是单点,当其中一个partition不可用的时候,那么这部分消息是没办法消费的。所以kafka为了提高partiition的可靠性而提供了副本的概念,通过副本机制来实现冗余备份解决高可用。
每个分区可以有多个副本,并且在副本集合中会在一个leader的副本,所有的读写请求都是有leader副本来进行处理,剩余其他的副本都作为follower副本,follower副本会从leader副本同步消息日志。
这个设计点类似zookeeper的leader和follower的概念,但是具体的实现方式还是有比较大的差异,所以我们可以认为,副本集会存在一主多从的关系。
一般情况下,同一个分区的多个副本被均匀的分配到集群中的不同的broker上,当leader副本所在的broker出现故障后,可以重新选举新的leader副本继续对外提供服务,通过这样的副本机制来提高kafka的集群的高可用性。
创建一个带副本机制的topic
通过命令创建带2个副本3个分区的topic
1 | sh kafka-topics.sh --create --zookeeper ip:2181 --replication-factor 2 --partitions 3 --topic testTopic |
然后我们会在/tmp/kafka-log路径下看到对应的topic的副本信息
如何知道那个各个分区中对应的leader是谁呢?
在zookeeper服务器上,通过如下命令去获取对应分区的信息,如获取testTopic第一个分区的状态信息
1 | get /brokers/topics/testTopic/partitions/1/state |
副本的leader选举
kafka提供了数据复制算法保证,如果leader副本所在的broker节点宕机或者出现故障,或者分区的leader节点发生故障,此时如何处理呢?
肯定是kafka必须要保证从follower副本中选择一个新的leader副本,那么kafka如何实现选举呢?
要了解leader选举先要知道几个概念
kafka分区下副本根据角色的不同可分为3类:
leader副本:相应clients端读写请求的副本
follower副本:被动的备份leader副本中数据,不能系那个硬clients端读写请求
ISR副本:包含了leader副本和所有与leader副本保持同步的follow副本
每个kafka副本对象都有两个重要的属性:
LEO:即日志末端位移(log end offset),记录了该副本底层日志log中下一条消息的位移值。如果LEO=10,那么
表示该副本保存了10条消息,位移值范围[0,9],另外leader LEO和follower LEO的更新是有区别的。
HW:即上面提到的水位值。对于同一个副本对象而言,其HW值不会大于LEO值,小于等于HW值的所有消息都被认为是“已备份”。同理,leader副本和follower副本的HW更新是有区别的
注:从生产者发出一条消息首先会被写入分区的leader副本,不过还需要等待ISR集合中的所有follower副本都同步完之后才能被认为已经提交,之后才会更新分区的HW,进而消费者可以消费到这条消息。
副本的协同机制
上述已经提到消息的读写操作只会由leader及诶按来接收和处理。follower副本只会同步数据以及当leader副本所在broker挂了以后,会从follow副本中选取新的leader。
写请求首先有leader副本处理,之后follower副本会从leader副本上拉取写入的消息,这个过程会有一定的延迟,导致follower副本中保存的消息略少于leader副本,但是只要没有超出阈值都可以容忍,但是一个follower副本出现异常,比如宕机,网络断开等原因长时间没有同步到消息,那这个时候leader就会把它踢出去,kafka通过ISR集合来维护一个分区副本消息
ISR
AR(Assigned Replicas)用来标识副本的全集,OSR 用来表示由于落后被剔除的副本集合。
所以公式如下:ISR = Leader + 没有落后太多的副本;AR = OSR+ ISR。
ISR集合中副本必须满足两个条件:
1.副本所在的节点必须维持着与zookeeper的连接
2.副本最后一条消息的offset与leader副本的最后一条消息的offset之间的差值不能超过指定的阈值,如果该follower在此时间间隔内一直没有追上leader的所有消息,则该follower就会被剔除isr列表
3.ISR数据保存在zookeeper的/brokers/topics/
follower副本把leader副本LEO之前的日志全部同步完成时,则认为follower副本已经追赶上leader副本,这个时候会更新这个副本的lastCaughtUpTimeMs标示,kafka副本管理器会启动一个副本过期检查的定时任务,这个任务会定期检查当前时间与副本的lastCaughtTimeMs的差值是否大于参数replica.lag.time.max.ms的值,如果大于,则会把这个副本剔除ISR集合。
副本数据的同步原理
疑问:在向消息发送端返回ack之前需要保证多少个Replica已经接受到这个消息?
procuce发布消息到某个partition时:
- 先通过zookeeper找到该partition的Leader,producer只将该消息发送到Partition的leader
- leader会将该消息写入其本地的log,每个follower都从leader pull数据,
- follower在收到该消息并写入期其og后,向leader发送ACK
- 一旦leader收到ISR中所有Replica的ACK,该消息就被认为已经commit了,leader将增加HW,并且向produce发送ACK。
上述图,说明了HW和LEO的关系
随着Follower副本的不断和leader副本进行数据同步,follower副本的LEO会逐渐后移并且追赶到leader副本,这个追赶上的判断标准是当前副本LEO是否大于等于leader的HW,这个追赶上也会使得原来被剔除follower副本重新加入到ISR集合中。
如图中最右侧的副本剔除ISR后,会导致这个分区的HW也发生变化,变成3.