kafka源码编译导入intellij idea

阅读kafka源码,把源码编译导入intellij idea中启动调试是最方便不过了,因为kafka依赖zookeeper,另外编译是用gradle,core模块是用scala语言编写,应团队小伙伴的要求就写了一个文档指引……

前期环境准备

kafka源码阅读版本,这里我选择了一个常用的版本:

  • kafka下载地址:0.10.x

下载kafka-0.10.0.1:https://kafka.apache.org/downloads

  • Scala 2.10.6

https://www.scala-lang.org/download/2.10.6.html

  • Grade 3.1

https://gradle.org/install/

  • zookeeper 3.4.9

https://archive.apache.org/dist/zookeeper/

以上scala、gradle、zookeeper的安装步骤这里就不详细说了

kafka源码本地构建

1、解压kafka安装目录,然后进入安装目录,执行gradle idea

构建成功截图如下:

image-20221210101339701

在构建时候可能会有错误:

需要打开工程中的build.gradle文件然后添加如下配置:

1
2
3
4
ScalaCompileOptions.metaClass.daemonServer = true
ScalaCompileOptions.metaClass.fork = true
ScalaCompileOptions.metaClass.useAnt = false
ScalaCompileOptions.metaClass.useCompileDaemon = false

2、导入intellij idea

导入之前安装插件

在ide插件市场搜索Scala安装,然后重启ide

把构建好的kafka源码导入intellij idea,然后本地自动编译,出现如下截图即为成功!

image-20221210104443192

  • 把config模块下的log4j.properties复制一份到core模块下src/main/scala目录下

image-20221210111057645

注意:Log4j.properties中日志路径中占位符要配置成绝对路径,否则会报错

  • 修改config模块中server.properties中

image-20221210110850884

  • 设置启动配置

image-20221210110743277

  • 启动

下面是启动成功的日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
12:26:50: Executing ':core:Kafka.main()'...

Starting Gradle Daemon...
Gradle Daemon started in 778 ms
Building project 'core' with Scala version 2.10.6
:clients:compileJava UP-TO-DATE
:clients:processResources UP-TO-DATE
:clients:classes UP-TO-DATE
:clients:determineCommitId UP-TO-DATE
:clients:createVersionFile
:clients:jar UP-TO-DATE
:core:compileJava UP-TO-DATE
:core:compileScala UP-TO-DATE
:core:processResources
:core:classes
:core:Kafka.main()
[2022-12-10 12:26:56,736] INFO KafkaConfig values:
advertised.host.name = null
metric.reporters = []
quota.producer.default = 9223372036854775807
offsets.topic.num.partitions = 50
log.flush.interval.messages = 9223372036854775807
auto.create.topics.enable = true
controller.socket.timeout.ms = 30000
log.flush.interval.ms = null
principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
replica.socket.receive.buffer.bytes = 65536
min.insync.replicas = 1
replica.fetch.wait.max.ms = 500
num.recovery.threads.per.data.dir = 1
ssl.keystore.type = JKS
sasl.mechanism.inter.broker.protocol = GSSAPI
default.replication.factor = 1
ssl.truststore.password = null
log.preallocate = false
sasl.kerberos.principal.to.local.rules = [DEFAULT]
fetch.purgatory.purge.interval.requests = 1000
ssl.endpoint.identification.algorithm = null
replica.socket.timeout.ms = 30000
message.max.bytes = 1000012
num.io.threads = 8
offsets.commit.required.acks = -1
log.flush.offset.checkpoint.interval.ms = 60000
delete.topic.enable = false
quota.window.size.seconds = 1
ssl.truststore.type = JKS
offsets.commit.timeout.ms = 5000
quota.window.num = 11
zookeeper.connect = localhost:2181
authorizer.class.name =
num.replica.fetchers = 1
log.retention.ms = null
log.roll.jitter.hours = 0
log.cleaner.enable = true
offsets.load.buffer.size = 5242880
log.cleaner.delete.retention.ms = 86400000
ssl.client.auth = none
controlled.shutdown.max.retries = 3
queued.max.requests = 500
offsets.topic.replication.factor = 3
log.cleaner.threads = 1
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
socket.request.max.bytes = 104857600
ssl.trustmanager.algorithm = PKIX
zookeeper.session.timeout.ms = 6000
log.retention.bytes = -1
log.message.timestamp.type = CreateTime
sasl.kerberos.min.time.before.relogin = 60000
zookeeper.set.acl = false
connections.max.idle.ms = 600000
offsets.retention.minutes = 1440
replica.fetch.backoff.ms = 1000
inter.broker.protocol.version = 0.10.0-IV1
log.retention.hours = 168
num.partitions = 1
broker.id.generation.enable = true
listeners = null
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
log.roll.ms = null
log.flush.scheduler.interval.ms = 9223372036854775807
ssl.cipher.suites = null
log.index.size.max.bytes = 10485760
ssl.keymanager.algorithm = SunX509
security.inter.broker.protocol = PLAINTEXT
replica.fetch.max.bytes = 1048576
advertised.port = null
log.cleaner.dedupe.buffer.size = 134217728
replica.high.watermark.checkpoint.interval.ms = 5000
log.cleaner.io.buffer.size = 524288
sasl.kerberos.ticket.renew.window.factor = 0.8
zookeeper.connection.timeout.ms = 60000
controlled.shutdown.retry.backoff.ms = 5000
log.roll.hours = 168
log.cleanup.policy = delete
host.name =
log.roll.jitter.ms = null
max.connections.per.ip = 2147483647
offsets.topic.segment.bytes = 104857600
background.threads = 10
quota.consumer.default = 9223372036854775807
request.timeout.ms = 30000
log.message.format.version = 0.10.0-IV1
log.index.interval.bytes = 4096
log.dir = /tmp/kafka-logs
log.segment.bytes = 1073741824
log.cleaner.backoff.ms = 15000
offset.metadata.max.bytes = 4096
ssl.truststore.location = null
group.max.session.timeout.ms = 300000
ssl.keystore.password = null
zookeeper.sync.time.ms = 2000
port = 9092
log.retention.minutes = null
log.segment.delete.delay.ms = 60000
log.dirs = /Users/shuangfan/source-code/development/kafka-log
controlled.shutdown.enable = true
compression.type = producer
max.connections.per.ip.overrides =
log.message.timestamp.difference.max.ms = 9223372036854775807
sasl.kerberos.kinit.cmd = /usr/bin/kinit
log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
auto.leader.rebalance.enable = true
leader.imbalance.check.interval.seconds = 300
log.cleaner.min.cleanable.ratio = 0.5
replica.lag.time.max.ms = 10000
num.network.threads = 3
ssl.key.password = null
reserved.broker.max.id = 1000
metrics.num.samples = 2
socket.send.buffer.bytes = 102400
ssl.protocol = TLS
socket.receive.buffer.bytes = 102400
ssl.keystore.location = null
replica.fetch.min.bytes = 1
broker.rack = null
unclean.leader.election.enable = true
sasl.enabled.mechanisms = [GSSAPI]
group.min.session.timeout.ms = 6000
log.cleaner.io.buffer.load.factor = 0.9
offsets.retention.check.interval.ms = 600000
producer.purgatory.purge.interval.requests = 1000
metrics.sample.window.ms = 30000
broker.id = 0
offsets.topic.compression.codec = 0
log.retention.check.interval.ms = 300000
advertised.listeners = null
leader.imbalance.per.broker.percentage = 10
(kafka.server.KafkaConfig)
[2022-12-10 12:26:56,811] INFO starting (kafka.server.KafkaServer)
[2022-12-10 12:26:56,819] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2022-12-10 12:26:56,830] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2022-12-10 12:26:57,009] INFO Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT (org.apache.zookeeper.ZooKeeper)
[2022-12-10 12:26:57,009] INFO Client environment:host.name=localhost (org.apache.zookeeper.ZooKeeper)
[2022-12-10 12:26:57,009] INFO Client environment:java.version=1.8.0_271 (org.apache.zookeeper.ZooKeeper)
[2022-12-10 12:26:57,009] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2022-12-10 12:26:57,009] INFO Client environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_271.jdk/Contents/Home/jre (org.apache.zookeeper.ZooKeeper)
[2022-12-10 12:26:57,009] INFO Client environment:java.class.path=/Users/shuangfan/source-code/kafka-0.10.0.1-src/core/build/classes/main:/Users/shuangfan/source-code/kafka-0.10.0.1-src/core/build/resources/main:/Users/shuangfan/source-code/kafka-0.10.0.1-src/clients/build/libs/kafka-clients-0.10.0.1.jar:/Users/shuangfan/opt/gradle/gradle-3.1/caches/modules-2/files-2.1/net.sf.jopt-simple/jopt-simple/4.9/ee9e9eaa0a35360dcfeac129ff4923215fd65904/jopt-simple-4.9.jar:/Users/shuangfan/opt/gradle/gradle-3.1/caches/modules-2/files-2.1/com.yammer.metrics/metrics-core/2.2.0/f82c035cfa786d3cbec362c38c22a5f5b1bc8724/metrics-core-2.2.0.jar:/Users/shuangfan/opt/gradle/gradle-3.1/caches/modules-2/files-2.1/org.scala-lang/scala-library/2.10.6/421989aa8f95a05a4f894630aad96b8c7b828732/scala-library-2.10.6.jar:/Users/shuangfan/opt/gradle/gradle-3.1/caches/modules-2/files-2.1/org.slf4j/slf4j-log4j12/1.7.21/7238b064d1aba20da2ac03217d700d91e02460fa/slf4j-log4j12-1.7.21.jar:/Users/shuangfan/opt/gradle/gradle-3.1/caches/modules-2/files-2.1/com.101tec/zkclient/0.8/c0f700a4a3b386279d7d8dd164edecbe836cbfdb/zkclient-0.8.jar:/Users/shuangfan/opt/gradle/gradle-3.1/caches/modules-2/files-2.1/org.apache.zookeeper/zookeeper/3.4.6/1b2502e29da1ebaade2357cd1de35a855fa3755/zookeeper-3.4.6.jar:/Users/shuangfan/opt/gradle/gradle-3.1/caches/modules-2/files-2.1/net.jpountz.lz4/lz4/1.3.0/c708bb2590c0652a642236ef45d9f99ff842a2ce/lz4-1.3.0.jar:/Users/shuangfan/opt/gradle/gradle-3.1/caches/modules-2/files-2.1/org.xerial.snappy/snappy-java/1.1.2.6/48d92871ca286a47f230feb375f0bbffa83b85f6/snappy-java-1.1.2.6.jar:/Users/shuangfan/opt/gradle/gradle-3.1/caches/modules-2/files-2.1/org.slf4j/slf4j-api/1.7.21/139535a69a4239db087de9bab0bee568bf8e0b70/slf4j-api-1.7.21.jar:/Users/shuangfan/opt/gradle/gradle-3.1/caches/modules-2/files-2.1/log4j/log4j/1.2.17/5af35056b4d257e4b64b9e8069c0746e8b08629f/log4j-1.2.17.jar (org.apache.zookeeper.ZooKeeper)
[2022-12-10 12:26:57,009] INFO Client environment:java.library.path=/Users/shuangfan/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:. (org.apache.zookeeper.ZooKeeper)
[2022-12-10 12:26:57,009] INFO Client environment:java.io.tmpdir=/var/folders/wq/k5zn0k617bn81djpplx7m0hc0000gn/T/ (org.apache.zookeeper.ZooKeeper)
[2022-12-10 12:26:57,009] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2022-12-10 12:26:57,009] INFO Client environment:os.name=Mac OS X (org.apache.zookeeper.ZooKeeper)
[2022-12-10 12:26:57,009] INFO Client environment:os.arch=x86_64 (org.apache.zookeeper.ZooKeeper)
[2022-12-10 12:26:57,009] INFO Client environment:os.version=10.16 (org.apache.zookeeper.ZooKeeper)
[2022-12-10 12:26:57,009] INFO Client environment:user.name=shuangfan (org.apache.zookeeper.ZooKeeper)
[2022-12-10 12:26:57,009] INFO Client environment:user.home=/Users/shuangfan (org.apache.zookeeper.ZooKeeper)
[2022-12-10 12:26:57,009] INFO Client environment:user.dir=/Users/shuangfan/source-code/kafka-0.10.0.1-src (org.apache.zookeeper.ZooKeeper)
[2022-12-10 12:26:57,010] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@2b546384 (org.apache.zookeeper.ZooKeeper)
[2022-12-10 12:26:57,026] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient)
[2022-12-10 12:26:57,028] INFO Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2022-12-10 12:26:57,039] INFO Socket connection established to localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2022-12-10 12:26:57,046] INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x184f9bf015a0008, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2022-12-10 12:26:57,047] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2022-12-10 12:26:57,105] INFO Loading logs. (kafka.log.LogManager)
[2022-12-10 12:26:57,114] INFO Logs loading complete. (kafka.log.LogManager)
[2022-12-10 12:26:57,226] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2022-12-10 12:26:57,230] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2022-12-10 12:26:57,235] WARN No meta.properties file under dir /Users/shuangfan/source-code/development/kafka-log/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2022-12-10 12:26:57,284] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2022-12-10 12:26:57,289] INFO [Socket Server on Broker 0], Started 1 acceptor threads (kafka.network.SocketServer)
[2022-12-10 12:26:57,323] INFO [ExpirationReaper-0], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-12-10 12:26:57,325] INFO [ExpirationReaper-0], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-12-10 12:26:57,400] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2022-12-10 12:26:57,408] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2022-12-10 12:26:57,409] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2022-12-10 12:26:57,485] INFO [ExpirationReaper-0], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-12-10 12:26:57,487] INFO [ExpirationReaper-0], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-12-10 12:26:57,513] INFO [GroupCoordinator 0]: Starting up. (kafka.coordinator.GroupCoordinator)
[2022-12-10 12:26:57,514] INFO [GroupCoordinator 0]: Startup complete. (kafka.coordinator.GroupCoordinator)
[2022-12-10 12:26:57,517] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 20 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2022-12-10 12:26:57,535] INFO [ThrottledRequestReaper-Produce], Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2022-12-10 12:26:57,536] INFO [ThrottledRequestReaper-Fetch], Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2022-12-10 12:26:57,543] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2022-12-10 12:26:57,553] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2022-12-10 12:26:57,565] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2022-12-10 12:26:57,569] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2022-12-10 12:26:57,570] INFO Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT -> EndPoint(localhost,9092,PLAINTEXT) (kafka.utils.ZkUtils)
[2022-12-10 12:26:57,571] WARN No meta.properties file under dir /Users/shuangfan/source-code/development/kafka-log/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2022-12-10 12:26:57,582] INFO Kafka version : 0.10.0.1 (org.apache.kafka.common.utils.AppInfoParser)
[2022-12-10 12:26:57,582] INFO Kafka commitId : unknown (org.apache.kafka.common.utils.AppInfoParser)
[2022-12-10 12:26:57,583] INFO [Kafka Server 0], started (kafka.server.KafkaServer)