Skip to content

Commit b72a37f

Browse files
authored
GH-2061: EKB Seek to End Option for Embedded Topics
Resolves #2061
1 parent 6a5fdc6 commit b72a37f

File tree

2 files changed

+59
-16
lines changed

2 files changed

+59
-16
lines changed

spring-kafka-docs/src/main/asciidoc/testing.adoc

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,18 @@ public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafk
3737
----
3838
====
3939

40-
NOTE: Starting with version 2.5, the `consumerProps` method sets the `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG` to `earliest`.
40+
[NOTE]
41+
====
42+
Starting with version 2.5, the `consumerProps` method sets the `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG` to `earliest`.
4143
This is because, in most cases, you want the consumer to consume any messages sent in a test case.
4244
The `ConsumerConfig` default is `latest` which means that messages already sent by a test, before the consumer starts, will not receive those records.
4345
To revert to the previous behavior, set the property to `latest` after calling the method.
4446
47+
When using the embedded broker, it is generally best practice to use a different topic for each test, to prevent cross-talk.
48+
If this is not possible for some reason, note that the `consumeFromEmbeddedTopics` method's default behavior is to seek the assigned partitions to the beginning after assignment.
49+
Since it does not have access to the consumer properties, you must use the overloaded method that takes a `seekToEnd` boolean parameter to seek to the end instead of the beginning.
50+
====
51+
4552
A JUnit 4 `@Rule` wrapper for the `EmbeddedKafkaBroker` is provided to create an embedded Kafka and an embedded Zookeeper server.
4653
(See <<embedded-kafka-annotation>> for information about using `@EmbeddedKafka` with JUnit 5).
4754
The following listing shows the signatures of those methods:

spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java

Lines changed: 51 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -39,7 +39,6 @@
3939
import java.util.concurrent.ExecutionException;
4040
import java.util.concurrent.TimeUnit;
4141
import java.util.concurrent.TimeoutException;
42-
import java.util.concurrent.atomic.AtomicBoolean;
4342
import java.util.concurrent.atomic.AtomicReference;
4443
import java.util.function.Function;
4544
import java.util.stream.Collectors;
@@ -700,6 +699,16 @@ public void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer) {
700699
consumeFromEmbeddedTopics(consumer, this.topics.toArray(new String[0]));
701700
}
702701

702+
/**
703+
* Subscribe a consumer to all the embedded topics.
704+
* @param seekToEnd true to seek to the end instead of the beginning.
705+
* @param consumer the consumer.
706+
* @since 2.8.2
707+
*/
708+
public void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer, boolean seekToEnd) {
709+
consumeFromEmbeddedTopics(consumer, seekToEnd, this.topics.toArray(new String[0]));
710+
}
711+
703712
/**
704713
* Subscribe a consumer to one of the embedded topics.
705714
* @param consumer the consumer.
@@ -709,6 +718,17 @@ public void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, String topic) {
709718
consumeFromEmbeddedTopics(consumer, topic);
710719
}
711720

721+
/**
722+
* Subscribe a consumer to one of the embedded topics.
723+
* @param consumer the consumer.
724+
* @param seekToEnd true to seek to the end instead of the beginning.
725+
* @param topic the topic.
726+
* @since 2.8.2
727+
*/
728+
public void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, boolean seekToEnd, String topic) {
729+
consumeFromEmbeddedTopics(consumer, seekToEnd, topic);
730+
}
731+
712732
/**
713733
* Subscribe a consumer to one or more of the embedded topics.
714734
* @param consumer the consumer.
@@ -717,13 +737,26 @@ public void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, String topic) {
717737
* the list of embedded topics (since 2.3.4).
718738
*/
719739
public void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, String... topicsToConsume) {
740+
consumeFromEmbeddedTopics(consumer, false, topicsToConsume);
741+
}
742+
743+
/**
744+
* Subscribe a consumer to one or more of the embedded topics.
745+
* @param consumer the consumer.
746+
* @param topicsToConsume the topics.
747+
* @param seekToEnd true to seek to the end instead of the beginning.
748+
* @throws IllegalStateException if you attempt to consume from a topic that is not in
749+
* the list of embedded topics.
750+
* @since 2.8.2
751+
*/
752+
public void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, boolean seekToEnd, String... topicsToConsume) {
720753
List<String> notEmbedded = Arrays.stream(topicsToConsume)
721754
.filter(topic -> !this.topics.contains(topic))
722755
.collect(Collectors.toList());
723756
if (notEmbedded.size() > 0) {
724757
throw new IllegalStateException("topic(s):'" + notEmbedded + "' are not in embedded topic list");
725758
}
726-
final AtomicBoolean assigned = new AtomicBoolean();
759+
final AtomicReference<Collection<TopicPartition>> assigned = new AtomicReference<>();
727760
consumer.subscribe(Arrays.asList(topicsToConsume), new ConsumerRebalanceListener() {
728761

729762
@Override
@@ -732,27 +765,30 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
732765

733766
@Override
734767
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
735-
assigned.set(true);
768+
assigned.set(partitions);
736769
logger.debug(() -> "partitions assigned: " + partitions);
737770
}
738771

739772
});
740773
ConsumerRecords<?, ?> records = null;
741774
int n = 0;
742-
while (!assigned.get() && n++ < 600) { // NOSONAR magic #
775+
while (assigned.get() == null && n++ < 600) { // NOSONAR magic #
743776
records = consumer.poll(Duration.ofMillis(100)); // force assignment NOSONAR magic #
744777
}
745-
if (records != null && records.count() > 0) {
778+
if (assigned.get() != null) {
746779
final ConsumerRecords<?, ?> theRecords = records;
747-
logger.debug(() -> "Records received on initial poll for assignment; re-seeking to beginning; "
748-
+ theRecords.partitions().stream()
749-
.flatMap(p -> theRecords.records(p).stream())
750-
// map to same format as send metadata toString()
751-
.map(r -> r.topic() + "-" + r.partition() + "@" + r.offset())
752-
.collect(Collectors.toList()));
753-
consumer.seekToBeginning(records.partitions());
754-
}
755-
if (!assigned.get()) {
780+
logger.debug(() -> "Partitions assigned "
781+
+ assigned.get()
782+
+ "; re-seeking to "
783+
+ (seekToEnd ? "end; " : "beginning"));
784+
if (seekToEnd) {
785+
consumer.seekToEnd(assigned.get());
786+
}
787+
else {
788+
consumer.seekToBeginning(assigned.get());
789+
}
790+
}
791+
else {
756792
throw new IllegalStateException("Failed to be assigned partitions from the embedded topics");
757793
}
758794
logger.debug("Subscription Initiated");

0 commit comments

Comments
 (0)