-
Notifications
You must be signed in to change notification settings - Fork 14.6k
KAFKA-13722: Refactor SerdeGetter #18242
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-13722: Refactor SerdeGetter #18242
Conversation
Refactor SerdeGetter to not use old ProcessorContext any longer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM and one minor comment
newProcessorContext != null ? newProcessorContext.valueSerde() : stateStorecontext.valueSerde(); | ||
|
||
public Serde<?> valueSerde() { | ||
return processorContext != null ? processorContext.valueSerde() : stateStorecontext.valueSerde(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The IDE incorrectly assumes that stateStoreContext.valueSerde()
might throw a NPE. To eliminate this false alert, perhaps we can retain the Supplier<Serde<?>> instead.
private final Supplier<Serde<?>> keySerdeSupplier;
private final Supplier<Serde<?>> valueSerdeSupplier;
public SerdeGetter(final ProcessorContext<?, ?> context) {
keySerdeSupplier = context::keySerde;
valueSerdeSupplier = context::valueSerde;
}
public SerdeGetter(final StateStoreContext context) {
keySerdeSupplier = context::keySerde;
valueSerdeSupplier = context::valueSerde;
}
public Serde<?> keySerde() {
return keySerdeSupplier.get();
}
public Serde<?> valueSerde() {
return valueSerdeSupplier.get();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chia7712 I like your proposal!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, @mjsax !
LGTM!
newProcessorContext != null ? newProcessorContext.valueSerde() : stateStorecontext.valueSerde(); | ||
|
||
public Serde<?> valueSerde() { | ||
return processorContext != null ? processorContext.valueSerde() : stateStorecontext.valueSerde(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chia7712 I like your proposal!
Java 23:
|
Java 17:
|
testThrottledProducerConsumer -> https://issues.apache.org/jira/browse/KAFKA-8073 |
Reviewers: Bruno Cadonna <[email protected]>, Chia-Ping Tsai <[email protected]>
Cherry-picked to |
…e-old-protocol-versions * apache-github/trunk: (25 commits) KAFKA-18270: FindCoordinator v0 incorrectly tagged as deprecated (apache#18262) KAFKA-18284: Add group coordinator records for Streams rebalance protocol (apache#18228) MINOR: Fix flaky state updater test (apache#18253) MINOR: improve StreamsResetter logging (apache#18237) KAFKA-18227: Ensure v2 partitions are not added to last transaction during upgrade (apache#18176) Add IT for share consumer with duration base offet auto reset (apache#18251) KAFKA-18283: Add StreamsGroupDescribe RPC definitions (apache#18230) KAFKA-18241: add docs check to CI (apache#18183) KAFKA-18223 Improve flaky test report (apache#18212) MINOR Remove triage label in nightly job (apache#18147) KAFKA-18294 Remove deprecated SourceTask#commitRecord (apache#18260) KAFKA-18264 Remove NotLeaderForPartitionException (apache#18211) KAFKA-13722: Refactor SerdeGetter (apache#18242) KAFKA-18094 Remove deprecated TopicListing(String, Boolean) (apache#18248) KAFKA-18282: Add StreamsGroupHeartbeat RPC definitions (apache#18227) KAFKA-18026: KIP-1112 migrate KTableSuppressProcessorSupplier (apache#18150) KAFKA-18026: transition KTable#filter impl to use processor wrapper (apache#18205) KAFKA-18293 Remove `org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler` and `org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler` (apache#18244) MINOR: add assertion about groupEpoch and targetAssignmentEpoch to testConsumerGroups (apache#18203) KAFKA-17960; PlaintextAdminIntegrationTest.testConsumerGroups fails with CONSUMER group protocol (apache#18234) ...
Reviewers: Bruno Cadonna <[email protected]>, Chia-Ping Tsai <[email protected]>
Refactor SerdeGetter to not use old ProcessorContext any longer.