-
Notifications
You must be signed in to change notification settings - Fork 14.6k
KAFKA-18529: ConsumerRebootstrapTest should run for async consumer #18554
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18529: ConsumerRebootstrapTest should run for async consumer #18554
Conversation
Thanks for the patch @TaiJuWu ! Could you please merge trunk latest changes? Thanks! |
Done~~~~~~ |
assertEquals(1, getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly.count()) | ||
val args = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly | ||
assertEquals(2, getTestQuorumAndGroupProtocolParametersAll.count()) | ||
val args = getTestQuorumAndGroupProtocolParametersAll | ||
.findFirst().get.get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we're only keeping the first entry from the full protocol list we still won't be testing with both consumers right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching it, update!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @TaiJuWu, Thanks for this PR, left a comment
Arguments.of((args :+ true):_*), | ||
Arguments.of((args :+ false):_*) | ||
) | ||
assertEquals(2, getTestQuorumAndGroupProtocolParametersAll.count()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we still need this assertion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense to me to remove it, this test shouldn't care about the elements in the list, just make sure we use them all right? (this is the only comment left here imo @TaiJuWu, please take a look when you can and I'll take another look too). Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! LGTM
Hey @TaiJuWu, could you please merge trunk latest changes? (some changes just went in on these files). Thanks! |
Done 😀 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious about these changes. Do all of these tests actually create a Consumer
? I don't see that immediately when looking at the test code 🤔
They do by using the That being said, good point that in the admin there seems to be some using only admin/producer, so we could review and clean up a bit if the consumer grp param if not needed. Thanks for pointing it out @kirktrue! Could you take a look @TaiJuWu ? Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @TaiJuWu!
I'm not sure if I'm confused about the intent here. The majority of the integration tests that were updated to include the group protocol don't actually create any Consumer
instances during the test. With this change, there are tens of tests that run twice unnecessarily. Is there a reason that I'm not seeing that these should all be changed?
Thanks!
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) | ||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) | ||
def testDescribeConfigWithOptionTimeoutMs(quorum: String, groupProtocol: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test does not create any Consumer
s, so this change should be reverted.
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) | ||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) | ||
def testCreatePartitionWithOptionRetryOnQuotaViolation(quorum: String, groupProtocol: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test does not create any Consumer
s, so this change should be reverted.
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) | ||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) | ||
def testDescribeUserScramCredentials(quorum: String, groupProtocol: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test does not create any Consumer
s, so this change should be reverted.
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) | ||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) | ||
def testDescribeTransactions(quorum: String, groupProtocol: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test does not create any Consumer
s, so this change should be reverted.
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) | ||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) | ||
def testDescribeTransactionsTimeout(quorum: String, groupProtocol: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test does not create any Consumer
s, so this change should be reverted.
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) | ||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) | ||
def testIncrementalAlterConfigsForLog4jLogLevelsDoesNotWorkWithInvalidConfigs(quorum: String, groupProtocol: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test does not create any Consumer
s, so this change should be reverted.
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) | ||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test does not create any Consumer
s, so this change should be reverted.
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) | ||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test does not create any Consumer
s, so this change should be reverted.
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) | ||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) | ||
def testListClientMetricsResources(quorum: String, groupProtocol: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test does not create any Consumer
s, so this change should be reverted.
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) | ||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) | ||
def testCreateTopicsReturnsConfigs(quorum: String, groupProtocol: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test does not create any Consumer
s, so this change should be reverted.
To test whether or not a test needed to be updated, I changed the code that creates a diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java
index 74592972b9..fe70aebcee 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerDelegateCreator.java
@@ -56,6 +56,9 @@ public class ConsumerDelegateCreator {
public <K, V> ConsumerDelegate<K, V> create(ConsumerConfig config,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
+ if (true)
+ throw new RuntimeException("Intentionally blocking attempt to create a Consumer!");
+
try {
GroupProtocol groupProtocol = GroupProtocol.valueOf(config.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG).toUpperCase(Locale.ROOT));
@@ -79,6 +82,9 @@ public class ConsumerDelegateCreator {
SubscriptionState subscriptions,
ConsumerMetadata metadata,
List<ConsumerPartitionAssignor> assignors) {
+ if (true)
+ throw new RuntimeException("Intentionally blocking attempt to create a Consumer!");
+
try {
GroupProtocol groupProtocol = GroupProtocol.valueOf(config.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG).toUpperCase(Locale.ROOT));
Then I ran Are there other changes in the queue that are going to start creating |
I agree with @kirktrue's suggestion that some tests do not utilize a consumer. Therefore, running these tests twice is redundant. |
Thanks @kirktrue for good point and suggestion , I will revisit all tests. |
Revert some change and do follow check:
three exception is
Also apply https://github.com/apache/kafka/pull/18554/files#r1920843220 |
226a078
to
560df51
Compare
@Timeout(10) | ||
def testDescribeUserScramCredentialsTimeout(quorum: String, groupProtocol: String): Unit = { | ||
@ParameterizedTest | ||
@ValueSource(strings = Array("kraft")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree with this change, but interesting, this shows that PlainTestAdminIntegration
was already running for async, even in cases where it was not needed, right?
So checking all these admin tests that were running for both consumers, I notice that testLogStartOffsetCheckpoint
is somehow similar to this one, should we apply the same fix to it?
It's running for both but it really only uses a consumer to subscribeAndWaitForAssignment
, which is already covered in many consumer-specific tests, is it valuable in this testLogStartOffsetCheckpoint? (seems unrelated). What do you think @kirktrue ? I see you enabled it for both consumers as part of #17670.
Also should we rename the jira/PR? It's only the ConsumerRebotstrap that was not running for async
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, at least one other committer has pushed back on enabling both consumer implementations anywhere that a consumer is created. I filed KAFKA-18087 to remedy that, but I haven't yet thought about the heuristic to apply to know when it's "safe" to just test the one consumer or the other.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To address your fourth question, yes, ideally we should update the Jira and PR to accurately reflect what's changing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename jira and PR title.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but I haven't yet thought about the heuristic to apply to know when it's "safe" to just test the one consumer or the other.
we are on the same page :)
It's difficult to definitively confirm that "this functionality of the consumer is already covered by another test." Therefore, a simpler approach is to test both consumers if the test requires the use of a consumer.
Maybe we can revisit only the "slow" tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree, that could be a safe start.
Also, it could make sense to start by reviewing Admin and Producer tests that are running for both consumers. It's probably on those where we can have some good candidates to rollback.
…pache#18554) Reviewers: Kirk True <[email protected]>, Chia-Ping Tsai <[email protected]>, Lianet Magrans <[email protected]>
…pache#18554) Reviewers: Kirk True <[email protected]>, Chia-Ping Tsai <[email protected]>, Lianet Magrans <[email protected]>
…pache#18554) Reviewers: Kirk True <[email protected]>, Chia-Ping Tsai <[email protected]>, Lianet Magrans <[email protected]>
…pache#18554) Reviewers: Kirk True <[email protected]>, Chia-Ping Tsai <[email protected]>, Lianet Magrans <[email protected]>
Jira: https://issues.apache.org/jira/browse/KAFKA-18529
Committer Checklist (excluded from commit message)