From c527530e806c7d9f79348656d801b1b78e8f2bec Mon Sep 17 00:00:00 2001 From: Xuan-Zhang Gong Date: Tue, 15 Apr 2025 16:34:47 +0800 Subject: [PATCH] KAFKA-19042 Move ProducerCompressionTest, ProducerFailureHandlingTest, and ProducerIdExpirationTest to client-integration-tests module (#19319) include three test case - ProducerCompressionTest - ProducerFailureHandlingTest - ProducerIdExpirationTest Reviewers: Ken Huang , PoAn Yang , Chia-Ping Tsai --- ...port-control-clients-integration-tests.xml | 16 +- .../producer/ProducerCompressionTest.java | 178 ++++++++++ .../producer/ProducerFailureHandlingTest.java | 305 ++++++++++++++++++ .../producer/ProducerIdExpirationTest.java | 287 ++++++++++++++++ .../kafka/api/ProducerCompressionTest.scala | 167 ---------- .../api/ProducerFailureHandlingTest.scala | 261 --------------- .../kafka/api/ProducerIdExpirationTest.scala | 254 --------------- 7 files changed, 780 insertions(+), 688 deletions(-) create mode 100644 clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerCompressionTest.java create mode 100644 clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerFailureHandlingTest.java create mode 100644 clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIdExpirationTest.java delete mode 100755 core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala delete mode 100644 core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala delete mode 100644 core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala diff --git a/checkstyle/import-control-clients-integration-tests.xml b/checkstyle/import-control-clients-integration-tests.xml index 880873676f6..5ca6c03e8a7 100644 --- a/checkstyle/import-control-clients-integration-tests.xml +++ b/checkstyle/import-control-clients-integration-tests.xml @@ -19,12 +19,16 @@ --> - - + + - - - - + + + + + + + + diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerCompressionTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerCompressionTest.java new file mode 100644 index 00000000000..190507ea9b0 --- /dev/null +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerCompressionTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer; + + +import org.apache.kafka.clients.consumer.CloseOptions; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.test.TestUtils; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static kafka.utils.TestUtils.consumeRecords; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.fail; + + +@ClusterTestDefaults(types = {Type.KRAFT}) +class ProducerCompressionTest { + + private final String topicName = "topic"; + private final int numRecords = 2000; + + /** + * testCompression + *

+ * Compressed messages should be able to sent and consumed correctly + */ + @ClusterTest + void testCompression(ClusterInstance cluster) throws ExecutionException, InterruptedException { + for (CompressionType compression : CompressionType.values()) { + processCompressionTest(cluster, compression); + } + } + + + void processCompressionTest(ClusterInstance cluster, CompressionType compression) throws InterruptedException, + ExecutionException { + String compressionTopic = topicName + "_" + compression.name; + cluster.createTopic(compressionTopic, 1, (short) 1); + Map producerProps = new HashMap<>(); + producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression.name); + producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000"); + producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "200"); + Consumer classicConsumer = cluster.consumer(Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic")); + Consumer consumer = cluster.consumer(Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer")); + try (Producer producer = cluster.producer(producerProps)) { + int partition = 0; + // prepare the messages + List messages = IntStream.range(0, numRecords).mapToObj(this::messageValue).toList(); + Header[] headerArr = new Header[]{new RecordHeader("key", "value".getBytes())}; + RecordHeaders headers = new RecordHeaders(headerArr); + + // make sure the returned messages are correct + long now = System.currentTimeMillis(); + List> responses = new ArrayList<>(); + messages.forEach(message -> { + // 1. send message without key and header + responses.add(producer.send(new ProducerRecord<>(compressionTopic, null, now, null, + message.getBytes()))); + // 2. send message with key, without header + responses.add(producer.send(new ProducerRecord<>(compressionTopic, null, now, + String.valueOf(message.length()).getBytes(), message.getBytes()))); + // 3. send message with key and header + responses.add(producer.send(new ProducerRecord<>(compressionTopic, null, now, + String.valueOf(message.length()).getBytes(), message.getBytes(), headers))); + }); + for (int offset = 0; offset < responses.size(); offset++) { + assertEquals(offset, responses.get(offset).get().offset(), compression.name); + } + verifyConsumerRecords(consumer, messages, now, headerArr, partition, compressionTopic, compression.name); + verifyConsumerRecords(classicConsumer, messages, now, headerArr, partition, compressionTopic, + compression.name); + } finally { + // This consumer close very slowly, which may cause the entire test to time out, and we can't wait for + // it to auto close + consumer.close(CloseOptions.timeout(Duration.ofSeconds(1))); + classicConsumer.close(CloseOptions.timeout(Duration.ofSeconds(1))); + } + } + + private void verifyConsumerRecords(Consumer consumer, List messages, long now, + Header[] headerArr, int partition, String topic, String compression) { + TopicPartition tp = new TopicPartition(topic, partition); + consumer.assign(List.of(tp)); + consumer.seek(tp, 0); + AtomicInteger num = new AtomicInteger(0); + AtomicInteger flag = new AtomicInteger(0); + consumeRecords(consumer, numRecords * 3, TestUtils.DEFAULT_MAX_WAIT_MS).foreach(record -> { + String messageValue = messages.get(num.get()); + long offset = num.get() * 3L + flag.get(); + if (flag.get() == 0) { + // verify message without key and header + assertNull(record.key(), errorMessage(compression)); + assertEquals(messageValue, new String(record.value()), errorMessage(compression)); + assertEquals(0, record.headers().toArray().length, errorMessage(compression)); + assertEquals(now, record.timestamp(), errorMessage(compression)); + assertEquals(offset, record.offset(), errorMessage(compression)); + } else if (flag.get() == 1) { + // verify message with key, without header + assertEquals(String.valueOf(messageValue.length()), new String(record.key()), errorMessage(compression)); + assertEquals(messageValue, new String(record.value()), errorMessage(compression)); + assertEquals(0, record.headers().toArray().length, errorMessage(compression)); + assertEquals(now, record.timestamp(), errorMessage(compression)); + assertEquals(offset, record.offset(), errorMessage(compression)); + } else if (flag.get() == 2) { + // verify message with key and header + assertEquals(String.valueOf(messageValue.length()), new String(record.key()), errorMessage(compression)); + assertEquals(messageValue, new String(record.value()), errorMessage(compression)); + assertEquals(1, record.headers().toArray().length, errorMessage(compression)); + assertEquals(headerArr[0], record.headers().toArray()[0], errorMessage(compression)); + assertEquals(now, record.timestamp(), errorMessage(compression)); + assertEquals(offset, record.offset(), errorMessage(compression)); + } else { + fail(); + } + flagLoop(num, flag); + return null; + }); + } + + private void flagLoop(AtomicInteger num, AtomicInteger flag) { + if (flag.get() == 2) { + num.incrementAndGet(); + flag.set(0); + } else { + flag.incrementAndGet(); + } + } + + private String messageValue(int length) { + Random random = new Random(); + return IntStream.range(0, length) + .map(i -> random.nextInt(TestUtils.LETTERS_AND_DIGITS.length())) + .mapToObj(TestUtils.LETTERS_AND_DIGITS::charAt) + .map(String::valueOf) + .collect(Collectors.joining()); + } + + private String errorMessage(String compression) { + return String.format("Compression type: %s - Assertion failed", compression); + } + +} diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerFailureHandlingTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerFailureHandlingTest.java new file mode 100644 index 00000000000..82cac8ae0ba --- /dev/null +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerFailureHandlingTest.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer; + +import kafka.server.KafkaBroker; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.NotEnoughReplicasAfterAppendException; +import org.apache.kafka.common.errors.NotEnoughReplicasException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.record.DefaultRecord; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG; +import static org.apache.kafka.common.config.TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.server.config.ReplicationConfigs.REPLICA_FETCH_MAX_BYTES_CONFIG; +import static org.apache.kafka.server.config.ReplicationConfigs.REPLICA_FETCH_RESPONSE_MAX_BYTES_DOC; +import static org.apache.kafka.server.config.ServerConfigs.MESSAGE_MAX_BYTES_CONFIG; +import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +@ClusterTestDefaults( + types = {Type.KRAFT}, + brokers = 2, + serverProperties = { + @ClusterConfigProperty(key = AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"), + // 15000 is filed serverMessageMaxBytes + @ClusterConfigProperty(key = MESSAGE_MAX_BYTES_CONFIG, value = "15000"), + // 15200 is filed replicaFetchMaxBytes + @ClusterConfigProperty(key = REPLICA_FETCH_MAX_BYTES_CONFIG, value = "15200"), + // 15400 is filed replicaFetchMaxResponseBytes + @ClusterConfigProperty(key = REPLICA_FETCH_RESPONSE_MAX_BYTES_DOC, value = "15400"), + // Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic) + // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long + @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + } +) +public class ProducerFailureHandlingTest { + + private final int producerBufferSize = 30000; + private final int serverMessageMaxBytes = producerBufferSize / 2; + private final int replicaFetchMaxPartitionBytes = serverMessageMaxBytes + 200; + private final int replicaFetchMaxResponseBytes = replicaFetchMaxPartitionBytes + 200; + private final String topic1 = "topic-1"; + private final String topic2 = "topic-2"; + + + /** + * With ack == 0 the future metadata will have no exceptions with offset -1 + */ + @ClusterTest + void testTooLargeRecordWithAckZero(ClusterInstance clusterInstance) throws InterruptedException, + ExecutionException { + clusterInstance.createTopic(topic1, 1, (short) clusterInstance.brokers().size()); + try (Producer producer = clusterInstance.producer(producerConfig(0))) { + // send a too-large record + ProducerRecord record = + new ProducerRecord<>(topic1, null, "key".getBytes(), new byte[serverMessageMaxBytes + 1]); + + RecordMetadata recordMetadata = producer.send(record).get(); + assertNotNull(recordMetadata); + assertFalse(recordMetadata.hasOffset()); + assertEquals(-1L, recordMetadata.offset()); + } + } + + /** + * With ack == 1 the future metadata will throw ExecutionException caused by RecordTooLargeException + */ + @ClusterTest + void testTooLargeRecordWithAckOne(ClusterInstance clusterInstance) throws InterruptedException { + clusterInstance.createTopic(topic1, 1, (short) clusterInstance.brokers().size()); + + try (Producer producer = clusterInstance.producer(producerConfig(1))) { + // send a too-large record + ProducerRecord record = + new ProducerRecord<>(topic1, null, "key".getBytes(), new byte[serverMessageMaxBytes + 1]); + assertThrows(ExecutionException.class, () -> producer.send(record).get()); + } + } + + + /** + * This should succeed as the replica fetcher thread can handle oversized messages since KIP-74 + */ + @ClusterTest + void testPartitionTooLargeForReplicationWithAckAll(ClusterInstance clusterInstance) throws InterruptedException, + ExecutionException { + checkTooLargeRecordForReplicationWithAckAll(clusterInstance, replicaFetchMaxPartitionBytes); + } + + /** + * This should succeed as the replica fetcher thread can handle oversized messages since KIP-74 + */ + @ClusterTest + void testResponseTooLargeForReplicationWithAckAll(ClusterInstance clusterInstance) throws InterruptedException, + ExecutionException { + checkTooLargeRecordForReplicationWithAckAll(clusterInstance, replicaFetchMaxResponseBytes); + } + + + /** + * With non-exist-topic the future metadata should return ExecutionException caused by TimeoutException + */ + @ClusterTest + void testNonExistentTopic(ClusterInstance clusterInstance) { + // send a record with non-exist topic + ProducerRecord record = + new ProducerRecord<>(topic2, null, "key".getBytes(), "value".getBytes()); + try (Producer producer = clusterInstance.producer(producerConfig(0))) { + assertThrows(ExecutionException.class, () -> producer.send(record).get()); + } + } + + + /** + * With incorrect broker-list the future metadata should return ExecutionException caused by TimeoutException + */ + @ClusterTest + void testWrongBrokerList(ClusterInstance clusterInstance) throws InterruptedException { + clusterInstance.createTopic(topic1, 1, (short) 1); + // producer with incorrect broker list + Map producerConfig = new HashMap<>(producerConfig(1)); + producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8686,localhost:4242"); + try (Producer producer = clusterInstance.producer(producerConfig)) { + // send a record with incorrect broker list + ProducerRecord record = + new ProducerRecord<>(topic1, null, "key".getBytes(), "value".getBytes()); + assertThrows(ExecutionException.class, () -> producer.send(record).get()); + } + } + + /** + * Send with invalid partition id should return ExecutionException caused by TimeoutException + * when partition is higher than the upper bound of partitions. + */ + @ClusterTest + void testInvalidPartition(ClusterInstance clusterInstance) throws InterruptedException { + // create topic with a single partition + clusterInstance.createTopic(topic1, 1, (short) clusterInstance.brokers().size()); + + // create a record with incorrect partition id (higher than the number of partitions), send should fail + try (Producer producer = clusterInstance.producer(producerConfig(0))) { + ProducerRecord higherRecord = + new ProducerRecord<>(topic1, 1, "key".getBytes(), "value".getBytes()); + Exception e = assertThrows(ExecutionException.class, () -> producer.send(higherRecord).get()); + assertEquals(TimeoutException.class, e.getCause().getClass()); + } + } + + + /** + * The send call after producer closed should throw IllegalStateException + */ + @ClusterTest + void testSendAfterClosed(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException { + // create topic + clusterInstance.createTopic(topic1, 1, (short) clusterInstance.brokers().size()); + + Producer producer1 = clusterInstance.producer(producerConfig(0)); + Producer producer2 = clusterInstance.producer(producerConfig(1)); + Producer producer3 = clusterInstance.producer(producerConfig(-1)); + + ProducerRecord record = + new ProducerRecord<>(topic1, null, "key".getBytes(), "value".getBytes()); + // first send a message to make sure the metadata is refreshed + producer1.send(record).get(); + producer2.send(record).get(); + producer3.send(record).get(); + + producer1.close(); + assertThrows(IllegalStateException.class, () -> producer1.send(record)); + producer2.close(); + assertThrows(IllegalStateException.class, () -> producer2.send(record)); + producer3.close(); + assertThrows(IllegalStateException.class, () -> producer3.send(record)); + } + + @ClusterTest + void testCannotSendToInternalTopic(ClusterInstance clusterInstance) throws InterruptedException { + try (Admin admin = clusterInstance.admin()) { + Map topicConfig = new HashMap<>(); + clusterInstance.brokers().get(0) + .groupCoordinator() + .groupMetadataTopicConfigs() + .forEach((k, v) -> topicConfig.put(k.toString(), v.toString())); + admin.createTopics(List.of(new NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, 1, (short) 1).configs(topicConfig))); + clusterInstance.waitForTopic(Topic.GROUP_METADATA_TOPIC_NAME, 0); + } + + try (Producer producer = clusterInstance.producer(producerConfig(1))) { + Exception thrown = assertThrows(ExecutionException.class, + () -> producer.send(new ProducerRecord<>(Topic.GROUP_METADATA_TOPIC_NAME, "test".getBytes(), + "test".getBytes())).get()); + assertInstanceOf(InvalidTopicException.class, thrown.getCause(), + () -> "Unexpected exception while sending to an invalid topic " + thrown.getCause()); + } + } + + @ClusterTest + void testNotEnoughReplicasAfterBrokerShutdown(ClusterInstance clusterInstance) throws InterruptedException, + ExecutionException { + String topicName = "minisrtest2"; + int brokerNum = clusterInstance.brokers().size(); + Map topicConfig = Map.of(MIN_IN_SYNC_REPLICAS_CONFIG, String.valueOf(brokerNum)); + try (Admin admin = clusterInstance.admin()) { + admin.createTopics(List.of(new NewTopic(topicName, 1, (short) brokerNum).configs(topicConfig))); + } + + ProducerRecord record = + new ProducerRecord<>(topicName, null, "key".getBytes(), "value".getBytes()); + + try (Producer producer = clusterInstance.producer(producerConfig(-1))) { + // this should work with all brokers up and running + producer.send(record).get(); + // shut down one broker + KafkaBroker oneBroker = clusterInstance.brokers().get(0); + oneBroker.shutdown(); + oneBroker.awaitShutdown(); + + Exception e = assertThrows(ExecutionException.class, () -> producer.send(record).get()); + assertTrue(e.getCause() instanceof NotEnoughReplicasException || + e.getCause() instanceof NotEnoughReplicasAfterAppendException || + e.getCause() instanceof TimeoutException); + + // restart the server + oneBroker.startup(); + } + } + + private void checkTooLargeRecordForReplicationWithAckAll(ClusterInstance clusterInstance, int maxFetchSize) throws InterruptedException, ExecutionException { + int maxMessageSize = maxFetchSize + 100; + int brokerSize = clusterInstance.brokers().size(); + Map topicConfig = new HashMap<>(); + topicConfig.put(MIN_IN_SYNC_REPLICAS_CONFIG, String.valueOf(brokerSize)); + topicConfig.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(maxMessageSize)); + + // create topic + String topic10 = "topic10"; + try (Admin admin = clusterInstance.admin()) { + admin.createTopics(List.of(new NewTopic(topic10, brokerSize, (short) brokerSize).configs(topicConfig))); + clusterInstance.waitTopicDeletion("topic10"); + } + + // send a record that is too large for replication, but within the broker max message limit + byte[] value = + new byte[maxMessageSize - DefaultRecordBatch.RECORD_BATCH_OVERHEAD - DefaultRecord.MAX_RECORD_OVERHEAD]; + Producer producer = clusterInstance.producer(producerConfig(-1)); + try (producer) { + ProducerRecord producerRecord = new ProducerRecord<>(topic10, null, value); + RecordMetadata recordMetadata = producer.send(producerRecord).get(); + + assertEquals(topic10, recordMetadata.topic()); + } + } + + private Map producerConfig(int acks) { + return Map.of( + ACKS_CONFIG, String.valueOf(acks), + RETRIES_CONFIG, 0, + REQUEST_TIMEOUT_MS_CONFIG, 30000, + MAX_BLOCK_MS_CONFIG, 10000, + BUFFER_MEMORY_CONFIG, producerBufferSize); + } + +} diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIdExpirationTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIdExpirationTest.java new file mode 100644 index 00000000000..f79b3786253 --- /dev/null +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIdExpirationTest.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer; + +import kafka.server.KafkaBroker; +import kafka.utils.TestUtils; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.ProducerState; +import org.apache.kafka.clients.consumer.CloseOptions; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.InvalidPidMappingException; +import org.apache.kafka.common.errors.TransactionalIdNotFoundException; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; + +import org.opentest4j.AssertionFailedError; + +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static kafka.utils.TestUtils.consumeRecords; +import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG; +import static org.apache.kafka.server.config.ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG; +import static org.apache.kafka.server.config.ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG; +import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG; +import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; +import static org.apache.kafka.test.TestUtils.assertFutureThrows; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@ClusterTestDefaults( + brokers = 3, + serverProperties = { + @ClusterConfigProperty(key = AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"), + // Set a smaller value for the number of partitions for the __consumer_offsets topic + // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively + // long. + @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + @ClusterConfigProperty(key = TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "3"), + @ClusterConfigProperty(key = TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2"), + @ClusterConfigProperty(key = TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "2"), + @ClusterConfigProperty(key = CONTROLLED_SHUTDOWN_ENABLE_CONFIG, value = "true"), + // ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG is not a constant + @ClusterConfigProperty(key = "unclean.leader.election.enable", value = "false"), + @ClusterConfigProperty(key = AUTO_LEADER_REBALANCE_ENABLE_CONFIG, value = "false"), + @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "0"), + @ClusterConfigProperty(key = TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, value = + "200"), + @ClusterConfigProperty(key = TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, value = "5000"), + @ClusterConfigProperty(key = TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG, value = + "500"), + @ClusterConfigProperty(key = PRODUCER_ID_EXPIRATION_MS_CONFIG, value = "10000"), + @ClusterConfigProperty(key = PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, value = "500") + } +) +public class ProducerIdExpirationTest { + private final String topic1 = "topic1"; + private final int numPartitions = 1; + private final short replicationFactor = 3; + private final TopicPartition tp0 = new TopicPartition(topic1, 0); + private final String transactionalId = "transactionalProducer"; + private final ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, ""); + + @ClusterTest + void testProducerIdExpirationWithNoTransactions(ClusterInstance cluster) throws InterruptedException, ExecutionException { + cluster.createTopic(topic1, numPartitions, replicationFactor); + Producer producer = cluster.producer(Map.of(ENABLE_IDEMPOTENCE_CONFIG, true)); + // Send records to populate producer state cache. + producer.send(new ProducerRecord<>(topic1, 0, null, "key".getBytes(), "value".getBytes())); + producer.flush(); + try (Admin admin = cluster.admin(); producer) { + assertEquals(1, producerStates(admin).size()); + + waitProducerIdExpire(admin); + + // Send more records to send producer ID back to brokers. + producer.send(new ProducerRecord<>(topic1, 0, null, "key".getBytes(), "value".getBytes())); + producer.flush(); + + // Producer IDs should repopulate. + assertEquals(1, producerStates(admin).size()); + } + } + + @ClusterTest + void testTransactionAfterTransactionIdExpiresButProducerIdRemains(ClusterInstance cluster) throws InterruptedException, ExecutionException { + cluster.createTopic(topic1, numPartitions, replicationFactor); + Producer producer = cluster.producer(transactionalProducerConfig()); + producer.initTransactions(); + + // Start and then abort a transaction to allow the producer ID to expire. + producer.beginTransaction(); + producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "2", "2", false)); + producer.flush(); + Consumer consumer = cluster.consumer(Map.of(ISOLATION_LEVEL_CONFIG, "read_committed")); + + try (Admin admin = cluster.admin()) { + // Ensure producer IDs are added. + TestUtils.waitUntilTrue(() -> { + try { + return producerStates(admin).size() == 1; + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + }, () -> "Producer IDs were not added.", DEFAULT_MAX_WAIT_MS, 100); + + producer.abortTransaction(); + + // Wait for the transactional ID to expire. + waitUntilTransactionalStateExpires(admin); + + // Producer IDs should be retained. + assertEquals(1, producerStates(admin).size()); + + // Start a new transaction and attempt to send, triggering an AddPartitionsToTxnRequest that will fail + // due to the expired transactional ID, resulting in a fatal error. + producer.beginTransaction(); + Future failedFuture = + producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "1", "1", false)); + TestUtils.waitUntilTrue(failedFuture::isDone, () -> "Producer future never completed.", + DEFAULT_MAX_WAIT_MS, 100); + assertFutureThrows(InvalidPidMappingException.class, failedFuture); + + // Assert that aborting the transaction throws a KafkaException due to the fatal error. + assertThrows(KafkaException.class, producer::abortTransaction); + + // Close the producer and reinitialize to recover from the fatal error. + producer.close(); + producer = cluster.producer(transactionalProducerConfig()); + producer.initTransactions(); + + producer.beginTransaction(); + producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "4", "4", true)); + producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "3", "3", true)); + + // Producer IDs should be retained. + assertFalse(producerStates(admin).isEmpty()); + + producer.commitTransaction(); + + // Check we can still consume the transaction. + consumer.subscribe(List.of(topic1)); + consumeRecords(consumer, 2, DEFAULT_MAX_WAIT_MS).foreach(TestUtils::assertCommittedAndGetValue); + } finally { + producer.close(); + consumer.close(CloseOptions.timeout(Duration.ofSeconds(1))); + } + } + + @ClusterTest + void testDynamicProducerIdExpirationMs(ClusterInstance cluster) throws InterruptedException, ExecutionException { + cluster.createTopic(topic1, numPartitions, replicationFactor); + Producer producer = cluster.producer(Map.of(ENABLE_IDEMPOTENCE_CONFIG, true)); + + // Send records to populate producer state cache. + producer.send(new ProducerRecord<>(topic1, 0, null, "key".getBytes(), "value".getBytes())); + producer.flush(); + + try (Admin admin = cluster.admin(); producer) { + assertEquals(1, producerStates(admin).size()); + + waitProducerIdExpire(admin); + + // Update the producer ID expiration ms to a very high value. + admin.incrementalAlterConfigs(producerIdExpirationConfig("100000")); + + cluster.brokers().values().forEach(broker -> { + TestUtils.waitUntilTrue(() -> broker.logManager().producerStateManagerConfig().producerIdExpirationMs() == 100000, + () -> "Configuration was not updated.", DEFAULT_MAX_WAIT_MS, 100); + }); + // Send more records to send producer ID back to brokers. + producer.send(new ProducerRecord<>(topic1, 0, null, "key".getBytes(), "value".getBytes())); + producer.flush(); + + // Producer IDs should repopulate. + assertEquals(1, producerStates(admin).size()); + + // Ensure producer ID does not expire within 4 seconds. + assertThrows(AssertionFailedError.class, () -> waitProducerIdExpire(admin, TimeUnit.SECONDS.toMillis(4))); + + // Update the expiration time to a low value again. + admin.incrementalAlterConfigs(producerIdExpirationConfig("100")).all().get(); + + KafkaBroker kafkaBroker = cluster.brokers().get(0); + kafkaBroker.shutdown(Duration.ofMinutes(5)); + kafkaBroker.awaitShutdown(); + kafkaBroker.startup(); + cluster.waitForReadyBrokers(); + cluster.brokers().values().forEach(broker -> { + TestUtils.waitUntilTrue(() -> broker.logManager().producerStateManagerConfig().producerIdExpirationMs() == 100, + () -> "Configuration was not updated.", DEFAULT_MAX_WAIT_MS, 100); + }); + + // Ensure producer ID expires quickly again. + waitProducerIdExpire(admin); + } + } + + + private void waitProducerIdExpire(Admin admin) { + waitProducerIdExpire(admin, DEFAULT_MAX_WAIT_MS); + } + + private void waitProducerIdExpire(Admin admin, long timeout) { + TestUtils.waitUntilTrue(() -> { + try { + return producerStates(admin).isEmpty(); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + }, () -> "Producer ID expired.", timeout, 100); + } + + private Map> producerIdExpirationConfig(String configValue) { + ConfigEntry producerIdCfg = new ConfigEntry(PRODUCER_ID_EXPIRATION_MS_CONFIG, configValue); + return Map.of(configResource, List.of(new AlterConfigOp(producerIdCfg, AlterConfigOp.OpType.SET))); + } + + + private void waitUntilTransactionalStateExpires(Admin admin) { + TestUtils.waitUntilTrue(() -> { + boolean removedTransactionState = false; + try { + admin.describeTransactions(List.of(transactionalId)) + .description(transactionalId) + .get(); + } catch (Exception e) { + removedTransactionState = e.getCause() instanceof TransactionalIdNotFoundException; + } + return removedTransactionState; + }, () -> "Transaction state never expired.", DEFAULT_MAX_WAIT_MS, 100); + } + + private Map transactionalProducerConfig() { + return Map.of( + ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId, + ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true, + ProducerConfig.ACKS_CONFIG, "all"); + } + + private List producerStates(Admin admin) throws ExecutionException, InterruptedException { + return admin.describeProducers(Collections.singletonList(tp0)) + .partitionResult(tp0) + .get() + .activeProducers(); + } +} diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala deleted file mode 100755 index 2782a46f18a..00000000000 --- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala +++ /dev/null @@ -1,167 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.api.test - -import kafka.server.{KafkaBroker, KafkaConfig, QuorumTestHarness} -import kafka.utils.TestUtils -import org.apache.kafka.clients.consumer.GroupProtocol -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata} -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.header.Header -import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders} -import org.apache.kafka.common.network.ListenerName -import org.apache.kafka.common.security.auth.SecurityProtocol -import org.apache.kafka.common.serialization.ByteArraySerializer -import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.CsvSource - -import java.util.concurrent.Future -import java.util.{Collections, Properties} -import scala.collection.mutable.ListBuffer -import scala.util.Random - -class ProducerCompressionTest extends QuorumTestHarness { - - private val brokerId = 0 - private val topic = "topic" - private val numRecords = 2000 - - private var broker: KafkaBroker = _ - - @BeforeEach - override def setUp(testInfo: TestInfo): Unit = { - super.setUp(testInfo) - val props = TestUtils.createBrokerConfig(brokerId) - broker = createBroker(new KafkaConfig(props)) - } - - @AfterEach - override def tearDown(): Unit = { - TestUtils.shutdownServers(Seq(broker)) - super.tearDown() - } - - /** - * testCompression - * - * Compressed messages should be able to sent and consumed correctly - */ - @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}.compression={2}") - @CsvSource(value = Array( - "kraft,classic,none", - "kraft,consumer,none", - "kraft,classic,gzip", - "kraft,consumer,gzip", - "kraft,classic,snappy", - "kraft,consumer,snappy", - "kraft,classic,lz4", - "kraft,consumer,lz4", - "kraft,classic,zstd", - "kraft,consumer,zstd" - )) - def testCompression(quorum: String, groupProtocol: String, compression: String): Unit = { - val producerProps = new Properties() - val bootstrapServers = TestUtils.plaintextBootstrapServers(Seq(broker)) - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) - producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression) - producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000") - producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "200") - val producer = new KafkaProducer(producerProps, new ByteArraySerializer, new ByteArraySerializer) - val consumer = TestUtils.createConsumer(bootstrapServers, GroupProtocol.of(groupProtocol)) - - try { - // create topic - val admin = TestUtils.createAdminClient(Seq(broker), - ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)) - try { - TestUtils.createTopicWithAdmin(admin, topic, Seq(broker), controllerServers) - } finally { - admin.close() - } - val partition = 0 - - def messageValue(length: Int): String = { - val random = new Random(0) - new String(random.alphanumeric.take(length).toArray) - } - - // prepare the messages - val messageValues = (0 until numRecords).map(i => messageValue(i)) - val headerArr = Array[Header](new RecordHeader("key", "value".getBytes)) - val headers = new RecordHeaders(headerArr) - - // make sure the returned messages are correct - val now = System.currentTimeMillis() - val responses: ListBuffer[Future[RecordMetadata]] = new ListBuffer[Future[RecordMetadata]]() - - for (message <- messageValues) { - // 1. send message without key and header - responses += producer.send(new ProducerRecord(topic, null, now, null, message.getBytes)) - // 2. send message with key, without header - responses += producer.send(new ProducerRecord(topic, null, now, message.length.toString.getBytes, message.getBytes)) - // 3. send message with key and header - responses += producer.send(new ProducerRecord(topic, null, now, message.length.toString.getBytes, message.getBytes, headers)) - } - for ((future, offset) <- responses.zipWithIndex) { - assertEquals(offset.toLong, future.get.offset) - } - - val tp = new TopicPartition(topic, partition) - // make sure the fetched message count match - consumer.assign(Collections.singleton(tp)) - consumer.seek(tp, 0) - val records = TestUtils.consumeRecords(consumer, numRecords*3) - - for (i <- 0 until numRecords) { - val messageValue = messageValues(i) - // 1. verify message without key and header - var offset = i * 3 - var record = records(offset) - assertNull(record.key()) - assertEquals(messageValue, new String(record.value)) - assertEquals(0, record.headers().toArray.length) - assertEquals(now, record.timestamp) - assertEquals(offset.toLong, record.offset) - - // 2. verify message with key, without header - offset = i * 3 + 1 - record = records(offset) - assertEquals(messageValue.length.toString, new String(record.key())) - assertEquals(messageValue, new String(record.value)) - assertEquals(0, record.headers().toArray.length) - assertEquals(now, record.timestamp) - assertEquals(offset.toLong, record.offset) - - // 3. verify message with key and header - offset = i * 3 + 2 - record = records(offset) - assertEquals(messageValue.length.toString, new String(record.key())) - assertEquals(messageValue, new String(record.value)) - assertEquals(1, record.headers().toArray.length) - assertEquals(headerArr.apply(0), record.headers().toArray.apply(0)) - assertEquals(now, record.timestamp) - assertEquals(offset.toLong, record.offset) - } - } finally { - producer.close() - consumer.close() - } - } -} diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala deleted file mode 100644 index 7f578ad98eb..00000000000 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ /dev/null @@ -1,261 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.api - -import java.util.concurrent.ExecutionException -import java.util.Properties -import kafka.integration.KafkaServerTestHarness -import kafka.server.KafkaConfig -import kafka.utils.{TestInfoUtils, TestUtils} -import org.apache.kafka.clients.producer._ -import org.apache.kafka.common.config.TopicConfig -import org.apache.kafka.common.errors._ -import org.apache.kafka.common.internals.Topic -import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch} -import org.apache.kafka.coordinator.group.GroupCoordinatorConfig -import org.apache.kafka.server.config.{ServerConfigs, ReplicationConfigs, ServerLogConfigs} -import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.{MethodSource, ValueSource} - -class ProducerFailureHandlingTest extends KafkaServerTestHarness { - private val producerBufferSize = 30000 - private val serverMessageMaxBytes = producerBufferSize/2 - private val replicaFetchMaxPartitionBytes = serverMessageMaxBytes + 200 - private val replicaFetchMaxResponseBytes = replicaFetchMaxPartitionBytes + 200 - - val numServers = 2 - - val overridingProps = new Properties() - overridingProps.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, false.toString) - overridingProps.put(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG, serverMessageMaxBytes.toString) - overridingProps.put(ReplicationConfigs.REPLICA_FETCH_MAX_BYTES_CONFIG, replicaFetchMaxPartitionBytes.toString) - overridingProps.put(ReplicationConfigs.REPLICA_FETCH_RESPONSE_MAX_BYTES_DOC, replicaFetchMaxResponseBytes.toString) - // Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic) - // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long - overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 1.toString) - - def generateConfigs = - TestUtils.createBrokerConfigs(numServers, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps)) - - private var producer1: KafkaProducer[Array[Byte], Array[Byte]] = _ - private var producer2: KafkaProducer[Array[Byte], Array[Byte]] = _ - private var producer3: KafkaProducer[Array[Byte], Array[Byte]] = _ - private var producer4: KafkaProducer[Array[Byte], Array[Byte]] = _ - - private val topic1 = "topic-1" - private val topic2 = "topic-2" - - @BeforeEach - override def setUp(testInfo: TestInfo): Unit = { - super.setUp(testInfo) - - producer1 = TestUtils.createProducer(bootstrapServers(), acks = 0, retries = 0, requestTimeoutMs = 30000, maxBlockMs = 10000L, - bufferSize = producerBufferSize) - producer2 = TestUtils.createProducer(bootstrapServers(), acks = 1, retries = 0, requestTimeoutMs = 30000, maxBlockMs = 10000L, - bufferSize = producerBufferSize) - producer3 = TestUtils.createProducer(bootstrapServers(), acks = -1, retries = 0, requestTimeoutMs = 30000, maxBlockMs = 10000L, - bufferSize = producerBufferSize) - } - - @AfterEach - override def tearDown(): Unit = { - if (producer1 != null) producer1.close() - if (producer2 != null) producer2.close() - if (producer3 != null) producer3.close() - if (producer4 != null) producer4.close() - - super.tearDown() - } - - /** - * With ack == 0 the future metadata will have no exceptions with offset -1 - */ - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testTooLargeRecordWithAckZero(groupProtocol: String): Unit = { - // create topic - createTopic(topic1, replicationFactor = numServers) - - // send a too-large record - val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1)) - - val recordMetadata = producer1.send(record).get() - assertNotNull(recordMetadata) - assertFalse(recordMetadata.hasOffset) - assertEquals(-1L, recordMetadata.offset) - } - - /** - * With ack == 1 the future metadata will throw ExecutionException caused by RecordTooLargeException - */ - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testTooLargeRecordWithAckOne(groupProtocol: String): Unit = { - // create topic - createTopic(topic1, replicationFactor = numServers) - - // send a too-large record - val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1)) - assertThrows(classOf[ExecutionException], () => producer2.send(record).get) - } - - private def checkTooLargeRecordForReplicationWithAckAll(maxFetchSize: Int): Unit = { - val maxMessageSize = maxFetchSize + 100 - val topicConfig = new Properties - topicConfig.setProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, numServers.toString) - topicConfig.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, maxMessageSize.toString) - - // create topic - val topic10 = "topic10" - createTopic(topic10, numPartitions = brokers.size, replicationFactor = numServers, topicConfig) - - // send a record that is too large for replication, but within the broker max message limit - val value = new Array[Byte](maxMessageSize - DefaultRecordBatch.RECORD_BATCH_OVERHEAD - DefaultRecord.MAX_RECORD_OVERHEAD) - val record = new ProducerRecord[Array[Byte], Array[Byte]](topic10, null, value) - val recordMetadata = producer3.send(record).get - - assertEquals(topic10, recordMetadata.topic) - } - - /** This should succeed as the replica fetcher thread can handle oversized messages since KIP-74 */ - @ParameterizedTest - @ValueSource(strings = Array("kraft")) - def testPartitionTooLargeForReplicationWithAckAll(quorum: String): Unit = { - checkTooLargeRecordForReplicationWithAckAll(replicaFetchMaxPartitionBytes) - } - - /** This should succeed as the replica fetcher thread can handle oversized messages since KIP-74 */ - @ParameterizedTest - @ValueSource(strings = Array("kraft")) - def testResponseTooLargeForReplicationWithAckAll(quorum: String): Unit = { - checkTooLargeRecordForReplicationWithAckAll(replicaFetchMaxResponseBytes) - } - - /** - * With non-exist-topic the future metadata should return ExecutionException caused by TimeoutException - */ - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testNonExistentTopic(groupProtocol: String): Unit = { - // send a record with non-exist topic - val record = new ProducerRecord(topic2, null, "key".getBytes, "value".getBytes) - assertThrows(classOf[ExecutionException], () => producer1.send(record).get) - } - - /** - * With incorrect broker-list the future metadata should return ExecutionException caused by TimeoutException - * - * TODO: other exceptions that can be thrown in ExecutionException: - * UnknownTopicOrPartitionException - * NotLeaderOrFollowerException - * LeaderNotAvailableException - * CorruptRecordException - * TimeoutException - */ - @ParameterizedTest - @ValueSource(strings = Array("kraft")) - def testWrongBrokerList(quorum: String): Unit = { - // create topic - createTopic(topic1, replicationFactor = numServers) - - // producer with incorrect broker list - producer4 = TestUtils.createProducer("localhost:8686,localhost:4242", acks = 1, maxBlockMs = 10000L, bufferSize = producerBufferSize) - - // send a record with incorrect broker list - val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes) - assertThrows(classOf[ExecutionException], () => producer4.send(record).get) - } - - /** - * Send with invalid partition id should return ExecutionException caused by TimeoutException - * when partition is higher than the upper bound of partitions. - */ - @ParameterizedTest - @ValueSource(strings = Array("kraft")) - def testInvalidPartition(quorum: String): Unit = { - // create topic with a single partition - createTopic(topic1, replicationFactor = numServers) - - // create a record with incorrect partition id (higher than the number of partitions), send should fail - val higherRecord = new ProducerRecord(topic1, 1, "key".getBytes, "value".getBytes) - val e = assertThrows(classOf[ExecutionException], () => producer1.send(higherRecord).get) - assertEquals(classOf[TimeoutException], e.getCause.getClass) - } - - /** - * The send call after producer closed should throw IllegalStateException - */ - @ParameterizedTest - @ValueSource(strings = Array("kraft")) - def testSendAfterClosed(quorum: String): Unit = { - // create topic - createTopic(topic1, replicationFactor = numServers) - - val record = new ProducerRecord[Array[Byte], Array[Byte]](topic1, null, "key".getBytes, "value".getBytes) - - // first send a message to make sure the metadata is refreshed - producer1.send(record).get - producer2.send(record).get - producer3.send(record).get - - producer1.close() - assertThrows(classOf[IllegalStateException], () => producer1.send(record)) - producer2.close() - assertThrows(classOf[IllegalStateException], () => producer2.send(record)) - producer3.close() - assertThrows(classOf[IllegalStateException], () => producer3.send(record)) - } - - @ParameterizedTest - @ValueSource(strings = Array("kraft")) - def testCannotSendToInternalTopic(quorum: String): Unit = { - - createOffsetsTopic() - val thrown = assertThrows(classOf[ExecutionException], - () => producer2.send(new ProducerRecord(Topic.GROUP_METADATA_TOPIC_NAME, "test".getBytes, "test".getBytes)).get) - assertTrue(thrown.getCause.isInstanceOf[InvalidTopicException], "Unexpected exception while sending to an invalid topic " + thrown.getCause) - } - - @ParameterizedTest - @ValueSource(strings = Array("kraft")) - def testNotEnoughReplicasAfterBrokerShutdown(quorum: String): Unit = { - val topicName = "minisrtest2" - val topicProps = new Properties() - topicProps.put("min.insync.replicas", numServers.toString) - - createTopic(topicName, replicationFactor = numServers, topicConfig = topicProps) - - val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes) - // this should work with all brokers up and running - producer3.send(record).get - - // shut down one broker - brokers.head.shutdown() - brokers.head.awaitShutdown() - val e = assertThrows(classOf[ExecutionException], () => producer3.send(record).get) - assertTrue(e.getCause.isInstanceOf[NotEnoughReplicasException] || - e.getCause.isInstanceOf[NotEnoughReplicasAfterAppendException] || - e.getCause.isInstanceOf[TimeoutException]) - - // restart the server - brokers.head.startup() - } - -} diff --git a/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala b/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala deleted file mode 100644 index 906de28e34f..00000000000 --- a/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala +++ /dev/null @@ -1,254 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.api - -import java.util -import java.util.{Collections, Properties} -import kafka.integration.KafkaServerTestHarness -import kafka.server.KafkaConfig -import kafka.utils.{TestInfoUtils, TestUtils} -import kafka.utils.TestUtils.{consumeRecords, createAdminClient} -import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry, ProducerState} -import org.apache.kafka.clients.consumer.Consumer -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} -import org.apache.kafka.common.{KafkaException, TopicPartition} -import org.apache.kafka.common.config.ConfigResource -import org.apache.kafka.common.errors.{InvalidPidMappingException, TransactionalIdNotFoundException} -import org.apache.kafka.coordinator.group.GroupCoordinatorConfig -import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig} -import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs} -import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} -import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.MethodSource -import org.opentest4j.AssertionFailedError - -import scala.collection.Seq - -class ProducerIdExpirationTest extends KafkaServerTestHarness { - val topic1 = "topic1" - val numPartitions = 1 - val replicationFactor = 3 - val tp0 = new TopicPartition(topic1, 0) - val configResource = new ConfigResource(ConfigResource.Type.BROKER, "") - - var producer: KafkaProducer[Array[Byte], Array[Byte]] = _ - var consumer: Consumer[Array[Byte], Array[Byte]] = _ - var admin: Admin = _ - - override def generateConfigs: Seq[KafkaConfig] = { - TestUtils.createBrokerConfigs(3).map(KafkaConfig.fromProps(_, serverProps())) - } - - @BeforeEach - override def setUp(testInfo: TestInfo): Unit = { - super.setUp(testInfo) - consumer = TestUtils.createConsumer(bootstrapServers(), - groupProtocolFromTestParameters(), - enableAutoCommit = false, - readCommitted = true) - admin = createAdminClient(brokers, listenerName) - - createTopic(topic1, numPartitions, 3) - } - - @AfterEach - override def tearDown(): Unit = { - if (producer != null) - producer.close() - if (consumer != null) - consumer.close() - if (admin != null) - admin.close() - - super.tearDown() - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testProducerIdExpirationWithNoTransactions(groupProtocol: String): Unit = { - producer = TestUtils.createProducer(bootstrapServers(), enableIdempotence = true) - - // Send records to populate producer state cache. - producer.send(new ProducerRecord(topic1, 0, null, "key".getBytes, "value".getBytes)) - producer.flush() - - // Ensure producer IDs are added. - ensureConsistentKRaftMetadata() - assertEquals(1, producerState.size) - - // Wait for the producer ID to expire. - TestUtils.waitUntilTrue(() => producerState.isEmpty, "Producer ID did not expire.") - - // Send more records to send producer ID back to brokers. - producer.send(new ProducerRecord(topic1, 0, null, "key".getBytes, "value".getBytes)) - producer.flush() - - // Producer IDs should repopulate. - assertEquals(1, producerState.size) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testTransactionAfterTransactionIdExpiresButProducerIdRemains(groupProtocol: String): Unit = { - producer = TestUtils.createTransactionalProducer("transactionalProducer", brokers) - producer.initTransactions() - - // Start and then abort a transaction to allow the producer ID to expire. - producer.beginTransaction() - producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "2", "2", willBeCommitted = false)) - producer.flush() - - // Ensure producer IDs are added. - TestUtils.waitUntilTrue(() => producerState.size == 1, "Producer IDs were not added.") - - producer.abortTransaction() - - // Wait for the transactional ID to expire. - waitUntilTransactionalStateExpires() - - // Producer IDs should be retained. - assertEquals(1, producerState.size) - - // Start a new transaction and attempt to send, triggering an AddPartitionsToTxnRequest that will fail - // due to the expired transactional ID, resulting in a fatal error. - producer.beginTransaction() - val failedFuture = producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "1", "1", willBeCommitted = false)) - TestUtils.waitUntilTrue(() => failedFuture.isDone, "Producer future never completed.") - org.apache.kafka.test.TestUtils.assertFutureThrows(classOf[InvalidPidMappingException], failedFuture) - - // Assert that aborting the transaction throws a KafkaException due to the fatal error. - assertThrows(classOf[KafkaException], () => producer.abortTransaction()) - - // Close the producer and reinitialize to recover from the fatal error. - producer.close() - producer = TestUtils.createTransactionalProducer("transactionalProducer", brokers) - producer.initTransactions() - - producer.beginTransaction() - producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "4", "4", willBeCommitted = true)) - producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "3", "3", willBeCommitted = true)) - - // Producer IDs should be retained. - assertTrue(producerState.size() > 0) - - producer.commitTransaction() - - // Check we can still consume the transaction. - consumer.subscribe(Collections.singletonList(topic1)) - - val records = consumeRecords(consumer, 2) - records.foreach { record => - TestUtils.assertCommittedAndGetValue(record) - } - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testDynamicProducerIdExpirationMs(groupProtocol: String): Unit = { - producer = TestUtils.createProducer(bootstrapServers(), enableIdempotence = true) - - // Send records to populate producer state cache. - producer.send(new ProducerRecord(topic1, 0, null, "key".getBytes, "value".getBytes)) - producer.flush() - - // Ensure producer IDs are added. - ensureConsistentKRaftMetadata() - assertEquals(1, producerState.size) - - // Wait for the producer ID to expire. - TestUtils.waitUntilTrue(() => producerState.isEmpty, "Producer ID did not expire.") - - // Update the producer ID expiration ms to a very high value. - admin.incrementalAlterConfigs(producerIdExpirationConfig("100000")) - - brokers.foreach(broker => TestUtils.waitUntilTrue(() => broker.logManager.producerStateManagerConfig.producerIdExpirationMs == 100000, "Configuration was not updated.")) - - // Send more records to send producer ID back to brokers. - producer.send(new ProducerRecord(topic1, 0, null, "key".getBytes, "value".getBytes)) - producer.flush() - - // Producer IDs should repopulate. - assertEquals(1, producerState.size) - - // Ensure producer ID does not expire within 4 seconds. - assertThrows(classOf[AssertionFailedError], () => - TestUtils.waitUntilTrue(() => producerState.isEmpty, "Producer ID did not expire.", 4000) - ) - - // Update the expiration time to a low value again. - admin.incrementalAlterConfigs(producerIdExpirationConfig("100")).all().get() - - // restart a broker to ensure that dynamic config changes are picked up on restart - killBroker(0) - restartDeadBrokers() - - brokers.foreach(broker => TestUtils.waitUntilTrue(() => broker.logManager.producerStateManagerConfig.producerIdExpirationMs == 100, "Configuration was not updated.")) - - // Ensure producer ID expires quickly again. - TestUtils.waitUntilTrue(() => producerState.isEmpty, "Producer ID did not expire.") - } - - private def producerState: util.List[ProducerState] = { - val describeResult = admin.describeProducers(Collections.singletonList(tp0)) - val activeProducers = describeResult.partitionResult(tp0).get().activeProducers - activeProducers - } - - private def producerIdExpirationConfig(configValue: String): util.Map[ConfigResource, util.Collection[AlterConfigOp]] = { - val producerIdCfg = new ConfigEntry(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG, configValue) - val configs = Collections.singletonList(new AlterConfigOp(producerIdCfg, AlterConfigOp.OpType.SET)) - Collections.singletonMap(configResource, configs) - } - - private def waitUntilTransactionalStateExpires(): Unit = { - TestUtils.waitUntilTrue(() => { - var removedTransactionState = false - val txnDescribeResult = admin.describeTransactions(Collections.singletonList("transactionalProducer")).description("transactionalProducer") - try { - txnDescribeResult.get() - } catch { - case e: Exception => { - removedTransactionState = e.getCause.isInstanceOf[TransactionalIdNotFoundException] - } - } - removedTransactionState - }, "Transaction state never expired.") - } - - private def serverProps(): Properties = { - val serverProps = new Properties() - serverProps.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, false.toString) - // Set a smaller value for the number of partitions for the __consumer_offsets topic - // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long. - serverProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 1.toString) - serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 3.toString) - serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 2.toString) - serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 2.toString) - serverProps.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, true.toString) - serverProps.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, false.toString) - serverProps.put(ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG, false.toString) - serverProps.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0") - serverProps.put(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, "200") - serverProps.put(TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, "5000") - serverProps.put(TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG, "500") - serverProps.put(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG, "10000") - serverProps.put(TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, "500") - serverProps - } -}