Skip to content

Commit 875da35

Browse files
authored
KAFKA-18339: Remove raw unversioned direct SASL protocol (KIP-896) (#18295)
Clients that support SASL but don't implement KIP-43 (eg Kafka producer/consumer 0.9.0.x) will fail to connect after this change. Added unit tests and also manually tested with the console producer 0.9.0. While testing, I noticed that the logged message when a 0.9.0 Java client is used without sasl is slightly misleading - fixed that too. Reviewers: Manikumar Reddy <[email protected]>
1 parent e6d2421 commit 875da35

File tree

3 files changed

+64
-55
lines changed

3 files changed

+64
-55
lines changed

clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java

Lines changed: 28 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public class SaslServerAuthenticator implements Authenticator {
106106
* state and likewise ends at either {@link #COMPLETE} or {@link #FAILED}.
107107
*/
108108
private enum SaslState {
109-
INITIAL_REQUEST, // May be GSSAPI token, SaslHandshake or ApiVersions for authentication
109+
INITIAL_REQUEST, // May be SaslHandshake or ApiVersions for authentication
110110
HANDSHAKE_OR_VERSIONS_REQUEST, // May be SaslHandshake or ApiVersions
111111
HANDSHAKE_REQUEST, // After an ApiVersions request, next request must be SaslHandshake
112112
AUTHENTICATE, // Authentication tokens (SaslHandshake v1 and above indicate SaslAuthenticate headers)
@@ -277,15 +277,11 @@ public void authenticate() throws IOException {
277277
case REAUTH_PROCESS_HANDSHAKE:
278278
case HANDSHAKE_OR_VERSIONS_REQUEST:
279279
case HANDSHAKE_REQUEST:
280+
case INITIAL_REQUEST:
280281
handleKafkaRequest(clientToken);
281282
break;
282283
case REAUTH_BAD_MECHANISM:
283284
throw new SaslAuthenticationException(reauthInfo.badMechanismErrorMessage);
284-
case INITIAL_REQUEST:
285-
if (handleKafkaRequest(clientToken))
286-
break;
287-
// For default GSSAPI, fall through to authenticate using the client token as the first GSSAPI packet.
288-
// This is required for interoperability with 0.9.0.x clients which do not send handshake request
289285
case AUTHENTICATE:
290286
handleSaslToken(clientToken);
291287
// When the authentication exchange is complete and no more tokens are expected from the client,
@@ -503,63 +499,51 @@ private void handleSaslToken(byte[] clientToken) throws IOException {
503499
}
504500
}
505501

506-
private boolean handleKafkaRequest(byte[] requestBytes) throws IOException, AuthenticationException {
507-
boolean isKafkaRequest = false;
508-
String clientMechanism = null;
502+
/**
503+
* @throws InvalidRequestException if the request is not in Kafka format or if the API key is invalid. Clients
504+
* that support SASL without support for KIP-43 (e.g. Kafka Clients 0.9.x) are in the former bucket - the first
505+
* packet such clients send is a GSSAPI token starting with 0x60.
506+
*/
507+
private void handleKafkaRequest(byte[] requestBytes) throws IOException, AuthenticationException {
509508
try {
510509
ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
511510
RequestHeader header = RequestHeader.parse(requestBuffer);
512511
ApiKeys apiKey = header.apiKey();
513512

514-
// A valid Kafka request header was received. SASL authentication tokens are now expected only
515-
// following a SaslHandshakeRequest since this is not a GSSAPI client token from a Kafka 0.9.0.x client.
516-
if (saslState == SaslState.INITIAL_REQUEST)
517-
setSaslState(SaslState.HANDSHAKE_OR_VERSIONS_REQUEST);
518-
isKafkaRequest = true;
519-
520513
// Raise an error prior to parsing if the api cannot be handled at this layer. This avoids
521514
// unnecessary exposure to some of the more complex schema types.
522515
if (apiKey != ApiKeys.API_VERSIONS && apiKey != ApiKeys.SASL_HANDSHAKE)
523-
throw new IllegalSaslStateException("Unexpected Kafka request of type " + apiKey + " during SASL handshake.");
516+
throw new InvalidRequestException("Unexpected Kafka request of type " + apiKey + " during SASL handshake.");
524517

525518
LOG.debug("Handling Kafka request {} during {}", apiKey, reauthInfo.authenticationOrReauthenticationText());
526519

527-
528520
RequestContext requestContext = new RequestContext(header, connectionId, clientAddress(), Optional.of(clientPort()),
529521
KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol, ClientInformation.EMPTY, false);
530522
RequestAndSize requestAndSize = requestContext.parseRequest(requestBuffer);
523+
524+
// A valid Kafka request was received, we can now update the sasl state
525+
if (saslState == SaslState.INITIAL_REQUEST)
526+
setSaslState(SaslState.HANDSHAKE_OR_VERSIONS_REQUEST);
527+
531528
if (apiKey == ApiKeys.API_VERSIONS)
532529
handleApiVersionsRequest(requestContext, (ApiVersionsRequest) requestAndSize.request);
533-
else
534-
clientMechanism = handleHandshakeRequest(requestContext, (SaslHandshakeRequest) requestAndSize.request);
530+
else {
531+
String clientMechanism = handleHandshakeRequest(requestContext, (SaslHandshakeRequest) requestAndSize.request);
532+
if (!reauthInfo.reauthenticating() || reauthInfo.saslMechanismUnchanged(clientMechanism)) {
533+
createSaslServer(clientMechanism);
534+
setSaslState(SaslState.AUTHENTICATE);
535+
}
536+
}
535537
} catch (InvalidRequestException e) {
536538
if (saslState == SaslState.INITIAL_REQUEST) {
537-
// InvalidRequestException is thrown if the request is not in Kafka format or if the API key
538-
// is invalid. For compatibility with 0.9.0.x where the first packet is a GSSAPI token
539-
// starting with 0x60, revert to GSSAPI for both these exceptions.
540-
if (LOG.isDebugEnabled()) {
541-
StringBuilder tokenBuilder = new StringBuilder();
542-
for (byte b : requestBytes) {
543-
tokenBuilder.append(String.format("%02x", b));
544-
if (tokenBuilder.length() >= 20)
545-
break;
546-
}
547-
LOG.debug("Received client packet of length {} starting with bytes 0x{}, process as GSSAPI packet", requestBytes.length, tokenBuilder);
548-
}
549-
if (enabledMechanisms.contains(SaslConfigs.GSSAPI_MECHANISM)) {
550-
LOG.debug("First client packet is not a SASL mechanism request, using default mechanism GSSAPI");
551-
clientMechanism = SaslConfigs.GSSAPI_MECHANISM;
552-
} else
553-
throw new UnsupportedSaslMechanismException("Exception handling first SASL packet from client, GSSAPI is not supported by server", e);
554-
} else
555-
throw e;
556-
}
557-
if (clientMechanism != null && (!reauthInfo.reauthenticating()
558-
|| reauthInfo.saslMechanismUnchanged(clientMechanism))) {
559-
createSaslServer(clientMechanism);
560-
setSaslState(SaslState.AUTHENTICATE);
539+
// InvalidRequestException is thrown if the request is not in Kafka format or if the API key is invalid.
540+
// If it's the initial request, this could be an ancient client (see method documentation for more details),
541+
// a client configured with the wrong security protocol or a non kafka-client altogether (eg http client).
542+
throw new InvalidRequestException("Invalid request, potential reasons: kafka client configured with the " +
543+
"wrong security protocol, it does not support KIP-43 or it is not a kafka client.", e);
544+
}
545+
throw e;
561546
}
562-
return isKafkaRequest;
563547
}
564548

565549
private String handleHandshakeRequest(RequestContext context, SaslHandshakeRequest handshakeRequest) throws IOException, UnsupportedSaslMechanismException {

clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@
1717
package org.apache.kafka.common.security.authenticator;
1818

1919
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
20-
import org.apache.kafka.common.errors.IllegalSaslStateException;
20+
import org.apache.kafka.common.errors.InvalidRequestException;
2121
import org.apache.kafka.common.errors.SaslAuthenticationException;
2222
import org.apache.kafka.common.message.ApiMessageType;
23+
import org.apache.kafka.common.message.RequestHeaderData;
2324
import org.apache.kafka.common.message.SaslAuthenticateRequestData;
2425
import org.apache.kafka.common.message.SaslHandshakeRequestData;
2526
import org.apache.kafka.common.network.ChannelBuilders;
@@ -63,6 +64,7 @@
6364
import java.nio.Buffer;
6465
import java.nio.ByteBuffer;
6566
import java.time.Duration;
67+
import java.util.Arrays;
6668
import java.util.Collections;
6769
import java.util.HashMap;
6870
import java.util.List;
@@ -77,7 +79,6 @@
7779
import static org.apache.kafka.common.security.scram.internals.ScramMechanism.SCRAM_SHA_256;
7880
import static org.junit.jupiter.api.Assertions.assertEquals;
7981
import static org.junit.jupiter.api.Assertions.assertThrows;
80-
import static org.junit.jupiter.api.Assertions.fail;
8182
import static org.mockito.ArgumentMatchers.any;
8283
import static org.mockito.ArgumentMatchers.anyMap;
8384
import static org.mockito.ArgumentMatchers.eq;
@@ -107,7 +108,7 @@ public void testOversizeRequest() throws IOException {
107108
}
108109

109110
@Test
110-
public void testUnexpectedRequestType() throws IOException {
111+
public void testUnexpectedRequestTypeWithValidRequestHeader() throws IOException {
111112
TransportLayer transportLayer = mock(TransportLayer.class);
112113
Map<String, ?> configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
113114
Collections.singletonList(SCRAM_SHA_256.mechanismName()));
@@ -126,13 +127,35 @@ public void testUnexpectedRequestType() throws IOException {
126127
return headerBuffer.remaining();
127128
});
128129

129-
try {
130-
authenticator.authenticate();
131-
fail("Expected authenticate() to raise an exception");
132-
} catch (IllegalSaslStateException e) {
133-
// expected exception
134-
}
130+
assertThrows(InvalidRequestException.class, () -> authenticator.authenticate());
131+
verify(transportLayer, times(2)).read(any(ByteBuffer.class));
132+
}
133+
134+
@Test
135+
public void testInvalidRequestHeader() throws IOException {
136+
TransportLayer transportLayer = mock(TransportLayer.class);
137+
Map<String, ?> configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
138+
Collections.singletonList(SCRAM_SHA_256.mechanismName()));
139+
SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer,
140+
SCRAM_SHA_256.mechanismName(), new DefaultChannelMetadataRegistry());
141+
142+
short invalidApiKeyId = (short) (Arrays.stream(ApiKeys.values()).mapToInt(k -> k.id).max().getAsInt() + 1);
143+
ByteBuffer headerBuffer = RequestTestUtils.serializeRequestHeader(new RequestHeader(
144+
new RequestHeaderData()
145+
.setRequestApiKey(invalidApiKeyId)
146+
.setRequestApiVersion((short) 0),
147+
(short) 2));
148+
149+
when(transportLayer.read(any(ByteBuffer.class))).then(invocation -> {
150+
invocation.<ByteBuffer>getArgument(0).putInt(headerBuffer.remaining());
151+
return 4;
152+
}).then(invocation -> {
153+
// serialize only the request header. the authenticator should not parse beyond this
154+
invocation.<ByteBuffer>getArgument(0).put(headerBuffer.duplicate());
155+
return headerBuffer.remaining();
156+
});
135157

158+
assertThrows(InvalidRequestException.class, () -> authenticator.authenticate());
136159
verify(transportLayer, times(2)).read(any(ByteBuffer.class));
137160
}
138161

core/src/main/scala/kafka/network/SocketServer.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import kafka.server.{ApiVersionManager, BrokerReconfigurable, KafkaConfig}
3333
import org.apache.kafka.common.message.ApiMessageType.ListenerType
3434
import kafka.utils._
3535
import org.apache.kafka.common.config.ConfigException
36-
import org.apache.kafka.common.errors.InvalidRequestException
36+
import org.apache.kafka.common.errors.{InvalidRequestException, UnsupportedVersionException}
3737
import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool}
3838
import org.apache.kafka.common.metrics._
3939
import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Meter, Rate}
@@ -1107,8 +1107,10 @@ private[kafka] class Processor(
11071107
val header = RequestHeader.parse(buffer)
11081108
if (apiVersionManager.isApiEnabled(header.apiKey, header.apiVersion)) {
11091109
header
1110-
} else {
1110+
} else if (header.isApiVersionDeprecated()) {
11111111
throw new InvalidRequestException(s"Received request api key ${header.apiKey} with version ${header.apiVersion} which is not enabled")
1112+
} else {
1113+
throw new UnsupportedVersionException(s"Received request api key ${header.apiKey} with version ${header.apiVersion} which is not supported")
11121114
}
11131115
}
11141116

0 commit comments

Comments
 (0)