KAFKA-13116: Fix message_format_change_test and compatibility_test_new_broker_test failures (#11108)

These failures were caused by a46b82bea9. Details for each test:

* message_format_change_test: use IBP 2.8 so that we can write in older message
formats.
* compatibility_test_new_broker_test_failures: fix down-conversion path to handle
empty record batches correctly. The record scan in the old code ensured that
empty record batches were never down-converted, which hid this bug.
* upgrade_test: set the IBP 2.8 when message format is < 0.11 to ensure we are
actually writing with the old message format even though the test was passing
without the change.

Verified with ducker that some variants of these tests failed without these changes
and passed with them. Also added a unit test for the down-conversion bug fix.

Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
Ismael Juma 2021-07-23 13:43:31 -07:00 committed by GitHub
parent 273d66479d
commit f34bb28ab6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 28 additions and 12 deletions

View File

@ -55,19 +55,20 @@ public class LazyDownConversionRecords implements BaseRecords {
this.firstOffset = firstOffset;
this.time = Objects.requireNonNull(time);
// Kafka consumers expect at least one full batch of messages for every topic-partition. To guarantee this, we
// need to make sure that we are able to accommodate one full batch of down-converted messages. The way we achieve
// this is by having sizeInBytes method factor in the size of the first down-converted batch and return at least
// its size.
// To make progress, kafka consumers require at least one full record batch per partition, i.e. we need to
// ensure we can accommodate one full batch of down-converted messages. We achieve this by having `sizeInBytes`
// factor in the size of the first down-converted batch and we return at least that many bytes.
java.util.Iterator<ConvertedRecords<?>> it = iterator(0);
if (it.hasNext()) {
firstConvertedBatch = it.next();
sizeInBytes = Math.max(records.sizeInBytes(), firstConvertedBatch.records().sizeInBytes());
} else {
// If there are no messages we got after down-conversion, make sure we are able to send at least an overflow
// message to the consumer. Typically, the consumer would need to increase the fetch size in such cases.
// If there are messages before down-conversion and no messages after down-conversion,
// make sure we are able to send at least an overflow message to the consumer so that it can throw
// a RecordTooLargeException. Typically, the consumer would need to increase the fetch size in such cases.
// If there are no messages before down-conversion, we return an empty record batch.
firstConvertedBatch = null;
sizeInBytes = LazyDownConversionRecordsSend.MIN_OVERFLOW_MESSAGE_LENGTH;
sizeInBytes = records.batches().iterator().hasNext() ? LazyDownConversionRecordsSend.MIN_OVERFLOW_MESSAGE_LENGTH : 0;
}
}

View File

@ -430,6 +430,16 @@ public class FileRecordsTest {
assertFalse(it.hasNext(), "No messages should be returned");
}
@Test
public void testFormatConversionWithNoMessages() throws IOException {
TopicPartition tp = new TopicPartition("topic-1", 0);
LazyDownConversionRecords lazyRecords = new LazyDownConversionRecords(tp, MemoryRecords.EMPTY, RecordBatch.MAGIC_VALUE_V0,
0, Time.SYSTEM);
assertEquals(0, lazyRecords.sizeInBytes());
Iterator<ConvertedRecords<?>> it = lazyRecords.iterator(16 * 1024L);
assertFalse(it.hasNext(), "No messages should be returned");
}
@Test
public void testSearchForTimestamp() throws IOException {
for (RecordVersion version : RecordVersion.values()) {

View File

@ -769,8 +769,8 @@ class KafkaApis(val requestChannel: RequestChannel,
// supported by the fetch request version.
// If the inter-broker protocol version is `3.0` or higher, the log config message format version is
// always `3.0` (i.e. magic value is `v2`). As a result, we always go through the down-conversion
// path if the fetch version is 3 or lower (in rare cases the down-conversion may not be needed,
// but it's not worth optimizing for them).
// path if the fetch version is 3 or lower (in rare cases the down-conversion may not be needed, but
// it's not worth optimizing for them).
// If the inter-broker protocol version is lower than `3.0`, we rely on the log config message format
// version as a proxy for the on-disk magic value to maintain the long-standing behavior originally
// introduced in Kafka 0.10.0. An important implication is that it's unsafe to downgrade the message

View File

@ -15,12 +15,12 @@ from ducktape.utils.util import wait_until
from ducktape.mark.resource import cluster
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.kafka import config_property, KafkaService, quorum
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
from kafkatest.version import LATEST_0_9, LATEST_0_10, LATEST_0_11, DEV_BRANCH, KafkaVersion
from kafkatest.version import LATEST_0_9, LATEST_0_10, LATEST_0_11, V_2_8_0, DEV_BRANCH, KafkaVersion
class MessageFormatChangeTest(ProduceConsumeValidateTest):
@ -82,6 +82,8 @@ class MessageFormatChangeTest(ProduceConsumeValidateTest):
"replication-factor": 3,
'configs': {"min.insync.replicas": 2}}},
controller_num_nodes_override=1)
for node in self.kafka.nodes:
node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = str(V_2_8_0) # required for writing old message formats
self.kafka.start()
self.logger.info("First format change to 0.9.0")

View File

@ -24,7 +24,7 @@ from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
from kafkatest.utils.remote_account import java_version
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, V_0_11_0_0, V_2_8_0, DEV_BRANCH, KafkaVersion
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, V_0_11_0_0, V_2_8_0, V_3_0_0, DEV_BRANCH, KafkaVersion
from kafkatest.services.kafka.util import new_jdk_not_supported
class TestUpgrade(ProduceConsumeValidateTest):
@ -83,6 +83,9 @@ class TestUpgrade(ProduceConsumeValidateTest):
if to_message_format_version is None:
del node.config[config_property.MESSAGE_FORMAT_VERSION]
else:
# older message formats are not supported with IBP 3.0 or higher
if to_message_format_version < V_0_11_0_0:
node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = str(V_2_8_0)
node.config[config_property.MESSAGE_FORMAT_VERSION] = to_message_format_version
self.kafka.start_node(node)
self.wait_until_rejoin()