From 7fc7136ffd28245629f00870fc7f7cbe1fe1c6ce Mon Sep 17 00:00:00 2001 From: Yu Yang Date: Thu, 26 Jul 2018 09:13:50 -0700 Subject: [PATCH] KAFKA-5886; Introduce delivery.timeout.ms producer config (KIP-91) (#5270) Co-authored-by: Sumant Tambe Co-authored-by: Yu Yang Reviewers: Ted Yu , Apurva Mehta , Jason Gustafson --- .../kafka/clients/producer/KafkaProducer.java | 58 +++- .../clients/producer/ProducerConfig.java | 24 +- .../producer/internals/ProducerBatch.java | 95 +++--- .../producer/internals/RecordAccumulator.java | 318 ++++++++++-------- .../clients/producer/internals/Sender.java | 188 ++++++++--- .../kafka/common/config/AbstractConfig.java | 2 +- .../apache/kafka/common/config/ConfigDef.java | 11 +- .../org/apache/kafka/clients/MockClient.java | 5 +- .../clients/consumer/KafkaConsumerTest.java | 4 +- .../producer/internals/ProducerBatchTest.java | 32 +- .../internals/RecordAccumulatorTest.java | 248 +++++++++++--- .../producer/internals/SenderTest.java | 291 +++++++++++++--- .../internals/TransactionManagerTest.java | 6 +- .../apache/kafka/connect/runtime/Worker.java | 1 + .../kafka/api/BaseProducerSendTest.scala | 12 +- .../kafka/api/PlaintextConsumerTest.scala | 4 +- .../kafka/api/PlaintextProducerSendTest.scala | 6 +- .../api/ProducerFailureHandlingTest.scala | 6 +- .../DynamicBrokerReconfigurationTest.scala | 4 +- .../unit/kafka/server/FetchRequestTest.scala | 2 +- .../scala/unit/kafka/utils/TestUtils.scala | 9 +- docs/upgrade.html | 5 + 22 files changed, 900 insertions(+), 431 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 3991467b676..b40b09a588a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -16,6 +16,17 @@ */ package org.apache.kafka.clients.producer; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.KafkaClient; @@ -24,6 +35,7 @@ import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.clients.producer.internals.BufferPool; import org.apache.kafka.clients.producer.internals.ProducerInterceptors; import org.apache.kafka.clients.producer.internals.ProducerMetrics; import org.apache.kafka.clients.producer.internals.RecordAccumulator; @@ -69,18 +81,6 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; -import java.net.InetSocketAddress; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Properties; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - import static org.apache.kafka.common.serialization.ExtendedSerializer.Wrapper.ensureExtended; /** @@ -235,6 +235,7 @@ public class KafkaProducer implements Producer { private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1); private static final String JMX_PREFIX = "kafka.producer"; public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread"; + public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics"; private final String clientId; // Visible for testing @@ -392,18 +393,21 @@ public class KafkaProducer implements Producer { int retries = configureRetries(config, transactionManager != null, log); int maxInflightRequests = configureInflightRequests(config, transactionManager != null); short acks = configureAcks(config, transactionManager != null, log); + int deliveryTimeoutMs = configureDeliveryTimeout(config, log); this.apiVersions = new ApiVersions(); this.accumulator = new RecordAccumulator(logContext, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), - this.totalMemorySize, this.compressionType, - config.getLong(ProducerConfig.LINGER_MS_CONFIG), + config.getInt(ProducerConfig.LINGER_MS_CONFIG), retryBackoffMs, + deliveryTimeoutMs, metrics, + PRODUCER_METRIC_GROUP_NAME, time, apiVersions, - transactionManager); + transactionManager, + new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME)); List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); if (metadata != null) { this.metadata = metadata; @@ -459,10 +463,30 @@ public class KafkaProducer implements Producer { } } + private static int configureDeliveryTimeout(ProducerConfig config, Logger log) { + int deliveryTimeoutMs = config.getInt(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG); + int lingerMs = config.getInt(ProducerConfig.LINGER_MS_CONFIG); + int requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); + + if (deliveryTimeoutMs < Integer.MAX_VALUE && deliveryTimeoutMs < lingerMs + requestTimeoutMs) { + if (config.originals().containsKey(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG)) { + // throw an exception if the user explicitly set an inconsistent value + throw new ConfigException(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG + + " should be equal to or larger than " + ProducerConfig.LINGER_MS_CONFIG + + " + " + ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); + } else { + // override deliveryTimeoutMs default value to lingerMs + requestTimeoutMs for backward compatibility + deliveryTimeoutMs = lingerMs + requestTimeoutMs; + log.warn("{} should be equal to or larger than {} + {}. Setting it to {}.", + ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, ProducerConfig.LINGER_MS_CONFIG, + ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, deliveryTimeoutMs); + } + } + return deliveryTimeoutMs; + } + private static TransactionManager configureTransactionState(ProducerConfig config, LogContext logContext, Logger log) { - TransactionManager transactionManager = null; - boolean userConfiguredIdempotence = false; if (config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) userConfiguredIdempotence = true; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 8e7b662e6df..ab55353a667 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -99,6 +99,19 @@ public class ProducerConfig extends AbstractConfig { + "specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting " + LINGER_MS_CONFIG + "=5, " + "for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load."; + /** request.timeout.ms */ + public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; + private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC + + " This should be larger than replica.lag.time.max.ms (a broker configuration)" + + " to reduce the possibility of message duplication due to unnecessary producer retries."; + + /** delivery.timeout.ms */ + public static final String DELIVERY_TIMEOUT_MS_CONFIG = "delivery.timeout.ms"; + private static final String DELIVERY_TIMEOUT_MS_DOC = "An upper bound on the time to report success or failure after Producer.send() returns. " + + "Producer may report failure to send a message earlier than this config if all the retries are exhausted or " + + "a record is added to a batch nearing expiration. " + DELIVERY_TIMEOUT_MS_CONFIG + "should be equal to or " + + "greater than " + REQUEST_TIMEOUT_MS_CONFIG + " + " + LINGER_MS_CONFIG; + /** client.id */ public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG; @@ -188,12 +201,6 @@ public class ProducerConfig extends AbstractConfig { public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class"; private static final String PARTITIONER_CLASS_DOC = "Partitioner class that implements the org.apache.kafka.clients.producer.Partitioner interface."; - /** request.timeout.ms */ - public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; - private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC - + " This should be larger than replica.lag.time.max.ms (a broker configuration)" - + " to reduce the possibility of message duplication due to unnecessary producer retries."; - /** interceptor.classes */ public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes"; public static final String INTERCEPTOR_CLASSES_DOC = "A list of classes to use as interceptors. " @@ -224,7 +231,7 @@ public class ProducerConfig extends AbstractConfig { static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Collections.emptyList(), new ConfigDef.NonNullValidator(), Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC) .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) - .define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC) + .define(RETRIES_CONFIG, Type.INT, Integer.MAX_VALUE, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC) .define(ACKS_CONFIG, Type.STRING, "1", @@ -233,7 +240,8 @@ public class ProducerConfig extends AbstractConfig { ACKS_DOC) .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC) .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC) - .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC) + .define(LINGER_MS_CONFIG, Type.INT, 0, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC) + .define(DELIVERY_TIMEOUT_MS_CONFIG, Type.INT, 120 * 1000, atLeast(0), Importance.MEDIUM, DELIVERY_TIMEOUT_MS_DOC) .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC) .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC) .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index ea0f0f7dff9..6e08185611d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -20,7 +20,6 @@ import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.RecordBatchTooLargeException; -import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.record.AbstractRecords; import org.apache.kafka.common.record.CompressionRatioEstimator; @@ -77,13 +76,13 @@ public final class ProducerBatch { private boolean retry; private boolean reopened = false; - public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long now) { - this(tp, recordsBuilder, now, false); + public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long createdMs) { + this(tp, recordsBuilder, createdMs, false); } - public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long now, boolean isSplitBatch) { - this.createdMs = now; - this.lastAttemptMs = now; + public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long createdMs, boolean isSplitBatch) { + this.createdMs = createdMs; + this.lastAttemptMs = createdMs; this.recordsBuilder = recordsBuilder; this.topicPartition = tp; this.lastAppendTime = createdMs; @@ -158,7 +157,17 @@ public final class ProducerBatch { } /** - * Complete the request. If the batch was previously aborted, this is a no-op. + * Finalize the state of a batch. Final state, once set, is immutable. This function may be called + * once or twice on a batch. It may be called twice if + * 1. An inflight batch expires before a response from the broker is received. The batch's final + * state is set to FAILED. But it could succeed on the broker and second time around batch.done() may + * try to set SUCCEEDED final state. + * 2. If a transaction abortion happens or if the producer is closed forcefully, the final state is + * ABORTED but again it could succeed if broker responds with a success. + * + * Attempted transitions from [FAILED | ABORTED] --> SUCCEEDED are logged. + * Attempted transitions from one failure state to the same or a different failed state are ignored. + * Attempted transitions from SUCCEEDED to the same or a failed state throw an exception. * * @param baseOffset The base offset of the messages assigned by the server * @param logAppendTime The log append time or -1 if CreateTime is being used @@ -166,26 +175,34 @@ public final class ProducerBatch { * @return true if the batch was completed successfully and false if the batch was previously aborted */ public boolean done(long baseOffset, long logAppendTime, RuntimeException exception) { - final FinalState finalState; - if (exception == null) { + final FinalState tryFinalState = (exception == null) ? FinalState.SUCCEEDED : FinalState.FAILED; + + if (tryFinalState == FinalState.SUCCEEDED) { log.trace("Successfully produced messages to {} with base offset {}.", topicPartition, baseOffset); - finalState = FinalState.SUCCEEDED; } else { - log.trace("Failed to produce messages to {}.", topicPartition, exception); - finalState = FinalState.FAILED; + log.trace("Failed to produce messages to {} with base offset {}.", topicPartition, baseOffset, exception); } - if (!this.finalState.compareAndSet(null, finalState)) { - if (this.finalState.get() == FinalState.ABORTED) { - log.debug("ProduceResponse returned for {} after batch had already been aborted.", topicPartition); - return false; + if (this.finalState.compareAndSet(null, tryFinalState)) { + completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception); + return true; + } + + if (this.finalState.get() != FinalState.SUCCEEDED) { + if (tryFinalState == FinalState.SUCCEEDED) { + // Log if a previously unsuccessful batch succeeded later on. + log.debug("ProduceResponse returned {} for {} after batch with base offset {} had already been {}.", + tryFinalState, topicPartition, baseOffset, this.finalState.get()); } else { - throw new IllegalStateException("Batch has already been completed in final state " + this.finalState.get()); + // FAILED --> FAILED and ABORTED --> FAILED transitions are ignored. + log.debug("Ignored state transition {} -> {} for {} batch with base offset {}", + this.finalState.get(), tryFinalState, topicPartition, baseOffset); } + } else { + // A SUCCESSFUL batch must not attempt another state change. + throw new IllegalStateException("A " + this.finalState.get() + " batch must not attempt another state change to " + tryFinalState); } - - completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception); - return true; + return false; } private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) { @@ -299,37 +316,12 @@ public final class ProducerBatch { return "ProducerBatch(topicPartition=" + topicPartition + ", recordCount=" + recordCount + ")"; } - /** - * A batch whose metadata is not available should be expired if one of the following is true: - *
    - *
  1. the batch is not in retry AND request timeout has elapsed after it is ready (full or linger.ms has reached). - *
  2. the batch is in retry AND request timeout has elapsed after the backoff period ended. - *
- * This methods closes this batch and sets {@code expiryErrorMessage} if the batch has timed out. - */ - boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) { - if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime)) - expiryErrorMessage = (now - this.lastAppendTime) + " ms has passed since last append"; - else if (!this.inRetry() && requestTimeoutMs < (createdTimeMs(now) - lingerMs)) - expiryErrorMessage = (createdTimeMs(now) - lingerMs) + " ms has passed since batch creation plus linger time"; - else if (this.inRetry() && requestTimeoutMs < (waitedTimeMs(now) - retryBackoffMs)) - expiryErrorMessage = (waitedTimeMs(now) - retryBackoffMs) + " ms has passed since last attempt plus backoff time"; - - boolean expired = expiryErrorMessage != null; - if (expired) - abortRecordAppends(); - return expired; + boolean hasReachedDeliveryTimeout(long deliveryTimeoutMs, long now) { + return deliveryTimeoutMs <= now - this.createdMs; } - /** - * If {@link #maybeExpire(int, long, long, long, boolean)} returned true, the sender will fail the batch with - * the exception returned by this method. - * @return An exception indicating the batch expired. - */ - TimeoutException timeoutException() { - if (expiryErrorMessage == null) - throw new IllegalStateException("Batch has not expired"); - return new TimeoutException("Expiring " + recordCount + " record(s) for " + topicPartition + ": " + expiryErrorMessage); + public FinalState finalState() { + return this.finalState.get(); } int attempts() { @@ -347,10 +339,6 @@ public final class ProducerBatch { return drainedMs - createdMs; } - long createdTimeMs(long nowMs) { - return Math.max(0, nowMs - createdMs); - } - long waitedTimeMs(long nowMs) { return Math.max(0, nowMs - lastAttemptMs); } @@ -467,5 +455,4 @@ public final class ProducerBatch { public boolean sequenceHasBeenReset() { return reopened; } - } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 31c6d754c9d..964ac3c1558 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -16,6 +16,18 @@ */ package org.apache.kafka.clients.producer.internals; +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.common.Cluster; @@ -34,10 +46,10 @@ import org.apache.kafka.common.metrics.stats.Meter; import org.apache.kafka.common.record.AbstractRecords; import org.apache.kafka.common.record.CompressionRatioEstimator; import org.apache.kafka.common.record.CompressionType; -import org.apache.kafka.common.record.Record; -import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.CopyOnWriteMap; import org.apache.kafka.common.utils.LogContext; @@ -45,20 +57,6 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; -import java.nio.ByteBuffer; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Deque; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; - /** * This class acts as a queue that accumulates records into {@link MemoryRecords} * instances to be sent to the server. @@ -76,6 +74,7 @@ public final class RecordAccumulator { private final CompressionType compression; private final long lingerMs; private final long retryBackoffMs; + private final long deliveryTimeoutMs; private final BufferPool free; private final Time time; private final ApiVersions apiVersions; @@ -85,13 +84,13 @@ public final class RecordAccumulator { private final Map muted; private int drainIndex; private final TransactionManager transactionManager; + private long nextBatchExpiryTimeMs = Long.MAX_VALUE; // the earliest time (absolute) a batch will expire. /** * Create a new record accumulator * * @param logContext The log context used for logging * @param batchSize The size to use when allocating {@link MemoryRecords} instances - * @param totalSize The maximum memory the record accumulator can use. * @param compression The compression codec for the records * @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for * sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some @@ -106,14 +105,16 @@ public final class RecordAccumulator { */ public RecordAccumulator(LogContext logContext, int batchSize, - long totalSize, CompressionType compression, long lingerMs, long retryBackoffMs, + long deliveryTimeoutMs, Metrics metrics, + String metricGrpName, Time time, ApiVersions apiVersions, - TransactionManager transactionManager) { + TransactionManager transactionManager, + BufferPool bufferPool) { this.log = logContext.logger(RecordAccumulator.class); this.drainIndex = 0; this.closed = false; @@ -123,9 +124,9 @@ public final class RecordAccumulator { this.compression = compression; this.lingerMs = lingerMs; this.retryBackoffMs = retryBackoffMs; + this.deliveryTimeoutMs = deliveryTimeoutMs; this.batches = new CopyOnWriteMap<>(); - String metricGrpName = "producer-metrics"; - this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName); + this.free = bufferPool; this.incomplete = new IncompleteBatches(); this.muted = new HashMap<>(); this.time = time; @@ -227,7 +228,6 @@ public final class RecordAccumulator { // Don't deallocate this buffer in the finally block as it's being used in the record batch buffer = null; - return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true); } } finally { @@ -240,7 +240,7 @@ public final class RecordAccumulator { private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMagic) { if (transactionManager != null && maxUsableMagic < RecordBatch.MAGIC_VALUE_V2) { throw new UnsupportedVersionException("Attempting to use idempotence with a broker which does not " + - "support the required message format (v2). The broker must be version 0.11 or later."); + "support the required message format (v2). The broker must be version 0.11 or later."); } return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, 0L); } @@ -273,37 +273,35 @@ public final class RecordAccumulator { return result; } + public void maybeUpdateNextBatchExpiryTime(ProducerBatch batch) { + if (batch.createdMs + deliveryTimeoutMs > 0) { + // the non-negative check is to guard us against potential overflow due to setting + // a large value for deliveryTimeoutMs + nextBatchExpiryTimeMs = Math.min(nextBatchExpiryTimeMs, batch.createdMs + deliveryTimeoutMs); + } else { + log.warn("Skipping next batch expiry time update due to addition overflow: " + + "batch.createMs={}, deliveryTimeoutMs={}", batch.createdMs, deliveryTimeoutMs); + } + } + /** * Get a list of batches which have been sitting in the accumulator too long and need to be expired. */ - public List expiredBatches(int requestTimeout, long now) { + public List expiredBatches(long now) { List expiredBatches = new ArrayList<>(); for (Map.Entry> entry : this.batches.entrySet()) { - Deque dq = entry.getValue(); - TopicPartition tp = entry.getKey(); - // We only check if the batch should be expired if the partition does not have a batch in flight. - // This is to prevent later batches from being expired while an earlier batch is still in progress. - // Note that `muted` is only ever populated if `max.in.flight.request.per.connection=1` so this protection - // is only active in this case. Otherwise the expiration order is not guaranteed. - if (!isMuted(tp, now)) { - synchronized (dq) { - // iterate over the batches and expire them if they have been in the accumulator for more than requestTimeOut - ProducerBatch lastBatch = dq.peekLast(); - Iterator batchIterator = dq.iterator(); - while (batchIterator.hasNext()) { - ProducerBatch batch = batchIterator.next(); - boolean isFull = batch != lastBatch || batch.isFull(); - // Check if the batch has expired. Expired batches are closed by maybeExpire, but callbacks - // are invoked after completing the iterations, since sends invoked from callbacks - // may append more batches to the deque being iterated. The batch is deallocated after - // callbacks are invoked. - if (batch.maybeExpire(requestTimeout, retryBackoffMs, now, this.lingerMs, isFull)) { - expiredBatches.add(batch); - batchIterator.remove(); - } else { - // Stop at the first batch that has not expired. - break; - } + // expire the batches in the order of sending + Deque deque = entry.getValue(); + synchronized (deque) { + while (!deque.isEmpty()) { + ProducerBatch batch = deque.getFirst(); + if (batch.hasReachedDeliveryTimeout(deliveryTimeoutMs, now)) { + deque.poll(); + batch.abortRecordAppends(); + expiredBatches.add(batch); + } else { + maybeUpdateNextBatchExpiryTime(batch); + break; } } } @@ -311,8 +309,13 @@ public final class RecordAccumulator { return expiredBatches; } + public long getDeliveryTimeoutMs() { + return deliveryTimeoutMs; + } + /** - * Re-enqueue the given record batch in the accumulator to retry + * Re-enqueue the given record batch in the accumulator. In Sender.completeBatch method, we check + * whether the batch has reached deliveryTimeoutMs or not. Hence we do not do the delivery timeout check here. */ public void reenqueue(ProducerBatch batch, long now) { batch.reenqueued(now); @@ -356,8 +359,8 @@ public final class RecordAccumulator { } // We will have to do extra work to ensure the queue is in order when requests are being retried and there are - // multiple requests in flight to that partition. If the first inflight request fails to append, then all the subsequent - // in flight requests will also fail because the sequence numbers will not be accepted. + // multiple requests in flight to that partition. If the first in flight request fails to append, then all the + // subsequent in flight requests will also fail because the sequence numbers will not be accepted. // // Further, once batches are being retried, we are reduced to a single in flight request for that partition. So when // the subsequent batches come back in sequence order, they will have to be placed further back in the queue. @@ -368,12 +371,12 @@ public final class RecordAccumulator { private void insertInSequenceOrder(Deque deque, ProducerBatch batch) { // When we are requeing and have enabled idempotence, the reenqueued batch must always have a sequence. if (batch.baseSequence() == RecordBatch.NO_SEQUENCE) - throw new IllegalStateException("Trying to reenqueue a batch which doesn't have a sequence even " + - "though idempotence is enabled."); + throw new IllegalStateException("Trying to re-enqueue a batch which doesn't have a sequence even " + + "though idempotency is enabled."); if (transactionManager.nextBatchBySequence(batch.topicPartition) == null) - throw new IllegalStateException("We are reenqueueing a batch which is not tracked as part of the in flight " + - "requests. batch.topicPartition: " + batch.topicPartition + "; batch.baseSequence: " + batch.baseSequence()); + throw new IllegalStateException("We are re-enqueueing a batch which is not tracked as part of the in flight " + + "requests. batch.topicPartition: " + batch.topicPartition + "; batch.baseSequence: " + batch.baseSequence()); ProducerBatch firstBatchInQueue = deque.peekFirst(); if (firstBatchInQueue != null && firstBatchInQueue.hasSequence() && firstBatchInQueue.baseSequence() < batch.baseSequence()) { @@ -390,7 +393,7 @@ public final class RecordAccumulator { orderedBatches.add(deque.pollFirst()); log.debug("Reordered incoming batch with sequence {} for partition {}. It was placed in the queue at " + - "position {}", batch.baseSequence(), batch.topicPartition, orderedBatches.size()); + "position {}", batch.baseSequence(), batch.topicPartition, orderedBatches.size()); // Either we have reached a point where there are batches without a sequence (ie. never been drained // and are hence in order by default), or the batch at the front of the queue has a sequence greater // than the incoming batch. This is the right place to add the incoming batch. @@ -466,7 +469,6 @@ public final class RecordAccumulator { } } } - return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics); } @@ -484,6 +486,106 @@ public final class RecordAccumulator { return false; } + private boolean shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPartition tp) { + ProducerIdAndEpoch producerIdAndEpoch = null; + if (transactionManager != null) { + if (!transactionManager.isSendToPartitionAllowed(tp)) + return true; + + producerIdAndEpoch = transactionManager.producerIdAndEpoch(); + if (!producerIdAndEpoch.isValid()) + // we cannot send the batch until we have refreshed the producer id + return true; + + if (!first.hasSequence() && transactionManager.hasUnresolvedSequence(first.topicPartition)) + // Don't drain any new batches while the state of previous sequence numbers + // is unknown. The previous batches would be unknown if they were aborted + // on the client after being sent to the broker at least once. + return true; + + int firstInFlightSequence = transactionManager.firstInFlightSequence(first.topicPartition); + if (firstInFlightSequence != RecordBatch.NO_SEQUENCE && first.hasSequence() + && first.baseSequence() != firstInFlightSequence) + // If the queued batch already has an assigned sequence, then it is being retried. + // In this case, we wait until the next immediate batch is ready and drain that. + // We only move on when the next in line batch is complete (either successfully or due to + // a fatal broker error). This effectively reduces our in flight request count to 1. + return true; + } + return false; + } + + private List drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) { + int size = 0; + List parts = cluster.partitionsForNode(node.id()); + List ready = new ArrayList<>(); + /* to make starvation less likely this loop doesn't start at 0 */ + int start = drainIndex = drainIndex % parts.size(); + do { + PartitionInfo part = parts.get(drainIndex); + TopicPartition tp = new TopicPartition(part.topic(), part.partition()); + this.drainIndex = (this.drainIndex + 1) % parts.size(); + + // Only proceed if the partition has no in-flight batches. + if (isMuted(tp, now)) + continue; + + Deque deque = getDeque(tp); + if (deque == null) + continue; + + synchronized (deque) { + // invariant: !isMuted(tp,now) && deque != null + ProducerBatch first = deque.peekFirst(); + if (first == null) + continue; + + // first != null + boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs; + // Only drain the batch if it is not during backoff period. + if (backoff) + continue; + + if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { + // there is a rare case that a single batch size is larger than the request size due to + // compression; in this case we will still eventually send this batch in a single request + break; + } else { + if (shouldStopDrainBatchesForPartition(first, tp)) + break; + + boolean isTransactional = transactionManager != null ? transactionManager.isTransactional() : false; + ProducerIdAndEpoch producerIdAndEpoch = + transactionManager != null ? transactionManager.producerIdAndEpoch() : null; + ProducerBatch batch = deque.pollFirst(); + if (producerIdAndEpoch != null && !batch.hasSequence()) { + // If the batch already has an assigned sequence, then we should not change the producer id and + // sequence number, since this may introduce duplicates. In particular, the previous attempt + // may actually have been accepted, and if we change the producer id and sequence here, this + // attempt will also be accepted, causing a duplicate. + // + // Additionally, we update the next sequence number bound for the partition, and also have + // the transaction manager track the batch so as to ensure that sequence ordering is maintained + // even if we receive out of order responses. + batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional); + transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount); + log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " + + "{} being sent to partition {}", producerIdAndEpoch.producerId, + producerIdAndEpoch.epoch, batch.baseSequence(), tp); + + transactionManager.addInFlightBatch(batch); + } + batch.close(); + size += batch.records().sizeInBytes(); + ready.add(batch); + + batch.drained(now); + } + } + } while (start != drainIndex); + return ready; + } + /** * Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified * size on a per-node basis. This method attempts to avoid choosing the same topic-node over and over. @@ -494,106 +596,25 @@ public final class RecordAccumulator { * @param now The current unix time in milliseconds * @return A list of {@link ProducerBatch} for each node specified with total size less than the requested maxSize. */ - public Map> drain(Cluster cluster, - Set nodes, - int maxSize, - long now) { + public Map> drain(Cluster cluster, Set nodes, int maxSize, long now) { if (nodes.isEmpty()) return Collections.emptyMap(); Map> batches = new HashMap<>(); for (Node node : nodes) { - int size = 0; - List parts = cluster.partitionsForNode(node.id()); - List ready = new ArrayList<>(); - /* to make starvation less likely this loop doesn't start at 0 */ - int start = drainIndex = drainIndex % parts.size(); - do { - PartitionInfo part = parts.get(drainIndex); - TopicPartition tp = new TopicPartition(part.topic(), part.partition()); - // Only proceed if the partition has no in-flight batches. - if (!isMuted(tp, now)) { - Deque deque = getDeque(tp); - if (deque != null) { - synchronized (deque) { - ProducerBatch first = deque.peekFirst(); - if (first != null) { - boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs; - // Only drain the batch if it is not during backoff period. - if (!backoff) { - if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { - // there is a rare case that a single batch size is larger than the request size due - // to compression; in this case we will still eventually send this batch in a single - // request - break; - } else { - ProducerIdAndEpoch producerIdAndEpoch = null; - boolean isTransactional = false; - if (transactionManager != null) { - if (!transactionManager.isSendToPartitionAllowed(tp)) - break; - - producerIdAndEpoch = transactionManager.producerIdAndEpoch(); - if (!producerIdAndEpoch.isValid()) - // we cannot send the batch until we have refreshed the producer id - break; - - isTransactional = transactionManager.isTransactional(); - - if (!first.hasSequence() && transactionManager.hasUnresolvedSequence(first.topicPartition)) - // Don't drain any new batches while the state of previous sequence numbers - // is unknown. The previous batches would be unknown if they were aborted - // on the client after being sent to the broker at least once. - break; - - int firstInFlightSequence = transactionManager.firstInFlightSequence(first.topicPartition); - if (firstInFlightSequence != RecordBatch.NO_SEQUENCE && first.hasSequence() - && first.baseSequence() != firstInFlightSequence) - // If the queued batch already has an assigned sequence, then it is being - // retried. In this case, we wait until the next immediate batch is ready - // and drain that. We only move on when the next in line batch is complete (either successfully - // or due to a fatal broker error). This effectively reduces our - // in flight request count to 1. - break; - } - - ProducerBatch batch = deque.pollFirst(); - if (producerIdAndEpoch != null && !batch.hasSequence()) { - // If the batch already has an assigned sequence, then we should not change the producer id and - // sequence number, since this may introduce duplicates. In particular, - // the previous attempt may actually have been accepted, and if we change - // the producer id and sequence here, this attempt will also be accepted, - // causing a duplicate. - // - // Additionally, we update the next sequence number bound for the partition, - // and also have the transaction manager track the batch so as to ensure - // that sequence ordering is maintained even if we receive out of order - // responses. - batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional); - transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount); - log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " + - "{} being sent to partition {}", producerIdAndEpoch.producerId, - producerIdAndEpoch.epoch, batch.baseSequence(), tp); - - transactionManager.addInFlightBatch(batch); - } - batch.close(); - size += batch.records().sizeInBytes(); - ready.add(batch); - batch.drained(now); - } - } - } - } - } - } - this.drainIndex = (this.drainIndex + 1) % parts.size(); - } while (start != drainIndex); + List ready = drainBatchesForOneNode(cluster, node, maxSize, now); batches.put(node.id(), ready); } return batches; } + /** + * The earliest absolute time a batch will expire (in milliseconds) + */ + public Long nextExpiryTimeMs() { + return this.nextBatchExpiryTimeMs; + } + private Deque getDeque(TopicPartition tp) { return batches.get(tp); } @@ -784,5 +805,4 @@ public final class RecordAccumulator { this.unknownLeaderTopics = unknownLeaderTopics; } } - } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index d1a7bc9fe38..7077f154c6f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.producer.internals; +import java.util.ArrayList; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientRequest; import org.apache.kafka.clients.ClientResponse; @@ -34,6 +35,7 @@ import org.apache.kafka.common.errors.InvalidMetadataException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; @@ -120,6 +122,9 @@ public class Sender implements Runnable { /* all the state related to transactions, in particular the producer id, producer epoch, and sequence numbers */ private final TransactionManager transactionManager; + // A per-partition queue of batches ordered by creation time for tracking the in-flight batches + private final Map> inFlightBatches; + public Sender(LogContext logContext, KafkaClient client, Metadata metadata, @@ -149,6 +154,73 @@ public class Sender implements Runnable { this.retryBackoffMs = retryBackoffMs; this.apiVersions = apiVersions; this.transactionManager = transactionManager; + this.inFlightBatches = new HashMap<>(); + } + + public List inFlightBatches(TopicPartition tp) { + return inFlightBatches.containsKey(tp) ? inFlightBatches.get(tp) : new ArrayList<>(); + } + + public void maybeRemoveFromInflightBatches(ProducerBatch batch) { + List batches = inFlightBatches.get(batch.topicPartition); + if (batches != null) { + batches.remove(batch); + if (batches.isEmpty()) { + inFlightBatches.remove(batch.topicPartition); + } + } + } + + /** + * Get the in-flight batches that has reached delivery timeout. + */ + private List getExpiredInflightBatches(long now) { + List expiredBatches = new ArrayList<>(); + for (Map.Entry> entry : inFlightBatches.entrySet()) { + TopicPartition topicPartition = entry.getKey(); + List partitionInFlightBatches = entry.getValue(); + if (partitionInFlightBatches != null) { + Iterator iter = partitionInFlightBatches.iterator(); + while (iter.hasNext()) { + ProducerBatch batch = iter.next(); + if (batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(), now)) { + iter.remove(); + // expireBatches is called in Sender.sendProducerData, before client.poll. + // The batch.finalState() == null invariant should always hold. An IllegalStateException + // exception will be thrown if the invariant is violated. + if (batch.finalState() == null) { + expiredBatches.add(batch); + } else { + throw new IllegalStateException(batch.topicPartition + " batch created at " + + batch.createdMs + " gets unexpected final state " + batch.finalState()); + } + } else { + accumulator.maybeUpdateNextBatchExpiryTime(batch); + break; + } + } + if (partitionInFlightBatches.isEmpty()) + inFlightBatches.remove(topicPartition); + } + } + return expiredBatches; + } + + private void addToInflightBatches(List batches) { + for (ProducerBatch batch : batches) { + List inflightBatchList = inFlightBatches.get(batch.topicPartition); + if (inflightBatchList == null) { + inflightBatchList = new ArrayList<>(); + inFlightBatches.put(batch.topicPartition, inflightBatchList); + } + inflightBatchList.add(batch); + } + } + + public void addToInflightBatches(Map> batches) { + for (List batchList : batches.values()) { + addToInflightBatches(batchList); + } } /** @@ -204,12 +276,12 @@ public class Sender implements Runnable { if (transactionManager.shouldResetProducerStateAfterResolvingSequences()) // Check if the previous run expired batches which requires a reset of the producer state. transactionManager.resetProducerId(); - if (!transactionManager.isTransactional()) { // this is an idempotent producer, so make sure we have a producer id maybeWaitForProducerId(); } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) { - transactionManager.transitionToFatalError(new KafkaException("The client hasn't received acknowledgment for " + + transactionManager.transitionToFatalError( + new KafkaException("The client hasn't received acknowledgment for " + "some previously sent messages and can no longer retry them. It isn't safe to continue.")); } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) { // as long as there are outstanding transactional requests, we simply wait for them to return @@ -241,7 +313,6 @@ public class Sender implements Runnable { private long sendProducerData(long now) { Cluster cluster = metadata.fetch(); - // get the list of partitions with data ready to send RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); @@ -253,8 +324,8 @@ public class Sender implements Runnable { for (String topic : result.unknownLeaderTopics) this.metadata.add(topic); - log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", result.unknownLeaderTopics); - + log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", + result.unknownLeaderTopics); this.metadata.requestUpdate(); } @@ -270,8 +341,8 @@ public class Sender implements Runnable { } // create produce requests - Map> batches = this.accumulator.drain(cluster, result.readyNodes, - this.maxRequestSize, now); + Map> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); + addToInflightBatches(batches); if (guaranteeMessageOrder) { // Mute all the partitions drained for (List batchList : batches.values()) { @@ -280,27 +351,34 @@ public class Sender implements Runnable { } } - List expiredBatches = this.accumulator.expiredBatches(this.requestTimeoutMs, now); + List expiredInflightBatches = getExpiredInflightBatches(now); + List expiredBatches = this.accumulator.expiredBatches(now); + expiredBatches.addAll(expiredInflightBatches); + // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why // we need to reset the producer id here. if (!expiredBatches.isEmpty()) log.trace("Expired {} batches in accumulator", expiredBatches.size()); for (ProducerBatch expiredBatch : expiredBatches) { - failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException(), false); + String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition + + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation"; + failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false); if (transactionManager != null && expiredBatch.inRetry()) { // This ensures that no new batches are drained until the current in flight batches are fully resolved. transactionManager.markSequenceUnresolved(expiredBatch.topicPartition); } } - sensors.updateProduceRequestMetrics(batches); // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately - // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data - // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes - // with sendable data that aren't ready to send since they would cause busy looping. + // loop and try sending more data. Otherwise, the timeout will be the smaller value between next batch expiry + // time, and the delay time for checking data availability. Note that the nodes may have data that isn't yet + // sendable due to lingering, backing off, etc. This specifically does not include nodes with sendable data + // that aren't ready to send since they would cause busy looping. long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); + pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now); + pollTimeout = Math.max(pollTimeout, 0); if (!result.readyNodes.isEmpty()) { log.trace("Nodes with data ready to send: {}", result.readyNodes); // if some partitions are already ready to be sent, the select time would be 0; @@ -310,7 +388,6 @@ public class Sender implements Runnable { pollTimeout = 0; } sendProduceRequests(batches, now); - return pollTimeout; } @@ -318,7 +395,6 @@ public class Sender implements Runnable { if (transactionManager.isCompleting() && accumulator.hasIncomplete()) { if (transactionManager.isAborting()) accumulator.abortUndrainedBatches(new KafkaException("Failing batch since transaction was aborted")); - // There may still be requests left which are being retried. Since we do not know whether they had // been successfully appended to the broker log, we must resend them until their final status is clear. // If they had been appended and we did not receive the error, then our sequence number would no longer @@ -341,7 +417,6 @@ public class Sender implements Runnable { transactionManager.lookupCoordinator(nextRequestHandler); break; } - if (!NetworkClientUtils.awaitReady(client, targetNode, time, requestTimeoutMs)) { transactionManager.lookupCoordinator(nextRequestHandler); break; @@ -353,12 +428,10 @@ public class Sender implements Runnable { if (targetNode != null) { if (nextRequestHandler.isRetry()) time.sleep(nextRequestHandler.retryBackoffMs()); - - ClientRequest clientRequest = client.newClientRequest(targetNode.idString(), - requestBuilder, now, true, requestTimeoutMs, nextRequestHandler); + ClientRequest clientRequest = client.newClientRequest( + targetNode.idString(), requestBuilder, now, true, requestTimeoutMs, nextRequestHandler); transactionManager.setInFlightTransactionalRequestCorrelationId(clientRequest.correlationId()); log.debug("Sending transactional request {} to node {}", requestBuilder, targetNode); - client.send(clientRequest, now); return true; } @@ -371,11 +444,9 @@ public class Sender implements Runnable { break; } } - time.sleep(retryBackoffMs); metadata.requestUpdate(); } - transactionManager.retry(nextRequestHandler); return true; } @@ -442,8 +513,7 @@ public class Sender implements Runnable { break; } } else { - log.debug("Could not find an available broker to send InitProducerIdRequest to. " + - "We will back off and try again."); + log.debug("Could not find an available broker to send InitProducerIdRequest to. Will back off and retry."); } } catch (UnsupportedVersionException e) { transactionManager.transitionToFatalError(e); @@ -466,7 +536,7 @@ public class Sender implements Runnable { int correlationId = requestHeader.correlationId(); if (response.wasDisconnected()) { log.trace("Cancelled request with header {} due to node {} being disconnected", - requestHeader, response.destination()); + requestHeader, response.destination()); for (ProducerBatch batch : batches.values()) completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, now, 0L); } else if (response.versionMismatch() != null) { @@ -511,23 +581,25 @@ public class Sender implements Runnable { (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) { // If the batch is too large, we split the batch and send the split batches again. We do not decrement // the retry attempts in this case. - log.warn("Got error produce response in correlation id {} on topic-partition {}, splitting and retrying ({} attempts left). Error: {}", - correlationId, - batch.topicPartition, - this.retries - batch.attempts(), - error); + log.warn( + "Got error produce response in correlation id {} on topic-partition {}, splitting and retrying ({} attempts left). Error: {}", + correlationId, + batch.topicPartition, + this.retries - batch.attempts(), + error); if (transactionManager != null) transactionManager.removeInFlightBatch(batch); this.accumulator.splitAndReenqueue(batch); this.accumulator.deallocate(batch); this.sensors.recordBatchSplit(); } else if (error != Errors.NONE) { - if (canRetry(batch, response)) { - log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}", - correlationId, - batch.topicPartition, - this.retries - batch.attempts() - 1, - error); + if (canRetry(batch, response, now)) { + log.warn( + "Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}", + correlationId, + batch.topicPartition, + this.retries - batch.attempts() - 1, + error); if (transactionManager == null) { reenqueueBatch(batch, now); } else if (transactionManager.hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch())) { @@ -564,14 +636,14 @@ public class Sender implements Runnable { if (error.exception() instanceof InvalidMetadataException) { if (error.exception() instanceof UnknownTopicOrPartitionException) { log.warn("Received unknown topic or partition error in produce request on partition {}. The " + - "topic/partition may not exist or the user may not have Describe access to it", batch.topicPartition); + "topic-partition may not exist or the user may not have Describe access to it", + batch.topicPartition); } else { log.warn("Received invalid metadata error in produce request on partition {} due to {}. Going " + "to request metadata update now", batch.topicPartition, error.exception().toString()); } metadata.requestUpdate(); } - } else { completeBatch(batch, response); } @@ -583,35 +655,43 @@ public class Sender implements Runnable { private void reenqueueBatch(ProducerBatch batch, long currentTimeMs) { this.accumulator.reenqueue(batch, currentTimeMs); + maybeRemoveFromInflightBatches(batch); this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount); } private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response) { if (transactionManager != null) { if (transactionManager.hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch())) { - transactionManager.maybeUpdateLastAckedSequence(batch.topicPartition, batch.baseSequence() + batch.recordCount - 1); - log.debug("ProducerId: {}; Set last ack'd sequence number for topic-partition {} to {}", batch.producerId(), batch.topicPartition, - transactionManager.lastAckedSequence(batch.topicPartition)); + transactionManager + .maybeUpdateLastAckedSequence(batch.topicPartition, batch.baseSequence() + batch.recordCount - 1); + log.debug("ProducerId: {}; Set last ack'd sequence number for topic-partition {} to {}", + batch.producerId(), + batch.topicPartition, + transactionManager.lastAckedSequence(batch.topicPartition)); } transactionManager.updateLastAckedOffset(response, batch); transactionManager.removeInFlightBatch(batch); } - if (batch.done(response.baseOffset, response.logAppendTime, null)) + if (batch.done(response.baseOffset, response.logAppendTime, null)) { + maybeRemoveFromInflightBatches(batch); this.accumulator.deallocate(batch); + } } - private void failBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, RuntimeException exception, boolean adjustSequenceNumbers) { + private void failBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, RuntimeException exception, + boolean adjustSequenceNumbers) { failBatch(batch, response.baseOffset, response.logAppendTime, exception, adjustSequenceNumbers); } - private void failBatch(ProducerBatch batch, long baseOffset, long logAppendTime, RuntimeException exception, boolean adjustSequenceNumbers) { + private void failBatch(ProducerBatch batch, long baseOffset, long logAppendTime, RuntimeException exception, + boolean adjustSequenceNumbers) { if (transactionManager != null) { if (exception instanceof OutOfOrderSequenceException && !transactionManager.isTransactional() && transactionManager.hasProducerId(batch.producerId())) { log.error("The broker returned {} for topic-partition " + - "{} at offset {}. This indicates data loss on the broker, and should be investigated.", + "{} at offset {}. This indicates data loss on the broker, and should be investigated.", exception, batch.topicPartition, baseOffset); // Reset the transaction state since we have hit an irrecoverable exception and cannot make any guarantees @@ -633,19 +713,23 @@ public class Sender implements Runnable { this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount); - if (batch.done(baseOffset, logAppendTime, exception)) + if (batch.done(baseOffset, logAppendTime, exception)) { + maybeRemoveFromInflightBatches(batch); this.accumulator.deallocate(batch); + } } /** * We can retry a send if the error is transient and the number of attempts taken is fewer than the maximum allowed. - * We can also retry OutOfOrderSequence exceptions for future batches, since if the first batch has failed, the future - * batches are certain to fail with an OutOfOrderSequence exception. + * We can also retry OutOfOrderSequence exceptions for future batches, since if the first batch has failed, the + * future batches are certain to fail with an OutOfOrderSequence exception. */ - private boolean canRetry(ProducerBatch batch, ProduceResponse.PartitionResponse response) { - return batch.attempts() < this.retries && - ((response.error.exception() instanceof RetriableException) || - (transactionManager != null && transactionManager.canRetry(response, batch))); + private boolean canRetry(ProducerBatch batch, ProduceResponse.PartitionResponse response, long now) { + return !batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(), now) && + batch.attempts() < this.retries && + batch.finalState() == null && + ((response.error.exception() instanceof RetriableException) || + (transactionManager != null && transactionManager.canRetry(response, batch))); } /** diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index 1713d782078..4f8420b3ace 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -65,7 +65,7 @@ public class AbstractConfig { this.values.put(update.getKey(), update.getValue()); } definition.parse(this.values); - this.used = Collections.synchronizedSet(new HashSet()); + this.used = Collections.synchronizedSet(new HashSet<>()); this.definition = definition; if (doLog) logAll(); diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index c9efb82e061..12e467cee41 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -57,7 +57,7 @@ import java.util.Set; * Map<String, String> props = new HashMap<>(); * props.put("config_with_default", "some value"); * props.put("config_with_dependents", "some other value"); - * + * * Map<String, Object> configs = defs.parse(props); * // will return "some value" * String someConfig = (String) configs.get("config_with_default"); @@ -595,10 +595,8 @@ public class ConfigDef { if (!configKeys.containsKey(name)) { return; } - ConfigKey key = configKeys.get(name); ConfigValue value = configs.get(name); - if (key.recommender != null) { try { List recommendedValues = key.recommender.validValues(name, parsed); @@ -845,6 +843,11 @@ public class ConfigDef { private final Number min; private final Number max; + /** + * A numeric range with inclusive upper bound and inclusive lower bound + * @param min the lower bound + * @param max the upper bound + */ private Range(Number min, Number max) { this.min = min; this.max = max; @@ -860,7 +863,7 @@ public class ConfigDef { } /** - * A numeric range that checks both the upper and lower bound + * A numeric range that checks both the upper (inclusive) and lower bound */ public static Range between(Number min, Number max) { return new Range(min, max); diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index 6b41a9e8779..a586af82245 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -280,7 +280,6 @@ public class MockClient implements KafkaClient { checkTimeoutOfPendingRequests(now); List copy = new ArrayList<>(this.responses); - if (metadata != null && metadata.updateRequested()) { MetadataUpdate metadataUpdate = metadataUpdates.poll(); if (cluster != null) @@ -351,7 +350,9 @@ public class MockClient implements KafkaClient { public void respond(AbstractResponse response, boolean disconnected) { - ClientRequest request = requests.remove(); + ClientRequest request = null; + if (requests.size() > 0) + request = requests.remove(); short version = request.requestBuilder().latestAllowedVersion(); responses.add(new ClientResponse(request.makeHeader(version), request.callback(), request.destination(), request.createdTimeMs(), time.milliseconds(), disconnected, null, null, response)); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index c83fe06ebd0..634a1ab77cf 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1679,7 +1679,7 @@ public class KafkaConsumerTest { } private ListOffsetResponse listOffsetsResponse(Map offsets) { - return listOffsetsResponse(offsets, Collections.emptyMap()); + return listOffsetsResponse(offsets, Collections.emptyMap()); } private ListOffsetResponse listOffsetsResponse(Map partitionOffsets, @@ -1818,7 +1818,7 @@ public class KafkaConsumerTest { requestTimeoutMs, IsolationLevel.READ_UNCOMMITTED); - return new KafkaConsumer( + return new KafkaConsumer<>( loggerFactory, clientId, consumerCoordinator, diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java index 2f89d7949e7..6a85449a659 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java @@ -226,40 +226,30 @@ public class ProducerBatchTest { } /** - * A {@link ProducerBatch} configured using a very large linger value and a timestamp preceding its create - * time is interpreted correctly as not expired when the linger time is larger than the difference - * between now and create time by {@link ProducerBatch#maybeExpire(int, long, long, long, boolean)}. + * A {@link ProducerBatch} configured using a timestamp preceding its create time is interpreted correctly + * as not expired by {@link ProducerBatch#hasReachedDeliveryTimeout(long, long)}. */ @Test - public void testLargeLingerOldNowExpire() { + public void testBatchExpiration() { + long deliveryTimeoutMs = 10240; ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now); // Set `now` to 2ms before the create time. - assertFalse(batch.maybeExpire(10240, 100L, now - 2L, Long.MAX_VALUE, false)); + assertFalse(batch.hasReachedDeliveryTimeout(deliveryTimeoutMs, now - 2)); + // Set `now` to deliveryTimeoutMs. + assertTrue(batch.hasReachedDeliveryTimeout(deliveryTimeoutMs, now + deliveryTimeoutMs)); } /** - * A {@link ProducerBatch} configured using a very large retryBackoff value with retry = true and a timestamp - * preceding its create time is interpreted correctly as not expired when the retryBackoff time is larger than the - * difference between now and create time by {@link ProducerBatch#maybeExpire(int, long, long, long, boolean)}. + * A {@link ProducerBatch} configured using a timestamp preceding its create time is interpreted correctly + * * as not expired by {@link ProducerBatch#hasReachedDeliveryTimeout(long, long)}. */ @Test - public void testLargeRetryBackoffOldNowExpire() { + public void testBatchExpirationAfterReenqueue() { ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now); // Set batch.retry = true batch.reenqueued(now); // Set `now` to 2ms before the create time. - assertFalse(batch.maybeExpire(10240, Long.MAX_VALUE, now - 2L, 10240L, false)); - } - - /** - * A {@link ProducerBatch#maybeExpire(int, long, long, long, boolean)} call with a now value before the create - * time of the ProducerBatch is correctly recognized as not expired when invoked with parameter isFull = true. - */ - @Test - public void testLargeFullOldNowExpire() { - ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now); - // Set `now` to 2ms before the create time. - assertFalse(batch.maybeExpire(10240, 10240L, now - 2L, 10240L, true)); + assertFalse(batch.hasReachedDeliveryTimeout(10240, now - 2L)); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 5f4841032d2..13b0d1ba1c5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -83,10 +83,9 @@ public class RecordAccumulatorTest { private MockTime time = new MockTime(); private byte[] key = "key".getBytes(); private byte[] value = "value".getBytes(); - private int msgSize = DefaultRecord.sizeInBytes(0, 0, key.length, value.length, - Record.EMPTY_HEADERS); + private int msgSize = DefaultRecord.sizeInBytes(0, 0, key.length, value.length, Record.EMPTY_HEADERS); private Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3), - Collections.emptySet(), Collections.emptySet()); + Collections.emptySet(), Collections.emptySet()); private Metrics metrics = new Metrics(time); private final long maxBlockTimeMs = 1000; private final LogContext logContext = new LogContext(); @@ -255,7 +254,7 @@ public class RecordAccumulatorTest { final int msgs = 10000; final int numParts = 2; final RecordAccumulator accum = createTestRecordAccumulator( - 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, 0L); + 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, 0L); List threads = new ArrayList<>(); for (int i = 0; i < numThreads; i++) { threads.add(new Thread() { @@ -299,8 +298,8 @@ public class RecordAccumulatorTest { // test case assumes that the records do not fill the batch completely int batchSize = 1025; - RecordAccumulator accum = createTestRecordAccumulator( - batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, CompressionType.NONE, lingerMs); + RecordAccumulator accum = createTestRecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, + 10 * batchSize, CompressionType.NONE, lingerMs); // Just short of going over the limit so we trigger linger time int appends = expectedNumAppends(batchSize); @@ -332,10 +331,17 @@ public class RecordAccumulatorTest { @Test public void testRetryBackoff() throws Exception { - long lingerMs = Long.MAX_VALUE / 4; - long retryBackoffMs = Long.MAX_VALUE / 2; - final RecordAccumulator accum = new RecordAccumulator(logContext, 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, - CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions(), null); + long lingerMs = Integer.MAX_VALUE / 16; + long retryBackoffMs = Integer.MAX_VALUE / 8; + int requestTimeoutMs = Integer.MAX_VALUE / 4; + long deliveryTimeoutMs = Integer.MAX_VALUE; + long totalSize = 10 * 1024; + int batchSize = 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD; + String metricGrpName = "producer-metrics"; + + final RecordAccumulator accum = new RecordAccumulator(logContext, batchSize, + CompressionType.NONE, lingerMs, retryBackoffMs, deliveryTimeoutMs, metrics, metricGrpName, time, new ApiVersions(), null, + new BufferPool(totalSize, batchSize, metrics, time, metricGrpName)); long now = time.milliseconds(); accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs); @@ -371,7 +377,7 @@ public class RecordAccumulatorTest { @Test public void testFlush() throws Exception { - long lingerMs = Long.MAX_VALUE; + long lingerMs = Integer.MAX_VALUE; final RecordAccumulator accum = createTestRecordAccumulator( 4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, lingerMs); @@ -413,7 +419,7 @@ public class RecordAccumulatorTest { @Test public void testAwaitFlushComplete() throws Exception { RecordAccumulator accum = createTestRecordAccumulator( - 4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, Long.MAX_VALUE); + 4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, Long.MAX_VALUE); accum.append(new TopicPartition(topic, 0), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs); accum.beginFlush(); @@ -429,12 +435,12 @@ public class RecordAccumulatorTest { @Test public void testAbortIncompleteBatches() throws Exception { - long lingerMs = Long.MAX_VALUE; + int lingerMs = Integer.MAX_VALUE; int numRecords = 100; final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0); final RecordAccumulator accum = createTestRecordAccumulator( - 128 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, lingerMs); + 128 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, CompressionType.NONE, lingerMs); class TestCallback implements Callback { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { @@ -468,7 +474,7 @@ public class RecordAccumulatorTest { @Test public void testAbortUnsentBatches() throws Exception { - long lingerMs = Long.MAX_VALUE; + int lingerMs = Integer.MAX_VALUE; int numRecords = 100; final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0); @@ -509,17 +515,65 @@ public class RecordAccumulatorTest { assertTrue(accum.hasIncomplete()); } + private void doExpireBatchSingle(long deliveryTimeoutMs) throws InterruptedException { + long lingerMs = 300L; + List muteStates = Arrays.asList(false, true); + Set readyNodes = null; + List expiredBatches = new ArrayList<>(); + // test case assumes that the records do not fill the batch completely + int batchSize = 1025; + RecordAccumulator accum = createTestRecordAccumulator(deliveryTimeoutMs, + batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, CompressionType.NONE, lingerMs); + + // Make the batches ready due to linger. These batches are not in retry + for (Boolean mute: muteStates) { + if (time.milliseconds() < System.currentTimeMillis()) + time.setCurrentTimeMs(System.currentTimeMillis()); + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs); + assertEquals("No partition should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); + + time.sleep(lingerMs); + readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes; + assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes); + + expiredBatches = accum.expiredBatches(time.milliseconds()); + assertEquals("The batch should not expire when just linger has passed", 0, expiredBatches.size()); + + if (mute) + accum.mutePartition(tp1); + else + accum.unmutePartition(tp1, 0L); + + // Advance the clock to expire the batch. + time.sleep(deliveryTimeoutMs - lingerMs); + expiredBatches = accum.expiredBatches(time.milliseconds()); + assertEquals("The batch may expire when the partition is muted", 1, expiredBatches.size()); + assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); + } + } + + @Test + public void testExpiredBatchSingle() throws InterruptedException { + doExpireBatchSingle(3200L); + } + + @Test + public void testExpiredBatchSingleMaxValue() throws InterruptedException { + doExpireBatchSingle(Long.MAX_VALUE); + } + @Test public void testExpiredBatches() throws InterruptedException { long retryBackoffMs = 100L; - long lingerMs = 3000L; + long lingerMs = 30L; int requestTimeout = 60; + long deliveryTimeoutMs = 3200L; // test case assumes that the records do not fill the batch completely int batchSize = 1025; RecordAccumulator accum = createTestRecordAccumulator( - batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, CompressionType.NONE, lingerMs); + deliveryTimeoutMs, batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, CompressionType.NONE, lingerMs); int appends = expectedNumAppends(batchSize); // Test batches not in retry @@ -532,14 +586,14 @@ public class RecordAccumulatorTest { Set readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes; assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes); // Advance the clock to expire the batch. - time.sleep(requestTimeout + 1); + time.sleep(deliveryTimeoutMs + 1); accum.mutePartition(tp1); - List expiredBatches = accum.expiredBatches(requestTimeout, time.milliseconds()); - assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size()); + List expiredBatches = accum.expiredBatches(time.milliseconds()); + assertEquals("The batches will be muted no matter if the partition is muted or not", 2, expiredBatches.size()); accum.unmutePartition(tp1, 0L); - expiredBatches = accum.expiredBatches(requestTimeout, time.milliseconds()); - assertEquals("The batch should be expired", 1, expiredBatches.size()); + expiredBatches = accum.expiredBatches(time.milliseconds()); + assertEquals("All batches should have been expired earlier", 0, expiredBatches.size()); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); // Advance the clock to make the next batch ready due to linger.ms @@ -548,12 +602,12 @@ public class RecordAccumulatorTest { time.sleep(requestTimeout + 1); accum.mutePartition(tp1); - expiredBatches = accum.expiredBatches(requestTimeout, time.milliseconds()); + expiredBatches = accum.expiredBatches(time.milliseconds()); assertEquals("The batch should not be expired when metadata is still available and partition is muted", 0, expiredBatches.size()); accum.unmutePartition(tp1, 0L); - expiredBatches = accum.expiredBatches(requestTimeout, time.milliseconds()); - assertEquals("The batch should be expired when the partition is not muted", 1, expiredBatches.size()); + expiredBatches = accum.expiredBatches(time.milliseconds()); + assertEquals("All batches should have been expired", 0, expiredBatches.size()); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); // Test batches in retry. @@ -569,17 +623,17 @@ public class RecordAccumulatorTest { // test expiration. time.sleep(requestTimeout + retryBackoffMs); - expiredBatches = accum.expiredBatches(requestTimeout, time.milliseconds()); + expiredBatches = accum.expiredBatches(time.milliseconds()); assertEquals("The batch should not be expired.", 0, expiredBatches.size()); time.sleep(1L); accum.mutePartition(tp1); - expiredBatches = accum.expiredBatches(requestTimeout, time.milliseconds()); + expiredBatches = accum.expiredBatches(time.milliseconds()); assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size()); accum.unmutePartition(tp1, 0L); - expiredBatches = accum.expiredBatches(requestTimeout, time.milliseconds()); - assertEquals("The batch should be expired when the partition is not muted.", 1, expiredBatches.size()); + expiredBatches = accum.expiredBatches(time.milliseconds()); + assertEquals("All batches should have been expired.", 0, expiredBatches.size()); // Test that when being throttled muted batches are expired before the throttle time is over. accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0); @@ -589,20 +643,20 @@ public class RecordAccumulatorTest { // Advance the clock to expire the batch. time.sleep(requestTimeout + 1); accum.mutePartition(tp1); - expiredBatches = accum.expiredBatches(requestTimeout, time.milliseconds()); + expiredBatches = accum.expiredBatches(time.milliseconds()); assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size()); long throttleTimeMs = 100L; accum.unmutePartition(tp1, time.milliseconds() + throttleTimeMs); // The batch shouldn't be expired yet. - expiredBatches = accum.expiredBatches(requestTimeout, time.milliseconds()); + expiredBatches = accum.expiredBatches(time.milliseconds()); assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size()); // Once the throttle time is over, the batch can be expired. time.sleep(throttleTimeMs); - expiredBatches = accum.expiredBatches(requestTimeout, time.milliseconds()); - assertEquals("The batch should be expired", 1, expiredBatches.size()); - assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); + expiredBatches = accum.expiredBatches(time.milliseconds()); + assertEquals("All batches should have been expired earlier", 0, expiredBatches.size()); + assertEquals("No partitions should be ready.", 1, accum.ready(cluster, time.milliseconds()).readyNodes.size()); } @Test @@ -646,10 +700,18 @@ public class RecordAccumulatorTest { // Simulate talking to an older broker, ie. one which supports a lower magic. ApiVersions apiVersions = new ApiVersions(); int batchSize = 1025; + int requestTimeoutMs = 1600; + long deliveryTimeoutMs = 3200L; + long lingerMs = 10L; + long retryBackoffMs = 100L; + long totalSize = 10 * batchSize; + String metricGrpName = "producer-metrics"; + apiVersions.update("foobar", NodeApiVersions.create(Arrays.asList(new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 2)))); - RecordAccumulator accum = new RecordAccumulator(logContext, batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, - CompressionType.NONE, 10, 100L, metrics, time, apiVersions, new TransactionManager()); + RecordAccumulator accum = new RecordAccumulator(logContext, batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, + CompressionType.NONE, lingerMs, retryBackoffMs, deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, new TransactionManager(), + new BufferPool(totalSize, batchSize, metrics, time, metricGrpName)); accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0); } @@ -727,9 +789,9 @@ public class RecordAccumulatorTest { assertFalse(drained.get(node1.id()).isEmpty()); } assertTrue("All the batches should have been drained.", - accum.ready(cluster, time.milliseconds()).readyNodes.isEmpty()); + accum.ready(cluster, time.milliseconds()).readyNodes.isEmpty()); assertEquals("The split batches should be allocated off the accumulator", - bufferCapacity, accum.bufferPoolAvailableMemory()); + bufferCapacity, accum.bufferPoolAvailableMemory()); } @Test @@ -760,8 +822,78 @@ public class RecordAccumulatorTest { numSplit += result.numSplit; numBatches += result.numBatches; assertTrue(String.format("Total num batches = %d, split batches = %d, more than 10%% of the batch splits. " - + "Random seed is " + seed, - numBatches, numSplit), (double) numSplit / numBatches < 0.1f); + + "Random seed is " + seed, + numBatches, numSplit), (double) numSplit / numBatches < 0.1f); + } + } + + @Test + public void testSoonToExpireBatchesArePickedUpForExpiry() throws InterruptedException { + long lingerMs = 500L; + int batchSize = 1025; + + RecordAccumulator accum = createTestRecordAccumulator( + batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, CompressionType.NONE, lingerMs); + + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs); + Set readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes; + Map> drained = accum.drain(cluster, readyNodes, Integer.MAX_VALUE, time.milliseconds()); + assertTrue(drained.isEmpty()); + //assertTrue(accum.soonToExpireInFlightBatches().isEmpty()); + + // advanced clock and send one batch out but it should not be included in soon to expire inflight + // batches because batch's expiry is quite far. + time.sleep(lingerMs + 1); + readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes; + drained = accum.drain(cluster, readyNodes, Integer.MAX_VALUE, time.milliseconds()); + assertEquals("A batch did not drain after linger", 1, drained.size()); + //assertTrue(accum.soonToExpireInFlightBatches().isEmpty()); + + // Queue another batch and advance clock such that batch expiry time is earlier than request timeout. + accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs); + time.sleep(lingerMs * 4); + + // Now drain and check that accumulator picked up the drained batch because its expiry is soon. + readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes; + drained = accum.drain(cluster, readyNodes, Integer.MAX_VALUE, time.milliseconds()); + assertEquals("A batch did not drain after linger", 1, drained.size()); + } + + @Test + public void testExpiredBatchesRetry() throws InterruptedException { + int lingerMs = 3000; + int rtt = 1000; + int deliveryTimeoutMs = 3200; + Set readyNodes; + List expiredBatches; + List muteStates = Arrays.asList(false, true); + + // test case assumes that the records do not fill the batch completely + int batchSize = 1025; + RecordAccumulator accum = createTestRecordAccumulator( + batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, CompressionType.NONE, lingerMs); + + // Test batches in retry. + for (Boolean mute: muteStates) { + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0); + time.sleep(lingerMs); + readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes; + assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes); + Map> drained = accum.drain(cluster, readyNodes, Integer.MAX_VALUE, time.milliseconds()); + assertEquals("There should be only one batch.", 1, drained.get(node1.id()).size()); + time.sleep(rtt); + accum.reenqueue(drained.get(node1.id()).get(0), time.milliseconds()); + + if (mute) + accum.mutePartition(tp1); + else + accum.unmutePartition(tp1, 0L); + + // test expiration + time.sleep(deliveryTimeoutMs - rtt); + accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, time.milliseconds()); + expiredBatches = accum.expiredBatches(time.milliseconds()); + assertEquals("RecordAccumulator has expired batches if the partition is not muted", mute ? 1 : 0, expiredBatches.size()); } } @@ -852,7 +984,7 @@ public class RecordAccumulatorTest { int offsetDelta = 0; while (true) { int recordSize = DefaultRecord.sizeInBytes(offsetDelta, 0, key.length, value.length, - Record.EMPTY_HEADERS); + Record.EMPTY_HEADERS); if (size + recordSize > batchSize) return offsetDelta; offsetDelta += 1; @@ -860,20 +992,32 @@ public class RecordAccumulatorTest { } } + + private RecordAccumulator createTestRecordAccumulator(int batchSize, long totalSize, CompressionType type, long lingerMs) { + long deliveryTimeoutMs = 3200L; + return createTestRecordAccumulator(deliveryTimeoutMs, batchSize, totalSize, type, lingerMs); + } + /** * Return a test RecordAccumulator instance */ - private RecordAccumulator createTestRecordAccumulator(int batchSize, long totalSize, CompressionType type, long lingerMs) { + private RecordAccumulator createTestRecordAccumulator(long deliveryTimeoutMs, int batchSize, long totalSize, CompressionType type, long lingerMs) { + long retryBackoffMs = 100L; + int requestTimeoutMs = 1600; + String metricGrpName = "producer-metrics"; + return new RecordAccumulator( - logContext, - batchSize, - totalSize, - type, - lingerMs, - 100L, - metrics, - time, - new ApiVersions(), - null); + logContext, + batchSize, + type, + lingerMs, + retryBackoffMs, + deliveryTimeoutMs, + metrics, + metricGrpName, + time, + new ApiVersions(), + null, + new BufferPool(totalSize, batchSize, metrics, time, metricGrpName)); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index d87c8f9e894..2fbe3df2067 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -16,6 +16,21 @@ */ package org.apache.kafka.clients.producer.internals; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientRequest; import org.apache.kafka.clients.Metadata; @@ -62,6 +77,7 @@ import org.apache.kafka.common.requests.ProduceResponse; import org.apache.kafka.common.requests.ResponseHeader; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.test.DelayedReceive; import org.apache.kafka.test.MockSelector; import org.apache.kafka.test.TestUtils; @@ -69,25 +85,10 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.Deque; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class SenderTest { @@ -131,10 +132,12 @@ public class SenderTest { sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount()); + assertEquals(1, sender.inFlightBatches(tp0).size()); assertTrue(client.hasInFlightRequests()); client.respond(produceResponse(tp0, offset, Errors.NONE, 0)); sender.run(time.milliseconds()); assertEquals("All requests completed.", 0, client.inFlightRequestCount()); + assertEquals(0, sender.inFlightBatches(tp0).size()); assertFalse(client.hasInFlightRequests()); sender.run(time.milliseconds()); assertTrue("Request should be completed", future.isDone()); @@ -328,33 +331,42 @@ public class SenderTest { Node node = new Node(Integer.parseInt(id), "localhost", 0); assertEquals(1, client.inFlightRequestCount()); assertTrue(client.hasInFlightRequests()); + assertEquals(1, sender.inFlightBatches(tp0).size()); assertTrue("Client ready status should be true", client.isReady(node, 0L)); client.disconnect(id); assertEquals(0, client.inFlightRequestCount()); assertFalse(client.hasInFlightRequests()); assertFalse("Client ready status should be false", client.isReady(node, 0L)); + // the batch is in accumulator.inFlightBatches until it expires + assertEquals(1, sender.inFlightBatches(tp0).size()); sender.run(time.milliseconds()); // receive error sender.run(time.milliseconds()); // reconnect sender.run(time.milliseconds()); // resend assertEquals(1, client.inFlightRequestCount()); assertTrue(client.hasInFlightRequests()); + assertEquals(1, sender.inFlightBatches(tp0).size()); long offset = 0; client.respond(produceResponse(tp0, offset, Errors.NONE, 0)); sender.run(time.milliseconds()); assertTrue("Request should have retried and completed", future.isDone()); assertEquals(offset, future.get().offset()); + assertEquals(0, sender.inFlightBatches(tp0).size()); // do an unsuccessful retry future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); // send produce request + assertEquals(1, sender.inFlightBatches(tp0).size()); for (int i = 0; i < maxRetries + 1; i++) { client.disconnect(client.requests().peek().destination()); sender.run(time.milliseconds()); // receive error + assertEquals(0, sender.inFlightBatches(tp0).size()); sender.run(time.milliseconds()); // reconnect sender.run(time.milliseconds()); // resend + assertEquals(i > 0 ? 0 : 1, sender.inFlightBatches(tp0).size()); } sender.run(time.milliseconds()); assertFutureFailure(future, NetworkException.class); + assertEquals(0, sender.inFlightBatches(tp0).size()); } finally { m.close(); } @@ -371,7 +383,7 @@ public class SenderTest { senderMetrics, time, REQUEST_TIMEOUT, 50, null, apiVersions); // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1 Cluster cluster1 = TestUtils.clusterWith(2, "test", 2); - metadata.update(cluster1, Collections.emptySet(), time.milliseconds()); + metadata.update(cluster1, Collections.emptySet(), time.milliseconds()); // Send the first message. TopicPartition tp2 = new TopicPartition("test", 1); @@ -384,6 +396,7 @@ public class SenderTest { assertEquals(1, client.inFlightRequestCount()); assertTrue(client.hasInFlightRequests()); assertTrue("Client ready status should be true", client.isReady(node, 0L)); + assertEquals(1, sender.inFlightBatches(tp2).size()); time.sleep(900); // Now send another message to tp2 @@ -391,11 +404,13 @@ public class SenderTest { // Update metadata before sender receives response from broker 0. Now partition 2 moves to broker 0 Cluster cluster2 = TestUtils.singletonCluster("test", 2); - metadata.update(cluster2, Collections.emptySet(), time.milliseconds()); + metadata.update(cluster2, Collections.emptySet(), time.milliseconds()); // Sender should not send the second message to node 0. - sender.run(time.milliseconds()); + assertEquals(1, sender.inFlightBatches(tp2).size()); + sender.run(time.milliseconds()); // receive the response for the previous send, and send the new batch assertEquals(1, client.inFlightRequestCount()); assertTrue(client.hasInFlightRequests()); + assertEquals(1, sender.inFlightBatches(tp2).size()); } finally { m.close(); } @@ -429,14 +444,18 @@ public class SenderTest { // Advance the clock to expire the first batch. time.sleep(10000); + + Node clusterNode = this.cluster.nodes().get(0); + Map> drainedBatches = + accumulator.drain(cluster, Collections.singleton(clusterNode), Integer.MAX_VALUE, time.milliseconds()); + sender.addToInflightBatches(drainedBatches); + // Disconnect the target node for the pending produce request. This will ensure that sender will try to // expire the batch. - Node clusterNode = this.cluster.nodes().get(0); client.disconnect(clusterNode.idString()); client.blackout(clusterNode, 100); sender.run(time.milliseconds()); // We should try to flush the batch, but we expire it instead without sending anything. - assertEquals("Callbacks not invoked for expiry", messagesPerBatch, expiryCallbackCount.get()); assertNull("Unexpected exception", unexpectedException.get()); // Make sure that the reconds were appended back to the batch. @@ -463,6 +482,7 @@ public class SenderTest { sender.run(time.milliseconds()); assertEquals("Request completed.", 0, client.inFlightRequestCount()); assertFalse(client.hasInFlightRequests()); + assertEquals(0, sender.inFlightBatches(tp0).size()); sender.run(time.milliseconds()); assertTrue("Request should be completed", future.isDone()); @@ -479,6 +499,7 @@ public class SenderTest { sender.run(time.milliseconds()); assertEquals("Request completed.", 0, client.inFlightRequestCount()); assertFalse(client.hasInFlightRequests()); + assertEquals(0, sender.inFlightBatches(tp0).size()); sender.run(time.milliseconds()); assertTrue("Request should be completed", future.isDone()); } @@ -520,6 +541,7 @@ public class SenderTest { Node node = new Node(Integer.parseInt(id), "localhost", 0); assertEquals(1, client.inFlightRequestCount()); assertTrue(client.hasInFlightRequests()); + assertEquals(1, sender.inFlightBatches(tp0).size()); assertTrue("Client ready status should be true", client.isReady(node, 0L)); assertFalse(future.isDone()); @@ -583,6 +605,7 @@ public class SenderTest { sender.run(time.milliseconds()); // receive response 1 assertEquals(1, transactionManager.lastAckedSequence(tp0)); assertFalse(client.hasInFlightRequests()); + assertEquals(0, sender.inFlightBatches(tp0).size()); assertTrue(request2.isDone()); assertEquals(1, request2.get().offset()); } @@ -654,11 +677,12 @@ public class SenderTest { assertEquals(0, transactionManager.lastAckedSequence(tp0)); assertTrue(request1.isDone()); assertEquals(0, request1.get().offset()); - - assertFalse(client.hasInFlightRequests()); + assertEquals(0, sender.inFlightBatches(tp0).size()); + sender.run(time.milliseconds()); // send request 2; assertEquals(1, client.inFlightRequestCount()); + assertEquals(1, sender.inFlightBatches(tp0).size()); sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1L); sender.run(time.milliseconds()); // receive response 2 @@ -667,17 +691,19 @@ public class SenderTest { assertEquals(1, request2.get().offset()); assertFalse(client.hasInFlightRequests()); + assertEquals(0, sender.inFlightBatches(tp0).size()); sender.run(time.milliseconds()); // send request 3 assertEquals(1, client.inFlightRequestCount()); + assertEquals(1, sender.inFlightBatches(tp0).size()); sendIdempotentProducerResponse(2, tp0, Errors.NONE, 2L); sender.run(time.milliseconds()); // receive response 3, send request 4 since we are out of 'retry' mode. assertEquals(2, transactionManager.lastAckedSequence(tp0)); assertTrue(request3.isDone()); assertEquals(2, request3.get().offset()); - assertEquals(1, client.inFlightRequestCount()); + assertEquals(1, sender.inFlightBatches(tp0).size()); sendIdempotentProducerResponse(3, tp0, Errors.NONE, 3L); sender.run(time.milliseconds()); // receive response 4 @@ -795,7 +821,6 @@ public class SenderTest { setupWithTransactionState(transactionManager); prepareAndReceiveInitProducerId(producerId, Errors.NONE); assertTrue(transactionManager.hasProducerId()); - assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); // Send first ProduceRequest @@ -965,46 +990,54 @@ public class SenderTest { public void testExpiryOfFirstBatchShouldNotCauseUnresolvedSequencesIfFutureBatchesSucceed() throws Exception { final long producerId = 343434L; TransactionManager transactionManager = new TransactionManager(); - setupWithTransactionState(transactionManager); + setupWithTransactionState(transactionManager, false, null); prepareAndReceiveInitProducerId(producerId, Errors.NONE); assertTrue(transactionManager.hasProducerId()); - assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); // Send first ProduceRequest Future request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); // send request + // We separate the two appends by 1 second so that the two batches + // don't expire at the same time. + time.sleep(1000L); Future request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); // send request - assertEquals(2, client.inFlightRequestCount()); + assertEquals(2, sender.inFlightBatches(tp0).size()); sendIdempotentProducerResponse(0, tp0, Errors.REQUEST_TIMED_OUT, -1); sender.run(time.milliseconds()); // receive first response + assertEquals(1, sender.inFlightBatches(tp0).size()); Node node = this.cluster.nodes().get(0); - time.sleep(10000L); + // We add 600 millis to expire the first batch but not the second. + // Note deliveryTimeoutMs is 1500. + time.sleep(600L); client.disconnect(node.idString()); client.blackout(node, 10); sender.run(time.milliseconds()); // now expire the first batch. assertFutureFailure(request1, TimeoutException.class); assertTrue(transactionManager.hasUnresolvedSequence(tp0)); + assertEquals(0, sender.inFlightBatches(tp0).size()); + // let's enqueue another batch, which should not be dequeued until the unresolved state is clear. Future request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; - time.sleep(20); - assertFalse(request2.isDone()); sender.run(time.milliseconds()); // send second request sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1); + assertEquals(1, sender.inFlightBatches(tp0).size()); + sender.run(time.milliseconds()); // receive second response, the third request shouldn't be sent since we are in an unresolved state. assertTrue(request2.isDone()); assertEquals(1, request2.get().offset()); - Deque batches = accumulator.batches().get(tp0); + assertEquals(0, sender.inFlightBatches(tp0).size()); + Deque batches = accumulator.batches().get(tp0); assertEquals(1, batches.size()); assertFalse(batches.peekFirst().hasSequence()); assertFalse(client.hasInFlightRequests()); @@ -1017,6 +1050,7 @@ public class SenderTest { assertEquals(0, batches.size()); assertEquals(1, client.inFlightRequestCount()); assertFalse(request3.isDone()); + assertEquals(1, sender.inFlightBatches(tp0).size()); } @Test @@ -1026,13 +1060,13 @@ public class SenderTest { setupWithTransactionState(transactionManager); prepareAndReceiveInitProducerId(producerId, Errors.NONE); assertTrue(transactionManager.hasProducerId()); - assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); // Send first ProduceRequest Future request1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); // send request + time.sleep(1000L); Future request2 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); // send request @@ -1042,7 +1076,7 @@ public class SenderTest { sender.run(time.milliseconds()); // receive first response Node node = this.cluster.nodes().get(0); - time.sleep(10000L); + time.sleep(1000L); client.disconnect(node.idString()); client.blackout(node, 10); @@ -1053,9 +1087,7 @@ public class SenderTest { Future request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; time.sleep(20); - assertFalse(request2.isDone()); - sender.run(time.milliseconds()); // send second request sendIdempotentProducerResponse(1, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 1); sender.run(time.milliseconds()); // receive second response, the third request shouldn't be sent since we are in an unresolved state. @@ -1087,12 +1119,12 @@ public class SenderTest { Future request1 = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); // send request sendIdempotentProducerResponse(0, tp0, Errors.NOT_LEADER_FOR_PARTITION, -1); - sender.run(time.milliseconds()); // receive response + sender.run(time.milliseconds()); // receive response assertEquals(1L, transactionManager.sequenceNumber(tp0).longValue()); Node node = this.cluster.nodes().get(0); - time.sleep(10000L); + time.sleep(15000L); client.disconnect(node.idString()); client.blackout(node, 10); @@ -1520,7 +1552,6 @@ public class SenderTest { RecordBatch firstBatch = batchIterator.next(); assertFalse(batchIterator.hasNext()); assertEquals(expectedSequence, firstBatch.baseSequence()); - return true; } }, produceResponse(tp, responseOffset, responseError, 0, logStartOffset)); @@ -1754,11 +1785,13 @@ public class SenderTest { sender.run(time.milliseconds()); // send. assertEquals(1, client.inFlightRequestCount()); + assertEquals(1, sender.inFlightBatches(tp0).size()); client.respond(produceResponse(tp0, 0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 0)); sender.run(time.milliseconds()); assertTrue(responseFuture.isDone()); + assertEquals(0, sender.inFlightBatches(tp0).size()); assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionManager.hasProducerId()); } @@ -1794,11 +1827,15 @@ public class SenderTest { TopicPartition tp) throws Exception { int maxRetries = 1; String topic = tp.topic(); + long deliveryTimeoutMs = 3000L; + long totalSize = 1024 * 1024; + String metricGrpName = "producer-metrics"; // Set a good compression ratio. CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 0.2f); try (Metrics m = new Metrics()) { - accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time, - new ApiVersions(), txnManager); + accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.GZIP, + 0L, 0L, deliveryTimeoutMs, m, metricGrpName, time, new ApiVersions(), txnManager, + new BufferPool(totalSize, batchSize, metrics, time, "producer-internal-metrics")); SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m); Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, senderMetrics, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions()); @@ -1865,9 +1902,153 @@ public class SenderTest { assertEquals("The last ack'd sequence number should be 1", 1, txnManager.lastAckedSequence(tp)); assertEquals("Offset of the first message should be 1", 1L, f2.get().offset()); assertTrue("There should be no batch in the accumulator", accumulator.batches().get(tp).isEmpty()); + assertTrue("There should be a split", m.metrics().get(senderMetrics.batchSplitRate).value() > 0); + } + } - assertTrue("There should be a split", - m.metrics().get(senderMetrics.batchSplitRate).value() > 0); + @Test + public void testNoDoubleDeallocation() throws Exception { + long deliverTimeoutMs = 1500L; + long totalSize = 1024 * 1024; + String metricGrpName = "producer-custom-metrics"; + MatchingBufferPool pool = new MatchingBufferPool(totalSize, batchSize, metrics, time, metricGrpName); + setupWithTransactionState(null, false, pool); + + // Send first ProduceRequest + Future request1 = + accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; + sender.run(time.milliseconds()); // send request + assertEquals(1, client.inFlightRequestCount()); + assertEquals(1, sender.inFlightBatches(tp0).size()); + + time.sleep(deliverTimeoutMs); + assertFalse(pool.allMatch()); + + sender.run(time.milliseconds()); // expire the batch + assertTrue(request1.isDone()); + assertTrue("The batch should have been de-allocated", pool.allMatch()); + assertTrue(pool.allMatch()); + + sender.run(time.milliseconds()); + assertTrue("The batch should have been de-allocated", pool.allMatch()); + assertEquals(0, client.inFlightRequestCount()); + assertEquals(0, sender.inFlightBatches(tp0).size()); + } + + @Test + public void testInflightBatchesExpireOnDeliveryTimeout() throws InterruptedException { + long deliveryTimeoutMs = 1500L; + setupWithTransactionState(null, true, null); + + // Send first ProduceRequest + Future request = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; + sender.run(time.milliseconds()); // send request + assertEquals(1, client.inFlightRequestCount()); + assertEquals("Expect one in-flight batch in accumulator", 1, sender.inFlightBatches(tp0).size()); + + Map responseMap = new HashMap<>(); + responseMap.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L)); + client.respond(new ProduceResponse(responseMap)); + + time.sleep(deliveryTimeoutMs); + sender.run(time.milliseconds()); // receive first response + assertEquals("Expect zero in-flight batch in accumulator", 0, sender.inFlightBatches(tp0).size()); + try { + request.get(); + fail("The expired batch should throw a TimeoutException"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof TimeoutException); + } + } + + @Test + public void testWhenFirstBatchExpireNoSendSecondBatchIfGuaranteeOrder() throws InterruptedException { + long deliveryTimeoutMs = 1500L; + setupWithTransactionState(null, true, null); + + // Send first ProduceRequest + accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT); + sender.run(time.milliseconds()); // send request + assertEquals(1, client.inFlightRequestCount()); + assertEquals(1, sender.inFlightBatches(tp0).size()); + + time.sleep(deliveryTimeoutMs / 2); + + // Send second ProduceRequest + accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT); + sender.run(time.milliseconds()); // must not send request because the partition is muted + assertEquals(1, client.inFlightRequestCount()); + assertEquals(1, sender.inFlightBatches(tp0).size()); + + time.sleep(deliveryTimeoutMs / 2); // expire the first batch only + + client.respond(produceResponse(tp0, 0L, Errors.NONE, 0, 0L)); + sender.run(time.milliseconds()); // receive response (offset=0) + assertEquals(0, client.inFlightRequestCount()); + assertEquals(0, sender.inFlightBatches(tp0).size()); + + sender.run(time.milliseconds()); // Drain the second request only this time + assertEquals(1, client.inFlightRequestCount()); + assertEquals(1, sender.inFlightBatches(tp0).size()); + } + + @Test + public void testExpiredBatchDoesNotRetry() throws Exception { + long deliverTimeoutMs = 1500L; + setupWithTransactionState(null, false, null); + + // Send first ProduceRequest + Future request1 = + accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, + MAX_BLOCK_TIMEOUT).future; + sender.run(time.milliseconds()); // send request + assertEquals(1, client.inFlightRequestCount()); + time.sleep(deliverTimeoutMs); + + Map responseMap = new HashMap<>(); + responseMap.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L)); + client.respond(produceResponse(tp0, -1, Errors.NOT_LEADER_FOR_PARTITION, -1)); // return a retriable error + + sender.run(time.milliseconds()); // expire the batch + assertTrue(request1.isDone()); + assertEquals(0, client.inFlightRequestCount()); + assertEquals(0, sender.inFlightBatches(tp0).size()); + + sender.run(time.milliseconds()); // receive first response and do not reenqueue. + assertEquals(0, client.inFlightRequestCount()); + assertEquals(0, sender.inFlightBatches(tp0).size()); + + sender.run(time.milliseconds()); // run again and must not send anything. + assertEquals(0, client.inFlightRequestCount()); + assertEquals(0, sender.inFlightBatches(tp0).size()); + } + + private class MatchingBufferPool extends BufferPool { + IdentityHashMap allocatedBuffers; + + MatchingBufferPool(long totalSize, int batchSize, Metrics metrics, Time time, String metricGrpName) { + super(totalSize, batchSize, metrics, time, metricGrpName); + allocatedBuffers = new IdentityHashMap<>(); + } + + @Override + public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { + ByteBuffer buffer = super.allocate(size, maxTimeToBlockMs); + allocatedBuffers.put(buffer, Boolean.TRUE); + return buffer; + } + + @Override + public void deallocate(ByteBuffer buffer, int size) { + if (!allocatedBuffers.containsKey(buffer)) { + throw new IllegalStateException("Deallocating a buffer that is not allocated"); + } + allocatedBuffers.remove(buffer); + super.deallocate(buffer, size); + } + + public boolean allMatch() { + return allocatedBuffers.isEmpty(); } } @@ -1931,17 +2112,29 @@ public class SenderTest { } private void setupWithTransactionState(TransactionManager transactionManager) { + setupWithTransactionState(transactionManager, false, null); + } + + private void setupWithTransactionState(TransactionManager transactionManager, boolean guaranteeOrder, BufferPool customPool) { + long totalSize = 1024 * 1024; + String metricGrpName = "producer-metrics"; Map metricTags = new LinkedHashMap<>(); metricTags.put("client-id", CLIENT_ID); MetricConfig metricConfig = new MetricConfig().tags(metricTags); this.metrics = new Metrics(metricConfig, time); - this.accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, - apiVersions, transactionManager); - this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics); + BufferPool pool = (customPool == null) ? new BufferPool(totalSize, batchSize, metrics, time, metricGrpName) : customPool; + setupWithTransactionState(transactionManager, guaranteeOrder, metricTags, pool); + } - this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, - Integer.MAX_VALUE, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); - this.metadata.update(this.cluster, Collections.emptySet(), time.milliseconds()); + private void setupWithTransactionState(TransactionManager transactionManager, boolean guaranteeOrder, Map metricTags, BufferPool pool) { + long deliveryTimeoutMs = 1500L; + String metricGrpName = "producer-metrics"; + this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0L, 0L, + deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager, pool); + this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics); + this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, guaranteeOrder, MAX_REQUEST_SIZE, ACKS_ALL, + Integer.MAX_VALUE, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); + this.metadata.update(this.cluster, Collections.emptySet(), time.milliseconds()); } private void assertSendFailure(Class expectedError) throws Exception { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 558ec721096..550d003406b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -118,6 +118,10 @@ public class TransactionManagerTest { Map metricTags = new LinkedHashMap<>(); metricTags.put("client-id", CLIENT_ID); int batchSize = 16 * 1024; + int requestTimeoutMs = 1500; + long deliveryTimeoutMs = 3000L; + long totalSize = 1024 * 1024; + String metricGrpName = "producer-metrics"; MetricConfig metricConfig = new MetricConfig().tags(metricTags); this.brokerNode = new Node(0, "localhost", 2211); this.transactionManager = new TransactionManager(logContext, transactionalId, transactionTimeoutMs, @@ -125,7 +129,7 @@ public class TransactionManagerTest { Metrics metrics = new Metrics(metricConfig, time); SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(metrics); - this.accumulator = new RecordAccumulator(logContext, batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions, transactionManager); + this.accumulator = new RecordAccumulator(logContext, batchSize, CompressionType.NONE, 0L, 0L, deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager, new BufferPool(totalSize, batchSize, metrics, time, metricGrpName)); this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, senderMetrics, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions); this.metadata.update(this.cluster, Collections.emptySet(), time.milliseconds()); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 7291d4f6e88..1f62103213c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -139,6 +139,7 @@ public class Worker { producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE)); producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); + producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE)); // User-specified overrides producerProps.putAll(config.originalsWithPrefix("producer.")); } diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala index dc4041f1d63..739675ead6e 100644 --- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala @@ -68,7 +68,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { super.tearDown() } - protected def createProducer(brokerList: String, retries: Int = 0, lingerMs: Long = 0, props: Option[Properties] = None): KafkaProducer[Array[Byte],Array[Byte]] = { + protected def createProducer(brokerList: String, retries: Int = 0, lingerMs: Int = 0, props: Option[Properties] = None): KafkaProducer[Array[Byte],Array[Byte]] = { val producer = TestUtils.createProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile, saslProperties = clientSaslProperties, retries = retries, lingerMs = lingerMs, props = props) registerProducer(producer) @@ -170,13 +170,13 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { def testSendCompressedMessageWithCreateTime() { val producerProps = new Properties() producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip") - val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue, props = Some(producerProps)) + val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue, props = Some(producerProps)) sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME) } @Test def testSendNonCompressedMessageWithCreateTime() { - val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue) + val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue) sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME) } @@ -409,7 +409,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { */ @Test def testFlush() { - val producer = createProducer(brokerList, lingerMs = Long.MaxValue) + val producer = createProducer(brokerList, lingerMs = Int.MaxValue) try { createTopic(topic, 2, 2) val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, @@ -438,7 +438,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { // Test closing from caller thread. for (_ <- 0 until 50) { - val producer = createProducer(brokerList, lingerMs = Long.MaxValue) + val producer = createProducer(brokerList, lingerMs = Int.MaxValue) val responses = (0 until numRecords) map (_ => producer.send(record0)) assertTrue("No request is complete.", responses.forall(!_.isDone())) producer.close(0, TimeUnit.MILLISECONDS) @@ -478,7 +478,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { } } for (i <- 0 until 50) { - val producer = createProducer(brokerList, lingerMs = Long.MaxValue) + val producer = createProducer(brokerList, lingerMs = Int.MaxValue) try { // send message to partition 0 // Only send the records in the first callback since we close the producer in the callback and no records diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index 02396dd1bb2..ba4df7d64f6 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -617,9 +617,9 @@ class PlaintextConsumerTest extends BaseConsumerTest { private def sendCompressedMessages(numRecords: Int, tp: TopicPartition) { val producerProps = new Properties() producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.GZIP.name) - producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, Long.MaxValue.toString) + producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, Int.MaxValue.toString) val producer = TestUtils.createProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile, - saslProperties = clientSaslProperties, retries = 0, lingerMs = Long.MaxValue, props = Some(producerProps)) + saslProperties = clientSaslProperties, retries = 0, lingerMs = Int.MaxValue, props = Some(producerProps)) (0 until numRecords).foreach { i => producer.send(new ProducerRecord(tp.topic, tp.partition, i.toLong, s"key $i".getBytes, s"value $i".getBytes)) } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala index 929dbe40291..1da6f9ee369 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala @@ -45,7 +45,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { def testBatchSizeZero() { val producerProps = new Properties() producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "0") - val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue, props = Some(producerProps)) + val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue, props = Some(producerProps)) sendAndVerify(producer) } @@ -53,13 +53,13 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { def testSendCompressedMessageWithLogAppendTime() { val producerProps = new Properties() producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip") - val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue, props = Some(producerProps)) + val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue, props = Some(producerProps)) sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME) } @Test def testSendNonCompressedMessageWithLogAppendTime() { - val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue) + val producer = createProducer(brokerList = brokerList, lingerMs = Int.MaxValue) sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME) } diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index 9b77c2d4169..0227690e928 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -64,11 +64,11 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { override def setUp() { super.setUp() - producer1 = TestUtils.createProducer(brokerList, acks = 0, requestTimeoutMs = 30000L, maxBlockMs = 10000L, + producer1 = TestUtils.createProducer(brokerList, acks = 0, requestTimeoutMs = 30000, maxBlockMs = 10000L, bufferSize = producerBufferSize) - producer2 = TestUtils.createProducer(brokerList, acks = 1, requestTimeoutMs = 30000L, maxBlockMs = 10000L, + producer2 = TestUtils.createProducer(brokerList, acks = 1, requestTimeoutMs = 30000, maxBlockMs = 10000L, bufferSize = producerBufferSize) - producer3 = TestUtils.createProducer(brokerList, acks = -1, requestTimeoutMs = 30000L, maxBlockMs = 10000L, + producer3 = TestUtils.createProducer(brokerList, acks = -1, requestTimeoutMs = 30000, maxBlockMs = 10000L, bufferSize = producerBufferSize) } diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 49553e8d071..61d59192154 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -1368,11 +1368,11 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet private case class ProducerBuilder() extends ClientBuilder[KafkaProducer[String, String]] { private var _retries = 0 private var _acks = -1 - private var _requestTimeoutMs = 30000L + private var _requestTimeoutMs = 30000 def maxRetries(retries: Int): ProducerBuilder = { _retries = retries; this } def acks(acks: Int): ProducerBuilder = { _acks = acks; this } - def requestTimeoutMs(timeoutMs: Long): ProducerBuilder = { _requestTimeoutMs = timeoutMs; this } + def requestTimeoutMs(timeoutMs: Int): ProducerBuilder = { _requestTimeoutMs = timeoutMs; this } override def build(): KafkaProducer[String, String] = { val producer = TestUtils.createProducer(bootstrapServers, diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala index 67f33ebdb45..57aca1e73ba 100644 --- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala @@ -204,7 +204,7 @@ class FetchRequestTest extends BaseRequestTest { val propsOverride = new Properties propsOverride.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize.toString) val producer = TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers), - retries = 5, lingerMs = Long.MaxValue, + retries = 5, lingerMs = Int.MaxValue, keySerializer = new StringSerializer, valueSerializer = new ByteArraySerializer, props = Some(propsOverride)) val bytes = new Array[Byte](msgValueLen) val futures = try { diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index cf60c780d39..aa902f2582b 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -548,8 +548,8 @@ object TestUtils extends Logging { maxBlockMs: Long = 60 * 1000L, bufferSize: Long = 1024L * 1024L, retries: Int = 0, - lingerMs: Long = 0, - requestTimeoutMs: Long = 30 * 1000L, + lingerMs: Int = 0, + requestTimeoutMs: Int = 30 * 1000, securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT, trustStoreFile: Option[File] = None, saslProperties: Option[Properties] = None, @@ -564,6 +564,11 @@ object TestUtils extends Logging { producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString) producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString) producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs.toString) + producerProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs.toString) + + // In case of overflow set maximum possible value for deliveryTimeoutMs + val deliveryTimeoutMs = if (lingerMs + requestTimeoutMs < 0) Int.MaxValue else lingerMs + requestTimeoutMs + producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeoutMs.toString) /* Only use these if not already set */ val defaultProps = Map( diff --git a/docs/upgrade.html b/docs/upgrade.html index ccfab95897f..ac1388eb440 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -30,6 +30,11 @@ offset retention period (or the one set by broker) has passed since their last commit.
  • The default for console consumer's enable.auto.commit property when no group.id is provided is now set to false. This is to avoid polluting the consumer coordinator cache as the auto-generated group is not likely to be used by other consumers.
  • +
  • The default value for the producer's retries config was changed to Integer.MAX_VALUE, as we introduced delivery.timeout.ms + in KIP-91, + which sets an upper bound on the total time between sending a record and receiving acknowledgement from the broker. By default, + the delivery timeout is set to 2 minutes. +