KAFKA-18672; CoordinatorRecordSerde must validate value version (#18749)

CoordinatorRecordSerde does not validate the version of the value to check whether the version is supported by the current version of the software. This is problematic if a future and unsupported version of the record is read by an older version of the software because it would misinterpret the bytes. Hence CoordinatorRecordSerde must throw an error if the version is unknown. This is also consistent with the handling in the old coordinator.

Reviewers: Jeff Kim <jeff.kim@confluent.io>
This commit is contained in:
David Jacot 2025-02-03 11:19:27 +01:00 committed by GitHub
parent eb01221dc0
commit bf05d2c914
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 88 additions and 38 deletions

View File

@ -28,23 +28,6 @@ import java.util.concurrent.CompletableFuture;
*/
public interface CoordinatorLoader<U> extends AutoCloseable {
/**
* UnknownRecordTypeException is thrown when the Deserializer encounters
* an unknown record type.
*/
class UnknownRecordTypeException extends RuntimeException {
private final short unknownType;
public UnknownRecordTypeException(short unknownType) {
super(String.format("Found an unknown record type %d", unknownType));
this.unknownType = unknownType;
}
public short unknownType() {
return unknownType;
}
}
/**
* Object that is returned as part of the future from load(). Holds the partition load time and the
* end time.

View File

@ -76,6 +76,11 @@ public abstract class CoordinatorRecordSerde implements Serializer<CoordinatorRe
final ApiMessage valueMessage = apiMessageValueFor(recordType);
final short valueVersion = readVersion(valueBuffer, "value");
if (valueVersion < valueMessage.lowestSupportedVersion() || valueVersion > valueMessage.highestSupportedVersion()) {
throw new UnknownRecordVersionException(recordType, valueVersion);
}
readMessage(valueMessage, valueBuffer, valueVersion, "value");
return CoordinatorRecord.record(

View File

@ -24,6 +24,42 @@ import java.nio.ByteBuffer;
* @param <T> The record type.
*/
public interface Deserializer<T> {
/**
* UnknownRecordTypeException is thrown when the Deserializer encounters
* an unknown record type.
*/
class UnknownRecordTypeException extends RuntimeException {
private final short unknownType;
public UnknownRecordTypeException(short unknownType) {
super(String.format("Found an unknown record type %d", unknownType));
this.unknownType = unknownType;
}
public short unknownType() {
return unknownType;
}
}
class UnknownRecordVersionException extends RuntimeException {
private final short type;
private final short unknownVersion;
public UnknownRecordVersionException(short type, short unknownVersion) {
super(String.format("Found an unknown record version %d for %d type", unknownVersion, type));
this.type = type;
this.unknownVersion = unknownVersion;
}
public short type() {
return type;
}
public short unknownVersion() {
return unknownVersion;
}
}
/**
* Deserializes the key and the value.
*

View File

@ -23,7 +23,8 @@ import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import org.apache.kafka.common.record.{ControlRecordType, FileRecords, MemoryRecords}
import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.Time
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.{LoadSummary, UnknownRecordTypeException}
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.LoadSummary
import org.apache.kafka.coordinator.common.runtime.Deserializer.UnknownRecordTypeException
import org.apache.kafka.coordinator.common.runtime.{CoordinatorLoader, CoordinatorPlayback, Deserializer}
import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.server.util.KafkaScheduler

View File

@ -37,8 +37,8 @@ import org.apache.kafka.common.protocol.{ApiMessage, ByteBufferAccessor}
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue, GroupMetadataValueJsonConverter, CoordinatorRecordJsonConverters => GroupCoordinatorRecordJsonConverters, CoordinatorRecordType => GroupCoordinatorRecordType}
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.UnknownRecordTypeException
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde
import org.apache.kafka.coordinator.common.runtime.Deserializer.UnknownRecordTypeException
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde
import org.apache.kafka.coordinator.share.ShareCoordinatorRecordSerde
import org.apache.kafka.coordinator.share.generated.{CoordinatorRecordJsonConverters => ShareCoordinatorRecordJsonConverters}

View File

@ -24,7 +24,7 @@ import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, FileRecords, MemoryRecords, RecordBatch, SimpleRecord}
import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.{MockTime, Time}
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.UnknownRecordTypeException
import org.apache.kafka.coordinator.common.runtime.Deserializer.UnknownRecordTypeException
import org.apache.kafka.coordinator.common.runtime.{CoordinatorPlayback, Deserializer}
import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.storage.internals.log.{FetchDataInfo, LogOffsetMetadata}

View File

@ -18,7 +18,6 @@ package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde;
import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType;
@ -31,7 +30,7 @@ public class GroupCoordinatorRecordSerde extends CoordinatorRecordSerde {
try {
return CoordinatorRecordType.fromId(recordType).newRecordKey();
} catch (UnsupportedVersionException ex) {
throw new CoordinatorLoader.UnknownRecordTypeException(recordType);
throw new UnknownRecordTypeException(recordType);
}
}
@ -40,7 +39,7 @@ public class GroupCoordinatorRecordSerde extends CoordinatorRecordSerde {
try {
return CoordinatorRecordType.fromId(recordType).newRecordValue();
} catch (UnsupportedVersionException ex) {
throw new CoordinatorLoader.UnknownRecordTypeException(recordType);
throw new UnknownRecordTypeException(recordType);
}
}
}

View File

@ -18,8 +18,8 @@ package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.Deserializer;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType;
@ -121,8 +121,8 @@ public class GroupCoordinatorRecordSerdeTest {
ByteBuffer valueBuffer = ByteBuffer.allocate(64);
CoordinatorLoader.UnknownRecordTypeException ex =
assertThrows(CoordinatorLoader.UnknownRecordTypeException.class,
Deserializer.UnknownRecordTypeException ex =
assertThrows(Deserializer.UnknownRecordTypeException.class,
() -> serde.deserialize(keyBuffer, valueBuffer));
assertEquals((short) 255, ex.unknownType());
}
@ -198,6 +198,34 @@ public class GroupCoordinatorRecordSerdeTest {
ex.getMessage());
}
@Test
public void testDeserializeWithInvalidValueVersion() {
GroupCoordinatorRecordSerde serde = new GroupCoordinatorRecordSerde();
ApiMessage key = new ConsumerGroupMetadataKey().setGroupId("foo");
ByteBuffer keyBuffer = MessageUtil.toCoordinatorTypePrefixedByteBuffer(key);
ByteBuffer valueBuffer1 = ByteBuffer.allocate(2);
valueBuffer1.putShort((short) (ConsumerGroupMetadataValue.HIGHEST_SUPPORTED_VERSION + 1));
valueBuffer1.rewind();
Deserializer.UnknownRecordVersionException ex =
assertThrows(Deserializer.UnknownRecordVersionException.class,
() -> serde.deserialize(keyBuffer, valueBuffer1));
assertEquals(key.apiKey(), ex.type());
assertEquals(ConsumerGroupMetadataValue.HIGHEST_SUPPORTED_VERSION + 1, ex.unknownVersion());
keyBuffer.rewind();
ByteBuffer valueBuffer2 = ByteBuffer.allocate(2);
valueBuffer2.putShort((short) (ConsumerGroupMetadataValue.LOWEST_SUPPORTED_VERSION - 1));
valueBuffer2.rewind();
ex = assertThrows(Deserializer.UnknownRecordVersionException.class,
() -> serde.deserialize(keyBuffer, valueBuffer2));
assertEquals(key.apiKey(), ex.type());
assertEquals(ConsumerGroupMetadataValue.LOWEST_SUPPORTED_VERSION - 1, ex.unknownVersion());
}
@Test
public void testDeserializeAllRecordTypes() {
for (CoordinatorRecordType record : CoordinatorRecordType.values()) {

View File

@ -19,7 +19,6 @@ package org.apache.kafka.coordinator.share;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde;
import org.apache.kafka.coordinator.share.generated.CoordinatorRecordType;
@ -29,7 +28,7 @@ public class ShareCoordinatorRecordSerde extends CoordinatorRecordSerde {
try {
return CoordinatorRecordType.fromId(recordType).newRecordKey();
} catch (UnsupportedVersionException ex) {
throw new CoordinatorLoader.UnknownRecordTypeException(recordType);
throw new UnknownRecordTypeException(recordType);
}
}
@ -38,7 +37,7 @@ public class ShareCoordinatorRecordSerde extends CoordinatorRecordSerde {
try {
return CoordinatorRecordType.fromId(recordType).newRecordValue();
} catch (UnsupportedVersionException ex) {
throw new CoordinatorLoader.UnknownRecordTypeException(recordType);
throw new UnknownRecordTypeException(recordType);
}
}
}

View File

@ -19,8 +19,8 @@ package org.apache.kafka.coordinator.share;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.Deserializer;
import org.apache.kafka.coordinator.share.generated.CoordinatorRecordType;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
@ -113,8 +113,8 @@ public class ShareCoordinatorRecordSerdeTest {
ByteBuffer valueBuffer = ByteBuffer.allocate(64);
CoordinatorLoader.UnknownRecordTypeException ex =
assertThrows(CoordinatorLoader.UnknownRecordTypeException.class,
Deserializer.UnknownRecordTypeException ex =
assertThrows(Deserializer.UnknownRecordTypeException.class,
() -> serde.deserialize(keyBuffer, valueBuffer));
assertEquals((short) 255, ex.unknownType());
}

View File

@ -18,7 +18,6 @@ package org.apache.kafka.coordinator.transaction;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde;
import org.apache.kafka.coordinator.transaction.generated.CoordinatorRecordType;
@ -29,7 +28,7 @@ public class TransactionCoordinatorRecordSerde extends CoordinatorRecordSerde {
try {
return CoordinatorRecordType.fromId(recordType).newRecordKey();
} catch (UnsupportedVersionException ex) {
throw new CoordinatorLoader.UnknownRecordTypeException(recordType);
throw new UnknownRecordTypeException(recordType);
}
}
@ -38,7 +37,7 @@ public class TransactionCoordinatorRecordSerde extends CoordinatorRecordSerde {
try {
return CoordinatorRecordType.fromId(recordType).newRecordValue();
} catch (UnsupportedVersionException ex) {
throw new CoordinatorLoader.UnknownRecordTypeException(recordType);
throw new UnknownRecordTypeException(recordType);
}
}
}

View File

@ -18,8 +18,8 @@ package org.apache.kafka.coordinator.transaction;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.Deserializer;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
import org.apache.kafka.server.common.ApiMessageAndVersion;
@ -120,8 +120,8 @@ public class TransactionCoordinatorRecordSerdeTest {
ByteBuffer valueBuffer = ByteBuffer.allocate(64);
CoordinatorLoader.UnknownRecordTypeException ex =
assertThrows(CoordinatorLoader.UnknownRecordTypeException.class,
Deserializer.UnknownRecordTypeException ex =
assertThrows(Deserializer.UnknownRecordTypeException.class,
() -> serde.deserialize(keyBuffer, valueBuffer));
assertEquals((short) 255, ex.unknownType());
}