From b99be961b8a05e75b7225280c9592574423cbfe1 Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Thu, 6 Feb 2025 01:14:36 +0800 Subject: [PATCH] KAFKA-18206: EmbeddedKafkaCluster must set features (#18189) related to KAFKA-18206, set features in EmbeddedKafkaCluster in both streams and connect module, note that this PR also fix potential transaction with empty records in sendPrivileged method as transaction version 2 doesn't allow this kind of scenario. Reviewers: Justine Olshan --- .../integration/ExactlyOnceSourceIntegrationTest.java | 4 ++-- .../integration/kafka/server/KRaftClusterTest.scala | 3 ++- .../org/apache/kafka/common/test/TestKitNodes.java | 11 ++++++++++- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java index 6bb12e8f178..494af3358e0 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java @@ -30,7 +30,7 @@ import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; -import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.InvalidProducerEpochException; import org.apache.kafka.common.resource.PatternType; import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.resource.ResourceType; @@ -1225,7 +1225,7 @@ public class ExactlyOnceSourceIntegrationTest { private void assertTransactionalProducerIsFenced(KafkaProducer producer, String topic) { producer.beginTransaction(); assertThrows( - ProducerFencedException.class, + InvalidProducerEpochException.class, () -> { producer.send(new ProducerRecord<>(topic, new byte[] {69}, new byte[] {96})); producer.commitTransaction(); diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index 44bfa9f1734..de9ea1a72c0 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -696,7 +696,8 @@ class KRaftClusterTest { new AlterConfigOp(new ConfigEntry("max.connections.per.ip", "60"), OpType.SET)))))) validateConfigs(admin, Map(new ConfigResource(Type.BROKER, "") -> Seq( ("log.roll.ms", "1234567"), - ("max.connections.per.ip", "60"))), exhaustive = true) + ("max.connections.per.ip", "60"), + ("min.insync.replicas", "1"))), exhaustive = true) admin.createTopics(util.Arrays.asList( new NewTopic("foo", 2, 3.toShort), diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java index c78462c213a..d608972e1d7 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java @@ -25,6 +25,7 @@ import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; import org.apache.kafka.metadata.properties.MetaProperties; import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble; import org.apache.kafka.metadata.properties.MetaPropertiesVersion; +import org.apache.kafka.server.common.Feature; import org.apache.kafka.server.common.MetadataVersion; import java.io.File; @@ -55,7 +56,15 @@ public class TestKitNodes { private BootstrapMetadata bootstrapMetadata; public Builder() { - this(BootstrapMetadata.fromVersion(MetadataVersion.latestTesting(), "testkit")); + this(BootstrapMetadata.fromVersions( + MetadataVersion.latestTesting(), + Feature.PRODUCTION_FEATURES.stream() + .collect(Collectors.toMap( + Feature::featureName, + feature -> feature.defaultLevel(MetadataVersion.latestTesting()), + (existing, replacement) -> existing, + TreeMap::new)), + "testkit")); } public Builder(BootstrapMetadata bootstrapMetadata) {