KAFKA-19658 Tweak org.apache.kafka.clients.consumer.OffsetAndMetadata (#20451)

1. Optimize the `equals()`, `hashCode()`, and `toString()` methods in
`OffsetAndMetadata`.
2. Add UT and IT to these modifications.

Reviewers: TengYao Chi <kitingiao@gmail.com>, Sean Quah
 <squah@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Lan Ding 2025-09-05 06:06:08 +08:00 committed by GitHub
parent 8076702c4c
commit 32c2383bfa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 36 additions and 12 deletions

View File

@ -553,14 +553,19 @@ public class PlaintextConsumerTest {
// commit sync and verify onCommit is called
var commitCountBefore = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue();
consumer.commitSync(Map.of(TP, new OffsetAndMetadata(2L)));
assertEquals(2, consumer.committed(Set.of(TP)).get(TP).offset());
consumer.commitSync(Map.of(TP, new OffsetAndMetadata(2L, "metadata")));
OffsetAndMetadata metadata = consumer.committed(Set.of(TP)).get(TP);
assertEquals(2, metadata.offset());
assertEquals("metadata", metadata.metadata());
assertEquals(commitCountBefore + 1, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue());
// commit async and verify onCommit is called
var offsetsToCommit = Map.of(TP, new OffsetAndMetadata(5L));
var offsetsToCommit = Map.of(TP, new OffsetAndMetadata(5L, null));
sendAndAwaitAsyncCommit(consumer, Optional.of(offsetsToCommit));
assertEquals(5, consumer.committed(Set.of(TP)).get(TP).offset());
metadata = consumer.committed(Set.of(TP)).get(TP);
assertEquals(5, metadata.offset());
// null metadata will be converted to an empty string
assertEquals("", metadata.metadata());
assertEquals(commitCountBefore + 2, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue());
}
// cleanup

View File

@ -54,10 +54,7 @@ public class OffsetAndMetadata implements Serializable {
// The server converts null metadata to an empty string. So we store it as an empty string as well on the client
// to be consistent.
if (metadata == null)
this.metadata = OffsetFetchResponse.NO_METADATA;
else
this.metadata = metadata;
this.metadata = Objects.requireNonNullElse(metadata, OffsetFetchResponse.NO_METADATA);
}
/**
@ -82,6 +79,11 @@ public class OffsetAndMetadata implements Serializable {
return offset;
}
/**
* Get the metadata of the previously consumed record.
*
* @return the metadata or empty string if no metadata
*/
public String metadata() {
return metadata;
}
@ -106,21 +108,20 @@ public class OffsetAndMetadata implements Serializable {
OffsetAndMetadata that = (OffsetAndMetadata) o;
return offset == that.offset &&
Objects.equals(metadata, that.metadata) &&
Objects.equals(leaderEpoch, that.leaderEpoch);
Objects.equals(leaderEpoch(), that.leaderEpoch());
}
@Override
public int hashCode() {
return Objects.hash(offset, metadata, leaderEpoch);
return Objects.hash(offset, metadata, leaderEpoch());
}
@Override
public String toString() {
return "OffsetAndMetadata{" +
"offset=" + offset +
", leaderEpoch=" + leaderEpoch +
", leaderEpoch=" + leaderEpoch().orElse(null) +
", metadata='" + metadata + '\'' +
'}';
}
}

View File

@ -65,4 +65,19 @@ public class OffsetAndMetadataTest {
assertEquals(new OffsetAndMetadata(10, Optional.of(235), "test commit metadata"), deserializedObject);
}
@Test
public void testEqualsWithNullAndNegativeLeaderEpoch() {
OffsetAndMetadata metadataWithNullEpoch = new OffsetAndMetadata(100L, Optional.empty(), "metadata");
OffsetAndMetadata metadataWithNegativeEpoch = new OffsetAndMetadata(100L, Optional.of(-1), "metadata");
assertEquals(metadataWithNullEpoch, metadataWithNegativeEpoch);
assertEquals(metadataWithNullEpoch.hashCode(), metadataWithNegativeEpoch.hashCode());
}
@Test
public void testEqualsWithNullAndEmptyMetadata() {
OffsetAndMetadata metadataWithNullMetadata = new OffsetAndMetadata(100L, Optional.of(1), null);
OffsetAndMetadata metadataWithEmptyMetadata = new OffsetAndMetadata(100L, Optional.of(1), "");
assertEquals(metadataWithNullMetadata, metadataWithEmptyMetadata);
assertEquals(metadataWithNullMetadata.hashCode(), metadataWithEmptyMetadata.hashCode());
}
}

View File

@ -766,6 +766,7 @@ public class CommitRequestManagerTest {
// Complete request with a response
long expectedOffset = 100;
String expectedMetadata = "metadata";
NetworkClientDelegate.UnsentRequest req = result.unsentRequests.get(0);
OffsetFetchResponseData.OffsetFetchResponseGroup groupResponse = new OffsetFetchResponseData.OffsetFetchResponseGroup()
.setGroupId(DEFAULT_GROUP_ID)
@ -777,6 +778,7 @@ public class CommitRequestManagerTest {
.setPartitionIndex(tp.partition())
.setCommittedOffset(expectedOffset)
.setCommittedLeaderEpoch(1)
.setMetadata(expectedMetadata)
))
));
req.handler().onComplete(buildOffsetFetchClientResponse(req, groupResponse, false));
@ -794,6 +796,7 @@ public class CommitRequestManagerTest {
assertEquals(1, offsetsAndMetadata.size());
assertTrue(offsetsAndMetadata.containsKey(tp));
assertEquals(expectedOffset, offsetsAndMetadata.get(tp).offset());
assertEquals(expectedMetadata, offsetsAndMetadata.get(tp).metadata());
assertEquals(0, commitManager.pendingRequests.inflightOffsetFetches.size(), "Inflight " +
"request should be removed from the queue when a response is received.");
}