diff --git a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java index c3598501f50..56ef8e1609a 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java @@ -55,19 +55,20 @@ public class LazyDownConversionRecords implements BaseRecords { this.firstOffset = firstOffset; this.time = Objects.requireNonNull(time); - // Kafka consumers expect at least one full batch of messages for every topic-partition. To guarantee this, we - // need to make sure that we are able to accommodate one full batch of down-converted messages. The way we achieve - // this is by having sizeInBytes method factor in the size of the first down-converted batch and return at least - // its size. + // To make progress, kafka consumers require at least one full record batch per partition, i.e. we need to + // ensure we can accommodate one full batch of down-converted messages. We achieve this by having `sizeInBytes` + // factor in the size of the first down-converted batch and we return at least that many bytes. java.util.Iterator> it = iterator(0); if (it.hasNext()) { firstConvertedBatch = it.next(); sizeInBytes = Math.max(records.sizeInBytes(), firstConvertedBatch.records().sizeInBytes()); } else { - // If there are no messages we got after down-conversion, make sure we are able to send at least an overflow - // message to the consumer. Typically, the consumer would need to increase the fetch size in such cases. + // If there are messages before down-conversion and no messages after down-conversion, + // make sure we are able to send at least an overflow message to the consumer so that it can throw + // a RecordTooLargeException. Typically, the consumer would need to increase the fetch size in such cases. + // If there are no messages before down-conversion, we return an empty record batch. firstConvertedBatch = null; - sizeInBytes = LazyDownConversionRecordsSend.MIN_OVERFLOW_MESSAGE_LENGTH; + sizeInBytes = records.batches().iterator().hasNext() ? LazyDownConversionRecordsSend.MIN_OVERFLOW_MESSAGE_LENGTH : 0; } } diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java index c32359b4467..2e9ff330bba 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java @@ -430,6 +430,16 @@ public class FileRecordsTest { assertFalse(it.hasNext(), "No messages should be returned"); } + @Test + public void testFormatConversionWithNoMessages() throws IOException { + TopicPartition tp = new TopicPartition("topic-1", 0); + LazyDownConversionRecords lazyRecords = new LazyDownConversionRecords(tp, MemoryRecords.EMPTY, RecordBatch.MAGIC_VALUE_V0, + 0, Time.SYSTEM); + assertEquals(0, lazyRecords.sizeInBytes()); + Iterator> it = lazyRecords.iterator(16 * 1024L); + assertFalse(it.hasNext(), "No messages should be returned"); + } + @Test public void testSearchForTimestamp() throws IOException { for (RecordVersion version : RecordVersion.values()) { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index ca1bcdae4bd..e5e4f535093 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -769,8 +769,8 @@ class KafkaApis(val requestChannel: RequestChannel, // supported by the fetch request version. // If the inter-broker protocol version is `3.0` or higher, the log config message format version is // always `3.0` (i.e. magic value is `v2`). As a result, we always go through the down-conversion - // path if the fetch version is 3 or lower (in rare cases the down-conversion may not be needed, - // but it's not worth optimizing for them). + // path if the fetch version is 3 or lower (in rare cases the down-conversion may not be needed, but + // it's not worth optimizing for them). // If the inter-broker protocol version is lower than `3.0`, we rely on the log config message format // version as a proxy for the on-disk magic value to maintain the long-standing behavior originally // introduced in Kafka 0.10.0. An important implication is that it's unsafe to downgrade the message diff --git a/tests/kafkatest/tests/client/message_format_change_test.py b/tests/kafkatest/tests/client/message_format_change_test.py index 0cd9a2127d6..cb6cf72d22e 100644 --- a/tests/kafkatest/tests/client/message_format_change_test.py +++ b/tests/kafkatest/tests/client/message_format_change_test.py @@ -15,12 +15,12 @@ from ducktape.utils.util import wait_until from ducktape.mark.resource import cluster from kafkatest.services.console_consumer import ConsoleConsumer -from kafkatest.services.kafka import KafkaService, quorum +from kafkatest.services.kafka import config_property, KafkaService, quorum from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.zookeeper import ZookeeperService from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.utils import is_int -from kafkatest.version import LATEST_0_9, LATEST_0_10, LATEST_0_11, DEV_BRANCH, KafkaVersion +from kafkatest.version import LATEST_0_9, LATEST_0_10, LATEST_0_11, V_2_8_0, DEV_BRANCH, KafkaVersion class MessageFormatChangeTest(ProduceConsumeValidateTest): @@ -82,6 +82,8 @@ class MessageFormatChangeTest(ProduceConsumeValidateTest): "replication-factor": 3, 'configs': {"min.insync.replicas": 2}}}, controller_num_nodes_override=1) + for node in self.kafka.nodes: + node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = str(V_2_8_0) # required for writing old message formats self.kafka.start() self.logger.info("First format change to 0.9.0") diff --git a/tests/kafkatest/tests/core/upgrade_test.py b/tests/kafkatest/tests/core/upgrade_test.py index 39e4fa2fe60..7fd4ab84a96 100644 --- a/tests/kafkatest/tests/core/upgrade_test.py +++ b/tests/kafkatest/tests/core/upgrade_test.py @@ -24,7 +24,7 @@ from kafkatest.services.zookeeper import ZookeeperService from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.utils import is_int from kafkatest.utils.remote_account import java_version -from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, V_0_11_0_0, V_2_8_0, DEV_BRANCH, KafkaVersion +from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, V_0_11_0_0, V_2_8_0, V_3_0_0, DEV_BRANCH, KafkaVersion from kafkatest.services.kafka.util import new_jdk_not_supported class TestUpgrade(ProduceConsumeValidateTest): @@ -83,6 +83,9 @@ class TestUpgrade(ProduceConsumeValidateTest): if to_message_format_version is None: del node.config[config_property.MESSAGE_FORMAT_VERSION] else: + # older message formats are not supported with IBP 3.0 or higher + if to_message_format_version < V_0_11_0_0: + node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = str(V_2_8_0) node.config[config_property.MESSAGE_FORMAT_VERSION] = to_message_format_version self.kafka.start_node(node) self.wait_until_rejoin()