Skip to content

Commit b857317

Browse files
committed
Allow configuring CompositeBatchInterceptor
This changes allows configuring CompositeBatchInterceptor on AbstractMessageListenerContainer, in same way as CompositeRecordInterceptor. Signed-off-by: cfredri4 <[email protected]>
1 parent e8ab82a commit b857317

File tree

3 files changed

+19
-3
lines changed

3 files changed

+19
-3
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/message-listener-container.adoc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ IMPORTANT: If the interceptor mutates the record (by creating a new one), the `t
2222

2323
The `CompositeRecordInterceptor` and `CompositeBatchInterceptor` can be used to invoke multiple interceptors.
2424

25-
Starting with version 4.0, `AbstractMessageListenerContainer` exposes `getRecordInterceptor()` as a public method.
26-
If the returned interceptor is an instance of `CompositeRecordInterceptor`, additional `RecordInterceptor` instances can be added to it even after the container instance extending `AbstractMessageListenerContainer` has been created and a `RecordInterceptor` has already been configured.
25+
Starting with version 4.0, `AbstractMessageListenerContainer` exposes `getRecordInterceptor()` and `getBatchInterceptor()` as public methods.
26+
If the returned interceptor is an instance of `CompositeRecordInterceptor` or `CompositeBatchInterceptor`, additional `RecordInterceptor` or `BatchInterceptor` instances can be added to it even after the container instance extending `AbstractMessageListenerContainer` has been created and a `RecordInterceptor` has already been configured.
2727
The following example shows how to do so:
2828

2929
[source, java]

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
* @author Soby Chacko
7373
* @author Sanghyeok An
7474
* @author Lokesh Alamuri
75+
* @author Christian Fredriksson
7576
*/
7677
public abstract class AbstractMessageListenerContainer<K, V>
7778
implements GenericMessageListenerContainer<K, V>, BeanNameAware, ApplicationEventPublisherAware,
@@ -480,7 +481,12 @@ public void setRecordInterceptor(@Nullable RecordInterceptor<K, V> recordInterce
480481
this.recordInterceptor = recordInterceptor;
481482
}
482483

483-
protected @Nullable BatchInterceptor<K, V> getBatchInterceptor() {
484+
/**
485+
* Get the {@link BatchInterceptor} for modification, if configured.
486+
* @return the {@link BatchInterceptor}, or {@code null} if not configured
487+
* @since 4.0
488+
*/
489+
public @Nullable BatchInterceptor<K, V> getBatchInterceptor() {
484490
return this.batchInterceptor;
485491
}
486492

spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeBatchInterceptor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
* @param <V> the value type.
3535
*
3636
* @author Gary Russell
37+
* @author Christian Fredriksson
3738
* @since 2.7
3839
*
3940
*/
@@ -85,4 +86,13 @@ public void clearThreadState(Consumer<?, ?> consumer) {
8586
this.delegates.forEach(del -> del.clearThreadState(consumer));
8687
}
8788

89+
/**
90+
* Add an {@link BatchInterceptor} to delegates.
91+
* @param batchInterceptor the interceptor.
92+
* @since 4.0
93+
*/
94+
public void addRecordInterceptor(BatchInterceptor<K, V> batchInterceptor) {
95+
this.delegates.add(batchInterceptor);
96+
}
97+
8898
}

0 commit comments

Comments
 (0)