Skip to content

Commit 9257c43

Browse files
authored
MINOR: Fix failed e2e compatibility_test_new_broker_test and upgrade_test.py (#20471)
#20390 Replace the -`-producer.config` for the verifiable producer and `--consumer.config` option by `--command-config` for the verifiable consumer. However, for e2e tests targeting older broker versions, the original configuration should still be used. Fix the following tests: `consumer_protocol_migration_test.py`、`compatibility_test_new_broker_test.py` and `upgrade_test.py`. Reviewers: Chia-Ping Tsai <[email protected]>, Manikumar Reddy <[email protected]>
1 parent 6a1cdf8 commit 9257c43

File tree

4 files changed

+23
-4
lines changed

4 files changed

+23
-4
lines changed

tests/kafkatest/services/verifiable_consumer.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from kafkatest.services.kafka import TopicPartition, consumer_group
2323
from kafkatest.services.kafka.util import get_log4j_config_param, get_log4j_config_for_tools
2424
from kafkatest.services.verifiable_client import VerifiableClientMixin
25-
from kafkatest.version import DEV_BRANCH, V_2_3_0, V_2_3_1, V_3_7_0, V_4_0_0
25+
from kafkatest.version import get_version, DEV_BRANCH, V_2_3_0, V_2_3_1, V_3_7_0, V_4_0_0
2626

2727

2828
class ConsumerState:
@@ -424,7 +424,12 @@ def start_cmd(self, node):
424424
if self.max_messages > 0:
425425
cmd += " --max-messages %s" % str(self.max_messages)
426426

427-
cmd += " --command-config %s" % VerifiableConsumer.CONFIG_FILE
427+
version = get_version(node)
428+
if version.supports_command_config():
429+
cmd += " --command-config %s" % VerifiableConsumer.CONFIG_FILE
430+
else:
431+
cmd += " --consumer.config %s" % VerifiableConsumer.CONFIG_FILE
432+
428433
cmd += " 2>> %s | tee -a %s &" % (VerifiableConsumer.STDOUT_CAPTURE, VerifiableConsumer.STDOUT_CAPTURE)
429434
return cmd
430435

tests/kafkatest/services/verifiable_producer.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,10 @@ def start_cmd(self, node, idx):
249249
if self.repeating_keys is not None:
250250
cmd += " --repeating-keys %s " % str(self.repeating_keys)
251251

252-
cmd += " --command-config %s" % VerifiableProducer.CONFIG_FILE
252+
if version.supports_command_config():
253+
cmd += " --command-config %s" % VerifiableProducer.CONFIG_FILE
254+
else:
255+
cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE
253256

254257
cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, VerifiableProducer.STDOUT_CAPTURE)
255258
return cmd

tests/kafkatest/version.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,13 @@ def supports_fk_joins(self):
9797
def supports_feature_command(self):
9898
return self >= V_3_8_0
9999

100+
def supports_command_config(self):
101+
# According to KIP-1147, --producer.config and --consumer.config have been deprecated and will be removed in future versions
102+
# For backward compatibility, we select the configuration based on node version:
103+
# - For versions 4.2.0 and above, use --command-config
104+
# - For older versions, continue using --producer.config or --consumer.config
105+
return self >= V_4_2_0
106+
100107
def get_version(node=None):
101108
"""Return the version attached to the given node.
102109
Default to DEV_BRANCH if node or node.version is undefined (aka None)
@@ -223,3 +230,7 @@ def get_version(node=None):
223230
# 4.1.x version
224231
V_4_1_0 = KafkaVersion("4.1.0")
225232
LATEST_4_1 = V_4_1_0
233+
234+
# 4.2.x version
235+
V_4_2_0 = KafkaVersion("4.2.0")
236+
LATEST_4_2 = V_4_2_0

tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -650,7 +650,7 @@ public static VerifiableConsumer createFromArgs(ArgumentParser parser, String[]
650650
}
651651
if (commandConfigFile != null) {
652652
try {
653-
consumerProps.putAll(Utils.loadProps(res.getString(commandConfigFile)));
653+
consumerProps.putAll(Utils.loadProps(commandConfigFile));
654654
} catch (IOException e) {
655655
throw new ArgumentParserException(e.getMessage(), parser);
656656
}

0 commit comments

Comments
 (0)