diff --git a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java index 96ba599450b..b64e6d1a000 100644 --- a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java +++ b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java @@ -166,7 +166,7 @@ public interface ClusterInstance { } default Producer producer() { - return new KafkaProducer<>(Map.of()); + return producer(Map.of()); } default Consumer consumer(Map configs) { @@ -180,7 +180,7 @@ public interface ClusterInstance { } default Consumer consumer() { - return new KafkaConsumer<>(Map.of()); + return consumer(Map.of()); } default Admin admin(Map configs, boolean usingBootstrapControllers) { diff --git a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java index 7fa28e64580..b96e8c28c08 100644 --- a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java +++ b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java @@ -32,6 +32,8 @@ import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.test.TestUtils; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; import org.apache.kafka.server.common.MetadataVersion; import org.junit.jupiter.api.Assertions; @@ -292,4 +294,32 @@ public class ClusterTestExtensionsTest { assertEquals(value, records.get(0).value()); } } + + @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, serverProperties = { + @ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + @ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") + }) + public void testCreateDefaultProducerAndConsumer(ClusterInstance cluster) throws InterruptedException { + String topic = "topic"; + Bytes key = Bytes.wrap("key".getBytes()); + Bytes value = Bytes.wrap("value".getBytes()); + try (Admin adminClient = cluster.admin(); + Producer producer = cluster.producer(); + Consumer consumer = cluster.consumer() + ) { + adminClient.createTopics(singleton(new NewTopic(topic, 1, (short) 1))); + assertNotNull(producer); + assertNotNull(consumer); + producer.send(new ProducerRecord<>(topic, key, value)); + producer.flush(); + consumer.subscribe(singletonList(topic)); + List> records = new ArrayList<>(); + TestUtils.waitForCondition(() -> { + consumer.poll(Duration.ofMillis(100)).forEach(records::add); + return records.size() == 1; + }, "Failed to receive message"); + assertEquals(key, records.get(0).key()); + assertEquals(value, records.get(0).value()); + } + } }