@@ -66,7 +66,7 @@ import scala.jdk.CollectionConverters._
66
66
import scala .util .control .ControlThrowable
67
67
68
68
class SocketServerTest {
69
- val props = TestUtils .createBrokerConfig(0 , TestUtils . MockZkConnect , port = 0 )
69
+ val props = TestUtils .createBrokerConfig(0 , null , port = 0 )
70
70
props.put(" listeners" , " PLAINTEXT://localhost:0" )
71
71
props.put(" num.network.threads" , " 1" )
72
72
props.put(" socket.send.buffer.bytes" , " 300000" )
@@ -314,73 +314,15 @@ class SocketServerTest {
314
314
)
315
315
}
316
316
317
- @ Test
318
- def testStagedListenerStartup (): Unit = {
319
- shutdownServerAndMetrics(server)
320
- val testProps = new Properties
321
- testProps ++= props
322
- testProps.put(" listeners" , " EXTERNAL://localhost:0,INTERNAL://localhost:0,CONTROL_PLANE://localhost:0" )
323
- testProps.put(" listener.security.protocol.map" , " EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROL_PLANE:PLAINTEXT" )
324
- testProps.put(" control.plane.listener.name" , " CONTROL_PLANE" )
325
- testProps.put(" inter.broker.listener.name" , " INTERNAL" )
326
- val config = KafkaConfig .fromProps(testProps)
327
- val testableServer = new TestableSocketServer (config)
328
-
329
- val updatedEndPoints = config.effectiveAdvertisedBrokerListeners.map { endpoint =>
330
- endpoint.copy(port = testableServer.boundPort(endpoint.listenerName))
331
- }.map(_.toJava)
332
-
333
- val externalReadyFuture = new CompletableFuture [Void ]()
334
-
335
- def controlPlaneListenerStarted () = {
336
- try {
337
- val socket = connect(testableServer, config.controlPlaneListenerName.get, localAddr = InetAddress .getLocalHost)
338
- sendAndReceiveControllerRequest(socket, testableServer)
339
- true
340
- } catch {
341
- case _ : Throwable => false
342
- }
343
- }
344
-
345
- def listenerStarted (listenerName : ListenerName ) = {
346
- try {
347
- val socket = connect(testableServer, listenerName, localAddr = InetAddress .getLocalHost)
348
- sendAndReceiveRequest(socket, testableServer)
349
- true
350
- } catch {
351
- case _ : Throwable => false
352
- }
353
- }
354
-
355
- try {
356
- val externalListener = new ListenerName (" EXTERNAL" )
357
- val externalEndpoint = updatedEndPoints.find(e => e.listenerName.get == externalListener.value).get
358
- val controlPlaneListener = new ListenerName (" CONTROL_PLANE" )
359
- val controlPlaneEndpoint = updatedEndPoints.find(e => e.listenerName.get == controlPlaneListener.value).get
360
- val futures = Map (
361
- externalEndpoint -> externalReadyFuture,
362
- controlPlaneEndpoint -> CompletableFuture .completedFuture[Void ](null ))
363
- val requestProcessingFuture = testableServer.enableRequestProcessing(futures)
364
- TestUtils .waitUntilTrue(() => controlPlaneListenerStarted(), " Control plane listener not started" )
365
- assertFalse(listenerStarted(config.interBrokerListenerName))
366
- assertFalse(listenerStarted(externalListener))
367
- externalReadyFuture.complete(null )
368
- TestUtils .waitUntilTrue(() => listenerStarted(config.interBrokerListenerName), " Inter-broker listener not started" )
369
- TestUtils .waitUntilTrue(() => listenerStarted(externalListener), " External listener not started" )
370
- requestProcessingFuture.get(1 , TimeUnit .MINUTES )
371
- } finally {
372
- shutdownServerAndMetrics(testableServer)
373
- }
374
- }
375
-
376
317
@ Test
377
318
def testStagedListenerShutdownWhenConnectionQueueIsFull (): Unit = {
378
319
shutdownServerAndMetrics(server)
379
320
val testProps = new Properties
380
321
testProps ++= props
381
- testProps.put(" listeners" , " EXTERNAL://localhost:0,INTERNAL://localhost:0,CONTROLLER://localhost:0" )
322
+ testProps.put(" listeners" , " EXTERNAL://localhost:0,INTERNAL://localhost:0" )
323
+ testProps.put(" advertised.listeners" , " EXTERNAL://localhost:0,INTERNAL://localhost:0" )
382
324
testProps.put(" listener.security.protocol.map" , " EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT" )
383
- testProps.put(" control.plane. listener.name " , " CONTROLLER" )
325
+ testProps.put(" controller. listener.names " , " CONTROLLER" )
384
326
testProps.put(" inter.broker.listener.name" , " INTERNAL" )
385
327
val config = KafkaConfig .fromProps(testProps)
386
328
val connectionQueueSize = 1
@@ -825,7 +767,7 @@ class SocketServerTest {
825
767
826
768
@ Test
827
769
def testZeroMaxConnectionsPerIp (): Unit = {
828
- val newProps = TestUtils .createBrokerConfig(0 , TestUtils . MockZkConnect , port = 0 )
770
+ val newProps = TestUtils .createBrokerConfig(0 , null , port = 0 )
829
771
newProps.setProperty(SocketServerConfigs .MAX_CONNECTIONS_PER_IP_CONFIG , " 0" )
830
772
newProps.setProperty(SocketServerConfigs .MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG , " %s:%s" .format(" 127.0.0.1" , " 5" ))
831
773
val server = new SocketServer (KafkaConfig .fromProps(newProps), new Metrics (),
@@ -864,7 +806,7 @@ class SocketServerTest {
864
806
@ Test
865
807
def testMaxConnectionsPerIpOverrides (): Unit = {
866
808
val overrideNum = server.config.maxConnectionsPerIp + 1
867
- val overrideProps = TestUtils .createBrokerConfig(0 , TestUtils . MockZkConnect , port = 0 )
809
+ val overrideProps = TestUtils .createBrokerConfig(0 , null , port = 0 )
868
810
overrideProps.put(SocketServerConfigs .MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG , s " localhost: $overrideNum" )
869
811
val serverMetrics = new Metrics ()
870
812
val overrideServer = new SocketServer (KafkaConfig .fromProps(overrideProps), serverMetrics,
@@ -923,7 +865,7 @@ class SocketServerTest {
923
865
@ Test
924
866
def testConnectionRatePerIp (): Unit = {
925
867
val defaultTimeoutMs = 2000
926
- val overrideProps = TestUtils .createBrokerConfig(0 , TestUtils . MockZkConnect , port = 0 )
868
+ val overrideProps = TestUtils .createBrokerConfig(0 , null , port = 0 )
927
869
overrideProps.remove(SocketServerConfigs .MAX_CONNECTIONS_PER_IP_CONFIG )
928
870
overrideProps.put(QuotaConfig .NUM_QUOTA_SAMPLES_CONFIG , String .valueOf(2 ))
929
871
val connectionRate = 5
@@ -974,7 +916,7 @@ class SocketServerTest {
974
916
975
917
@ Test
976
918
def testThrottledSocketsClosedOnShutdown (): Unit = {
977
- val overrideProps = TestUtils .createBrokerConfig(0 , TestUtils . MockZkConnect , port = 0 )
919
+ val overrideProps = TestUtils .createBrokerConfig(0 , null , port = 0 )
978
920
overrideProps.remove(" max.connections.per.ip" )
979
921
overrideProps.put(QuotaConfig .NUM_QUOTA_SAMPLES_CONFIG , String .valueOf(2 ))
980
922
val connectionRate = 5
@@ -1051,6 +993,7 @@ class SocketServerTest {
1051
993
val password = " admin-secret"
1052
994
val reauthMs = 1500
1053
995
props.setProperty(" listeners" , " SASL_PLAINTEXT://localhost:0" )
996
+ props.setProperty(" advertised.listeners" , " SASL_PLAINTEXT://localhost:0" )
1054
997
props.setProperty(" security.inter.broker.protocol" , " SASL_PLAINTEXT" )
1055
998
props.setProperty(" listener.name.sasl_plaintext.plain.sasl.jaas.config" ,
1056
999
" org.apache.kafka.common.security.plain.PlainLoginModule required " +
@@ -1059,8 +1002,9 @@ class SocketServerTest {
1059
1002
props.setProperty(" listener.name.sasl_plaintext.sasl.enabled.mechanisms" , " PLAIN" )
1060
1003
props.setProperty(" num.network.threads" , " 1" )
1061
1004
props.setProperty(" connections.max.reauth.ms" , reauthMs.toString)
1062
- val overrideProps = TestUtils .createBrokerConfig(0 , TestUtils .MockZkConnect ,
1063
- saslProperties = Some (props), enableSaslPlaintext = true )
1005
+ props.setProperty(" listener.security.protocol.map" , " SASL_PLAINTEXT:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT" )
1006
+
1007
+ val overrideProps = TestUtils .createBrokerConfig(0 , null , saslProperties = Some (props), enableSaslPlaintext = true )
1064
1008
val time = new MockTime ()
1065
1009
val overrideServer = new TestableSocketServer (KafkaConfig .fromProps(overrideProps), time = time)
1066
1010
try {
@@ -1140,7 +1084,7 @@ class SocketServerTest {
1140
1084
}
1141
1085
1142
1086
private def checkClientDisconnectionUpdatesRequestMetrics (responseBufferSize : Int ): Unit = {
1143
- val props = TestUtils .createBrokerConfig(0 , TestUtils . MockZkConnect , port = 0 )
1087
+ val props = TestUtils .createBrokerConfig(0 , null , port = 0 )
1144
1088
val overrideServer = new TestableSocketServer (KafkaConfig .fromProps(props))
1145
1089
1146
1090
try {
@@ -1173,7 +1117,7 @@ class SocketServerTest {
1173
1117
def testServerShutdownWithoutEnable (): Unit = {
1174
1118
// The harness server has already been enabled, so it's invalid for this test.
1175
1119
shutdownServerAndMetrics(server)
1176
- val props = TestUtils .createBrokerConfig(0 , TestUtils . MockZkConnect , port = 0 )
1120
+ val props = TestUtils .createBrokerConfig(0 , null , port = 0 )
1177
1121
val overrideServer = new TestableSocketServer (KafkaConfig .fromProps(props))
1178
1122
overrideServer.shutdown()
1179
1123
assertFalse(overrideServer.testableAcceptor.isOpen)
@@ -1831,33 +1775,13 @@ class SocketServerTest {
1831
1775
}
1832
1776
}
1833
1777
1834
-
1835
- @ Test
1836
- def testControlPlaneAsPrivilegedListener (): Unit = {
1837
- val testProps = new Properties
1838
- testProps ++= props
1839
- testProps.put(" listeners" , " PLAINTEXT://localhost:0,CONTROLLER://localhost:0" )
1840
- testProps.put(" listener.security.protocol.map" , " PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT" )
1841
- testProps.put(" control.plane.listener.name" , " CONTROLLER" )
1842
- val config = KafkaConfig .fromProps(testProps)
1843
- withTestableServer(config, { testableServer =>
1844
- val controlPlaneSocket = connect(testableServer, config.controlPlaneListenerName.get,
1845
- localAddr = InetAddress .getLocalHost)
1846
- val sentRequest = sendAndReceiveControllerRequest(controlPlaneSocket, testableServer)
1847
- assertTrue(sentRequest.context.fromPrivilegedListener)
1848
-
1849
- val plainSocket = connect(testableServer, localAddr = InetAddress .getLocalHost)
1850
- val plainRequest = sendAndReceiveRequest(plainSocket, testableServer)
1851
- assertFalse(plainRequest.context.fromPrivilegedListener)
1852
- })
1853
- }
1854
-
1855
1778
@ Test
1856
1779
def testInterBrokerListenerAsPrivilegedListener (): Unit = {
1857
1780
val testProps = new Properties
1858
1781
testProps ++= props
1859
1782
testProps.put(" listeners" , " EXTERNAL://localhost:0,INTERNAL://localhost:0" )
1860
- testProps.put(" listener.security.protocol.map" , " EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT" )
1783
+ testProps.put(" advertised.listeners" , " EXTERNAL://localhost:0,INTERNAL://localhost:0" )
1784
+ testProps.put(" listener.security.protocol.map" , " EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT" )
1861
1785
testProps.put(" inter.broker.listener.name" , " INTERNAL" )
1862
1786
val config = KafkaConfig .fromProps(testProps)
1863
1787
withTestableServer(config, { testableServer =>
@@ -1873,33 +1797,6 @@ class SocketServerTest {
1873
1797
})
1874
1798
}
1875
1799
1876
- @ Test
1877
- def testControlPlaneTakePrecedenceOverInterBrokerListenerAsPrivilegedListener (): Unit = {
1878
- val testProps = new Properties
1879
- testProps ++= props
1880
- testProps.put(" listeners" , " EXTERNAL://localhost:0,INTERNAL://localhost:0,CONTROLLER://localhost:0" )
1881
- testProps.put(" listener.security.protocol.map" , " EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT" )
1882
- testProps.put(" control.plane.listener.name" , " CONTROLLER" )
1883
- testProps.put(" inter.broker.listener.name" , " INTERNAL" )
1884
- val config = KafkaConfig .fromProps(testProps)
1885
- withTestableServer(config, { testableServer =>
1886
- val controlPlaneSocket = connect(testableServer, config.controlPlaneListenerName.get,
1887
- localAddr = InetAddress .getLocalHost)
1888
- val controlPlaneRequest = sendAndReceiveControllerRequest(controlPlaneSocket, testableServer)
1889
- assertTrue(controlPlaneRequest.context.fromPrivilegedListener)
1890
-
1891
- val interBrokerSocket = connect(testableServer, config.interBrokerListenerName,
1892
- localAddr = InetAddress .getLocalHost)
1893
- val interBrokerRequest = sendAndReceiveRequest(interBrokerSocket, testableServer)
1894
- assertFalse(interBrokerRequest.context.fromPrivilegedListener)
1895
-
1896
- val externalSocket = connect(testableServer, new ListenerName (" EXTERNAL" ),
1897
- localAddr = InetAddress .getLocalHost)
1898
- val externalRequest = sendAndReceiveRequest(externalSocket, testableServer)
1899
- assertFalse(externalRequest.context.fromPrivilegedListener)
1900
- })
1901
- }
1902
-
1903
1800
@ Test
1904
1801
def testListenBacklogSize (): Unit = {
1905
1802
val backlogSize = 128
@@ -2029,9 +1926,10 @@ class SocketServerTest {
2029
1926
2030
1927
private def sslServerProps : Properties = {
2031
1928
val trustStoreFile = TestUtils .tempFile(" truststore" , " .jks" )
2032
- val sslProps = TestUtils .createBrokerConfig(0 , TestUtils . MockZkConnect , interBrokerSecurityProtocol = Some (SecurityProtocol .SSL ),
1929
+ val sslProps = TestUtils .createBrokerConfig(0 , null , interBrokerSecurityProtocol = Some (SecurityProtocol .SSL ),
2033
1930
trustStoreFile = Some (trustStoreFile))
2034
1931
sslProps.put(SocketServerConfigs .LISTENERS_CONFIG , " SSL://localhost:0" )
1932
+ sslProps.put(SocketServerConfigs .ADVERTISED_LISTENERS_CONFIG , " SSL://localhost:0" )
2035
1933
sslProps.put(SocketServerConfigs .NUM_NETWORK_THREADS_CONFIG , " 1" )
2036
1934
sslProps
2037
1935
}
@@ -2053,11 +1951,6 @@ class SocketServerTest {
2053
1951
}
2054
1952
}
2055
1953
2056
- def sendAndReceiveControllerRequest (socket : Socket , server : SocketServer ): RequestChannel .Request = {
2057
- sendRequest(socket, producerRequestBytes())
2058
- receiveRequest(server.controlPlaneRequestChannelOpt.get)
2059
- }
2060
-
2061
1954
private def assertProcessorHealthy (testableServer : TestableSocketServer , healthySockets : Seq [Socket ] = Seq .empty): Unit = {
2062
1955
val selector = testableServer.testableSelector
2063
1956
selector.reset()
@@ -2203,7 +2096,8 @@ class SocketServerTest {
2203
2096
time : Time = Time .SYSTEM ,
2204
2097
connectionDisconnectListeners : Seq [ConnectionDisconnectListener ] = Seq .empty
2205
2098
) extends SocketServer (
2206
- config, new Metrics , time, credentialProvider, apiVersionManager, connectionDisconnectListeners = connectionDisconnectListeners
2099
+ config, new Metrics , time, credentialProvider, apiVersionManager,
2100
+ connectionDisconnectListeners = connectionDisconnectListeners
2207
2101
) {
2208
2102
2209
2103
override def createDataPlaneAcceptor (endPoint : EndPoint , isPrivilegedListener : Boolean , requestChannel : RequestChannel ) : DataPlaneAcceptor = {
0 commit comments