Skip to content

Commit 0a12eaa

Browse files
authored
KAFKA-19112 Unifying LIST-Type Configuration Validation and Default Values (#20334)
We add the three main changes in this PR - Disallowing null values for most LIST-type configurations makes sense, since users cannot explicitly set a configuration to null in a properties file. Therefore, only configurations with a default value of null should be allowed to accept null. - Disallowing duplicate values is reasonable, as there are currently no known configurations in Kafka that require specifying the same value multiple times. Allowing duplicates is both rare in practice and potentially confusing to users. - Disallowing empty list, even though many configurations currently accept them. In practice, setting an empty list for several of these configurations can lead to server startup failures or unexpected behavior. Therefore, enforcing non-empty lists helps prevent misconfiguration and improves system robustness. These changes may introduce some backward incompatibility, but this trade-off is justified by the significant improvements in safety, consistency, and overall user experience. Additionally, we introduce two minor adjustments: - Reclassify some STRING-type configurations as LIST-type, particularly those using comma-separated values to represent multiple entries. This change reflects the actual semantics used in Kafka. - Update the default values for some configurations to better align with other configs. These changes will not introduce any compatibility issues. Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 548fb18 commit 0a12eaa

File tree

99 files changed

+684
-308
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

99 files changed

+684
-308
lines changed

clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,12 +155,14 @@ public class AdminClientConfig extends AbstractConfig {
155155
static {
156156
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
157157
Type.LIST,
158-
"",
158+
List.of(),
159+
ConfigDef.ValidList.anyNonDuplicateValues(true, false),
159160
Importance.HIGH,
160161
BOOTSTRAP_SERVERS_DOC).
161162
define(BOOTSTRAP_CONTROLLERS_CONFIG,
162163
Type.LIST,
163-
"",
164+
List.of(),
165+
ConfigDef.ValidList.anyNonDuplicateValues(true, false),
164166
Importance.HIGH,
165167
BOOTSTRAP_CONTROLLERS_DOC)
166168
.define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CLIENT_ID_DOC)
@@ -239,6 +241,7 @@ public class AdminClientConfig extends AbstractConfig {
239241
.define(METRIC_REPORTER_CLASSES_CONFIG,
240242
Type.LIST,
241243
JmxReporter.class.getName(),
244+
ConfigDef.ValidList.anyNonDuplicateValues(true, false),
242245
Importance.LOW,
243246
METRIC_REPORTER_CLASSES_DOC)
244247
.define(METRICS_RECORDING_LEVEL_CONFIG,
@@ -284,7 +287,8 @@ public class AdminClientConfig extends AbstractConfig {
284287
METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC)
285288
.define(CONFIG_PROVIDERS_CONFIG,
286289
ConfigDef.Type.LIST,
287-
List.of(),
290+
List.of(),
291+
ConfigDef.ValidList.anyNonDuplicateValues(true, false),
288292
ConfigDef.Importance.LOW,
289293
CONFIG_PROVIDERS_DOC);
290294
}

clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import org.apache.kafka.common.utils.Utils;
3838

3939
import java.util.ArrayList;
40-
import java.util.Collections;
4140
import java.util.HashMap;
4241
import java.util.List;
4342
import java.util.Locale;
@@ -63,9 +62,9 @@ public class ConsumerConfig extends AbstractConfig {
6362
// a list contains all the assignor names that only assign subscribed topics to consumer. Should be updated when new assignor added.
6463
// This is to help optimize ConsumerCoordinator#performAssignment method
6564
public static final List<String> ASSIGN_FROM_SUBSCRIBED_ASSIGNORS = List.of(
66-
RANGE_ASSIGNOR_NAME,
67-
ROUNDROBIN_ASSIGNOR_NAME,
68-
STICKY_ASSIGNOR_NAME,
65+
RANGE_ASSIGNOR_NAME,
66+
ROUNDROBIN_ASSIGNOR_NAME,
67+
STICKY_ASSIGNOR_NAME,
6968
COOPERATIVE_STICKY_ASSIGNOR_NAME
7069
);
7170

@@ -406,17 +405,17 @@ public class ConsumerConfig extends AbstractConfig {
406405
* A list of configuration keys not supported for CONSUMER protocol.
407406
*/
408407
private static final List<String> CONSUMER_PROTOCOL_UNSUPPORTED_CONFIGS = List.of(
409-
PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
410-
HEARTBEAT_INTERVAL_MS_CONFIG,
408+
PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
409+
HEARTBEAT_INTERVAL_MS_CONFIG,
411410
SESSION_TIMEOUT_MS_CONFIG,
412411
SHARE_ACKNOWLEDGEMENT_MODE_CONFIG
413412
);
414-
413+
415414
static {
416415
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
417416
Type.LIST,
418-
Collections.emptyList(),
419-
new ConfigDef.NonNullValidator(),
417+
ConfigDef.NO_DEFAULT_VALUE,
418+
ConfigDef.ValidList.anyNonDuplicateValues(false, false),
420419
Importance.HIGH,
421420
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
422421
.define(CLIENT_DNS_LOOKUP_CONFIG,
@@ -446,7 +445,7 @@ public class ConsumerConfig extends AbstractConfig {
446445
.define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
447446
Type.LIST,
448447
List.of(RangeAssignor.class, CooperativeStickyAssignor.class),
449-
new ConfigDef.NonNullValidator(),
448+
ConfigDef.ValidList.anyNonDuplicateValues(true, false),
450449
Importance.MEDIUM,
451450
PARTITION_ASSIGNMENT_STRATEGY_DOC)
452451
.define(METADATA_MAX_AGE_CONFIG,
@@ -573,7 +572,7 @@ public class ConsumerConfig extends AbstractConfig {
573572
.define(METRIC_REPORTER_CLASSES_CONFIG,
574573
Type.LIST,
575574
JmxReporter.class.getName(),
576-
new ConfigDef.NonNullValidator(),
575+
ConfigDef.ValidList.anyNonDuplicateValues(true, false),
577576
Importance.LOW,
578577
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
579578
.define(KEY_DESERIALIZER_CLASS_CONFIG,
@@ -614,8 +613,8 @@ public class ConsumerConfig extends AbstractConfig {
614613
CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
615614
.define(INTERCEPTOR_CLASSES_CONFIG,
616615
Type.LIST,
617-
Collections.emptyList(),
618-
new ConfigDef.NonNullValidator(),
616+
List.of(),
617+
ConfigDef.ValidList.anyNonDuplicateValues(true, false),
619618
Importance.LOW,
620619
INTERCEPTOR_CLASSES_DOC)
621620
.define(MAX_POLL_RECORDS_CONFIG,
@@ -702,6 +701,7 @@ public class ConsumerConfig extends AbstractConfig {
702701
.define(CONFIG_PROVIDERS_CONFIG,
703702
ConfigDef.Type.LIST,
704703
List.of(),
704+
ConfigDef.ValidList.anyNonDuplicateValues(true, false),
705705
ConfigDef.Importance.LOW,
706706
CONFIG_PROVIDERS_DOC);
707707
}

clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -291,9 +291,6 @@ static List<ConsumerPartitionAssignor> getAssignorInstances(List<String> assigno
291291
// a map to store assignor name -> assignor class name
292292
Map<String, String> assignorNameMap = new HashMap<>();
293293

294-
if (assignorClasses == null)
295-
return assignors;
296-
297294
for (Object klass : assignorClasses) {
298295
// first try to get the class if passed in as a string
299296
if (klass instanceof String) {

clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,14 @@
3535
import org.slf4j.Logger;
3636
import org.slf4j.LoggerFactory;
3737

38-
import java.util.Collections;
3938
import java.util.HashMap;
4039
import java.util.List;
4140
import java.util.Map;
4241
import java.util.Properties;
4342
import java.util.Set;
4443
import java.util.concurrent.atomic.AtomicInteger;
4544

45+
import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
4646
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
4747
import static org.apache.kafka.common.config.ConfigDef.Range.between;
4848
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
@@ -373,7 +373,12 @@ public class ProducerConfig extends AbstractConfig {
373373
private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
374374

375375
static {
376-
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Collections.emptyList(), new ConfigDef.NonNullValidator(), Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
376+
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
377+
Type.LIST,
378+
NO_DEFAULT_VALUE,
379+
ConfigDef.ValidList.anyNonDuplicateValues(false, false),
380+
Importance.HIGH,
381+
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
377382
.define(CLIENT_DNS_LOOKUP_CONFIG,
378383
Type.STRING,
379384
ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
@@ -462,7 +467,7 @@ public class ProducerConfig extends AbstractConfig {
462467
.define(METRIC_REPORTER_CLASSES_CONFIG,
463468
Type.LIST,
464469
JmxReporter.class.getName(),
465-
new ConfigDef.NonNullValidator(),
470+
ConfigDef.ValidList.anyNonDuplicateValues(true, false),
466471
Importance.LOW,
467472
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
468473
.define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
@@ -501,8 +506,8 @@ public class ProducerConfig extends AbstractConfig {
501506
Importance.MEDIUM, PARTITIONER_CLASS_DOC)
502507
.define(INTERCEPTOR_CLASSES_CONFIG,
503508
Type.LIST,
504-
Collections.emptyList(),
505-
new ConfigDef.NonNullValidator(),
509+
List.of(),
510+
ConfigDef.ValidList.anyNonDuplicateValues(true, false),
506511
Importance.LOW,
507512
INTERCEPTOR_CLASSES_DOC)
508513
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
@@ -553,10 +558,11 @@ public class ProducerConfig extends AbstractConfig {
553558
atLeast(0),
554559
Importance.LOW,
555560
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC)
556-
.define(CONFIG_PROVIDERS_CONFIG,
561+
.define(CONFIG_PROVIDERS_CONFIG,
557562
ConfigDef.Type.LIST,
558-
List.of(),
559-
ConfigDef.Importance.LOW,
563+
List.of(),
564+
ConfigDef.ValidList.anyNonDuplicateValues(true, false),
565+
ConfigDef.Importance.LOW,
560566
CONFIG_PROVIDERS_DOC);
561567
}
562568

clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1006,26 +1006,72 @@ else if (max == null)
10061006
public static class ValidList implements Validator {
10071007

10081008
final ValidString validString;
1009+
final boolean isEmptyAllowed;
1010+
final boolean isNullAllowed;
10091011

1010-
private ValidList(List<String> validStrings) {
1012+
private ValidList(List<String> validStrings, boolean isEmptyAllowed, boolean isNullAllowed) {
10111013
this.validString = new ValidString(validStrings);
1014+
this.isEmptyAllowed = isEmptyAllowed;
1015+
this.isNullAllowed = isNullAllowed;
1016+
}
1017+
1018+
public static ValidList anyNonDuplicateValues(boolean isEmptyAllowed, boolean isNullAllowed) {
1019+
return new ValidList(List.of(), isEmptyAllowed, isNullAllowed);
10121020
}
10131021

10141022
public static ValidList in(String... validStrings) {
1015-
return new ValidList(Arrays.asList(validStrings));
1023+
return new ValidList(List.of(validStrings), true, false);
1024+
}
1025+
1026+
public static ValidList in(boolean isEmptyAllowed, String... validStrings) {
1027+
if (!isEmptyAllowed && validStrings.length == 0) {
1028+
throw new IllegalArgumentException("At least one valid string must be provided when empty values are not allowed");
1029+
}
1030+
return new ValidList(List.of(validStrings), isEmptyAllowed, false);
10161031
}
10171032

10181033
@Override
10191034
public void ensureValid(final String name, final Object value) {
1035+
if (value == null) {
1036+
if (isNullAllowed)
1037+
return;
1038+
else
1039+
throw new ConfigException("Configuration '" + name + "' values must not be null.");
1040+
}
1041+
10201042
@SuppressWarnings("unchecked")
1021-
List<String> values = (List<String>) value;
1022-
for (String string : values) {
1023-
validString.ensureValid(name, string);
1043+
List<Object> values = (List<Object>) value;
1044+
if (!isEmptyAllowed && values.isEmpty()) {
1045+
String validString = this.validString.validStrings.isEmpty() ? "any non-empty value" : this.validString.toString();
1046+
throw new ConfigException("Configuration '" + name + "' must not be empty. Valid values include: " + validString);
1047+
}
1048+
1049+
if (Set.copyOf(values).size() != values.size()) {
1050+
throw new ConfigException("Configuration '" + name + "' values must not be duplicated.");
1051+
}
1052+
1053+
validateIndividualValues(name, values);
1054+
}
1055+
1056+
private void validateIndividualValues(String name, List<Object> values) {
1057+
boolean hasValidStrings = !validString.validStrings.isEmpty();
1058+
1059+
for (Object value : values) {
1060+
if (value instanceof String) {
1061+
String string = (String) value;
1062+
if (string.isEmpty()) {
1063+
throw new ConfigException("Configuration '" + name + "' values must not be empty.");
1064+
}
1065+
if (hasValidStrings) {
1066+
validString.ensureValid(name, value);
1067+
}
1068+
}
10241069
}
10251070
}
10261071

10271072
public String toString() {
1028-
return validString.toString();
1073+
return validString + (isEmptyAllowed ? " (empty config allowed)" : " (empty not allowed)") +
1074+
(isNullAllowed ? " (null config allowed)" : " (null not allowed)");
10291075
}
10301076
}
10311077

clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString;
2020
import org.apache.kafka.common.config.ConfigDef.Range;
2121

22+
import java.util.List;
23+
2224
public class SaslConfigs {
2325

2426
private static final String OAUTHBEARER_NOTE = " Currently applies only to OAUTHBEARER.";
@@ -407,7 +409,7 @@ public static void addClientSaslSupport(ConfigDef config) {
407409
.define(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS, ConfigDef.Type.LONG, DEFAULT_SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS, ConfigDef.Importance.LOW, SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS_DOC)
408410
.define(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS, ConfigDef.Type.LONG, DEFAULT_SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS, ConfigDef.Importance.LOW, SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS_DOC)
409411
.define(SaslConfigs.SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS, ConfigDef.Type.INT, DEFAULT_SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS, ConfigDef.Importance.LOW, SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS_DOC)
410-
.define(SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE, ConfigDef.Type.LIST, null, ConfigDef.Importance.LOW, SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC)
412+
.define(SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE, ConfigDef.Type.LIST, List.of(), ConfigDef.ValidList.anyNonDuplicateValues(true, false), ConfigDef.Importance.LOW, SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC)
411413
.define(SaslConfigs.SASL_OAUTHBEARER_EXPECTED_ISSUER, ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW, SASL_OAUTHBEARER_EXPECTED_ISSUER_DOC)
412414
.define(SaslConfigs.SASL_OAUTHBEARER_HEADER_URLENCODE, ConfigDef.Type.BOOLEAN, DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE, ConfigDef.Importance.LOW, SASL_OAUTHBEARER_HEADER_URLENCODE_DOC);
413415
}

clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
2020

21+
import java.util.List;
2122
import java.util.Set;
2223

2324
import javax.net.ssl.KeyManagerFactory;
@@ -49,7 +50,9 @@ public class SslConfigs {
4950
public static final String SSL_ENABLED_PROTOCOLS_DOC = "The list of protocols enabled for SSL connections. "
5051
+ "The default is 'TLSv1.2,TLSv1.3'. This means that clients and servers will prefer TLSv1.3 if both support it "
5152
+ "and fallback to TLSv1.2 otherwise (assuming both support at least TLSv1.2). This default should be fine for most use "
52-
+ "cases. Also see the config documentation for <code>ssl.protocol</code> to understand how it can impact the TLS version negotiation behavior.";
53+
+ "cases. If this configuration is set to an empty list, Kafka will use the protocols enabled by default in the underlying SSLEngine, "
54+
+ "which may include additional protocols depending on the JVM version. "
55+
+ "Also see the config documentation for <code>ssl.protocol</code> to understand how it can impact the TLS version negotiation behavior.";
5356
public static final String DEFAULT_SSL_ENABLED_PROTOCOLS = "TLSv1.2,TLSv1.3";
5457

5558
public static final String SSL_KEYSTORE_TYPE_CONFIG = "ssl.keystore.type";
@@ -123,8 +126,8 @@ public class SslConfigs {
123126
public static void addClientSslSupport(ConfigDef config) {
124127
config.define(SslConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC)
125128
.define(SslConfigs.SSL_PROVIDER_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC)
126-
.define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, ConfigDef.Type.LIST, null, ConfigDef.Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC)
127-
.define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, ConfigDef.Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
129+
.define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, ConfigDef.Type.LIST, List.of(), ConfigDef.ValidList.anyNonDuplicateValues(true, false), ConfigDef.Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC)
130+
.define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, ConfigDef.Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, ConfigDef.ValidList.anyNonDuplicateValues(true, false), ConfigDef.Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
128131
.define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC)
129132
.define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC)
130133
.define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC)

0 commit comments

Comments
 (0)