KAFKA-18206: EmbeddedKafkaCluster must set features (#18189)

related to KAFKA-18206, set features in EmbeddedKafkaCluster in both streams and connect module, note that this PR also fix potential transaction with empty records in sendPrivileged method as transaction version 2 doesn't allow this kind of scenario.

Reviewers: Justine Olshan <jolshan@confluent.io>
This commit is contained in:
Kuan-Po Tseng 2025-02-06 01:14:36 +08:00 committed by Justine
parent 6684319185
commit 4c9d335bcb
3 changed files with 14 additions and 4 deletions

View File

@ -30,7 +30,7 @@ import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
@ -1225,7 +1225,7 @@ public class ExactlyOnceSourceIntegrationTest {
private void assertTransactionalProducerIsFenced(KafkaProducer<byte[], byte[]> producer, String topic) {
producer.beginTransaction();
assertThrows(
ProducerFencedException.class,
InvalidProducerEpochException.class,
() -> {
producer.send(new ProducerRecord<>(topic, new byte[] {69}, new byte[] {96}));
producer.commitTransaction();

View File

@ -699,7 +699,8 @@ class KRaftClusterTest {
new AlterConfigOp(new ConfigEntry("max.connections.per.ip", "60"), OpType.SET))))))
validateConfigs(admin, Map(new ConfigResource(Type.BROKER, "") -> Seq(
("log.roll.ms", "1234567"),
("max.connections.per.ip", "60"))), exhaustive = true)
("max.connections.per.ip", "60"),
("min.insync.replicas", "1"))), exhaustive = true)
admin.createTopics(util.Arrays.asList(
new NewTopic("foo", 2, 3.toShort),

View File

@ -25,6 +25,7 @@ import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.properties.MetaProperties;
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.common.MetadataVersion;
import java.io.File;
@ -56,7 +57,15 @@ public class TestKitNodes {
private BootstrapMetadata bootstrapMetadata;
public Builder() {
this(BootstrapMetadata.fromVersion(MetadataVersion.latestTesting(), "testkit"));
this(BootstrapMetadata.fromVersions(
MetadataVersion.latestTesting(),
Feature.PRODUCTION_FEATURES.stream()
.collect(Collectors.toMap(
Feature::featureName,
feature -> feature.defaultLevel(MetadataVersion.latestTesting()),
(existing, replacement) -> existing,
TreeMap::new)),
"testkit"));
}
public Builder(BootstrapMetadata bootstrapMetadata) {