KAFKA-18646: Null records in fetch response breaks librdkafka (#18726)

Ensure we always return empty records (including cases where an error is returned).
We also remove `nullable` from `records` since it is effectively expected to be
non-null by a large percentage of clients in the wild.

This behavior regressed in fe56fc9 (KAFKA-18269). Empty records were
previously set via `FetchResponse.recordsOrFail(partitionData)` in the
now-removed `maybeConvertFetchedData` method.

Added an integration test that fails without this fix and also update many
tests to set `records` to `empty` instead of leaving them as `null`.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Arthur <mumrah@gmail.com>
This commit is contained in:
Ismael Juma 2025-01-29 07:04:12 -08:00 committed by GitHub
parent 97a228070e
commit ca5d2cf76d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 42 additions and 6 deletions

View File

@ -196,7 +196,8 @@ public class FetchResponse extends AbstractResponse {
return new FetchResponseData.PartitionData() return new FetchResponseData.PartitionData()
.setPartitionIndex(partition) .setPartitionIndex(partition)
.setErrorCode(error.code()) .setErrorCode(error.code())
.setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK); .setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK)
.setRecords(MemoryRecords.EMPTY);
} }
/** /**
@ -285,4 +286,4 @@ public class FetchResponse extends AbstractResponse {
.setSessionId(sessionId) .setSessionId(sessionId)
.setResponses(topicResponseList); .setResponses(topicResponseList);
} }
} }

View File

@ -106,7 +106,7 @@
]}, ]},
{ "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "default": "-1", "ignorable": false, "entityType": "brokerId", { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "default": "-1", "ignorable": false, "entityType": "brokerId",
"about": "The preferred read replica for the consumer to use on its next fetch request."}, "about": "The preferred read replica for the consumer to use on its next fetch request."},
{ "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."} { "name": "Records", "type": "records", "versions": "0+", "about": "The record data."}
]} ]}
]}, ]},
{ "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "16+", "taggedVersions": "16+", "tag": 0, { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "16+", "taggedVersions": "16+", "tag": 0,

View File

@ -2024,7 +2024,8 @@ public class RequestResponseTest {
.setPartitionIndex(1) .setPartitionIndex(1)
.setHighWatermark(1000000) .setHighWatermark(1000000)
.setLogStartOffset(0) .setLogStartOffset(0)
.setAbortedTransactions(abortedTransactions)); .setAbortedTransactions(abortedTransactions)
.setRecords(MemoryRecords.EMPTY));
return FetchResponse.parse(FetchResponse.of(Errors.NONE, 25, sessionId, return FetchResponse.parse(FetchResponse.of(Errors.NONE, 25, sessionId,
responseData).serialize(FETCH.latestVersion()), FETCH.latestVersion()); responseData).serialize(FETCH.latestVersion()), FETCH.latestVersion());
} }
@ -2048,7 +2049,8 @@ public class RequestResponseTest {
.setPartitionIndex(1) .setPartitionIndex(1)
.setHighWatermark(1000000) .setHighWatermark(1000000)
.setLogStartOffset(0) .setLogStartOffset(0)
.setAbortedTransactions(abortedTransactions)); .setAbortedTransactions(abortedTransactions)
.setRecords(MemoryRecords.EMPTY));
return FetchResponse.parse(FetchResponse.of(Errors.NONE, 25, INVALID_SESSION_ID, return FetchResponse.parse(FetchResponse.of(Errors.NONE, 25, INVALID_SESSION_ID,
responseData).serialize(FETCH.latestVersion()), FETCH.latestVersion()); responseData).serialize(FETCH.latestVersion()), FETCH.latestVersion());
} }

View File

@ -37,7 +37,7 @@ import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic}
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition, OffsetForLeaderTopic, OffsetForLeaderTopicCollection} import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition, OffsetForLeaderTopic, OffsetForLeaderTopicCollection}
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestData, DescribeProducersRequestData, DescribeTransactionsRequestData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, ListTransactionsRequestData, MetadataRequestData, OffsetCommitRequestData, ProduceRequestData, SyncGroupRequestData, WriteTxnMarkersRequestData} import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, ConsumerGroupDescribeRequestData, ConsumerGroupHeartbeatRequestData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestData, DescribeProducersRequestData, DescribeTransactionsRequestData, FetchResponseData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, ListTransactionsRequestData, MetadataRequestData, OffsetCommitRequestData, ProduceRequestData, SyncGroupRequestData, WriteTxnMarkersRequestData}
import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, SimpleRecord} import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, SimpleRecord}
import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
@ -59,6 +59,7 @@ import java.util.Collections.singletonList
import org.apache.kafka.common.message.MetadataRequestData.MetadataRequestTopic import org.apache.kafka.common.message.MetadataRequestData.MetadataRequestTopic
import org.apache.kafka.common.message.WriteTxnMarkersRequestData.{WritableTxnMarker, WritableTxnMarkerTopic} import org.apache.kafka.common.message.WriteTxnMarkersRequestData.{WritableTxnMarker, WritableTxnMarkerTopic}
import org.apache.kafka.coordinator.group.GroupConfig import org.apache.kafka.coordinator.group.GroupConfig
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.function.Executable import org.junit.jupiter.api.function.Executable
import scala.collection.mutable import scala.collection.mutable
@ -808,6 +809,34 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
sendRequestAndVerifyResponseError(request, resources, isAuthorized = true) sendRequestAndVerifyResponseError(request, resources, isAuthorized = true)
} }
@Test
def testFetchConsumerRequest(): Unit = {
createTopicWithBrokerPrincipal(topic)
val request = createFetchRequest
val topicNames = getTopicNames().asJava
def partitionDatas(response: AbstractResponse): Iterable[FetchResponseData.PartitionData] = {
assertTrue(response.isInstanceOf[FetchResponse])
response.asInstanceOf[FetchResponse].responseData(topicNames, ApiKeys.FETCH.latestVersion).values().asScala
}
removeAllClientAcls()
val resources = Set(topicResource.resourceType, clusterResource.resourceType)
val failedResponse = sendRequestAndVerifyResponseError(request, resources, isAuthorized = false)
val failedPartitionDatas = partitionDatas(failedResponse)
assertEquals(1, failedPartitionDatas.size)
// Some clients (like librdkafka) always expect non-null records - even for the cases where an error is returned
failedPartitionDatas.foreach(partitionData => assertEquals(MemoryRecords.EMPTY, partitionData.records))
val readAcls = topicReadAcl(topicResource)
addAndVerifyAcls(readAcls, topicResource)
val succeededResponse = sendRequestAndVerifyResponseError(request, resources, isAuthorized = true)
val succeededPartitionDatas = partitionDatas(succeededResponse)
assertEquals(1, succeededPartitionDatas.size)
succeededPartitionDatas.foreach(partitionData => assertEquals(MemoryRecords.EMPTY, partitionData.records))
}
@ParameterizedTest @ParameterizedTest
@ValueSource(strings = Array("kraft")) @ValueSource(strings = Array("kraft"))
def testIncrementalAlterConfigsRequestRequiresClusterPermissionForBrokerLogger(quorum: String): Unit = { def testIncrementalAlterConfigsRequestRequiresClusterPermissionForBrokerLogger(quorum: String): Unit = {

View File

@ -1802,6 +1802,8 @@ public final class RaftClientTestContext {
partitionData.divergingEpoch() partitionData.divergingEpoch()
.setEpoch(divergingEpoch) .setEpoch(divergingEpoch)
.setEndOffset(divergingEpochEndOffset); .setEndOffset(divergingEpochEndOffset);
partitionData.setRecords(MemoryRecords.EMPTY);
} }
); );
} }
@ -1830,6 +1832,8 @@ public final class RaftClientTestContext {
partitionData.snapshotId() partitionData.snapshotId()
.setEpoch(snapshotId.epoch()) .setEpoch(snapshotId.epoch())
.setEndOffset(snapshotId.offset()); .setEndOffset(snapshotId.offset());
partitionData.setRecords(MemoryRecords.EMPTY);
} }
); );
} }