Skip to content

Commit c28d9a3

Browse files
authored
KAFKA-18435 Remove zookeeper dependencies in build.gradle (#18450)
Remove Apache ZooKeeper from the Apache Kafka build. Also remove commons IO, commons CLI, and netty, which were dependencies we took only because of ZooKeeper. In order to keep the size of this PR manageable, I did not remove all classes which formerly interfaced with ZK. I just removed the ZK types. Fortunately, Kafka generally wrapped ZK data structures rather than using them directly. Some classes were pretty entangled with ZK, so it was easier just to stub them out. For ZkNodeChangeNotificationListener.scala, PartitionStateMachine.scala, ReplicaStateMachine.scala, KafkaZkClient.scala, and ZookeeperClient.scala, I replaced all the functions with "throw new UnsupportedOperationException". Since the tests for these classes have been removed, as well as the ZK-based broker code, this should be OK as an incremental step. Reviewers: Chia-Ping Tsai <[email protected]>
1 parent d1b1d9d commit c28d9a3

16 files changed

+182
-2772
lines changed

LICENSE-binary

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -208,10 +208,8 @@ License Version 2.0:
208208
audience-annotations-0.12.0
209209
caffeine-3.1.1
210210
commons-beanutils-1.9.4
211-
commons-cli-1.4
212211
commons-collections-3.2.2
213212
commons-digester-2.1
214-
commons-io-2.14.0
215213
commons-lang3-3.12.0
216214
commons-logging-1.3.2
217215
commons-validator-1.9.0
@@ -257,15 +255,6 @@ lz4-java-1.8.0
257255
maven-artifact-3.9.6
258256
metrics-core-4.1.12.1
259257
metrics-core-2.2.0
260-
netty-buffer-4.1.115.Final
261-
netty-codec-4.1.115.Final
262-
netty-common-4.1.115.Final
263-
netty-handler-4.1.115.Final
264-
netty-resolver-4.1.115.Final
265-
netty-transport-4.1.115.Final
266-
netty-transport-classes-epoll-4.1.115.Final
267-
netty-transport-native-epoll-4.1.115.Final
268-
netty-transport-native-unix-common-4.1.115.Final
269258
opentelemetry-proto-1.0.0-alpha
270259
plexus-utils-3.5.1
271260
rocksdbjni-7.9.2
@@ -275,8 +264,6 @@ scala-reflect-2.13.15
275264
snappy-java-1.1.10.5
276265
snakeyaml-2.2
277266
swagger-annotations-2.2.25
278-
zookeeper-3.8.4
279-
zookeeper-jute-3.8.4
280267

281268
===============================================================================
282269
This product bundles various third-party components under other open source

NOTICE-binary

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -596,45 +596,6 @@ limitations under the License.
596596
This software includes projects with other licenses -- see `doc/LICENSE.md`.
597597

598598

599-
Apache ZooKeeper - Server
600-
Copyright 2008-2021 The Apache Software Foundation
601-
602-
This product includes software developed at
603-
The Apache Software Foundation (http://www.apache.org/).
604-
605-
606-
Apache ZooKeeper - Jute
607-
Copyright 2008-2021 The Apache Software Foundation
608-
609-
This product includes software developed at
610-
The Apache Software Foundation (http://www.apache.org/).
611-
612-
613-
The Netty Project
614-
=================
615-
616-
Please visit the Netty web site for more information:
617-
618-
* https://netty.io/
619-
620-
Copyright 2014 The Netty Project
621-
622-
The Netty Project licenses this file to you under the Apache License,
623-
version 2.0 (the "License"); you may not use this file except in compliance
624-
with the License. You may obtain a copy of the License at:
625-
626-
https://www.apache.org/licenses/LICENSE-2.0
627-
628-
Unless required by applicable law or agreed to in writing, software
629-
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
630-
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
631-
License for the specific language governing permissions and limitations
632-
under the License.
633-
634-
Also, please refer to each LICENSE.<component>.txt file, which is located in
635-
the 'license' directory of the distribution file, for the license terms of the
636-
components that this product depends on.
637-
638599
-------------------------------------------------------------------------------
639600
This product contains the extensions to Java Collections Framework which has
640601
been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene:

build.gradle

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -190,10 +190,6 @@ allprojects {
190190
libs.scalaReflect,
191191
libs.jacksonAnnotations,
192192
libs.jacksonDatabindYaml,
193-
// be explicit about the Netty dependency version instead of relying on the version set by
194-
// ZooKeeper (potentially older and containing CVEs)
195-
libs.nettyHandler,
196-
libs.nettyTransportNativeEpoll,
197193
libs.log4j2Api,
198194
libs.log4j2Core,
199195
libs.log4j1Bridge2Api
@@ -1107,20 +1103,6 @@ project(':core') {
11071103
// only needed transitively, but set it explicitly to ensure it has the same version as scala-library
11081104
implementation libs.scalaReflect
11091105
implementation libs.scalaLogging
1110-
implementation libs.commonsIo // ZooKeeper dependency. Do not use, this is going away.
1111-
implementation(libs.zookeeper) {
1112-
// Dropwizard Metrics are required by ZooKeeper as of v3.6.0,
1113-
// but the library should *not* be used in Kafka code
1114-
implementation libs.dropwizardMetrics
1115-
exclude module: 'slf4j-log4j12'
1116-
exclude module: 'log4j'
1117-
// Both Kafka and Zookeeper use slf4j. ZooKeeper moved from log4j to logback in v3.8.0.
1118-
// We are removing Zookeeper's dependency on logback so we have a singular logging backend.
1119-
exclude module: 'logback-classic'
1120-
exclude module: 'logback-core'
1121-
}
1122-
// ZooKeeperMain depends on commons-cli but declares the dependency as `provided`
1123-
implementation libs.commonsCli
11241106
implementation log4jLibs
11251107

11261108
runtimeOnly log4jRuntimeLibs

checkstyle/import-control.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,6 @@
451451
<subpackage name="internals">
452452
<allow pkg="com.fasterxml.jackson" />
453453
<allow pkg="kafka.utils" />
454-
<allow pkg="org.apache.zookeeper" />
455454
<allow pkg="org.apache.log4j" />
456455
</subpackage>
457456
</subpackage>

core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala

Lines changed: 7 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,9 @@
1616
*/
1717
package kafka.common
1818

19-
import java.nio.charset.StandardCharsets.UTF_8
20-
import java.util.concurrent.LinkedBlockingQueue
21-
import java.util.concurrent.atomic.AtomicBoolean
22-
2319
import kafka.utils.Logging
24-
import kafka.zk.{KafkaZkClient, StateChangeHandlers}
25-
import kafka.zookeeper.{StateChangeHandler, ZNodeChildChangeHandler}
20+
import kafka.zk.KafkaZkClient
2621
import org.apache.kafka.common.utils.Time
27-
import org.apache.kafka.server.util.ShutdownableThread
28-
29-
import scala.collection.Seq
30-
import scala.util.{Failure, Try}
3122

3223
/**
3324
* Handle the notificationMessage.
@@ -56,105 +47,20 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient,
5647
private val notificationHandler: NotificationHandler,
5748
private val changeExpirationMs: Long = 15 * 60 * 1000,
5849
private val time: Time = Time.SYSTEM) extends Logging {
59-
private var lastExecutedChange = -1L
60-
private val queue = new LinkedBlockingQueue[ChangeNotification]
61-
private val thread = new ChangeEventProcessThread(s"$seqNodeRoot-event-process-thread")
62-
private val isClosed = new AtomicBoolean(false)
63-
6450
def init(): Unit = {
65-
zkClient.registerStateChangeHandler(ZkStateChangeHandler)
66-
zkClient.registerZNodeChildChangeHandler(ChangeNotificationHandler)
67-
addChangeNotification()
68-
thread.start()
51+
throw new UnsupportedOperationException()
6952
}
7053

7154
def close(): Unit = {
72-
isClosed.set(true)
73-
zkClient.unregisterStateChangeHandler(ZkStateChangeHandler.name)
74-
zkClient.unregisterZNodeChildChangeHandler(ChangeNotificationHandler.path)
75-
queue.clear()
76-
thread.shutdown()
55+
throw new UnsupportedOperationException()
7756
}
7857

79-
/**
80-
* Process notifications
81-
*/
82-
private def processNotifications(): Unit = {
83-
try {
84-
val notifications = zkClient.getChildren(seqNodeRoot).sorted
85-
if (notifications.nonEmpty) {
86-
info(s"Processing notification(s) to $seqNodeRoot")
87-
val now = time.milliseconds
88-
for (notification <- notifications) {
89-
val changeId = changeNumber(notification)
90-
if (changeId > lastExecutedChange) {
91-
processNotification(notification)
92-
lastExecutedChange = changeId
93-
}
94-
}
95-
purgeObsoleteNotifications(now, notifications)
96-
}
97-
} catch {
98-
case e: InterruptedException => if (!isClosed.get) error(s"Error while processing notification change for path = $seqNodeRoot", e)
99-
case e: Exception => error(s"Error while processing notification change for path = $seqNodeRoot", e)
100-
}
101-
}
58+
object ZkStateChangeHandler {
59+
val name: String = null
10260

103-
private def processNotification(notification: String): Unit = {
104-
val changeZnode = seqNodeRoot + "/" + notification
105-
val (data, _) = zkClient.getDataAndStat(changeZnode)
106-
data match {
107-
case Some(d) => Try(notificationHandler.processNotification(d)) match {
108-
case Failure(e) => error(s"error processing change notification ${new String(d, UTF_8)} from $changeZnode", e)
109-
case _ =>
110-
}
111-
case None => warn(s"read null data from $changeZnode")
61+
def afterInitializingSession(): Unit = {
62+
throw new UnsupportedOperationException()
11263
}
11364
}
114-
115-
private def addChangeNotification(): Unit = {
116-
if (!isClosed.get && queue.peek() == null)
117-
queue.put(new ChangeNotification)
118-
}
119-
120-
private class ChangeNotification {
121-
def process(): Unit = processNotifications()
122-
}
123-
124-
/**
125-
* Purges expired notifications.
126-
*
127-
* @param now
128-
* @param notifications
129-
*/
130-
private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]): Unit = {
131-
for (notification <- notifications.sorted) {
132-
val notificationNode = seqNodeRoot + "/" + notification
133-
val (data, stat) = zkClient.getDataAndStat(notificationNode)
134-
if (data.isDefined) {
135-
if (now - stat.getCtime > changeExpirationMs) {
136-
debug(s"Purging change notification $notificationNode")
137-
zkClient.deletePath(notificationNode)
138-
}
139-
}
140-
}
141-
}
142-
143-
/* get the change number from a change notification znode */
144-
private def changeNumber(name: String): Long = name.substring(seqNodePrefix.length).toLong
145-
146-
private class ChangeEventProcessThread(name: String) extends ShutdownableThread(name) {
147-
override def doWork(): Unit = queue.take().process()
148-
}
149-
150-
private object ChangeNotificationHandler extends ZNodeChildChangeHandler {
151-
override val path: String = seqNodeRoot
152-
override def handleChildChange(): Unit = addChangeNotification()
153-
}
154-
155-
object ZkStateChangeHandler extends StateChangeHandler {
156-
override val name: String = StateChangeHandlers.zkNodeChangeListenerHandler(seqNodeRoot)
157-
override def afterInitializingSession(): Unit = addChangeNotification()
158-
}
15965
}
16066

core/src/main/scala/kafka/controller/KafkaController.scala

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,9 @@ import kafka.utils._
2828
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
2929
import kafka.zk.TopicZNode.TopicIdReplicaAssignment
3030
import kafka.zk.{FeatureZNodeStatus, _}
31-
import kafka.zookeeper.{StateChangeHandler, ZNodeChangeHandler, ZNodeChildChangeHandler}
31+
import kafka.zookeeper.{StateChangeHandler, ZNodeChangeHandler, ZNodeChildChangeHandler, ZooKeeperClientException}
3232
import org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType
3333
import org.apache.kafka.common.ElectionType
34-
import org.apache.kafka.common.KafkaException
3534
import org.apache.kafka.common.TopicPartition
3635
import org.apache.kafka.common.Uuid
3736
import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException, StaleBrokerEpochException}
@@ -45,8 +44,6 @@ import org.apache.kafka.server.BrokerFeatures
4544
import org.apache.kafka.server.common.{AdminOperationException, ProducerIdsBlock}
4645
import org.apache.kafka.server.metrics.KafkaMetricsGroup
4746
import org.apache.kafka.server.util.KafkaScheduler
48-
import org.apache.zookeeper.KeeperException
49-
import org.apache.zookeeper.KeeperException.Code
5047

5148
import scala.collection.{Map, Seq, Set, immutable, mutable}
5249
import scala.collection.mutable.ArrayBuffer
@@ -1068,21 +1065,7 @@ class KafkaController(val config: KafkaConfig,
10681065
}
10691066

10701067
private def updateReplicaAssignmentForPartition(topicPartition: TopicPartition, assignment: ReplicaAssignment): Unit = {
1071-
val topicAssignment = mutable.Map() ++=
1072-
controllerContext.partitionFullReplicaAssignmentForTopic(topicPartition.topic) +=
1073-
(topicPartition -> assignment)
1074-
1075-
val setDataResponse = zkClient.setTopicAssignmentRaw(topicPartition.topic,
1076-
controllerContext.topicIds.get(topicPartition.topic),
1077-
topicAssignment, controllerContext.epochZkVersion)
1078-
setDataResponse.resultCode match {
1079-
case Code.OK =>
1080-
info(s"Successfully updated assignment of partition $topicPartition to $assignment")
1081-
case Code.NONODE =>
1082-
throw new IllegalStateException(s"Failed to update assignment for $topicPartition since the topic " +
1083-
"has no current assignment")
1084-
case _ => throw new KafkaException(setDataResponse.resultException.get)
1085-
}
1068+
throw new UnsupportedOperationException()
10861069
}
10871070

10881071
private def startNewReplicasForReassignedPartition(topicPartition: TopicPartition, newReplicas: Seq[Int]): Unit = {
@@ -1186,7 +1169,7 @@ class KafkaController(val config: KafkaConfig,
11861169
try {
11871170
zkClient.setOrCreatePartitionReassignment(updatedPartitionsBeingReassigned, controllerContext.epochZkVersion)
11881171
} catch {
1189-
case e: KeeperException => throw new AdminOperationException(e)
1172+
case e: ZooKeeperClientException => throw new AdminOperationException(e)
11901173
}
11911174
}
11921175
}

0 commit comments

Comments
 (0)