kafka设计思想

​ Apache Kafka是一个分布式的基于push-subscribe的消息系统,它具备快速,可扩展、可持久化,高吞吐量等的特点。如此优秀的中间件,设计肯定有其独到之处,下面我们就探寻一下它的设计思想。

搭建环境

  • 官网地址

    http://kafka.apache.org/

  • 下载版本

    http://kafka.apache.org/downloads

    最新的版本已经到了2.3.0,本文基于2.2.0来搭建

    下载kafka_2.11-2.2.0.tgz文件然后上传到centos 7服务器上解压安装

  • 安装

    1
    tar zxvf kafka_2.11-2.2.0.tgz -C /usr/local/

    配置

    1
    2
    3
    进入kafka解压后配置文件的目录
    cd /usr/local/kafka_2.11-2.2.0/config
    vi server.properties

    然后修改

    1.broker.id

    1
    2
    broker.id=0
    注:设置集群在不同broker上设置不同序号如broker.id=0,broker.id=1 ……

    2.listeners

    1
    2
    listeners=PLAINTEXT://0.0.0.0:9092
    注:这里ip如果是在云服务器上要设置成内网ip,由于我的云服务器不是在同一个内网所以设置成0.0.0.0

    3.advertised.listeners

    1
    2
    3
    advertised.listeners=PLAINTEXT://120.79.232.219:9092
    注:这里如果要外网访问kafka,要设置此项,本来此项是注释掉的;
    ip设置成外网ip

    4.zookeeper.connect,配置zookeeper

    1
    zookeeper.connect=xx.xx.xx.xx:2181
  • 启动

    1
    2
    3
    4
    5
    6
    7
    cd /usr/local/kafka_2.11-2.2.0/bin/
    ./kafka-server-start.sh -daemon ../config/server.properties

    ps -ef|grep kafka 查看broker是否启动成功
    或者查看日志
    cd /usr/local/kafka_2.11-2.2.0/logs
    less kafkaServer.out

    如果出现

    mark

​ 说明你的机器的内存设置不够需要调整内存

   
1
2
3
4
cd /usr/local/kafka_2.11-2.2.0/bin
vi kafka-server-start.sh
然后根据自己试验机修改合适内存,配置高的试验机可以忽略
export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M"
  • 常用客户端命令

    1
    2
    3
    4
    5
    6
    7
    8
    9
    ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

    ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic

    ./kafka-console-producer.sh --broker-list localhost:9092 --topic test

    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

    ./kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test

kafka应用场景

  • 日志收集:kafa可以手机系统的各种log,然后统一接口开放给consumer
  • 消息系统:系统的解耦
  • 用户活动跟踪:如用户画像,用户操作浏览网页的各种活动等信息都可以发送到kafka的topic中,然后订阅者通过订阅各种tipic来对数据进行分析监控等
  • 运营数据:kafka可以用来记录各种运营的指标数据,如收集各种分布式应用的数据

思考一下设计一个消息中间件?

基本需求

  • 最基本是要能支持消息的发送和接收,需要设计网络通信就一定要涉及到NIO
  • 消息中心消息存储(持久化/非持久)
  • 消息的序列化和反序列化
  • 消息的确认机制,如何避免消息重发

高级功能

  • 消息有序性
  • 是否支持事物消息
  • 消息的收发性能,对高并发大数据量的支持
  • 是否支持集群
  • 消息的可靠性存储
  • 是否支持多协议

kafka的架构

mark

一个典型的kafka集群包含若干个Producer,若干Broker(kafka支持水平扩展,)若干个Consumer Group,以及一个zookeeper集群。kafka通过zookeeper管理集群配置及服务协同,Producer使用push模式将消息发布到broker,Consumer通过监听使用pull模式从broker订阅并消费消息。

多个broker协同工作,Producer和Consumer部署到各个业务逻辑中,三者通过zookeeper管理协调请求和转发,这样就组成一个高性能的分布式消息发布和订阅系统。

注意点:Producer发送消息到bioker是采用push方式,Consumer从broker消费消息采用主动pull方式,这个是和其他mq消息的一个不同点。

Topic

在kafka中,topic是一个存储消息的逻辑概念,可以认为是一个消息集合,每条消息发送到kafka集群的消息都有一个类别,物理上来说,不同的topic的消息是分开存储的。

每个topic可以有多个生产者向他发送消息,也可以有多个消费者去消费其中的消息。

mark

Partition

每个topic可以划分多个分区(每个topic至少有一个分区),同一个topic下的不同分区包含的消息是不同的,每个消息在被添加到分区时,都会被分配一个offset(偏移量),它是此消息在此分区中唯一编号,kafka通过offset保证消息在分区内的顺序,offset的顺序不跨分区,换句话说kafka只保证在同一个分区内的消息是有序的。

如下图所示,对于名字为test的topic,有三个分区,分别是p0,p1,p2

注:每一条消息发送到broker时,会根据partition的规则选择存储到那个partition

mark

topic和partition的存储

Partition是以文件的形式存储在文件系统中,比如创建一个testTopic的topic,其中有三个partition,那么kafka的数据目录(/tmp/kafka-log)中就有3个目录,testTopic0-3,命名规则是-

命令方式创建一个topic

1
sh kafka-topics.sh --create --zookeeper ip:2181 --replication-factor 1 --partitions 3 --topic testTopic

生产端

关于消息的分发

生产端消息发送方式

  • 同步发送

    同步发送消息需要阻塞,无非是通过future.get()来等待消息的发送返回结果,但是这种方法会严重影响消息发送性能。

  • 异步发送

    本质上来说kafka采用异步方式来发送消息到broker,但是kafka每次发送消息都会直接发送到broker上,而是把消息放到一个发送队列中,然后通过一个后台线程不断从队列取出消息进行发送,发送成功后会触发callback,kafka客户端会积累到一定量的消息统一组装成一个批量消息发送出去,触发的条件是batch.seize和linger.ms参数。当然具体参数设置需要在效率和时效性方面做一个权衡。

    好处:可以减少网络请求和磁盘IO的次数

kafka消息的分发策略

在kafka中一条消息有key,value两部分组成,在发送一条消息我们可以指定这个key,那么producer会根据key和partition机制来判断当前这条消息应该发送并存储到那个partition分区中。

另外我们还可以根据需要自定义进行扩展prioducer的partition机制

消息默认的分发机制

默认情况下kafka采用hash取模的分区算法,如果key为null,则会随机分配一个分区,这个随机是在参数“metadata.max.age.ms”d的时间范围内随机选择一个。对于时间段内,如果key为null,则只会发送到唯一的分区,这个值默认情况下是10分钟更新一次。

问题?

  • 生产者如何保证发送到broker的消息成功?
  • 如果生产者发送消息到broker失败如何处理?
  • kafka中数据存储如何保证高可用,不丢失
  • kafka中数据是怎么样的存储方式?

带着这些问题,我们继续探索。

分区副本机制

我们知道kafka的每个topic都可以有多个partition,并且多个partition会均匀的分布在集群的各个节点上,虽然这种方式能够有效的对数据进行分片,但是对于每个partition来说都是单点,当其中一个partition不可用的时候,那么这部分消息是没办法消费的。所以kafka为了提高partiition的可靠性而提供了副本的概念,通过副本机制来实现冗余备份解决高可用。

每个分区可以有多个副本,并且在副本集合中会在一个leader的副本,所有的读写请求都是有leader副本来进行处理,剩余其他的副本都作为follower副本,follower副本会从leader副本同步消息日志。

这个设计点类似zookeeper的leader和follower的概念,但是具体的实现方式还是有比较大的差异,所以我们可以认为,副本集会存在一主多从的关系。

一般情况下,同一个分区的多个副本被均匀的分配到集群中的不同的broker上,当leader副本所在的broker出现故障后,可以重新选举新的leader副本继续对外提供服务,通过这样的副本机制来提高kafka的集群的高可用性。

创建一个带副本机制的topic

通过命令创建带2个副本3个分区的topic

1
sh kafka-topics.sh --create --zookeeper ip:2181 --replication-factor 2 --partitions 3 --topic testTopic

然后我们会在/tmp/kafka-log路径下看到对应的topic的副本信息

如何知道那个各个分区中对应的leader是谁呢?

在zookeeper服务器上,通过如下命令去获取对应分区的信息,如获取testTopic第一个分区的状态信息

1
2
3
4
5
6
get /brokers/topics/testTopic/partitions/1/state
或者
sh kafka-topics.sh --zookeeper 192.168.13.106:2181 --describe --topic
test_partition

--》{" controller_epoch":12,"leader ":0,"version":1,"leader_epoch":0,"isr ":[0,1]}

副本的leader选举

kafka提供了数据复制算法保证,如果leader副本所在的broker节点宕机或者出现故障,或者分区的leader节点发生故障,此时如何处理呢?

肯定是kafka必须要保证从follower副本中选择一个新的leader副本,那么kafka如何实现选举呢?

要了解leader选举先要知道几个概念

kafka分区下副本根据角色的不同可分为3类:

  • leader副本:相应clients端读写请求的副本

  • follower副本:被动的备份leader副本中数据,不能系那个硬clients端读写请求

  • ISR副本:包含了leader副本和所有与leader副本保持同步的follow副本

    每个kafka副本对象都有两个重要的属性:

    LEO:即日志末端位移(log end offset),记录了该副本底层日志log中下一条消息的位移值。如果LEO=10,那么

    表示该副本保存了10条消息,位移值范围[0,9],另外leader LEO和follower LEO的更新是有区别的。

    HW:即上面提到的水位值。对于同一个副本对象而言,其HW值不会大于LEO值,小于等于HW值的所有消息都被认为是“已备份”。同理,leader副本和follower副本的HW更新是有区别的

注:从生产者发出一条消息首先会被写入分区的leader副本,不过还需要等待ISR集合中的所有follower副本都同步完之后才能被认为已经提交,之后才会更新分区的HW,进而消费者可以消费到这条消息。

副本的协同机制

上述已经提到消息的读写操作只会由leader及诶按来接收和处理。follower副本只会同步数据以及当leader副本所在broker挂了以后,会从follow副本中选取新的leader。

写请求首先有leader副本处理,之后follower副本会从leader副本上拉取写入的消息,这个过程会有一定的延迟,导致follower副本中保存的消息略少于leader副本,但是只要没有超出阈值都可以容忍,但是一个follower副本出现异常,比如宕机,网络断开等原因长时间没有同步到消息,那这个时候leader就会把它踢出去,kafka通过ISR集合来维护一个分区副本消息

ISR

AR(Assigned Replicas)用来标识副本的全集,OSR 用来表示由于落后被剔除的副本集合。

所以公式如下:ISR = Leader + 没有落后太多的副本;AR = OSR+ ISR。

ISR集合中副本必须满足两个条件:

1.副本所在的节点必须维持着与zookeeper的连接

2.副本最后一条消息的offset与leader副本的最后一条消息的offset之间的差值不能超过指定的阈值,如果该follower在此时间间隔内一直没有追上leader的所有消息,则该follower就会被剔除isr列表

3.ISR数据保存在zookeeper的/brokers/topics//partitions//state节点中

follower副本把leader副本LEO之前的日志全部同步完成时,则认为follower副本已经追赶上leader副本,这个时候会更新这个副本的lastCaughtUpTimeMs标示,kafka副本管理器会启动一个副本过期检查的定时任务,这个任务会定期检查当前时间与副本的lastCaughtTimeMs的差值是否大于参数replica.lag.time.max.ms的值,如果大于,则会把这个副本剔除ISR集合。

副本数据的同步原理

疑问:在向消息发送端返回ack之前需要保证多少个Replica已经接受到这个消息?

procuce发布消息到某个partition时:

  • 先通过zookeeper找到该partition的Leader,producer只将该消息发送到Partition的leader
  • leader会将该消息写入其本地的log,每个follower都从leader pull数据,
  • follower在收到该消息并写入期其og后,向leader发送ACK
  • 一旦leader收到ISR中所有Replica的ACK,该消息就被认为已经commit了,leader将增加HW,并且向produce发送ACK。

mark

上述图,说明了HW和LEO的关系

随着Follower副本的不断和leader副本进行数据同步,follower副本的LEO会逐渐后移并且追赶到leader副本,这个追赶上的判断标准是当前副本LEO是否大于等于leader的HW,这个追赶上也会使得原来被剔除follower副本重新加入到ISR集合中。

如图中最右侧的副本剔除ISR后,会导致这个分区的HW也发生变化,变成3.