mirror of https://github.com/apache/kafka.git
KAFKA-18206: EmbeddedKafkaCluster must set features (#18189)
related to KAFKA-18206, set features in EmbeddedKafkaCluster in both streams and connect module, note that this PR also fix potential transaction with empty records in sendPrivileged method as transaction version 2 doesn't allow this kind of scenario. Reviewers: Justine Olshan <jolshan@confluent.io>
This commit is contained in:
parent
102de21355
commit
b99be961b8
|
@ -30,7 +30,7 @@ import org.apache.kafka.common.acl.AclOperation;
|
|||
import org.apache.kafka.common.acl.AclPermissionType;
|
||||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
|
||||
import org.apache.kafka.common.errors.ProducerFencedException;
|
||||
import org.apache.kafka.common.errors.InvalidProducerEpochException;
|
||||
import org.apache.kafka.common.resource.PatternType;
|
||||
import org.apache.kafka.common.resource.ResourcePattern;
|
||||
import org.apache.kafka.common.resource.ResourceType;
|
||||
|
@ -1225,7 +1225,7 @@ public class ExactlyOnceSourceIntegrationTest {
|
|||
private void assertTransactionalProducerIsFenced(KafkaProducer<byte[], byte[]> producer, String topic) {
|
||||
producer.beginTransaction();
|
||||
assertThrows(
|
||||
ProducerFencedException.class,
|
||||
InvalidProducerEpochException.class,
|
||||
() -> {
|
||||
producer.send(new ProducerRecord<>(topic, new byte[] {69}, new byte[] {96}));
|
||||
producer.commitTransaction();
|
||||
|
|
|
@ -696,7 +696,8 @@ class KRaftClusterTest {
|
|||
new AlterConfigOp(new ConfigEntry("max.connections.per.ip", "60"), OpType.SET))))))
|
||||
validateConfigs(admin, Map(new ConfigResource(Type.BROKER, "") -> Seq(
|
||||
("log.roll.ms", "1234567"),
|
||||
("max.connections.per.ip", "60"))), exhaustive = true)
|
||||
("max.connections.per.ip", "60"),
|
||||
("min.insync.replicas", "1"))), exhaustive = true)
|
||||
|
||||
admin.createTopics(util.Arrays.asList(
|
||||
new NewTopic("foo", 2, 3.toShort),
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
|
|||
import org.apache.kafka.metadata.properties.MetaProperties;
|
||||
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
|
||||
import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
|
||||
import org.apache.kafka.server.common.Feature;
|
||||
import org.apache.kafka.server.common.MetadataVersion;
|
||||
|
||||
import java.io.File;
|
||||
|
@ -55,7 +56,15 @@ public class TestKitNodes {
|
|||
private BootstrapMetadata bootstrapMetadata;
|
||||
|
||||
public Builder() {
|
||||
this(BootstrapMetadata.fromVersion(MetadataVersion.latestTesting(), "testkit"));
|
||||
this(BootstrapMetadata.fromVersions(
|
||||
MetadataVersion.latestTesting(),
|
||||
Feature.PRODUCTION_FEATURES.stream()
|
||||
.collect(Collectors.toMap(
|
||||
Feature::featureName,
|
||||
feature -> feature.defaultLevel(MetadataVersion.latestTesting()),
|
||||
(existing, replacement) -> existing,
|
||||
TreeMap::new)),
|
||||
"testkit"));
|
||||
}
|
||||
|
||||
public Builder(BootstrapMetadata bootstrapMetadata) {
|
||||
|
|
Loading…
Reference in New Issue