Skip to content

Commit 32c2383

Browse files
authored
KAFKA-19658 Tweak org.apache.kafka.clients.consumer.OffsetAndMetadata (#20451)
1. Optimize the `equals()`, `hashCode()`, and `toString()` methods in `OffsetAndMetadata`. 2. Add UT and IT to these modifications. Reviewers: TengYao Chi <[email protected]>, Sean Quah <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 8076702 commit 32c2383

File tree

4 files changed

+36
-12
lines changed

4 files changed

+36
-12
lines changed

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -553,14 +553,19 @@ private void testInterceptors(Map<String, Object> consumerConfig) throws Excepti
553553

554554
// commit sync and verify onCommit is called
555555
var commitCountBefore = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue();
556-
consumer.commitSync(Map.of(TP, new OffsetAndMetadata(2L)));
557-
assertEquals(2, consumer.committed(Set.of(TP)).get(TP).offset());
556+
consumer.commitSync(Map.of(TP, new OffsetAndMetadata(2L, "metadata")));
557+
OffsetAndMetadata metadata = consumer.committed(Set.of(TP)).get(TP);
558+
assertEquals(2, metadata.offset());
559+
assertEquals("metadata", metadata.metadata());
558560
assertEquals(commitCountBefore + 1, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue());
559561

560562
// commit async and verify onCommit is called
561-
var offsetsToCommit = Map.of(TP, new OffsetAndMetadata(5L));
563+
var offsetsToCommit = Map.of(TP, new OffsetAndMetadata(5L, null));
562564
sendAndAwaitAsyncCommit(consumer, Optional.of(offsetsToCommit));
563-
assertEquals(5, consumer.committed(Set.of(TP)).get(TP).offset());
565+
metadata = consumer.committed(Set.of(TP)).get(TP);
566+
assertEquals(5, metadata.offset());
567+
// null metadata will be converted to an empty string
568+
assertEquals("", metadata.metadata());
564569
assertEquals(commitCountBefore + 2, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue());
565570
}
566571
// cleanup

clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,7 @@ public OffsetAndMetadata(long offset, Optional<Integer> leaderEpoch, String meta
5454

5555
// The server converts null metadata to an empty string. So we store it as an empty string as well on the client
5656
// to be consistent.
57-
if (metadata == null)
58-
this.metadata = OffsetFetchResponse.NO_METADATA;
59-
else
60-
this.metadata = metadata;
57+
this.metadata = Objects.requireNonNullElse(metadata, OffsetFetchResponse.NO_METADATA);
6158
}
6259

6360
/**
@@ -82,6 +79,11 @@ public long offset() {
8279
return offset;
8380
}
8481

82+
/**
83+
* Get the metadata of the previously consumed record.
84+
*
85+
* @return the metadata or empty string if no metadata
86+
*/
8587
public String metadata() {
8688
return metadata;
8789
}
@@ -106,21 +108,20 @@ public boolean equals(Object o) {
106108
OffsetAndMetadata that = (OffsetAndMetadata) o;
107109
return offset == that.offset &&
108110
Objects.equals(metadata, that.metadata) &&
109-
Objects.equals(leaderEpoch, that.leaderEpoch);
111+
Objects.equals(leaderEpoch(), that.leaderEpoch());
110112
}
111113

112114
@Override
113115
public int hashCode() {
114-
return Objects.hash(offset, metadata, leaderEpoch);
116+
return Objects.hash(offset, metadata, leaderEpoch());
115117
}
116118

117119
@Override
118120
public String toString() {
119121
return "OffsetAndMetadata{" +
120122
"offset=" + offset +
121-
", leaderEpoch=" + leaderEpoch +
123+
", leaderEpoch=" + leaderEpoch().orElse(null) +
122124
", metadata='" + metadata + '\'' +
123125
'}';
124126
}
125-
126127
}

clients/src/test/java/org/apache/kafka/clients/consumer/OffsetAndMetadataTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,19 @@ public void testDeserializationCompatibilityWithLeaderEpoch() throws IOException
6565
assertEquals(new OffsetAndMetadata(10, Optional.of(235), "test commit metadata"), deserializedObject);
6666
}
6767

68+
@Test
69+
public void testEqualsWithNullAndNegativeLeaderEpoch() {
70+
OffsetAndMetadata metadataWithNullEpoch = new OffsetAndMetadata(100L, Optional.empty(), "metadata");
71+
OffsetAndMetadata metadataWithNegativeEpoch = new OffsetAndMetadata(100L, Optional.of(-1), "metadata");
72+
assertEquals(metadataWithNullEpoch, metadataWithNegativeEpoch);
73+
assertEquals(metadataWithNullEpoch.hashCode(), metadataWithNegativeEpoch.hashCode());
74+
}
75+
76+
@Test
77+
public void testEqualsWithNullAndEmptyMetadata() {
78+
OffsetAndMetadata metadataWithNullMetadata = new OffsetAndMetadata(100L, Optional.of(1), null);
79+
OffsetAndMetadata metadataWithEmptyMetadata = new OffsetAndMetadata(100L, Optional.of(1), "");
80+
assertEquals(metadataWithNullMetadata, metadataWithEmptyMetadata);
81+
assertEquals(metadataWithNullMetadata.hashCode(), metadataWithEmptyMetadata.hashCode());
82+
}
6883
}

clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -766,6 +766,7 @@ public void testSuccessfulOffsetFetch() {
766766

767767
// Complete request with a response
768768
long expectedOffset = 100;
769+
String expectedMetadata = "metadata";
769770
NetworkClientDelegate.UnsentRequest req = result.unsentRequests.get(0);
770771
OffsetFetchResponseData.OffsetFetchResponseGroup groupResponse = new OffsetFetchResponseData.OffsetFetchResponseGroup()
771772
.setGroupId(DEFAULT_GROUP_ID)
@@ -777,6 +778,7 @@ public void testSuccessfulOffsetFetch() {
777778
.setPartitionIndex(tp.partition())
778779
.setCommittedOffset(expectedOffset)
779780
.setCommittedLeaderEpoch(1)
781+
.setMetadata(expectedMetadata)
780782
))
781783
));
782784
req.handler().onComplete(buildOffsetFetchClientResponse(req, groupResponse, false));
@@ -794,6 +796,7 @@ public void testSuccessfulOffsetFetch() {
794796
assertEquals(1, offsetsAndMetadata.size());
795797
assertTrue(offsetsAndMetadata.containsKey(tp));
796798
assertEquals(expectedOffset, offsetsAndMetadata.get(tp).offset());
799+
assertEquals(expectedMetadata, offsetsAndMetadata.get(tp).metadata());
797800
assertEquals(0, commitManager.pendingRequests.inflightOffsetFetches.size(), "Inflight " +
798801
"request should be removed from the queue when a response is received.");
799802
}

0 commit comments

Comments
 (0)