Skip to content

Commit 1d0c5f2

Browse files
KAFKA-19667: Close ShareConsumer in ShareConsumerPerformance after metrics displayed (#20467)
Ensure that metrics are retrieved and displayed (when requested) before ShareConsumer.close() is called. This is important because metrics are technically supposed to be removed on ShareConsumer.close(), which means retrieving them after close() would yield an empty map. Related to #20267. Reviewers: Apoorv Mittal <[email protected]>
1 parent 9257c43 commit 1d0c5f2

File tree

2 files changed

+32
-7
lines changed

2 files changed

+32
-7
lines changed

tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.kafka.clients.consumer.ConsumerRecord;
2121
import org.apache.kafka.clients.consumer.ConsumerRecords;
2222
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
23+
import org.apache.kafka.clients.consumer.ShareConsumer;
2324
import org.apache.kafka.common.KafkaException;
2425
import org.apache.kafka.common.Metric;
2526
import org.apache.kafka.common.MetricName;
@@ -47,8 +48,8 @@
4748
import java.util.concurrent.Executors;
4849
import java.util.concurrent.Future;
4950
import java.util.concurrent.TimeUnit;
50-
import java.util.concurrent.TimeoutException;
5151
import java.util.concurrent.atomic.AtomicLong;
52+
import java.util.function.Function;
5253

5354
import joptsimple.OptionException;
5455
import joptsimple.OptionSpec;
@@ -59,6 +60,10 @@ public class ShareConsumerPerformance {
5960
private static final Logger LOG = LoggerFactory.getLogger(ShareConsumerPerformance.class);
6061

6162
public static void main(String[] args) {
63+
run(args, KafkaShareConsumer::new);
64+
}
65+
66+
static void run(String[] args, Function<Properties, ShareConsumer<byte[], byte[]>> shareConsumerCreator) {
6267
try {
6368
LOG.info("Starting share consumer/consumers...");
6469
ShareConsumerPerfOptions options = new ShareConsumerPerfOptions(args);
@@ -68,9 +73,9 @@ public static void main(String[] args) {
6873
if (!options.hideHeader())
6974
printHeader();
7075

71-
List<KafkaShareConsumer<byte[], byte[]>> shareConsumers = new ArrayList<>();
76+
List<ShareConsumer<byte[], byte[]>> shareConsumers = new ArrayList<>();
7277
for (int i = 0; i < options.threads(); i++) {
73-
shareConsumers.add(new KafkaShareConsumer<>(options.props()));
78+
shareConsumers.add(shareConsumerCreator.apply(options.props()));
7479
}
7580
long startMs = System.currentTimeMillis();
7681
consume(shareConsumers, options, totalMessagesRead, totalBytesRead, startMs);
@@ -83,7 +88,6 @@ public static void main(String[] args) {
8388
shareConsumers.forEach(shareConsumer -> {
8489
@SuppressWarnings("UnusedLocalVariable")
8590
Map<TopicIdPartition, Optional<KafkaException>> ignored = shareConsumer.commitSync();
86-
shareConsumer.close(Duration.ofMillis(500));
8791
});
8892

8993
// Print final stats for share group.
@@ -94,6 +98,7 @@ public static void main(String[] args) {
9498

9599
shareConsumersMetrics.forEach(ToolsUtils::printMetrics);
96100

101+
shareConsumers.forEach(shareConsumer -> shareConsumer.close(Duration.ofMillis(500)));
97102
} catch (Throwable e) {
98103
System.err.println(e.getMessage());
99104
System.err.println(Utils.stackTrace(e));
@@ -106,11 +111,11 @@ protected static void printHeader() {
106111
System.out.printf("start.time, end.time, data.consumed.in.MB, MB.sec, nMsg.sec, data.consumed.in.nMsg%s%n", newFieldsInHeader);
107112
}
108113

109-
private static void consume(List<KafkaShareConsumer<byte[], byte[]>> shareConsumers,
114+
private static void consume(List<ShareConsumer<byte[], byte[]>> shareConsumers,
110115
ShareConsumerPerfOptions options,
111116
AtomicLong totalMessagesRead,
112117
AtomicLong totalBytesRead,
113-
long startMs) throws ExecutionException, InterruptedException, TimeoutException {
118+
long startMs) throws ExecutionException, InterruptedException {
114119
long numMessages = options.numMessages();
115120
long recordFetchTimeoutMs = options.recordFetchTimeoutMs();
116121
shareConsumers.forEach(shareConsumer -> shareConsumer.subscribe(options.topic()));
@@ -180,7 +185,7 @@ private static void consume(List<KafkaShareConsumer<byte[], byte[]>> shareConsum
180185
totalBytesRead.set(bytesRead.get());
181186
}
182187

183-
private static void consumeMessagesForSingleShareConsumer(KafkaShareConsumer<byte[], byte[]> shareConsumer,
188+
private static void consumeMessagesForSingleShareConsumer(ShareConsumer<byte[], byte[]> shareConsumer,
184189
AtomicLong totalMessagesRead,
185190
AtomicLong totalBytesRead,
186191
ShareConsumerPerfOptions options,

tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717
package org.apache.kafka.tools;
1818

1919
import org.apache.kafka.clients.consumer.ConsumerConfig;
20+
import org.apache.kafka.clients.consumer.MockShareConsumer;
21+
import org.apache.kafka.clients.consumer.ShareConsumer;
2022
import org.apache.kafka.common.utils.Exit;
23+
import org.apache.kafka.common.utils.Utils;
2124

2225
import org.junit.jupiter.api.AfterEach;
2326
import org.junit.jupiter.api.BeforeEach;
@@ -30,6 +33,8 @@
3033
import java.nio.file.Files;
3134
import java.nio.file.Path;
3235
import java.text.SimpleDateFormat;
36+
import java.util.Properties;
37+
import java.util.function.Function;
3338

3439
import static org.junit.jupiter.api.Assertions.assertEquals;
3540
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -120,6 +125,21 @@ public void testDefaultClientId() throws IOException {
120125
assertEquals("perf-share-consumer-client", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
121126
}
122127

128+
@Test
129+
public void testMetricsRetrievedBeforeConsumerClosed() {
130+
String[] args = new String[]{
131+
"--bootstrap-server", "localhost:9092",
132+
"--topic", "test",
133+
"--messages", "0",
134+
"--print-metrics"
135+
};
136+
137+
Function<Properties, ShareConsumer<byte[], byte[]>> shareConsumerCreator = properties -> new MockShareConsumer<>();
138+
139+
String err = ToolsTestUtils.captureStandardErr(() -> ShareConsumerPerformance.run(args, shareConsumerCreator));
140+
assertTrue(Utils.isBlank(err), "Should be no stderr message, but was \"" + err + "\"");
141+
}
142+
123143
private void testHeaderMatchContent(int expectedOutputLineCount, Runnable runnable) {
124144
String out = ToolsTestUtils.captureStandardOut(() -> {
125145
ShareConsumerPerformance.printHeader();

0 commit comments

Comments
 (0)