KAFKA-19042 Move ProducerCompressionTest, ProducerFailureHandlingTest, and ProducerIdExpirationTest to client-integration-tests module (#19319)

include three test case
- ProducerCompressionTest
- ProducerFailureHandlingTest
- ProducerIdExpirationTest

Reviewers: Ken Huang <s7133700@gmail.com>, PoAn Yang
<payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Xuan-Zhang Gong 2025-04-15 16:34:47 +08:00 committed by GitHub
parent 321a380d0a
commit c527530e80
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 780 additions and 688 deletions

View File

@ -19,12 +19,16 @@
-->
<import-control pkg="org.apache.kafka">
<allow pkg="java" />
<allow pkg="org.junit" />
<allow pkg="java"/>
<allow pkg="org.junit"/>
<!-- These are tests, allow whatever -->
<allow pkg="org.apache.kafka"/>
<allow pkg="org.junit" />
<allow pkg="kafka"/>
<!-- These are tests, allow whatever -->
<allow pkg="org.apache.kafka"/>
<allow pkg="org.junit"/>
<allow pkg="kafka"/>
<subpackage name="clients.producer">
<allow pkg="org.opentest4j"/>
</subpackage>
</import-control>

View File

@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.producer;
import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.test.TestUtils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static kafka.utils.TestUtils.consumeRecords;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.fail;
@ClusterTestDefaults(types = {Type.KRAFT})
class ProducerCompressionTest {
private final String topicName = "topic";
private final int numRecords = 2000;
/**
* testCompression
* <p>
* Compressed messages should be able to sent and consumed correctly
*/
@ClusterTest
void testCompression(ClusterInstance cluster) throws ExecutionException, InterruptedException {
for (CompressionType compression : CompressionType.values()) {
processCompressionTest(cluster, compression);
}
}
void processCompressionTest(ClusterInstance cluster, CompressionType compression) throws InterruptedException,
ExecutionException {
String compressionTopic = topicName + "_" + compression.name;
cluster.createTopic(compressionTopic, 1, (short) 1);
Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression.name);
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000");
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "200");
Consumer<byte[], byte[]> classicConsumer = cluster.consumer(Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic"));
Consumer<byte[], byte[]> consumer = cluster.consumer(Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer"));
try (Producer<byte[], byte[]> producer = cluster.producer(producerProps)) {
int partition = 0;
// prepare the messages
List<String> messages = IntStream.range(0, numRecords).mapToObj(this::messageValue).toList();
Header[] headerArr = new Header[]{new RecordHeader("key", "value".getBytes())};
RecordHeaders headers = new RecordHeaders(headerArr);
// make sure the returned messages are correct
long now = System.currentTimeMillis();
List<Future<RecordMetadata>> responses = new ArrayList<>();
messages.forEach(message -> {
// 1. send message without key and header
responses.add(producer.send(new ProducerRecord<>(compressionTopic, null, now, null,
message.getBytes())));
// 2. send message with key, without header
responses.add(producer.send(new ProducerRecord<>(compressionTopic, null, now,
String.valueOf(message.length()).getBytes(), message.getBytes())));
// 3. send message with key and header
responses.add(producer.send(new ProducerRecord<>(compressionTopic, null, now,
String.valueOf(message.length()).getBytes(), message.getBytes(), headers)));
});
for (int offset = 0; offset < responses.size(); offset++) {
assertEquals(offset, responses.get(offset).get().offset(), compression.name);
}
verifyConsumerRecords(consumer, messages, now, headerArr, partition, compressionTopic, compression.name);
verifyConsumerRecords(classicConsumer, messages, now, headerArr, partition, compressionTopic,
compression.name);
} finally {
// This consumer close very slowly, which may cause the entire test to time out, and we can't wait for
// it to auto close
consumer.close(CloseOptions.timeout(Duration.ofSeconds(1)));
classicConsumer.close(CloseOptions.timeout(Duration.ofSeconds(1)));
}
}
private void verifyConsumerRecords(Consumer<byte[], byte[]> consumer, List<String> messages, long now,
Header[] headerArr, int partition, String topic, String compression) {
TopicPartition tp = new TopicPartition(topic, partition);
consumer.assign(List.of(tp));
consumer.seek(tp, 0);
AtomicInteger num = new AtomicInteger(0);
AtomicInteger flag = new AtomicInteger(0);
consumeRecords(consumer, numRecords * 3, TestUtils.DEFAULT_MAX_WAIT_MS).foreach(record -> {
String messageValue = messages.get(num.get());
long offset = num.get() * 3L + flag.get();
if (flag.get() == 0) {
// verify message without key and header
assertNull(record.key(), errorMessage(compression));
assertEquals(messageValue, new String(record.value()), errorMessage(compression));
assertEquals(0, record.headers().toArray().length, errorMessage(compression));
assertEquals(now, record.timestamp(), errorMessage(compression));
assertEquals(offset, record.offset(), errorMessage(compression));
} else if (flag.get() == 1) {
// verify message with key, without header
assertEquals(String.valueOf(messageValue.length()), new String(record.key()), errorMessage(compression));
assertEquals(messageValue, new String(record.value()), errorMessage(compression));
assertEquals(0, record.headers().toArray().length, errorMessage(compression));
assertEquals(now, record.timestamp(), errorMessage(compression));
assertEquals(offset, record.offset(), errorMessage(compression));
} else if (flag.get() == 2) {
// verify message with key and header
assertEquals(String.valueOf(messageValue.length()), new String(record.key()), errorMessage(compression));
assertEquals(messageValue, new String(record.value()), errorMessage(compression));
assertEquals(1, record.headers().toArray().length, errorMessage(compression));
assertEquals(headerArr[0], record.headers().toArray()[0], errorMessage(compression));
assertEquals(now, record.timestamp(), errorMessage(compression));
assertEquals(offset, record.offset(), errorMessage(compression));
} else {
fail();
}
flagLoop(num, flag);
return null;
});
}
private void flagLoop(AtomicInteger num, AtomicInteger flag) {
if (flag.get() == 2) {
num.incrementAndGet();
flag.set(0);
} else {
flag.incrementAndGet();
}
}
private String messageValue(int length) {
Random random = new Random();
return IntStream.range(0, length)
.map(i -> random.nextInt(TestUtils.LETTERS_AND_DIGITS.length()))
.mapToObj(TestUtils.LETTERS_AND_DIGITS::charAt)
.map(String::valueOf)
.collect(Collectors.joining());
}
private String errorMessage(String compression) {
return String.format("Compression type: %s - Assertion failed", compression);
}
}

View File

@ -0,0 +1,305 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.producer;
import kafka.server.KafkaBroker;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.NotEnoughReplicasAfterAppendException;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.record.DefaultRecord;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.Type;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
import static org.apache.kafka.server.config.ReplicationConfigs.REPLICA_FETCH_MAX_BYTES_CONFIG;
import static org.apache.kafka.server.config.ReplicationConfigs.REPLICA_FETCH_RESPONSE_MAX_BYTES_DOC;
import static org.apache.kafka.server.config.ServerConfigs.MESSAGE_MAX_BYTES_CONFIG;
import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ClusterTestDefaults(
types = {Type.KRAFT},
brokers = 2,
serverProperties = {
@ClusterConfigProperty(key = AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"),
// 15000 is filed serverMessageMaxBytes
@ClusterConfigProperty(key = MESSAGE_MAX_BYTES_CONFIG, value = "15000"),
// 15200 is filed replicaFetchMaxBytes
@ClusterConfigProperty(key = REPLICA_FETCH_MAX_BYTES_CONFIG, value = "15200"),
// 15400 is filed replicaFetchMaxResponseBytes
@ClusterConfigProperty(key = REPLICA_FETCH_RESPONSE_MAX_BYTES_DOC, value = "15400"),
// Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic)
// so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
}
)
public class ProducerFailureHandlingTest {
private final int producerBufferSize = 30000;
private final int serverMessageMaxBytes = producerBufferSize / 2;
private final int replicaFetchMaxPartitionBytes = serverMessageMaxBytes + 200;
private final int replicaFetchMaxResponseBytes = replicaFetchMaxPartitionBytes + 200;
private final String topic1 = "topic-1";
private final String topic2 = "topic-2";
/**
* With ack == 0 the future metadata will have no exceptions with offset -1
*/
@ClusterTest
void testTooLargeRecordWithAckZero(ClusterInstance clusterInstance) throws InterruptedException,
ExecutionException {
clusterInstance.createTopic(topic1, 1, (short) clusterInstance.brokers().size());
try (Producer<byte[], byte[]> producer = clusterInstance.producer(producerConfig(0))) {
// send a too-large record
ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>(topic1, null, "key".getBytes(), new byte[serverMessageMaxBytes + 1]);
RecordMetadata recordMetadata = producer.send(record).get();
assertNotNull(recordMetadata);
assertFalse(recordMetadata.hasOffset());
assertEquals(-1L, recordMetadata.offset());
}
}
/**
* With ack == 1 the future metadata will throw ExecutionException caused by RecordTooLargeException
*/
@ClusterTest
void testTooLargeRecordWithAckOne(ClusterInstance clusterInstance) throws InterruptedException {
clusterInstance.createTopic(topic1, 1, (short) clusterInstance.brokers().size());
try (Producer<byte[], byte[]> producer = clusterInstance.producer(producerConfig(1))) {
// send a too-large record
ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>(topic1, null, "key".getBytes(), new byte[serverMessageMaxBytes + 1]);
assertThrows(ExecutionException.class, () -> producer.send(record).get());
}
}
/**
* This should succeed as the replica fetcher thread can handle oversized messages since KIP-74
*/
@ClusterTest
void testPartitionTooLargeForReplicationWithAckAll(ClusterInstance clusterInstance) throws InterruptedException,
ExecutionException {
checkTooLargeRecordForReplicationWithAckAll(clusterInstance, replicaFetchMaxPartitionBytes);
}
/**
* This should succeed as the replica fetcher thread can handle oversized messages since KIP-74
*/
@ClusterTest
void testResponseTooLargeForReplicationWithAckAll(ClusterInstance clusterInstance) throws InterruptedException,
ExecutionException {
checkTooLargeRecordForReplicationWithAckAll(clusterInstance, replicaFetchMaxResponseBytes);
}
/**
* With non-exist-topic the future metadata should return ExecutionException caused by TimeoutException
*/
@ClusterTest
void testNonExistentTopic(ClusterInstance clusterInstance) {
// send a record with non-exist topic
ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>(topic2, null, "key".getBytes(), "value".getBytes());
try (Producer<byte[], byte[]> producer = clusterInstance.producer(producerConfig(0))) {
assertThrows(ExecutionException.class, () -> producer.send(record).get());
}
}
/**
* With incorrect broker-list the future metadata should return ExecutionException caused by TimeoutException
*/
@ClusterTest
void testWrongBrokerList(ClusterInstance clusterInstance) throws InterruptedException {
clusterInstance.createTopic(topic1, 1, (short) 1);
// producer with incorrect broker list
Map<String, Object> producerConfig = new HashMap<>(producerConfig(1));
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8686,localhost:4242");
try (Producer<byte[], byte[]> producer = clusterInstance.producer(producerConfig)) {
// send a record with incorrect broker list
ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>(topic1, null, "key".getBytes(), "value".getBytes());
assertThrows(ExecutionException.class, () -> producer.send(record).get());
}
}
/**
* Send with invalid partition id should return ExecutionException caused by TimeoutException
* when partition is higher than the upper bound of partitions.
*/
@ClusterTest
void testInvalidPartition(ClusterInstance clusterInstance) throws InterruptedException {
// create topic with a single partition
clusterInstance.createTopic(topic1, 1, (short) clusterInstance.brokers().size());
// create a record with incorrect partition id (higher than the number of partitions), send should fail
try (Producer<byte[], byte[]> producer = clusterInstance.producer(producerConfig(0))) {
ProducerRecord<byte[], byte[]> higherRecord =
new ProducerRecord<>(topic1, 1, "key".getBytes(), "value".getBytes());
Exception e = assertThrows(ExecutionException.class, () -> producer.send(higherRecord).get());
assertEquals(TimeoutException.class, e.getCause().getClass());
}
}
/**
* The send call after producer closed should throw IllegalStateException
*/
@ClusterTest
void testSendAfterClosed(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException {
// create topic
clusterInstance.createTopic(topic1, 1, (short) clusterInstance.brokers().size());
Producer<byte[], byte[]> producer1 = clusterInstance.producer(producerConfig(0));
Producer<byte[], byte[]> producer2 = clusterInstance.producer(producerConfig(1));
Producer<byte[], byte[]> producer3 = clusterInstance.producer(producerConfig(-1));
ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>(topic1, null, "key".getBytes(), "value".getBytes());
// first send a message to make sure the metadata is refreshed
producer1.send(record).get();
producer2.send(record).get();
producer3.send(record).get();
producer1.close();
assertThrows(IllegalStateException.class, () -> producer1.send(record));
producer2.close();
assertThrows(IllegalStateException.class, () -> producer2.send(record));
producer3.close();
assertThrows(IllegalStateException.class, () -> producer3.send(record));
}
@ClusterTest
void testCannotSendToInternalTopic(ClusterInstance clusterInstance) throws InterruptedException {
try (Admin admin = clusterInstance.admin()) {
Map<String, String> topicConfig = new HashMap<>();
clusterInstance.brokers().get(0)
.groupCoordinator()
.groupMetadataTopicConfigs()
.forEach((k, v) -> topicConfig.put(k.toString(), v.toString()));
admin.createTopics(List.of(new NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, 1, (short) 1).configs(topicConfig)));
clusterInstance.waitForTopic(Topic.GROUP_METADATA_TOPIC_NAME, 0);
}
try (Producer<byte[], byte[]> producer = clusterInstance.producer(producerConfig(1))) {
Exception thrown = assertThrows(ExecutionException.class,
() -> producer.send(new ProducerRecord<>(Topic.GROUP_METADATA_TOPIC_NAME, "test".getBytes(),
"test".getBytes())).get());
assertInstanceOf(InvalidTopicException.class, thrown.getCause(),
() -> "Unexpected exception while sending to an invalid topic " + thrown.getCause());
}
}
@ClusterTest
void testNotEnoughReplicasAfterBrokerShutdown(ClusterInstance clusterInstance) throws InterruptedException,
ExecutionException {
String topicName = "minisrtest2";
int brokerNum = clusterInstance.brokers().size();
Map<String, String> topicConfig = Map.of(MIN_IN_SYNC_REPLICAS_CONFIG, String.valueOf(brokerNum));
try (Admin admin = clusterInstance.admin()) {
admin.createTopics(List.of(new NewTopic(topicName, 1, (short) brokerNum).configs(topicConfig)));
}
ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>(topicName, null, "key".getBytes(), "value".getBytes());
try (Producer<byte[], byte[]> producer = clusterInstance.producer(producerConfig(-1))) {
// this should work with all brokers up and running
producer.send(record).get();
// shut down one broker
KafkaBroker oneBroker = clusterInstance.brokers().get(0);
oneBroker.shutdown();
oneBroker.awaitShutdown();
Exception e = assertThrows(ExecutionException.class, () -> producer.send(record).get());
assertTrue(e.getCause() instanceof NotEnoughReplicasException ||
e.getCause() instanceof NotEnoughReplicasAfterAppendException ||
e.getCause() instanceof TimeoutException);
// restart the server
oneBroker.startup();
}
}
private void checkTooLargeRecordForReplicationWithAckAll(ClusterInstance clusterInstance, int maxFetchSize) throws InterruptedException, ExecutionException {
int maxMessageSize = maxFetchSize + 100;
int brokerSize = clusterInstance.brokers().size();
Map<String, String> topicConfig = new HashMap<>();
topicConfig.put(MIN_IN_SYNC_REPLICAS_CONFIG, String.valueOf(brokerSize));
topicConfig.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(maxMessageSize));
// create topic
String topic10 = "topic10";
try (Admin admin = clusterInstance.admin()) {
admin.createTopics(List.of(new NewTopic(topic10, brokerSize, (short) brokerSize).configs(topicConfig)));
clusterInstance.waitTopicDeletion("topic10");
}
// send a record that is too large for replication, but within the broker max message limit
byte[] value =
new byte[maxMessageSize - DefaultRecordBatch.RECORD_BATCH_OVERHEAD - DefaultRecord.MAX_RECORD_OVERHEAD];
Producer<byte[], byte[]> producer = clusterInstance.producer(producerConfig(-1));
try (producer) {
ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(topic10, null, value);
RecordMetadata recordMetadata = producer.send(producerRecord).get();
assertEquals(topic10, recordMetadata.topic());
}
}
private Map<String, Object> producerConfig(int acks) {
return Map.of(
ACKS_CONFIG, String.valueOf(acks),
RETRIES_CONFIG, 0,
REQUEST_TIMEOUT_MS_CONFIG, 30000,
MAX_BLOCK_MS_CONFIG, 10000,
BUFFER_MEMORY_CONFIG, producerBufferSize);
}
}

View File

@ -0,0 +1,287 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.producer;
import kafka.server.KafkaBroker;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ProducerState;
import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidPidMappingException;
import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.opentest4j.AssertionFailedError;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static kafka.utils.TestUtils.consumeRecords;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG;
import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG;
import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG;
import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG;
import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG;
import static org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG;
import static org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG;
import static org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG;
import static org.apache.kafka.server.config.ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG;
import static org.apache.kafka.server.config.ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG;
import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ClusterTestDefaults(
brokers = 3,
serverProperties = {
@ClusterConfigProperty(key = AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"),
// Set a smaller value for the number of partitions for the __consumer_offsets topic
// so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively
// long.
@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
@ClusterConfigProperty(key = TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "3"),
@ClusterConfigProperty(key = TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2"),
@ClusterConfigProperty(key = TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "2"),
@ClusterConfigProperty(key = CONTROLLED_SHUTDOWN_ENABLE_CONFIG, value = "true"),
// ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG is not a constant
@ClusterConfigProperty(key = "unclean.leader.election.enable", value = "false"),
@ClusterConfigProperty(key = AUTO_LEADER_REBALANCE_ENABLE_CONFIG, value = "false"),
@ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "0"),
@ClusterConfigProperty(key = TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, value =
"200"),
@ClusterConfigProperty(key = TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, value = "5000"),
@ClusterConfigProperty(key = TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG, value =
"500"),
@ClusterConfigProperty(key = PRODUCER_ID_EXPIRATION_MS_CONFIG, value = "10000"),
@ClusterConfigProperty(key = PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, value = "500")
}
)
public class ProducerIdExpirationTest {
private final String topic1 = "topic1";
private final int numPartitions = 1;
private final short replicationFactor = 3;
private final TopicPartition tp0 = new TopicPartition(topic1, 0);
private final String transactionalId = "transactionalProducer";
private final ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "");
@ClusterTest
void testProducerIdExpirationWithNoTransactions(ClusterInstance cluster) throws InterruptedException, ExecutionException {
cluster.createTopic(topic1, numPartitions, replicationFactor);
Producer<byte[], byte[]> producer = cluster.producer(Map.of(ENABLE_IDEMPOTENCE_CONFIG, true));
// Send records to populate producer state cache.
producer.send(new ProducerRecord<>(topic1, 0, null, "key".getBytes(), "value".getBytes()));
producer.flush();
try (Admin admin = cluster.admin(); producer) {
assertEquals(1, producerStates(admin).size());
waitProducerIdExpire(admin);
// Send more records to send producer ID back to brokers.
producer.send(new ProducerRecord<>(topic1, 0, null, "key".getBytes(), "value".getBytes()));
producer.flush();
// Producer IDs should repopulate.
assertEquals(1, producerStates(admin).size());
}
}
@ClusterTest
void testTransactionAfterTransactionIdExpiresButProducerIdRemains(ClusterInstance cluster) throws InterruptedException, ExecutionException {
cluster.createTopic(topic1, numPartitions, replicationFactor);
Producer<byte[], byte[]> producer = cluster.producer(transactionalProducerConfig());
producer.initTransactions();
// Start and then abort a transaction to allow the producer ID to expire.
producer.beginTransaction();
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "2", "2", false));
producer.flush();
Consumer<byte[], byte[]> consumer = cluster.consumer(Map.of(ISOLATION_LEVEL_CONFIG, "read_committed"));
try (Admin admin = cluster.admin()) {
// Ensure producer IDs are added.
TestUtils.waitUntilTrue(() -> {
try {
return producerStates(admin).size() == 1;
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}, () -> "Producer IDs were not added.", DEFAULT_MAX_WAIT_MS, 100);
producer.abortTransaction();
// Wait for the transactional ID to expire.
waitUntilTransactionalStateExpires(admin);
// Producer IDs should be retained.
assertEquals(1, producerStates(admin).size());
// Start a new transaction and attempt to send, triggering an AddPartitionsToTxnRequest that will fail
// due to the expired transactional ID, resulting in a fatal error.
producer.beginTransaction();
Future<RecordMetadata> failedFuture =
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "1", "1", false));
TestUtils.waitUntilTrue(failedFuture::isDone, () -> "Producer future never completed.",
DEFAULT_MAX_WAIT_MS, 100);
assertFutureThrows(InvalidPidMappingException.class, failedFuture);
// Assert that aborting the transaction throws a KafkaException due to the fatal error.
assertThrows(KafkaException.class, producer::abortTransaction);
// Close the producer and reinitialize to recover from the fatal error.
producer.close();
producer = cluster.producer(transactionalProducerConfig());
producer.initTransactions();
producer.beginTransaction();
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "4", "4", true));
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "3", "3", true));
// Producer IDs should be retained.
assertFalse(producerStates(admin).isEmpty());
producer.commitTransaction();
// Check we can still consume the transaction.
consumer.subscribe(List.of(topic1));
consumeRecords(consumer, 2, DEFAULT_MAX_WAIT_MS).foreach(TestUtils::assertCommittedAndGetValue);
} finally {
producer.close();
consumer.close(CloseOptions.timeout(Duration.ofSeconds(1)));
}
}
@ClusterTest
void testDynamicProducerIdExpirationMs(ClusterInstance cluster) throws InterruptedException, ExecutionException {
cluster.createTopic(topic1, numPartitions, replicationFactor);
Producer<byte[], byte[]> producer = cluster.producer(Map.of(ENABLE_IDEMPOTENCE_CONFIG, true));
// Send records to populate producer state cache.
producer.send(new ProducerRecord<>(topic1, 0, null, "key".getBytes(), "value".getBytes()));
producer.flush();
try (Admin admin = cluster.admin(); producer) {
assertEquals(1, producerStates(admin).size());
waitProducerIdExpire(admin);
// Update the producer ID expiration ms to a very high value.
admin.incrementalAlterConfigs(producerIdExpirationConfig("100000"));
cluster.brokers().values().forEach(broker -> {
TestUtils.waitUntilTrue(() -> broker.logManager().producerStateManagerConfig().producerIdExpirationMs() == 100000,
() -> "Configuration was not updated.", DEFAULT_MAX_WAIT_MS, 100);
});
// Send more records to send producer ID back to brokers.
producer.send(new ProducerRecord<>(topic1, 0, null, "key".getBytes(), "value".getBytes()));
producer.flush();
// Producer IDs should repopulate.
assertEquals(1, producerStates(admin).size());
// Ensure producer ID does not expire within 4 seconds.
assertThrows(AssertionFailedError.class, () -> waitProducerIdExpire(admin, TimeUnit.SECONDS.toMillis(4)));
// Update the expiration time to a low value again.
admin.incrementalAlterConfigs(producerIdExpirationConfig("100")).all().get();
KafkaBroker kafkaBroker = cluster.brokers().get(0);
kafkaBroker.shutdown(Duration.ofMinutes(5));
kafkaBroker.awaitShutdown();
kafkaBroker.startup();
cluster.waitForReadyBrokers();
cluster.brokers().values().forEach(broker -> {
TestUtils.waitUntilTrue(() -> broker.logManager().producerStateManagerConfig().producerIdExpirationMs() == 100,
() -> "Configuration was not updated.", DEFAULT_MAX_WAIT_MS, 100);
});
// Ensure producer ID expires quickly again.
waitProducerIdExpire(admin);
}
}
private void waitProducerIdExpire(Admin admin) {
waitProducerIdExpire(admin, DEFAULT_MAX_WAIT_MS);
}
private void waitProducerIdExpire(Admin admin, long timeout) {
TestUtils.waitUntilTrue(() -> {
try {
return producerStates(admin).isEmpty();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}, () -> "Producer ID expired.", timeout, 100);
}
private Map<ConfigResource, Collection<AlterConfigOp>> producerIdExpirationConfig(String configValue) {
ConfigEntry producerIdCfg = new ConfigEntry(PRODUCER_ID_EXPIRATION_MS_CONFIG, configValue);
return Map.of(configResource, List.of(new AlterConfigOp(producerIdCfg, AlterConfigOp.OpType.SET)));
}
private void waitUntilTransactionalStateExpires(Admin admin) {
TestUtils.waitUntilTrue(() -> {
boolean removedTransactionState = false;
try {
admin.describeTransactions(List.of(transactionalId))
.description(transactionalId)
.get();
} catch (Exception e) {
removedTransactionState = e.getCause() instanceof TransactionalIdNotFoundException;
}
return removedTransactionState;
}, () -> "Transaction state never expired.", DEFAULT_MAX_WAIT_MS, 100);
}
private Map<String, Object> transactionalProducerConfig() {
return Map.of(
ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId,
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true,
ProducerConfig.ACKS_CONFIG, "all");
}
private List<ProducerState> producerStates(Admin admin) throws ExecutionException, InterruptedException {
return admin.describeProducers(Collections.singletonList(tp0))
.partitionResult(tp0)
.get()
.activeProducers();
}
}

View File

@ -1,167 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api.test
import kafka.server.{KafkaBroker, KafkaConfig, QuorumTestHarness}
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.GroupProtocol
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.header.Header
import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource
import java.util.concurrent.Future
import java.util.{Collections, Properties}
import scala.collection.mutable.ListBuffer
import scala.util.Random
class ProducerCompressionTest extends QuorumTestHarness {
private val brokerId = 0
private val topic = "topic"
private val numRecords = 2000
private var broker: KafkaBroker = _
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
val props = TestUtils.createBrokerConfig(brokerId)
broker = createBroker(new KafkaConfig(props))
}
@AfterEach
override def tearDown(): Unit = {
TestUtils.shutdownServers(Seq(broker))
super.tearDown()
}
/**
* testCompression
*
* Compressed messages should be able to sent and consumed correctly
*/
@ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}.compression={2}")
@CsvSource(value = Array(
"kraft,classic,none",
"kraft,consumer,none",
"kraft,classic,gzip",
"kraft,consumer,gzip",
"kraft,classic,snappy",
"kraft,consumer,snappy",
"kraft,classic,lz4",
"kraft,consumer,lz4",
"kraft,classic,zstd",
"kraft,consumer,zstd"
))
def testCompression(quorum: String, groupProtocol: String, compression: String): Unit = {
val producerProps = new Properties()
val bootstrapServers = TestUtils.plaintextBootstrapServers(Seq(broker))
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression)
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000")
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "200")
val producer = new KafkaProducer(producerProps, new ByteArraySerializer, new ByteArraySerializer)
val consumer = TestUtils.createConsumer(bootstrapServers, GroupProtocol.of(groupProtocol))
try {
// create topic
val admin = TestUtils.createAdminClient(Seq(broker),
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
try {
TestUtils.createTopicWithAdmin(admin, topic, Seq(broker), controllerServers)
} finally {
admin.close()
}
val partition = 0
def messageValue(length: Int): String = {
val random = new Random(0)
new String(random.alphanumeric.take(length).toArray)
}
// prepare the messages
val messageValues = (0 until numRecords).map(i => messageValue(i))
val headerArr = Array[Header](new RecordHeader("key", "value".getBytes))
val headers = new RecordHeaders(headerArr)
// make sure the returned messages are correct
val now = System.currentTimeMillis()
val responses: ListBuffer[Future[RecordMetadata]] = new ListBuffer[Future[RecordMetadata]]()
for (message <- messageValues) {
// 1. send message without key and header
responses += producer.send(new ProducerRecord(topic, null, now, null, message.getBytes))
// 2. send message with key, without header
responses += producer.send(new ProducerRecord(topic, null, now, message.length.toString.getBytes, message.getBytes))
// 3. send message with key and header
responses += producer.send(new ProducerRecord(topic, null, now, message.length.toString.getBytes, message.getBytes, headers))
}
for ((future, offset) <- responses.zipWithIndex) {
assertEquals(offset.toLong, future.get.offset)
}
val tp = new TopicPartition(topic, partition)
// make sure the fetched message count match
consumer.assign(Collections.singleton(tp))
consumer.seek(tp, 0)
val records = TestUtils.consumeRecords(consumer, numRecords*3)
for (i <- 0 until numRecords) {
val messageValue = messageValues(i)
// 1. verify message without key and header
var offset = i * 3
var record = records(offset)
assertNull(record.key())
assertEquals(messageValue, new String(record.value))
assertEquals(0, record.headers().toArray.length)
assertEquals(now, record.timestamp)
assertEquals(offset.toLong, record.offset)
// 2. verify message with key, without header
offset = i * 3 + 1
record = records(offset)
assertEquals(messageValue.length.toString, new String(record.key()))
assertEquals(messageValue, new String(record.value))
assertEquals(0, record.headers().toArray.length)
assertEquals(now, record.timestamp)
assertEquals(offset.toLong, record.offset)
// 3. verify message with key and header
offset = i * 3 + 2
record = records(offset)
assertEquals(messageValue.length.toString, new String(record.key()))
assertEquals(messageValue, new String(record.value))
assertEquals(1, record.headers().toArray.length)
assertEquals(headerArr.apply(0), record.headers().toArray.apply(0))
assertEquals(now, record.timestamp)
assertEquals(offset.toLong, record.offset)
}
} finally {
producer.close()
consumer.close()
}
}
}

View File

@ -1,261 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import java.util.concurrent.ExecutionException
import java.util.Properties
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch}
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.server.config.{ServerConfigs, ReplicationConfigs, ServerLogConfigs}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{MethodSource, ValueSource}
class ProducerFailureHandlingTest extends KafkaServerTestHarness {
private val producerBufferSize = 30000
private val serverMessageMaxBytes = producerBufferSize/2
private val replicaFetchMaxPartitionBytes = serverMessageMaxBytes + 200
private val replicaFetchMaxResponseBytes = replicaFetchMaxPartitionBytes + 200
val numServers = 2
val overridingProps = new Properties()
overridingProps.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, false.toString)
overridingProps.put(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG, serverMessageMaxBytes.toString)
overridingProps.put(ReplicationConfigs.REPLICA_FETCH_MAX_BYTES_CONFIG, replicaFetchMaxPartitionBytes.toString)
overridingProps.put(ReplicationConfigs.REPLICA_FETCH_RESPONSE_MAX_BYTES_DOC, replicaFetchMaxResponseBytes.toString)
// Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic)
// so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 1.toString)
def generateConfigs =
TestUtils.createBrokerConfigs(numServers, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
private var producer1: KafkaProducer[Array[Byte], Array[Byte]] = _
private var producer2: KafkaProducer[Array[Byte], Array[Byte]] = _
private var producer3: KafkaProducer[Array[Byte], Array[Byte]] = _
private var producer4: KafkaProducer[Array[Byte], Array[Byte]] = _
private val topic1 = "topic-1"
private val topic2 = "topic-2"
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
producer1 = TestUtils.createProducer(bootstrapServers(), acks = 0, retries = 0, requestTimeoutMs = 30000, maxBlockMs = 10000L,
bufferSize = producerBufferSize)
producer2 = TestUtils.createProducer(bootstrapServers(), acks = 1, retries = 0, requestTimeoutMs = 30000, maxBlockMs = 10000L,
bufferSize = producerBufferSize)
producer3 = TestUtils.createProducer(bootstrapServers(), acks = -1, retries = 0, requestTimeoutMs = 30000, maxBlockMs = 10000L,
bufferSize = producerBufferSize)
}
@AfterEach
override def tearDown(): Unit = {
if (producer1 != null) producer1.close()
if (producer2 != null) producer2.close()
if (producer3 != null) producer3.close()
if (producer4 != null) producer4.close()
super.tearDown()
}
/**
* With ack == 0 the future metadata will have no exceptions with offset -1
*/
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def testTooLargeRecordWithAckZero(groupProtocol: String): Unit = {
// create topic
createTopic(topic1, replicationFactor = numServers)
// send a too-large record
val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1))
val recordMetadata = producer1.send(record).get()
assertNotNull(recordMetadata)
assertFalse(recordMetadata.hasOffset)
assertEquals(-1L, recordMetadata.offset)
}
/**
* With ack == 1 the future metadata will throw ExecutionException caused by RecordTooLargeException
*/
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def testTooLargeRecordWithAckOne(groupProtocol: String): Unit = {
// create topic
createTopic(topic1, replicationFactor = numServers)
// send a too-large record
val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1))
assertThrows(classOf[ExecutionException], () => producer2.send(record).get)
}
private def checkTooLargeRecordForReplicationWithAckAll(maxFetchSize: Int): Unit = {
val maxMessageSize = maxFetchSize + 100
val topicConfig = new Properties
topicConfig.setProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, numServers.toString)
topicConfig.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, maxMessageSize.toString)
// create topic
val topic10 = "topic10"
createTopic(topic10, numPartitions = brokers.size, replicationFactor = numServers, topicConfig)
// send a record that is too large for replication, but within the broker max message limit
val value = new Array[Byte](maxMessageSize - DefaultRecordBatch.RECORD_BATCH_OVERHEAD - DefaultRecord.MAX_RECORD_OVERHEAD)
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic10, null, value)
val recordMetadata = producer3.send(record).get
assertEquals(topic10, recordMetadata.topic)
}
/** This should succeed as the replica fetcher thread can handle oversized messages since KIP-74 */
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testPartitionTooLargeForReplicationWithAckAll(quorum: String): Unit = {
checkTooLargeRecordForReplicationWithAckAll(replicaFetchMaxPartitionBytes)
}
/** This should succeed as the replica fetcher thread can handle oversized messages since KIP-74 */
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testResponseTooLargeForReplicationWithAckAll(quorum: String): Unit = {
checkTooLargeRecordForReplicationWithAckAll(replicaFetchMaxResponseBytes)
}
/**
* With non-exist-topic the future metadata should return ExecutionException caused by TimeoutException
*/
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def testNonExistentTopic(groupProtocol: String): Unit = {
// send a record with non-exist topic
val record = new ProducerRecord(topic2, null, "key".getBytes, "value".getBytes)
assertThrows(classOf[ExecutionException], () => producer1.send(record).get)
}
/**
* With incorrect broker-list the future metadata should return ExecutionException caused by TimeoutException
*
* TODO: other exceptions that can be thrown in ExecutionException:
* UnknownTopicOrPartitionException
* NotLeaderOrFollowerException
* LeaderNotAvailableException
* CorruptRecordException
* TimeoutException
*/
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testWrongBrokerList(quorum: String): Unit = {
// create topic
createTopic(topic1, replicationFactor = numServers)
// producer with incorrect broker list
producer4 = TestUtils.createProducer("localhost:8686,localhost:4242", acks = 1, maxBlockMs = 10000L, bufferSize = producerBufferSize)
// send a record with incorrect broker list
val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
assertThrows(classOf[ExecutionException], () => producer4.send(record).get)
}
/**
* Send with invalid partition id should return ExecutionException caused by TimeoutException
* when partition is higher than the upper bound of partitions.
*/
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testInvalidPartition(quorum: String): Unit = {
// create topic with a single partition
createTopic(topic1, replicationFactor = numServers)
// create a record with incorrect partition id (higher than the number of partitions), send should fail
val higherRecord = new ProducerRecord(topic1, 1, "key".getBytes, "value".getBytes)
val e = assertThrows(classOf[ExecutionException], () => producer1.send(higherRecord).get)
assertEquals(classOf[TimeoutException], e.getCause.getClass)
}
/**
* The send call after producer closed should throw IllegalStateException
*/
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testSendAfterClosed(quorum: String): Unit = {
// create topic
createTopic(topic1, replicationFactor = numServers)
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic1, null, "key".getBytes, "value".getBytes)
// first send a message to make sure the metadata is refreshed
producer1.send(record).get
producer2.send(record).get
producer3.send(record).get
producer1.close()
assertThrows(classOf[IllegalStateException], () => producer1.send(record))
producer2.close()
assertThrows(classOf[IllegalStateException], () => producer2.send(record))
producer3.close()
assertThrows(classOf[IllegalStateException], () => producer3.send(record))
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCannotSendToInternalTopic(quorum: String): Unit = {
createOffsetsTopic()
val thrown = assertThrows(classOf[ExecutionException],
() => producer2.send(new ProducerRecord(Topic.GROUP_METADATA_TOPIC_NAME, "test".getBytes, "test".getBytes)).get)
assertTrue(thrown.getCause.isInstanceOf[InvalidTopicException], "Unexpected exception while sending to an invalid topic " + thrown.getCause)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testNotEnoughReplicasAfterBrokerShutdown(quorum: String): Unit = {
val topicName = "minisrtest2"
val topicProps = new Properties()
topicProps.put("min.insync.replicas", numServers.toString)
createTopic(topicName, replicationFactor = numServers, topicConfig = topicProps)
val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes)
// this should work with all brokers up and running
producer3.send(record).get
// shut down one broker
brokers.head.shutdown()
brokers.head.awaitShutdown()
val e = assertThrows(classOf[ExecutionException], () => producer3.send(record).get)
assertTrue(e.getCause.isInstanceOf[NotEnoughReplicasException] ||
e.getCause.isInstanceOf[NotEnoughReplicasAfterAppendException] ||
e.getCause.isInstanceOf[TimeoutException])
// restart the server
brokers.head.startup()
}
}

View File

@ -1,254 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import java.util
import java.util.{Collections, Properties}
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.{TestInfoUtils, TestUtils}
import kafka.utils.TestUtils.{consumeRecords, createAdminClient}
import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry, ProducerState}
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{InvalidPidMappingException, TransactionalIdNotFoundException}
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig}
import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import org.opentest4j.AssertionFailedError
import scala.collection.Seq
class ProducerIdExpirationTest extends KafkaServerTestHarness {
val topic1 = "topic1"
val numPartitions = 1
val replicationFactor = 3
val tp0 = new TopicPartition(topic1, 0)
val configResource = new ConfigResource(ConfigResource.Type.BROKER, "")
var producer: KafkaProducer[Array[Byte], Array[Byte]] = _
var consumer: Consumer[Array[Byte], Array[Byte]] = _
var admin: Admin = _
override def generateConfigs: Seq[KafkaConfig] = {
TestUtils.createBrokerConfigs(3).map(KafkaConfig.fromProps(_, serverProps()))
}
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
consumer = TestUtils.createConsumer(bootstrapServers(),
groupProtocolFromTestParameters(),
enableAutoCommit = false,
readCommitted = true)
admin = createAdminClient(brokers, listenerName)
createTopic(topic1, numPartitions, 3)
}
@AfterEach
override def tearDown(): Unit = {
if (producer != null)
producer.close()
if (consumer != null)
consumer.close()
if (admin != null)
admin.close()
super.tearDown()
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def testProducerIdExpirationWithNoTransactions(groupProtocol: String): Unit = {
producer = TestUtils.createProducer(bootstrapServers(), enableIdempotence = true)
// Send records to populate producer state cache.
producer.send(new ProducerRecord(topic1, 0, null, "key".getBytes, "value".getBytes))
producer.flush()
// Ensure producer IDs are added.
ensureConsistentKRaftMetadata()
assertEquals(1, producerState.size)
// Wait for the producer ID to expire.
TestUtils.waitUntilTrue(() => producerState.isEmpty, "Producer ID did not expire.")
// Send more records to send producer ID back to brokers.
producer.send(new ProducerRecord(topic1, 0, null, "key".getBytes, "value".getBytes))
producer.flush()
// Producer IDs should repopulate.
assertEquals(1, producerState.size)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def testTransactionAfterTransactionIdExpiresButProducerIdRemains(groupProtocol: String): Unit = {
producer = TestUtils.createTransactionalProducer("transactionalProducer", brokers)
producer.initTransactions()
// Start and then abort a transaction to allow the producer ID to expire.
producer.beginTransaction()
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "2", "2", willBeCommitted = false))
producer.flush()
// Ensure producer IDs are added.
TestUtils.waitUntilTrue(() => producerState.size == 1, "Producer IDs were not added.")
producer.abortTransaction()
// Wait for the transactional ID to expire.
waitUntilTransactionalStateExpires()
// Producer IDs should be retained.
assertEquals(1, producerState.size)
// Start a new transaction and attempt to send, triggering an AddPartitionsToTxnRequest that will fail
// due to the expired transactional ID, resulting in a fatal error.
producer.beginTransaction()
val failedFuture = producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "1", "1", willBeCommitted = false))
TestUtils.waitUntilTrue(() => failedFuture.isDone, "Producer future never completed.")
org.apache.kafka.test.TestUtils.assertFutureThrows(classOf[InvalidPidMappingException], failedFuture)
// Assert that aborting the transaction throws a KafkaException due to the fatal error.
assertThrows(classOf[KafkaException], () => producer.abortTransaction())
// Close the producer and reinitialize to recover from the fatal error.
producer.close()
producer = TestUtils.createTransactionalProducer("transactionalProducer", brokers)
producer.initTransactions()
producer.beginTransaction()
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "4", "4", willBeCommitted = true))
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, "3", "3", willBeCommitted = true))
// Producer IDs should be retained.
assertTrue(producerState.size() > 0)
producer.commitTransaction()
// Check we can still consume the transaction.
consumer.subscribe(Collections.singletonList(topic1))
val records = consumeRecords(consumer, 2)
records.foreach { record =>
TestUtils.assertCommittedAndGetValue(record)
}
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def testDynamicProducerIdExpirationMs(groupProtocol: String): Unit = {
producer = TestUtils.createProducer(bootstrapServers(), enableIdempotence = true)
// Send records to populate producer state cache.
producer.send(new ProducerRecord(topic1, 0, null, "key".getBytes, "value".getBytes))
producer.flush()
// Ensure producer IDs are added.
ensureConsistentKRaftMetadata()
assertEquals(1, producerState.size)
// Wait for the producer ID to expire.
TestUtils.waitUntilTrue(() => producerState.isEmpty, "Producer ID did not expire.")
// Update the producer ID expiration ms to a very high value.
admin.incrementalAlterConfigs(producerIdExpirationConfig("100000"))
brokers.foreach(broker => TestUtils.waitUntilTrue(() => broker.logManager.producerStateManagerConfig.producerIdExpirationMs == 100000, "Configuration was not updated."))
// Send more records to send producer ID back to brokers.
producer.send(new ProducerRecord(topic1, 0, null, "key".getBytes, "value".getBytes))
producer.flush()
// Producer IDs should repopulate.
assertEquals(1, producerState.size)
// Ensure producer ID does not expire within 4 seconds.
assertThrows(classOf[AssertionFailedError], () =>
TestUtils.waitUntilTrue(() => producerState.isEmpty, "Producer ID did not expire.", 4000)
)
// Update the expiration time to a low value again.
admin.incrementalAlterConfigs(producerIdExpirationConfig("100")).all().get()
// restart a broker to ensure that dynamic config changes are picked up on restart
killBroker(0)
restartDeadBrokers()
brokers.foreach(broker => TestUtils.waitUntilTrue(() => broker.logManager.producerStateManagerConfig.producerIdExpirationMs == 100, "Configuration was not updated."))
// Ensure producer ID expires quickly again.
TestUtils.waitUntilTrue(() => producerState.isEmpty, "Producer ID did not expire.")
}
private def producerState: util.List[ProducerState] = {
val describeResult = admin.describeProducers(Collections.singletonList(tp0))
val activeProducers = describeResult.partitionResult(tp0).get().activeProducers
activeProducers
}
private def producerIdExpirationConfig(configValue: String): util.Map[ConfigResource, util.Collection[AlterConfigOp]] = {
val producerIdCfg = new ConfigEntry(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG, configValue)
val configs = Collections.singletonList(new AlterConfigOp(producerIdCfg, AlterConfigOp.OpType.SET))
Collections.singletonMap(configResource, configs)
}
private def waitUntilTransactionalStateExpires(): Unit = {
TestUtils.waitUntilTrue(() => {
var removedTransactionState = false
val txnDescribeResult = admin.describeTransactions(Collections.singletonList("transactionalProducer")).description("transactionalProducer")
try {
txnDescribeResult.get()
} catch {
case e: Exception => {
removedTransactionState = e.getCause.isInstanceOf[TransactionalIdNotFoundException]
}
}
removedTransactionState
}, "Transaction state never expired.")
}
private def serverProps(): Properties = {
val serverProps = new Properties()
serverProps.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, false.toString)
// Set a smaller value for the number of partitions for the __consumer_offsets topic
// so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long.
serverProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 1.toString)
serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 3.toString)
serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, 2.toString)
serverProps.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 2.toString)
serverProps.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, true.toString)
serverProps.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, false.toString)
serverProps.put(ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG, false.toString)
serverProps.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0")
serverProps.put(TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, "200")
serverProps.put(TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, "5000")
serverProps.put(TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG, "500")
serverProps.put(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG, "10000")
serverProps.put(TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, "500")
serverProps
}
}