MINOR: Remove unused `ApiVersions` variable from Sender and RecordAccumulator (#19399)

Remove unused `ApiVersions` variable from Sender and RecordAccumulator.

Reviewers: PoAn Yang <payang@apache.org>, Ken Huang
<s7133700@gmail.com>, Parker Chang <parkerhiphop027@gmail.com>,
Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Jhen-Yung Hsu 2025-04-11 11:23:41 +08:00 committed by GitHub
parent b3ba7bc929
commit 90e7b53799
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 43 additions and 63 deletions

View File

@ -435,7 +435,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
metrics, metrics,
PRODUCER_METRIC_GROUP_NAME, PRODUCER_METRIC_GROUP_NAME,
time, time,
apiVersions,
transactionManager, transactionManager,
new BufferPool(this.totalMemorySize, batchSize, metrics, time, PRODUCER_METRIC_GROUP_NAME)); new BufferPool(this.totalMemorySize, batchSize, metrics, time, PRODUCER_METRIC_GROUP_NAME));
@ -538,8 +537,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
time, time,
requestTimeoutMs, requestTimeoutMs,
producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
this.transactionManager, this.transactionManager);
apiVersions);
} }
private static Compression configureCompression(ProducerConfig config) { private static Compression configureCompression(ProducerConfig config) {

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.kafka.clients.producer.internals; package org.apache.kafka.clients.producer.internals;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.MetadataSnapshot; import org.apache.kafka.clients.MetadataSnapshot;
import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Callback;
@ -82,7 +81,6 @@ public class RecordAccumulator {
private final boolean enableAdaptivePartitioning; private final boolean enableAdaptivePartitioning;
private final BufferPool free; private final BufferPool free;
private final Time time; private final Time time;
private final ApiVersions apiVersions;
private final ConcurrentMap<String /*topic*/, TopicInfo> topicInfoMap = new CopyOnWriteMap<>(); private final ConcurrentMap<String /*topic*/, TopicInfo> topicInfoMap = new CopyOnWriteMap<>();
private final ConcurrentMap<Integer /*nodeId*/, NodeLatencyStats> nodeStats = new CopyOnWriteMap<>(); private final ConcurrentMap<Integer /*nodeId*/, NodeLatencyStats> nodeStats = new CopyOnWriteMap<>();
private final IncompleteBatches incomplete; private final IncompleteBatches incomplete;
@ -109,7 +107,6 @@ public class RecordAccumulator {
* @param metrics The metrics * @param metrics The metrics
* @param metricGrpName The metric group name * @param metricGrpName The metric group name
* @param time The time instance to use * @param time The time instance to use
* @param apiVersions Request API versions for current connected brokers
* @param transactionManager The shared transaction state object which tracks producer IDs, epochs, and sequence * @param transactionManager The shared transaction state object which tracks producer IDs, epochs, and sequence
* numbers per partition. * numbers per partition.
* @param bufferPool The buffer pool * @param bufferPool The buffer pool
@ -125,7 +122,6 @@ public class RecordAccumulator {
Metrics metrics, Metrics metrics,
String metricGrpName, String metricGrpName,
Time time, Time time,
ApiVersions apiVersions,
TransactionManager transactionManager, TransactionManager transactionManager,
BufferPool bufferPool) { BufferPool bufferPool) {
this.logContext = logContext; this.logContext = logContext;
@ -147,7 +143,6 @@ public class RecordAccumulator {
this.incomplete = new IncompleteBatches(); this.incomplete = new IncompleteBatches();
this.muted = new HashSet<>(); this.muted = new HashSet<>();
this.time = time; this.time = time;
this.apiVersions = apiVersions;
nodesDrainIndex = new HashMap<>(); nodesDrainIndex = new HashMap<>();
this.transactionManager = transactionManager; this.transactionManager = transactionManager;
registerMetrics(metrics, metricGrpName); registerMetrics(metrics, metricGrpName);
@ -169,7 +164,6 @@ public class RecordAccumulator {
* @param metrics The metrics * @param metrics The metrics
* @param metricGrpName The metric group name * @param metricGrpName The metric group name
* @param time The time instance to use * @param time The time instance to use
* @param apiVersions Request API versions for current connected brokers
* @param transactionManager The shared transaction state object which tracks producer IDs, epochs, and sequence * @param transactionManager The shared transaction state object which tracks producer IDs, epochs, and sequence
* numbers per partition. * numbers per partition.
* @param bufferPool The buffer pool * @param bufferPool The buffer pool
@ -184,7 +178,6 @@ public class RecordAccumulator {
Metrics metrics, Metrics metrics,
String metricGrpName, String metricGrpName,
Time time, Time time,
ApiVersions apiVersions,
TransactionManager transactionManager, TransactionManager transactionManager,
BufferPool bufferPool) { BufferPool bufferPool) {
this(logContext, this(logContext,
@ -198,7 +191,6 @@ public class RecordAccumulator {
metrics, metrics,
metricGrpName, metricGrpName,
time, time,
apiVersions,
transactionManager, transactionManager,
bufferPool); bufferPool);
} }

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.kafka.clients.producer.internals; package org.apache.kafka.clients.producer.internals;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest; import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.KafkaClient;
@ -118,9 +117,6 @@ public class Sender implements Runnable {
/* The max time to wait before retrying a request which has failed */ /* The max time to wait before retrying a request which has failed */
private final long retryBackoffMs; private final long retryBackoffMs;
/* current request API versions supported by the known brokers */
private final ApiVersions apiVersions;
/* all the state related to transactions, in particular the producer id, producer epoch, and sequence numbers */ /* all the state related to transactions, in particular the producer id, producer epoch, and sequence numbers */
private final TransactionManager transactionManager; private final TransactionManager transactionManager;
@ -139,8 +135,7 @@ public class Sender implements Runnable {
Time time, Time time,
int requestTimeoutMs, int requestTimeoutMs,
long retryBackoffMs, long retryBackoffMs,
TransactionManager transactionManager, TransactionManager transactionManager) {
ApiVersions apiVersions) {
this.log = logContext.logger(Sender.class); this.log = logContext.logger(Sender.class);
this.client = client; this.client = client;
this.accumulator = accumulator; this.accumulator = accumulator;
@ -154,7 +149,6 @@ public class Sender implements Runnable {
this.sensors = new SenderMetrics(metricsRegistry, metadata, client, time); this.sensors = new SenderMetrics(metricsRegistry, metadata, client, time);
this.requestTimeoutMs = requestTimeoutMs; this.requestTimeoutMs = requestTimeoutMs;
this.retryBackoffMs = retryBackoffMs; this.retryBackoffMs = retryBackoffMs;
this.apiVersions = apiVersions;
this.transactionManager = transactionManager; this.transactionManager = transactionManager;
this.inFlightBatches = new HashMap<>(); this.inFlightBatches = new HashMap<>();
} }

View File

@ -460,7 +460,7 @@ public class RecordAccumulatorTest {
final RecordAccumulator accum = new RecordAccumulator(logContext, batchSize, final RecordAccumulator accum = new RecordAccumulator(logContext, batchSize,
Compression.NONE, lingerMs, retryBackoffMs, retryBackoffMaxMs, Compression.NONE, lingerMs, retryBackoffMs, retryBackoffMaxMs,
deliveryTimeoutMs, metrics, metricGrpName, time, new ApiVersions(), null, deliveryTimeoutMs, metrics, metricGrpName, time, null,
new BufferPool(totalSize, batchSize, metrics, time, metricGrpName)); new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
long now = time.milliseconds(); long now = time.milliseconds();
@ -525,7 +525,7 @@ public class RecordAccumulatorTest {
final RecordAccumulator accum = new RecordAccumulator(logContext, batchSize, final RecordAccumulator accum = new RecordAccumulator(logContext, batchSize,
Compression.NONE, lingerMs, retryBackoffMs, retryBackoffMaxMs, Compression.NONE, lingerMs, retryBackoffMs, retryBackoffMaxMs,
deliveryTimeoutMs, metrics, metricGrpName, time, new ApiVersions(), null, deliveryTimeoutMs, metrics, metricGrpName, time, null,
new BufferPool(totalSize, batchSize, metrics, time, metricGrpName)); new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
long now = time.milliseconds(); long now = time.milliseconds();
@ -586,7 +586,7 @@ public class RecordAccumulatorTest {
final RecordAccumulator accum = new RecordAccumulator(logContext, batchSize, final RecordAccumulator accum = new RecordAccumulator(logContext, batchSize,
Compression.NONE, lingerMs, retryBackoffMs, retryBackoffMaxMs, Compression.NONE, lingerMs, retryBackoffMs, retryBackoffMaxMs,
deliveryTimeoutMs, metrics, metricGrpName, time, new ApiVersions(), null, deliveryTimeoutMs, metrics, metricGrpName, time, null,
new BufferPool(totalSize, batchSize, metrics, time, metricGrpName)); new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
long now = time.milliseconds(); long now = time.milliseconds();
@ -1266,7 +1266,7 @@ public class RecordAccumulatorTest {
long totalSize = 1024 * 1024; long totalSize = 1024 * 1024;
int batchSize = 128; int batchSize = 128;
RecordAccumulator accum = new RecordAccumulator(logContext, batchSize, Compression.NONE, 0, 0L, 0L, RecordAccumulator accum = new RecordAccumulator(logContext, batchSize, Compression.NONE, 0, 0L, 0L,
3200, config, metrics, "producer-metrics", time, new ApiVersions(), null, 3200, config, metrics, "producer-metrics", time, null,
new BufferPool(totalSize, batchSize, metrics, time, "producer-internal-metrics")) { new BufferPool(totalSize, batchSize, metrics, time, "producer-internal-metrics")) {
@Override @Override
BuiltInPartitioner createBuiltInPartitioner(LogContext logContext, String topic, BuiltInPartitioner createBuiltInPartitioner(LogContext logContext, String topic,
@ -1399,7 +1399,7 @@ public class RecordAccumulatorTest {
String metricGrpName = "producer-metrics"; String metricGrpName = "producer-metrics";
final RecordAccumulator accum = new RecordAccumulator(logContext, batchSize, final RecordAccumulator accum = new RecordAccumulator(logContext, batchSize,
Compression.NONE, lingerMs, retryBackoffMs, retryBackoffMaxMs, Compression.NONE, lingerMs, retryBackoffMs, retryBackoffMaxMs,
deliveryTimeoutMs, metrics, metricGrpName, time, new ApiVersions(), null, deliveryTimeoutMs, metrics, metricGrpName, time, null,
new BufferPool(totalSize, batchSize, metrics, time, metricGrpName)); new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
// Create 1 batch(batchA) to be produced to partition1. // Create 1 batch(batchA) to be produced to partition1.
@ -1661,7 +1661,6 @@ public class RecordAccumulatorTest {
metrics, metrics,
metricGrpName, metricGrpName,
time, time,
new ApiVersions(),
txnManager, txnManager,
new BufferPool(totalSize, batchSize, metrics, time, metricGrpName)) { new BufferPool(totalSize, batchSize, metrics, time, metricGrpName)) {
@Override @Override

View File

@ -265,7 +265,7 @@ public class SenderTest {
metrics = new Metrics(new MetricConfig().tags(clientTags)); metrics = new Metrics(new MetricConfig().tags(clientTags));
SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry(metrics); SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry(metrics);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
1, metricsRegistry, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null, apiVersions); 1, metricsRegistry, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null);
// Append a message so that topic metrics are created // Append a message so that topic metrics are created
appendToAccumulator(tp0, 0L, "key", "value"); appendToAccumulator(tp0, 0L, "key", "value");
@ -293,7 +293,7 @@ public class SenderTest {
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m); SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
try { try {
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null, apiVersions); maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null);
// do a successful retry // do a successful retry
Future<RecordMetadata> future = appendToAccumulator(tp0, 0L, "key", "value"); Future<RecordMetadata> future = appendToAccumulator(tp0, 0L, "key", "value");
sender.runOnce(); // connect sender.runOnce(); // connect
@ -351,7 +351,7 @@ public class SenderTest {
try { try {
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null, apiVersions); senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null);
// Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1 // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
MetadataResponse metadataUpdate1 = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap("test", 2)); MetadataResponse metadataUpdate1 = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap("test", 2));
client.prepareMetadataUpdate(metadataUpdate1); client.prepareMetadataUpdate(metadataUpdate1);
@ -530,12 +530,12 @@ public class SenderTest {
RecordAccumulator.PartitionerConfig config = new RecordAccumulator.PartitionerConfig(false, 42); RecordAccumulator.PartitionerConfig config = new RecordAccumulator.PartitionerConfig(false, 42);
long totalSize = 1024 * 1024; long totalSize = 1024 * 1024;
accumulator = new RecordAccumulator(logContext, batchSize, Compression.NONE, 0, 0L, 0L, accumulator = new RecordAccumulator(logContext, batchSize, Compression.NONE, 0, 0L, 0L,
DELIVERY_TIMEOUT_MS, config, m, "producer-metrics", time, apiVersions, null, DELIVERY_TIMEOUT_MS, config, m, "producer-metrics", time, null,
new BufferPool(totalSize, batchSize, m, time, "producer-internal-metrics")); new BufferPool(totalSize, batchSize, m, time, "producer-internal-metrics"));
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m); SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, 1, Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, 1,
senderMetrics, time, REQUEST_TIMEOUT, 1000L, null, new ApiVersions()); senderMetrics, time, REQUEST_TIMEOUT, 1000L, null);
// Produce and send batch. // Produce and send batch.
long time1 = time.milliseconds(); long time1 = time.milliseconds();
@ -1598,7 +1598,7 @@ public class SenderTest {
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m); SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions); senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager);
appendToAccumulator(tp0); // failed response appendToAccumulator(tp0); // failed response
Future<RecordMetadata> successfulResponse = appendToAccumulator(tp1); Future<RecordMetadata> successfulResponse = appendToAccumulator(tp1);
@ -1639,7 +1639,7 @@ public class SenderTest {
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m); SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, 10, Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, 10,
senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions); senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager);
appendToAccumulator(tp0); // failed response appendToAccumulator(tp0); // failed response
appendToAccumulator(tp1); // success response appendToAccumulator(tp1); // success response
@ -1672,7 +1672,7 @@ public class SenderTest {
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m); SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, 10, Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, 10,
senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions); senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager);
appendToAccumulator(tp0); appendToAccumulator(tp0);
Future<RecordMetadata> successfulResponse = appendToAccumulator(tp1); Future<RecordMetadata> successfulResponse = appendToAccumulator(tp1);
@ -1705,7 +1705,7 @@ public class SenderTest {
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m); SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions); senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager);
Future<RecordMetadata> outOfOrderResponse = appendToAccumulator(tp0); Future<RecordMetadata> outOfOrderResponse = appendToAccumulator(tp0);
Future<RecordMetadata> successfulResponse = appendToAccumulator(tp1); Future<RecordMetadata> successfulResponse = appendToAccumulator(tp1);
@ -2243,7 +2243,7 @@ public class SenderTest {
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m); SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions); senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager);
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0); Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
client.prepareResponse(body -> { client.prepareResponse(body -> {
@ -2283,7 +2283,7 @@ public class SenderTest {
Metrics m = new Metrics(); Metrics m = new Metrics();
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m); SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions); senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager);
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0); Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
sender.runOnce(); // connect. sender.runOnce(); // connect.
@ -2319,7 +2319,7 @@ public class SenderTest {
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m); SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions); senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager);
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0); Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
sender.runOnce(); // connect. sender.runOnce(); // connect.
@ -2378,11 +2378,11 @@ public class SenderTest {
CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 0.2f); CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 0.2f);
try (Metrics m = new Metrics()) { try (Metrics m = new Metrics()) {
accumulator = new RecordAccumulator(logContext, batchSize, Compression.gzip().build(), accumulator = new RecordAccumulator(logContext, batchSize, Compression.gzip().build(),
0, 0L, 0L, deliveryTimeoutMs, m, metricGrpName, time, new ApiVersions(), txnManager, 0, 0L, 0L, deliveryTimeoutMs, m, metricGrpName, time, txnManager,
new BufferPool(totalSize, batchSize, metrics, time, "producer-internal-metrics")); new BufferPool(totalSize, batchSize, metrics, time, "producer-internal-metrics"));
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m); SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries,
senderMetrics, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions()); senderMetrics, time, REQUEST_TIMEOUT, 1000L, txnManager);
// Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1 // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
MetadataResponse metadataUpdate1 = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap(topic, 2)); MetadataResponse metadataUpdate1 = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap(topic, 2));
client.prepareMetadataUpdate(metadataUpdate1); client.prepareMetadataUpdate(metadataUpdate1);
@ -2696,7 +2696,7 @@ public class SenderTest {
try { try {
TransactionManager txnManager = new TransactionManager(logContext, "testTransactionalRequestsSentOnShutdown", 6000, 100, apiVersions); TransactionManager txnManager = new TransactionManager(logContext, "testTransactionalRequestsSentOnShutdown", 6000, 100, apiVersions);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions); maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager);
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
TopicPartition tp = new TopicPartition("testTransactionalRequestsSentOnShutdown", 1); TopicPartition tp = new TopicPartition("testTransactionalRequestsSentOnShutdown", 1);
@ -2731,7 +2731,7 @@ public class SenderTest {
setupWithTransactionState(txnManager, lingerMs); setupWithTransactionState(txnManager, lingerMs);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions); 1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager);
// Begin a transaction and successfully add one partition to it. // Begin a transaction and successfully add one partition to it.
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
@ -2788,7 +2788,7 @@ public class SenderTest {
setupWithTransactionState(txnManager); setupWithTransactionState(txnManager);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions); 1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager);
// Begin a transaction and successfully add one partition to it. // Begin a transaction and successfully add one partition to it.
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
@ -2857,7 +2857,7 @@ public class SenderTest {
try { try {
TransactionManager txnManager = new TransactionManager(logContext, "testIncompleteTransactionAbortOnShutdown", 6000, 100, apiVersions); TransactionManager txnManager = new TransactionManager(logContext, "testIncompleteTransactionAbortOnShutdown", 6000, 100, apiVersions);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions); maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager);
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
TopicPartition tp = new TopicPartition("testIncompleteTransactionAbortOnShutdown", 1); TopicPartition tp = new TopicPartition("testIncompleteTransactionAbortOnShutdown", 1);
@ -2891,7 +2891,7 @@ public class SenderTest {
try { try {
TransactionManager txnManager = new TransactionManager(logContext, "testForceShutdownWithIncompleteTransaction", 6000, 100, apiVersions); TransactionManager txnManager = new TransactionManager(logContext, "testForceShutdownWithIncompleteTransaction", 6000, 100, apiVersions);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions); maxRetries, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager);
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
TopicPartition tp = new TopicPartition("testForceShutdownWithIncompleteTransaction", 1); TopicPartition tp = new TopicPartition("testForceShutdownWithIncompleteTransaction", 1);
@ -3163,11 +3163,10 @@ public class SenderTest {
// lingerMs is 0 to send batch as soon as any records are available on it. // lingerMs is 0 to send batch as soon as any records are available on it.
this.accumulator = new RecordAccumulator(logContext, batchSize, this.accumulator = new RecordAccumulator(logContext, batchSize,
Compression.NONE, 0, 10L, retryBackoffMaxMs, Compression.NONE, 0, 10L, retryBackoffMaxMs,
DELIVERY_TIMEOUT_MS, metrics, metricGrpName, time, apiVersions, null, pool); DELIVERY_TIMEOUT_MS, metrics, metricGrpName, time, null, pool);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, Sender sender = new Sender(logContext, client, metadata, this.accumulator, false,
MAX_REQUEST_SIZE, ACKS_ALL, MAX_REQUEST_SIZE, ACKS_ALL,
10, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null, 10, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null);
apiVersions);
// Update metadata with leader-epochs. // Update metadata with leader-epochs.
int tp0LeaderEpoch = 100; int tp0LeaderEpoch = 100;
int epoch = tp0LeaderEpoch; int epoch = tp0LeaderEpoch;
@ -3247,7 +3246,7 @@ public class SenderTest {
TransactionManager transactionManager = mock(TransactionManager.class); TransactionManager transactionManager = mock(TransactionManager.class);
SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry(metrics); SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry(metrics);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL, Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
1, metricsRegistry, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions); 1, metricsRegistry, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager);
when(transactionManager.hasOngoingTransaction()).thenReturn(true); when(transactionManager.hasOngoingTransaction()).thenReturn(true);
when(transactionManager.beginAbort()).thenThrow(new IllegalStateException()); when(transactionManager.beginAbort()).thenThrow(new IllegalStateException());
sender.initiateClose(); sender.initiateClose();
@ -3277,11 +3276,10 @@ public class SenderTest {
// lingerMs is 0 to send batch as soon as any records are available on it. // lingerMs is 0 to send batch as soon as any records are available on it.
this.accumulator = new RecordAccumulator(logContext, batchSize, this.accumulator = new RecordAccumulator(logContext, batchSize,
Compression.NONE, 0, 10L, retryBackoffMaxMs, Compression.NONE, 0, 10L, retryBackoffMaxMs,
DELIVERY_TIMEOUT_MS, metrics, metricGrpName, time, apiVersions, null, pool); DELIVERY_TIMEOUT_MS, metrics, metricGrpName, time, null, pool);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, Sender sender = new Sender(logContext, client, metadata, this.accumulator, false,
MAX_REQUEST_SIZE, ACKS_ALL, MAX_REQUEST_SIZE, ACKS_ALL,
10, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null, 10, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null);
apiVersions);
// Update metadata with leader-epochs. // Update metadata with leader-epochs.
int tp0LeaderEpoch = 100; int tp0LeaderEpoch = 100;
int tp1LeaderEpoch = 200; int tp1LeaderEpoch = 200;
@ -3357,11 +3355,10 @@ public class SenderTest {
// lingerMs is 0 to send batch as soon as any records are available on it. // lingerMs is 0 to send batch as soon as any records are available on it.
this.accumulator = new RecordAccumulator(logContext, batchSize, this.accumulator = new RecordAccumulator(logContext, batchSize,
Compression.NONE, 0, 10L, retryBackoffMaxMs, Compression.NONE, 0, 10L, retryBackoffMaxMs,
DELIVERY_TIMEOUT_MS, metrics, metricGrpName, time, apiVersions, null, pool); DELIVERY_TIMEOUT_MS, metrics, metricGrpName, time, null, pool);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, Sender sender = new Sender(logContext, client, metadata, this.accumulator, false,
MAX_REQUEST_SIZE, ACKS_ALL, MAX_REQUEST_SIZE, ACKS_ALL,
10, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null, 10, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, null);
apiVersions);
// Update metadata with leader-epochs. // Update metadata with leader-epochs.
int tp0LeaderEpoch = 100; int tp0LeaderEpoch = 100;
int tp1LeaderEpoch = 200; int tp1LeaderEpoch = 200;
@ -3659,10 +3656,10 @@ public class SenderTest {
BufferPool pool = (customPool == null) ? new BufferPool(totalSize, batchSize, metrics, time, metricGrpName) : customPool; BufferPool pool = (customPool == null) ? new BufferPool(totalSize, batchSize, metrics, time, metricGrpName) : customPool;
this.accumulator = new RecordAccumulator(logContext, batchSize, Compression.NONE, lingerMs, 0L, 0L, this.accumulator = new RecordAccumulator(logContext, batchSize, Compression.NONE, lingerMs, 0L, 0L,
DELIVERY_TIMEOUT_MS, metrics, metricGrpName, time, apiVersions, transactionManager, pool); DELIVERY_TIMEOUT_MS, metrics, metricGrpName, time, transactionManager, pool);
this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics); this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics);
this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, guaranteeOrder, MAX_REQUEST_SIZE, ACKS_ALL, this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, guaranteeOrder, MAX_REQUEST_SIZE, ACKS_ALL,
retries, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions); retries, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager);
metadata.add("test", time.milliseconds()); metadata.add("test", time.milliseconds());
if (updateMetadata) if (updateMetadata)

View File

@ -198,12 +198,12 @@ public class TransactionManagerTest {
this.brokerNode = new Node(0, "localhost", 2211); this.brokerNode = new Node(0, "localhost", 2211);
this.accumulator = new RecordAccumulator(logContext, batchSize, Compression.NONE, 0, 0L, 0L, this.accumulator = new RecordAccumulator(logContext, batchSize, Compression.NONE, 0, 0L, 0L,
deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager, deliveryTimeoutMs, metrics, metricGrpName, time, transactionManager,
new BufferPool(totalSize, batchSize, metrics, time, metricGrpName)); new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true,
MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(metrics), this.time, REQUEST_TIMEOUT, MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(metrics), this.time, REQUEST_TIMEOUT,
50, transactionManager, apiVersions); 50, transactionManager);
} }
@Test @Test
@ -737,12 +737,12 @@ public class TransactionManagerTest {
final int deliveryTimeout = 15000; final int deliveryTimeout = 15000;
RecordAccumulator accumulator = new RecordAccumulator(logContext, 16 * 1024, Compression.NONE, 0, 0L, 0L, RecordAccumulator accumulator = new RecordAccumulator(logContext, 16 * 1024, Compression.NONE, 0, 0L, 0L,
deliveryTimeout, metrics, "", time, apiVersions, transactionManager, deliveryTimeout, metrics, "", time, transactionManager,
new BufferPool(1024 * 1024, 16 * 1024, metrics, time, "")); new BufferPool(1024 * 1024, 16 * 1024, metrics, time, ""));
Sender sender = new Sender(logContext, this.client, this.metadata, accumulator, false, Sender sender = new Sender(logContext, this.client, this.metadata, accumulator, false,
MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(metrics), this.time, requestTimeout, MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(metrics), this.time, requestTimeout,
0, transactionManager, apiVersions); 0, transactionManager);
assertEquals(0, transactionManager.sequenceNumber(tp0)); assertEquals(0, transactionManager.sequenceNumber(tp0));
@ -1048,12 +1048,12 @@ public class TransactionManagerTest {
this.brokerNode = new Node(0, "localhost", 2211); this.brokerNode = new Node(0, "localhost", 2211);
this.accumulator = new RecordAccumulator(logContext, batchSize, Compression.NONE, 0, 0L, 0L, this.accumulator = new RecordAccumulator(logContext, batchSize, Compression.NONE, 0, 0L, 0L,
deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, transactionManager, deliveryTimeoutMs, metrics, metricGrpName, time, transactionManager,
new BufferPool(totalSize, batchSize, metrics, time, metricGrpName)); new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true, this.sender = new Sender(logContext, this.client, this.metadata, this.accumulator, true,
MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(metrics), this.time, REQUEST_TIMEOUT, MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(metrics), this.time, REQUEST_TIMEOUT,
50, transactionManager, apiVersions); 50, transactionManager);
doInitTransactions(); doInitTransactions();
assertFalse(transactionManager.isTransactionV2Enabled()); assertFalse(transactionManager.isTransactionV2Enabled());
@ -3585,7 +3585,7 @@ public class TransactionManagerTest {
initializeTransactionManager(Optional.empty(), transactionV2Enabled); initializeTransactionManager(Optional.empty(), transactionV2Enabled);
Sender sender = new Sender(logContext, this.client, this.metadata, this.accumulator, false, Sender sender = new Sender(logContext, this.client, this.metadata, this.accumulator, false,
MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(time)), this.time, MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(time)), this.time,
REQUEST_TIMEOUT, 50, transactionManager, apiVersions); REQUEST_TIMEOUT, 50, transactionManager);
initializeIdempotentProducerId(producerId, epoch); initializeIdempotentProducerId(producerId, epoch);
ProducerBatch tp0b1 = writeIdempotentBatchWithValue(transactionManager, tp0, "1"); ProducerBatch tp0b1 = writeIdempotentBatchWithValue(transactionManager, tp0, "1");
@ -3710,7 +3710,7 @@ public class TransactionManagerTest {
initializeTransactionManager(Optional.empty(), transactionV2Enabled); initializeTransactionManager(Optional.empty(), transactionV2Enabled);
Sender sender = new Sender(logContext, this.client, this.metadata, this.accumulator, false, Sender sender = new Sender(logContext, this.client, this.metadata, this.accumulator, false,
MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(time)), this.time, MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(time)), this.time,
REQUEST_TIMEOUT, 50, transactionManager, apiVersions); REQUEST_TIMEOUT, 50, transactionManager);
initializeIdempotentProducerId(producerId, epoch); initializeIdempotentProducerId(producerId, epoch);
ProducerBatch tp0b1 = writeIdempotentBatchWithValue(transactionManager, tp0, "1"); ProducerBatch tp0b1 = writeIdempotentBatchWithValue(transactionManager, tp0, "1");