-
Notifications
You must be signed in to change notification settings - Fork 14.6k
KAFKA-19112 Unifying LIST-Type Configuration Validation and Default Values #20334
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
MirrorCheckpointConfig
MirrorConnectorConfig, ConsumerConfig, ProducerConfig, MirrorSourceConfig
This reverts commit f603b75.
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@m1a2st Could you please add the KIP to upgrade.html
?
storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
Outdated
Show resolved
Hide resolved
# Conflicts: # docs/upgrade.html
@m1a2st Could you please highlight the changes of |
I'm running an e2e for this patch. Will merge it if everything looks good |
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@m1a2st : Thanks for the updated PR. Just a few minor comments.
storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
Show resolved
Hide resolved
|
||
assertEquals(1, CloseInterceptor.CLOSE_COUNT.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an existing issue. So, the interceptors are called in reverse ordering?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is an existing issue, I can file a Jira issue to trace it.
Updated: If the ordering is changed so that MockConsumerInterceptor
comes first and CloseInterceptor
second, then when MockConsumerInterceptor
throws an exception, CloseInterceptor
will not be initialized. As a result, its close()
method will not be executed. The following test will be passed
@EnumSource(GroupProtocol.class)
public void testInterceptorConstructorConfigurationWithExceptionShouldCloseRemainingInstances(GroupProtocol groupProtocol) {
final int targetInterceptor = 1;
try {
Properties props = new Properties();
// skip
props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
MockConsumerInterceptor.class.getName() + "," + CloseInterceptor.class.getName());
MockConsumerInterceptor.setThrowOnConfigExceptionThreshold(targetInterceptor);
assertThrows(KafkaException.class, () -> newConsumer(
props, new StringDeserializer(), new StringDeserializer()));
assertEquals(1, MockConsumerInterceptor.CONFIG_COUNT.get());
assertEquals(1, MockConsumerInterceptor.CLOSE_COUNT.get());
assertEquals(0, CloseInterceptor.INIT_COUNT.get());
assertEquals(0, CloseInterceptor.CONFIG_COUNT.get());
assertEquals(0, CloseInterceptor.CLOSE_COUNT.get());
} finally {
MockConsumerInterceptor.resetCounters();
CloseInterceptor.resetCounters();
}
}
public static class CloseInterceptor implements ConsumerInterceptor<String, String> {
public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0);
public static final AtomicInteger INIT_COUNT = new AtomicInteger(0);
public static final AtomicInteger CONFIG_COUNT = new AtomicInteger(0);
public CloseInterceptor() {
INIT_COUNT.incrementAndGet();
}
// skip
@Override
public void close() {
CLOSE_COUNT.incrementAndGet();
}
@Override
public void configure(Map<String, ?> configs) {
CONFIG_COUNT.incrementAndGet();
}
public static void resetCounters() {
CLOSE_COUNT.set(0);
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excuse me, I may have misunderstood your discussion. In this test case, the MockConsumerInterceptor
is created after the CloseInterceptor
. As a result, CloseInterceptor.CLOSE_COUNT
is updated because the getConfiguredInstances
method closes the created objects in the exception handler
settings. | ||
</li> | ||
<li> | ||
The <code>cleanup.policy</code> is empty and <code>remote.storage.enable</code> is set to true, the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cleanup.policy
is empty => If cleanup.policy
is empty
Also, could we also add "cleanup.policy supports empty, which means infinite retention."
} else { | ||
// If cleanup.policy is empty and remote storage is disabled, we should not delete any local | ||
// log segments | ||
return deleteLogStartOffsetBreachedSegments(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's another issue we missed. the RPC DeleteRecordsRequest
is not supported if the delete
mode is disabled. If this KIP declares that the empty policy is supported, should we also allow DeleteRecordsRequest
to pass on topics with the empty policy.
if (!leaderLog.config.delete)
throw new PolicyViolationException(s"Records of partition $topicPartition can not be deleted due to the configured policy")
if (!leaderLog.config.delete) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, we should allow an empty cleanup.policy
when receiving a DeleteRecordsRequest
. I will update this change in the follow up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add the changes to #20492
We add the three main changes in this PR
since users cannot explicitly set a configuration to null in a
properties file. Therefore, only configurations with a default value of
null should be allowed to accept null.
known configurations in Kafka that require specifying the same value
multiple times. Allowing duplicates is both rare in practice and
potentially confusing to users.
accept them. In practice, setting an empty list for several of these
configurations can lead to server startup failures or unexpected
behavior. Therefore, enforcing non-empty lists helps prevent
misconfiguration and improves system robustness.
These changes may introduce some backward incompatibility, but this
trade-off is justified by the significant improvements in safety,
consistency, and overall user experience.
Additionally, we introduce two minor adjustments:
those using comma-separated values to represent multiple entries. This
change reflects the actual semantics used in Kafka.
other configs.
These changes will not introduce any compatibility issues.
Reviewers: Jun Rao [email protected], Chia-Ping Tsai
[email protected]