From bf05d2c91439ad4218f30343215a7ce16b0dc2b4 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Mon, 3 Feb 2025 11:19:27 +0100 Subject: [PATCH] KAFKA-18672; CoordinatorRecordSerde must validate value version (#18749) CoordinatorRecordSerde does not validate the version of the value to check whether the version is supported by the current version of the software. This is problematic if a future and unsupported version of the record is read by an older version of the software because it would misinterpret the bytes. Hence CoordinatorRecordSerde must throw an error if the version is unknown. This is also consistent with the handling in the old coordinator. Reviewers: Jeff Kim --- .../common/runtime/CoordinatorLoader.java | 17 --------- .../runtime/CoordinatorRecordSerde.java | 5 +++ .../common/runtime/Deserializer.java | 36 +++++++++++++++++++ .../group/CoordinatorLoaderImpl.scala | 3 +- .../scala/kafka/tools/DumpLogSegments.scala | 2 +- .../group/CoordinatorLoaderImplTest.scala | 2 +- .../group/GroupCoordinatorRecordSerde.java | 5 ++- .../GroupCoordinatorRecordSerdeTest.java | 34 ++++++++++++++++-- .../share/ShareCoordinatorRecordSerde.java | 5 ++- .../ShareCoordinatorRecordSerdeTest.java | 6 ++-- .../TransactionCoordinatorRecordSerde.java | 5 ++- ...TransactionCoordinatorRecordSerdeTest.java | 6 ++-- 12 files changed, 88 insertions(+), 38 deletions(-) diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoader.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoader.java index e48a4253fa4..4f739082d67 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoader.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoader.java @@ -28,23 +28,6 @@ import java.util.concurrent.CompletableFuture; */ public interface CoordinatorLoader extends AutoCloseable { - /** - * UnknownRecordTypeException is thrown when the Deserializer encounters - * an unknown record type. - */ - class UnknownRecordTypeException extends RuntimeException { - private final short unknownType; - - public UnknownRecordTypeException(short unknownType) { - super(String.format("Found an unknown record type %d", unknownType)); - this.unknownType = unknownType; - } - - public short unknownType() { - return unknownType; - } - } - /** * Object that is returned as part of the future from load(). Holds the partition load time and the * end time. diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRecordSerde.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRecordSerde.java index f917a7c6e7d..a8b77eb299d 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRecordSerde.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRecordSerde.java @@ -76,6 +76,11 @@ public abstract class CoordinatorRecordSerde implements Serializer valueMessage.highestSupportedVersion()) { + throw new UnknownRecordVersionException(recordType, valueVersion); + } + readMessage(valueMessage, valueBuffer, valueVersion, "value"); return CoordinatorRecord.record( diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/Deserializer.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/Deserializer.java index 1873a254609..09a964dd754 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/Deserializer.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/Deserializer.java @@ -24,6 +24,42 @@ import java.nio.ByteBuffer; * @param The record type. */ public interface Deserializer { + /** + * UnknownRecordTypeException is thrown when the Deserializer encounters + * an unknown record type. + */ + class UnknownRecordTypeException extends RuntimeException { + private final short unknownType; + + public UnknownRecordTypeException(short unknownType) { + super(String.format("Found an unknown record type %d", unknownType)); + this.unknownType = unknownType; + } + + public short unknownType() { + return unknownType; + } + } + + class UnknownRecordVersionException extends RuntimeException { + private final short type; + private final short unknownVersion; + + public UnknownRecordVersionException(short type, short unknownVersion) { + super(String.format("Found an unknown record version %d for %d type", unknownVersion, type)); + this.type = type; + this.unknownVersion = unknownVersion; + } + + public short type() { + return type; + } + + public short unknownVersion() { + return unknownVersion; + } + } + /** * Deserializes the key and the value. * diff --git a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala index f5e42f099bd..70536abecc0 100644 --- a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala +++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala @@ -23,7 +23,8 @@ import org.apache.kafka.common.errors.NotLeaderOrFollowerException import org.apache.kafka.common.record.{ControlRecordType, FileRecords, MemoryRecords} import org.apache.kafka.common.requests.TransactionResult import org.apache.kafka.common.utils.Time -import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.{LoadSummary, UnknownRecordTypeException} +import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.LoadSummary +import org.apache.kafka.coordinator.common.runtime.Deserializer.UnknownRecordTypeException import org.apache.kafka.coordinator.common.runtime.{CoordinatorLoader, CoordinatorPlayback, Deserializer} import org.apache.kafka.server.storage.log.FetchIsolation import org.apache.kafka.server.util.KafkaScheduler diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index dfa357943c5..09b8eab3d48 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -37,8 +37,8 @@ import org.apache.kafka.common.protocol.{ApiMessage, ByteBufferAccessor} import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Utils import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue, GroupMetadataValueJsonConverter, CoordinatorRecordJsonConverters => GroupCoordinatorRecordJsonConverters, CoordinatorRecordType => GroupCoordinatorRecordType} -import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.UnknownRecordTypeException import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde +import org.apache.kafka.coordinator.common.runtime.Deserializer.UnknownRecordTypeException import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde import org.apache.kafka.coordinator.share.ShareCoordinatorRecordSerde import org.apache.kafka.coordinator.share.generated.{CoordinatorRecordJsonConverters => ShareCoordinatorRecordJsonConverters} diff --git a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala index 68a6ba5da1d..dc4bbc830cd 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala @@ -24,7 +24,7 @@ import org.apache.kafka.common.errors.NotLeaderOrFollowerException import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, FileRecords, MemoryRecords, RecordBatch, SimpleRecord} import org.apache.kafka.common.requests.TransactionResult import org.apache.kafka.common.utils.{MockTime, Time} -import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.UnknownRecordTypeException +import org.apache.kafka.coordinator.common.runtime.Deserializer.UnknownRecordTypeException import org.apache.kafka.coordinator.common.runtime.{CoordinatorPlayback, Deserializer} import org.apache.kafka.server.storage.log.FetchIsolation import org.apache.kafka.storage.internals.log.{FetchDataInfo, LogOffsetMetadata} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerde.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerde.java index 374bae27026..29a54545e59 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerde.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerde.java @@ -18,7 +18,6 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.ApiMessage; -import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde; import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType; @@ -31,7 +30,7 @@ public class GroupCoordinatorRecordSerde extends CoordinatorRecordSerde { try { return CoordinatorRecordType.fromId(recordType).newRecordKey(); } catch (UnsupportedVersionException ex) { - throw new CoordinatorLoader.UnknownRecordTypeException(recordType); + throw new UnknownRecordTypeException(recordType); } } @@ -40,7 +39,7 @@ public class GroupCoordinatorRecordSerde extends CoordinatorRecordSerde { try { return CoordinatorRecordType.fromId(recordType).newRecordValue(); } catch (UnsupportedVersionException ex) { - throw new CoordinatorLoader.UnknownRecordTypeException(recordType); + throw new UnknownRecordTypeException(recordType); } } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerdeTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerdeTest.java index a17a46e947b..d864b4281d1 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerdeTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordSerdeTest.java @@ -18,8 +18,8 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.MessageUtil; -import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.common.runtime.Deserializer; import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType; @@ -121,8 +121,8 @@ public class GroupCoordinatorRecordSerdeTest { ByteBuffer valueBuffer = ByteBuffer.allocate(64); - CoordinatorLoader.UnknownRecordTypeException ex = - assertThrows(CoordinatorLoader.UnknownRecordTypeException.class, + Deserializer.UnknownRecordTypeException ex = + assertThrows(Deserializer.UnknownRecordTypeException.class, () -> serde.deserialize(keyBuffer, valueBuffer)); assertEquals((short) 255, ex.unknownType()); } @@ -198,6 +198,34 @@ public class GroupCoordinatorRecordSerdeTest { ex.getMessage()); } + @Test + public void testDeserializeWithInvalidValueVersion() { + GroupCoordinatorRecordSerde serde = new GroupCoordinatorRecordSerde(); + + ApiMessage key = new ConsumerGroupMetadataKey().setGroupId("foo"); + ByteBuffer keyBuffer = MessageUtil.toCoordinatorTypePrefixedByteBuffer(key); + + ByteBuffer valueBuffer1 = ByteBuffer.allocate(2); + valueBuffer1.putShort((short) (ConsumerGroupMetadataValue.HIGHEST_SUPPORTED_VERSION + 1)); + valueBuffer1.rewind(); + + Deserializer.UnknownRecordVersionException ex = + assertThrows(Deserializer.UnknownRecordVersionException.class, + () -> serde.deserialize(keyBuffer, valueBuffer1)); + assertEquals(key.apiKey(), ex.type()); + assertEquals(ConsumerGroupMetadataValue.HIGHEST_SUPPORTED_VERSION + 1, ex.unknownVersion()); + + keyBuffer.rewind(); + ByteBuffer valueBuffer2 = ByteBuffer.allocate(2); + valueBuffer2.putShort((short) (ConsumerGroupMetadataValue.LOWEST_SUPPORTED_VERSION - 1)); + valueBuffer2.rewind(); + + ex = assertThrows(Deserializer.UnknownRecordVersionException.class, + () -> serde.deserialize(keyBuffer, valueBuffer2)); + assertEquals(key.apiKey(), ex.type()); + assertEquals(ConsumerGroupMetadataValue.LOWEST_SUPPORTED_VERSION - 1, ex.unknownVersion()); + } + @Test public void testDeserializeAllRecordTypes() { for (CoordinatorRecordType record : CoordinatorRecordType.values()) { diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerde.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerde.java index c5451c33cdd..3a6f3318f44 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerde.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerde.java @@ -19,7 +19,6 @@ package org.apache.kafka.coordinator.share; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.ApiMessage; -import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde; import org.apache.kafka.coordinator.share.generated.CoordinatorRecordType; @@ -29,7 +28,7 @@ public class ShareCoordinatorRecordSerde extends CoordinatorRecordSerde { try { return CoordinatorRecordType.fromId(recordType).newRecordKey(); } catch (UnsupportedVersionException ex) { - throw new CoordinatorLoader.UnknownRecordTypeException(recordType); + throw new UnknownRecordTypeException(recordType); } } @@ -38,7 +37,7 @@ public class ShareCoordinatorRecordSerde extends CoordinatorRecordSerde { try { return CoordinatorRecordType.fromId(recordType).newRecordValue(); } catch (UnsupportedVersionException ex) { - throw new CoordinatorLoader.UnknownRecordTypeException(recordType); + throw new UnknownRecordTypeException(recordType); } } } diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerdeTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerdeTest.java index d64824e599b..8f0bb7e82d1 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerdeTest.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerdeTest.java @@ -19,8 +19,8 @@ package org.apache.kafka.coordinator.share; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.MessageUtil; -import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.common.runtime.Deserializer; import org.apache.kafka.coordinator.share.generated.CoordinatorRecordType; import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey; import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue; @@ -113,8 +113,8 @@ public class ShareCoordinatorRecordSerdeTest { ByteBuffer valueBuffer = ByteBuffer.allocate(64); - CoordinatorLoader.UnknownRecordTypeException ex = - assertThrows(CoordinatorLoader.UnknownRecordTypeException.class, + Deserializer.UnknownRecordTypeException ex = + assertThrows(Deserializer.UnknownRecordTypeException.class, () -> serde.deserialize(keyBuffer, valueBuffer)); assertEquals((short) 255, ex.unknownType()); } diff --git a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionCoordinatorRecordSerde.java b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionCoordinatorRecordSerde.java index 165baceaf23..d8b60900c75 100644 --- a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionCoordinatorRecordSerde.java +++ b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionCoordinatorRecordSerde.java @@ -18,7 +18,6 @@ package org.apache.kafka.coordinator.transaction; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.ApiMessage; -import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde; import org.apache.kafka.coordinator.transaction.generated.CoordinatorRecordType; @@ -29,7 +28,7 @@ public class TransactionCoordinatorRecordSerde extends CoordinatorRecordSerde { try { return CoordinatorRecordType.fromId(recordType).newRecordKey(); } catch (UnsupportedVersionException ex) { - throw new CoordinatorLoader.UnknownRecordTypeException(recordType); + throw new UnknownRecordTypeException(recordType); } } @@ -38,7 +37,7 @@ public class TransactionCoordinatorRecordSerde extends CoordinatorRecordSerde { try { return CoordinatorRecordType.fromId(recordType).newRecordValue(); } catch (UnsupportedVersionException ex) { - throw new CoordinatorLoader.UnknownRecordTypeException(recordType); + throw new UnknownRecordTypeException(recordType); } } } diff --git a/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionCoordinatorRecordSerdeTest.java b/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionCoordinatorRecordSerdeTest.java index 315cba0e712..cbee0273d36 100644 --- a/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionCoordinatorRecordSerdeTest.java +++ b/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionCoordinatorRecordSerdeTest.java @@ -18,8 +18,8 @@ package org.apache.kafka.coordinator.transaction; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.MessageUtil; -import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.common.runtime.Deserializer; import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey; import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue; import org.apache.kafka.server.common.ApiMessageAndVersion; @@ -120,8 +120,8 @@ public class TransactionCoordinatorRecordSerdeTest { ByteBuffer valueBuffer = ByteBuffer.allocate(64); - CoordinatorLoader.UnknownRecordTypeException ex = - assertThrows(CoordinatorLoader.UnknownRecordTypeException.class, + Deserializer.UnknownRecordTypeException ex = + assertThrows(Deserializer.UnknownRecordTypeException.class, () -> serde.deserialize(keyBuffer, valueBuffer)); assertEquals((short) 255, ex.unknownType()); }