KAFKA-18295 Remove deprecated function Partitioner#onNewBatch (#18282)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
PoAn Yang 2024-12-27 18:40:19 +08:00 committed by GitHub
parent dfb178a1d8
commit e6d2421136
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 69 additions and 321 deletions

View File

@ -946,15 +946,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
throw new IllegalStateException("Cannot perform operation after producer has been closed"); throw new IllegalStateException("Cannot perform operation after producer has been closed");
} }
/**
* Call deprecated {@link Partitioner#onNewBatch}
*/
@SuppressWarnings("deprecation")
private void onNewBatch(String topic, Cluster cluster, int prevPartition) {
assert partitioner != null;
partitioner.onNewBatch(topic, cluster, prevPartition);
}
/** /**
* Implementation of asynchronously send a record to a topic. * Implementation of asynchronously send a record to a topic.
*/ */
@ -1009,32 +1000,15 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
ensureValidRecordSize(serializedSize); ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? nowMs : record.timestamp(); long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
// A custom partitioner may take advantage on the onNewBatch callback.
boolean abortOnNewBatch = partitioner != null;
// Append the record to the accumulator. Note, that the actual partition may be // Append the record to the accumulator. Note, that the actual partition may be
// calculated there and can be accessed via appendCallbacks.topicPartition. // calculated there and can be accessed via appendCallbacks.topicPartition.
RecordAccumulator.RecordAppendResult result = accumulator.append(record.topic(), partition, timestamp, serializedKey, RecordAccumulator.RecordAppendResult result = accumulator.append(record.topic(), partition, timestamp, serializedKey,
serializedValue, headers, appendCallbacks, remainingWaitMs, abortOnNewBatch, nowMs, cluster); serializedValue, headers, appendCallbacks, remainingWaitMs, nowMs, cluster);
assert appendCallbacks.getPartition() != RecordMetadata.UNKNOWN_PARTITION; assert appendCallbacks.getPartition() != RecordMetadata.UNKNOWN_PARTITION;
if (result.abortForNewBatch) {
int prevPartition = partition;
// IMPORTANT NOTE: the following onNewBatch and partition calls should not interrupted to allow
// the custom partitioner to correctly track its state
onNewBatch(record.topic(), cluster, prevPartition);
partition = partition(record, serializedKey, serializedValue, cluster);
if (log.isTraceEnabled()) {
log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
}
result = accumulator.append(record.topic(), partition, timestamp, serializedKey,
serializedValue, headers, appendCallbacks, remainingWaitMs, false, nowMs, cluster);
}
// Add the partition to the transaction (if in progress) after it has been successfully // Add the partition to the transaction (if in progress) after it has been successfully
// appended to the accumulator. We cannot do it before because the partition may be // appended to the accumulator. We cannot do it before because the partition may be
// unknown or the initially selected partition may be changed when the batch is closed // unknown. Note that the `Sender` will refuse to dequeue
// (as indicated by `abortForNewBatch`). Note that the `Sender` will refuse to dequeue
// batches from the accumulator until they have been added to the transaction. // batches from the accumulator until they have been added to the transaction.
if (transactionManager != null) { if (transactionManager != null) {
transactionManager.maybeAddPartition(appendCallbacks.topicPartition()); transactionManager.maybeAddPartition(appendCallbacks.topicPartition());

View File

@ -42,22 +42,4 @@ public interface Partitioner extends Configurable, Closeable {
* This is called when partitioner is closed. * This is called when partitioner is closed.
*/ */
void close(); void close();
/**
* Note this method is only implemented in DefaultPartitioner and UniformStickyPartitioner which
* are now deprecated. See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner">KIP-794</a> for more info.
* <p>
* Notifies the partitioner a new batch is about to be created. When using the sticky partitioner,
* this method can change the chosen sticky partition for the new batch.
* <p>
* After onNewBatch, the {@link #partition(String, Object, byte[], Object, byte[], Cluster)} method is called again
* which allows the implementation to "redirect" the message on new batch creation.
* @param topic The topic name
* @param cluster The current cluster metadata
* @param prevPartition The partition previously selected for the record that triggered a new batch
* @deprecated Since 3.3.0
*/
@Deprecated
default void onNewBatch(String topic, Cluster cluster, int prevPartition) {
}
} }

View File

@ -78,11 +78,5 @@ public class RoundRobinPartitioner implements Partitioner {
return counter.getAndIncrement(); return counter.getAndIncrement();
} }
@SuppressWarnings("deprecation")
@Override
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
previousPartition.set(new TopicPartition(topic, prevPartition));
}
public void close() {} public void close() {}
} }

View File

@ -277,8 +277,6 @@ public class RecordAccumulator {
* @param headers the Headers for the record * @param headers the Headers for the record
* @param callbacks The callbacks to execute * @param callbacks The callbacks to execute
* @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available * @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
* @param abortOnNewBatch A boolean that indicates returning before a new batch is created and
* running the partitioner's onNewBatch method before trying to append again
* @param nowMs The current time, in milliseconds * @param nowMs The current time, in milliseconds
* @param cluster The cluster metadata * @param cluster The cluster metadata
*/ */
@ -290,7 +288,6 @@ public class RecordAccumulator {
Header[] headers, Header[] headers,
AppendCallbacks callbacks, AppendCallbacks callbacks,
long maxTimeToBlock, long maxTimeToBlock,
boolean abortOnNewBatch,
long nowMs, long nowMs,
Cluster cluster) throws InterruptedException { Cluster cluster) throws InterruptedException {
TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new TopicInfo(createBuiltInPartitioner(logContext, k, batchSize))); TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new TopicInfo(createBuiltInPartitioner(logContext, k, batchSize)));
@ -336,12 +333,6 @@ public class RecordAccumulator {
} }
} }
// we don't have an in-progress record batch try to allocate a new batch
if (abortOnNewBatch) {
// Return a result that will cause another call to append.
return new RecordAppendResult(null, false, false, true, 0);
}
if (buffer == null) { if (buffer == null) {
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound( int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(
RecordBatch.CURRENT_MAGIC_VALUE, compression.type(), key, value, headers)); RecordBatch.CURRENT_MAGIC_VALUE, compression.type(), key, value, headers));
@ -415,7 +406,7 @@ public class RecordAccumulator {
dq.addLast(batch); dq.addLast(batch);
incomplete.add(batch); incomplete.add(batch);
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false, batch.estimatedSizeInBytes()); return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, batch.estimatedSizeInBytes());
} }
private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer) { private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer) {
@ -451,7 +442,7 @@ public class RecordAccumulator {
last.closeForRecordAppends(); last.closeForRecordAppends();
} else { } else {
int appendedBytes = last.estimatedSizeInBytes() - initialBytes; int appendedBytes = last.estimatedSizeInBytes() - initialBytes;
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false, appendedBytes); return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, appendedBytes);
} }
} }
return null; return null;
@ -1213,18 +1204,15 @@ public class RecordAccumulator {
public final FutureRecordMetadata future; public final FutureRecordMetadata future;
public final boolean batchIsFull; public final boolean batchIsFull;
public final boolean newBatchCreated; public final boolean newBatchCreated;
public final boolean abortForNewBatch;
public final int appendedBytes; public final int appendedBytes;
public RecordAppendResult(FutureRecordMetadata future, public RecordAppendResult(FutureRecordMetadata future,
boolean batchIsFull, boolean batchIsFull,
boolean newBatchCreated, boolean newBatchCreated,
boolean abortForNewBatch,
int appendedBytes) { int appendedBytes) {
this.future = future; this.future = future;
this.batchIsFull = batchIsFull; this.batchIsFull = batchIsFull;
this.newBatchCreated = newBatchCreated; this.newBatchCreated = newBatchCreated;
this.abortForNewBatch = abortForNewBatch;
this.appendedBytes = appendedBytes; this.appendedBytes = appendedBytes;
} }
} }

View File

@ -2315,40 +2315,6 @@ public class KafkaProducerTest {
} }
} }
@SuppressWarnings("deprecation")
@Test
public void testPartitionAddedToTransactionAfterFullBatchRetry() throws Exception {
StringSerializer serializer = new StringSerializer();
KafkaProducerTestContext<String> ctx = new KafkaProducerTestContext<>(testInfo, serializer);
String topic = "foo";
TopicPartition topicPartition0 = new TopicPartition(topic, 0);
TopicPartition topicPartition1 = new TopicPartition(topic, 1);
Cluster cluster = TestUtils.singletonCluster(topic, 2);
when(ctx.sender.isRunning()).thenReturn(true);
when(ctx.metadata.fetch()).thenReturn(cluster);
long timestamp = ctx.time.milliseconds();
ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, timestamp, "key", "value");
FutureRecordMetadata future = expectAppendWithAbortForNewBatch(
ctx,
record,
topicPartition0,
topicPartition1,
cluster
);
try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
assertEquals(future, producer.send(record));
assertFalse(future.isDone());
verify(ctx.partitioner).onNewBatch(topic, cluster, 0);
verify(ctx.transactionManager, never()).maybeAddPartition(topicPartition0);
verify(ctx.transactionManager).maybeAddPartition(topicPartition1);
}
}
private <T> FutureRecordMetadata expectAppend( private <T> FutureRecordMetadata expectAppend(
KafkaProducerTestContext<T> ctx, KafkaProducerTestContext<T> ctx,
ProducerRecord<T, T> record, ProducerRecord<T, T> record,
@ -2387,7 +2353,6 @@ public class KafkaProducerTest {
eq(Record.EMPTY_HEADERS), // 5 eq(Record.EMPTY_HEADERS), // 5
any(RecordAccumulator.AppendCallbacks.class), // 6 <-- any(RecordAccumulator.AppendCallbacks.class), // 6 <--
anyLong(), anyLong(),
eq(true),
anyLong(), anyLong(),
any() any()
)).thenAnswer(invocation -> { )).thenAnswer(invocation -> {
@ -2398,96 +2363,12 @@ public class KafkaProducerTest {
futureRecordMetadata, futureRecordMetadata,
false, false,
false, false,
false,
0); 0);
}); });
return futureRecordMetadata; return futureRecordMetadata;
} }
private <T> FutureRecordMetadata expectAppendWithAbortForNewBatch(
KafkaProducerTestContext<T> ctx,
ProducerRecord<T, T> record,
TopicPartition initialSelectedPartition,
TopicPartition retrySelectedPartition,
Cluster cluster
) throws InterruptedException {
byte[] serializedKey = ctx.serializer.serialize(topic, record.key());
byte[] serializedValue = ctx.serializer.serialize(topic, record.value());
long timestamp = record.timestamp() == null ? ctx.time.milliseconds() : record.timestamp();
ProduceRequestResult requestResult = new ProduceRequestResult(retrySelectedPartition);
FutureRecordMetadata futureRecordMetadata = new FutureRecordMetadata(
requestResult,
0,
timestamp,
serializedKey.length,
serializedValue.length,
ctx.time
);
when(ctx.partitioner.partition(
initialSelectedPartition.topic(),
record.key(),
serializedKey,
record.value(),
serializedValue,
cluster
)).thenReturn(initialSelectedPartition.partition())
.thenReturn(retrySelectedPartition.partition());
when(ctx.accumulator.append(
eq(initialSelectedPartition.topic()), // 0
eq(initialSelectedPartition.partition()), // 1
eq(timestamp), // 2
eq(serializedKey), // 3
eq(serializedValue), // 4
eq(Record.EMPTY_HEADERS), // 5
any(RecordAccumulator.AppendCallbacks.class), // 6 <--
anyLong(),
eq(true), // abortOnNewBatch
anyLong(),
any()
)).thenAnswer(invocation -> {
RecordAccumulator.AppendCallbacks callbacks =
(RecordAccumulator.AppendCallbacks) invocation.getArguments()[6];
callbacks.setPartition(initialSelectedPartition.partition());
return new RecordAccumulator.RecordAppendResult(
null,
false,
false,
true,
0);
});
when(ctx.accumulator.append(
eq(retrySelectedPartition.topic()), // 0
eq(retrySelectedPartition.partition()), // 1
eq(timestamp), // 2
eq(serializedKey), // 3
eq(serializedValue), // 4
eq(Record.EMPTY_HEADERS), // 5
any(RecordAccumulator.AppendCallbacks.class), // 6 <--
anyLong(),
eq(false), // abortOnNewBatch
anyLong(),
any()
)).thenAnswer(invocation -> {
RecordAccumulator.AppendCallbacks callbacks =
(RecordAccumulator.AppendCallbacks) invocation.getArguments()[6];
callbacks.setPartition(retrySelectedPartition.partition());
return new RecordAccumulator.RecordAppendResult(
futureRecordMetadata,
false,
true,
false,
0);
});
return futureRecordMetadata;
}
private static final List<String> CLIENT_IDS = new ArrayList<>(); private static final List<String> CLIENT_IDS = new ArrayList<>();
public static class SerializerForClientId implements Serializer<byte[]> { public static class SerializerForClientId implements Serializer<byte[]> {

View File

@ -96,77 +96,4 @@ public class RoundRobinPartitionerTest {
assertEquals(10, partitionCount.get(1).intValue()); assertEquals(10, partitionCount.get(1).intValue());
assertEquals(10, partitionCount.get(2).intValue()); assertEquals(10, partitionCount.get(2).intValue());
} }
@SuppressWarnings("deprecation")
@Test
public void testRoundRobinWithNullKeyBytes() {
final String topicA = "topicA";
final String topicB = "topicB";
List<PartitionInfo> allPartitions = asList(new PartitionInfo(topicA, 0, NODES[0], NODES, NODES),
new PartitionInfo(topicA, 1, NODES[1], NODES, NODES), new PartitionInfo(topicA, 2, NODES[2], NODES, NODES),
new PartitionInfo(topicB, 0, NODES[0], NODES, NODES));
Cluster testCluster = new Cluster("clusterId", asList(NODES[0], NODES[1], NODES[2]), allPartitions,
Collections.emptySet(), Collections.emptySet());
final Map<Integer, Integer> partitionCount = new HashMap<>();
Partitioner partitioner = new RoundRobinPartitioner();
for (int i = 0; i < 30; ++i) {
int partition = partitioner.partition(topicA, null, null, null, null, testCluster);
// Simulate single-message batches
partitioner.onNewBatch(topicA, testCluster, partition);
int nextPartition = partitioner.partition(topicA, null, null, null, null, testCluster);
assertEquals(partition, nextPartition, "New batch creation should not affect the partition selection");
Integer count = partitionCount.get(partition);
if (null == count)
count = 0;
partitionCount.put(partition, count + 1);
if (i % 5 == 0) {
partitioner.partition(topicB, null, null, null, null, testCluster);
}
}
assertEquals(10, partitionCount.get(0).intValue());
assertEquals(10, partitionCount.get(1).intValue());
assertEquals(10, partitionCount.get(2).intValue());
}
@SuppressWarnings("deprecation")
@Test
public void testRoundRobinWithNullKeyBytesAndEvenPartitionCount() {
final String topicA = "topicA";
final String topicB = "topicB";
List<PartitionInfo> allPartitions = asList(new PartitionInfo(topicA, 0, NODES[0], NODES, NODES),
new PartitionInfo(topicA, 1, NODES[1], NODES, NODES), new PartitionInfo(topicA, 2, NODES[2], NODES, NODES),
new PartitionInfo(topicB, 0, NODES[0], NODES, NODES), new PartitionInfo(topicA, 3, NODES[0], NODES, NODES));
Cluster testCluster = new Cluster("clusterId", asList(NODES[0], NODES[1], NODES[2]), allPartitions,
Collections.emptySet(), Collections.emptySet());
final Map<Integer, Integer> partitionCount = new HashMap<>();
Partitioner partitioner = new RoundRobinPartitioner();
for (int i = 0; i < 40; ++i) {
int partition = partitioner.partition(topicA, null, null, null, null, testCluster);
// Simulate single-message batches
partitioner.onNewBatch(topicA, testCluster, partition);
int nextPartition = partitioner.partition(topicA, null, null, null, null, testCluster);
assertEquals(partition, nextPartition, "New batch creation should not affect the partition selection");
Integer count = partitionCount.get(partition);
if (null == count)
count = 0;
partitionCount.put(partition, count + 1);
if (i % 5 == 0) {
partitioner.partition(topicB, null, null, null, null, testCluster);
}
}
assertEquals(10, partitionCount.get(0).intValue());
assertEquals(10, partitionCount.get(1).intValue());
assertEquals(10, partitionCount.get(2).intValue());
assertEquals(10, partitionCount.get(3).intValue());
}
} }

View File

@ -161,18 +161,18 @@ public class RecordAccumulatorTest {
// initial data // initial data
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster);
accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster);
accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster);
accum.append(topic, partition4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.append(topic, partition4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster);
// drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the max request size is full after the first batch drained // drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the max request size is full after the first batch drained
Map<Integer, List<ProducerBatch>> batches1 = accum.drain(metadataCache, new HashSet<>(Arrays.asList(node1, node2)), (int) batchSize, 0); Map<Integer, List<ProducerBatch>> batches1 = accum.drain(metadataCache, new HashSet<>(Arrays.asList(node1, node2)), (int) batchSize, 0);
verifyTopicPartitionInBatches(batches1, tp1, tp3); verifyTopicPartitionInBatches(batches1, tp1, tp3);
// add record for tp1, tp3 // add record for tp1, tp3
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster);
accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster);
// drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the max request size is full after the first batch drained // drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the max request size is full after the first batch drained
// The drain index should start from next topic partition, that is, node1 => tp2, node2 => tp4 // The drain index should start from next topic partition, that is, node1 => tp2, node2 => tp4
@ -184,18 +184,18 @@ public class RecordAccumulatorTest {
verifyTopicPartitionInBatches(batches3, tp1, tp3); verifyTopicPartitionInBatches(batches3, tp1, tp3);
// add record for tp2, tp3, tp4 and mute the tp4 // add record for tp2, tp3, tp4 and mute the tp4
accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster);
accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster);
accum.append(topic, partition4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.append(topic, partition4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster);
accum.mutePartition(tp4); accum.mutePartition(tp4);
// drain batches from 2 nodes: node1 => tp2, node2 => tp3 (because tp4 is muted) // drain batches from 2 nodes: node1 => tp2, node2 => tp3 (because tp4 is muted)
Map<Integer, List<ProducerBatch>> batches4 = accum.drain(metadataCache, new HashSet<>(Arrays.asList(node1, node2)), (int) batchSize, 0); Map<Integer, List<ProducerBatch>> batches4 = accum.drain(metadataCache, new HashSet<>(Arrays.asList(node1, node2)), (int) batchSize, 0);
verifyTopicPartitionInBatches(batches4, tp2, tp3); verifyTopicPartitionInBatches(batches4, tp2, tp3);
// add record for tp1, tp2, tp3, and unmute tp4 // add record for tp1, tp2, tp3, and unmute tp4
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster);
accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster);
accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster);
accum.unmutePartition(tp4); accum.unmutePartition(tp4);
// set maxSize as a max value, so that the all partitions in 2 nodes should be drained: node1 => [tp1, tp2], node2 => [tp3, tp4] // set maxSize as a max value, so that the all partitions in 2 nodes should be drained: node1 => [tp1, tp2], node2 => [tp3, tp4]
Map<Integer, List<ProducerBatch>> batches5 = accum.drain(metadataCache, new HashSet<>(Arrays.asList(node1, node2)), Integer.MAX_VALUE, 0); Map<Integer, List<ProducerBatch>> batches5 = accum.drain(metadataCache, new HashSet<>(Arrays.asList(node1, node2)), Integer.MAX_VALUE, 0);
@ -229,7 +229,7 @@ public class RecordAccumulatorTest {
int appends = expectedNumAppends(batchSize); int appends = expectedNumAppends(batchSize);
for (int i = 0; i < appends; i++) { for (int i = 0; i < appends; i++) {
// append to the first batch // append to the first batch
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), metadataCache.cluster()); accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), metadataCache.cluster());
Deque<ProducerBatch> partitionBatches = accum.getDeque(tp1); Deque<ProducerBatch> partitionBatches = accum.getDeque(tp1);
assertEquals(1, partitionBatches.size()); assertEquals(1, partitionBatches.size());
@ -240,7 +240,7 @@ public class RecordAccumulatorTest {
// this append doesn't fit in the first batch, so a new batch is created and the first batch is closed // this append doesn't fit in the first batch, so a new batch is created and the first batch is closed
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), metadataCache.cluster()); accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), metadataCache.cluster());
Deque<ProducerBatch> partitionBatches = accum.getDeque(tp1); Deque<ProducerBatch> partitionBatches = accum.getDeque(tp1);
assertEquals(2, partitionBatches.size()); assertEquals(2, partitionBatches.size());
Iterator<ProducerBatch> partitionBatchesIterator = partitionBatches.iterator(); Iterator<ProducerBatch> partitionBatchesIterator = partitionBatches.iterator();
@ -275,7 +275,7 @@ public class RecordAccumulatorTest {
byte[] value = new byte[2 * batchSize]; byte[] value = new byte[2 * batchSize];
RecordAccumulator accum = createTestRecordAccumulator( RecordAccumulator accum = createTestRecordAccumulator(
batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compression, 0); batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compression, 0);
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), metadataCache.cluster()); accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), metadataCache.cluster());
assertEquals(Collections.singleton(node1), accum.ready(metadataCache, time.milliseconds()).readyNodes, "Our partition's leader should be ready"); assertEquals(Collections.singleton(node1), accum.ready(metadataCache, time.milliseconds()).readyNodes, "Our partition's leader should be ready");
Deque<ProducerBatch> batches = accum.getDeque(tp1); Deque<ProducerBatch> batches = accum.getDeque(tp1);
@ -313,7 +313,7 @@ public class RecordAccumulatorTest {
RecordAccumulator accum = createTestRecordAccumulator( RecordAccumulator accum = createTestRecordAccumulator(
batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compression, 0); batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compression, 0);
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), metadataCache.cluster()); accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), metadataCache.cluster());
assertEquals(Collections.singleton(node1), accum.ready(metadataCache, time.milliseconds()).readyNodes, "Our partition's leader should be ready"); assertEquals(Collections.singleton(node1), accum.ready(metadataCache, time.milliseconds()).readyNodes, "Our partition's leader should be ready");
Deque<ProducerBatch> batches = accum.getDeque(tp1); Deque<ProducerBatch> batches = accum.getDeque(tp1);
@ -337,7 +337,7 @@ public class RecordAccumulatorTest {
int lingerMs = 10; int lingerMs = 10;
RecordAccumulator accum = createTestRecordAccumulator( RecordAccumulator accum = createTestRecordAccumulator(
1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, Compression.NONE, lingerMs); 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, Compression.NONE, lingerMs);
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster);
assertEquals(0, accum.ready(metadataCache, time.milliseconds()).readyNodes.size(), "No partitions should be ready"); assertEquals(0, accum.ready(metadataCache, time.milliseconds()).readyNodes.size(), "No partitions should be ready");
time.sleep(10); time.sleep(10);
assertEquals(Collections.singleton(node1), accum.ready(metadataCache, time.milliseconds()).readyNodes, "Our partition's leader should be ready"); assertEquals(Collections.singleton(node1), accum.ready(metadataCache, time.milliseconds()).readyNodes, "Our partition's leader should be ready");
@ -360,7 +360,7 @@ public class RecordAccumulatorTest {
List<TopicPartition> partitions = asList(tp1, tp2); List<TopicPartition> partitions = asList(tp1, tp2);
for (TopicPartition tp : partitions) { for (TopicPartition tp : partitions) {
for (int i = 0; i < appends; i++) for (int i = 0; i < appends; i++)
accum.append(tp.topic(), tp.partition(), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.append(tp.topic(), tp.partition(), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster);
} }
assertEquals(Collections.singleton(node1), accum.ready(metadataCache, time.milliseconds()).readyNodes, "Partition's leader should be ready"); assertEquals(Collections.singleton(node1), accum.ready(metadataCache, time.milliseconds()).readyNodes, "Partition's leader should be ready");
@ -381,7 +381,7 @@ public class RecordAccumulatorTest {
threads.add(new Thread(() -> { threads.add(new Thread(() -> {
for (int j = 0; j < msgs; j++) { for (int j = 0; j < msgs; j++) {
try { try {
accum.append(topic, j % numParts, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.append(topic, j % numParts, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
@ -424,7 +424,7 @@ public class RecordAccumulatorTest {
// Partition on node1 only // Partition on node1 only
for (int i = 0; i < appends; i++) for (int i = 0; i < appends; i++)
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster);
RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds()); RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds());
assertEquals(0, result.readyNodes.size(), "No nodes should be ready."); assertEquals(0, result.readyNodes.size(), "No nodes should be ready.");
assertEquals(lingerMs, result.nextReadyCheckDelayMs, "Next check time should be the linger time"); assertEquals(lingerMs, result.nextReadyCheckDelayMs, "Next check time should be the linger time");
@ -433,14 +433,14 @@ public class RecordAccumulatorTest {
// Add partition on node2 only // Add partition on node2 only
for (int i = 0; i < appends; i++) for (int i = 0; i < appends; i++)
accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster);
result = accum.ready(metadataCache, time.milliseconds()); result = accum.ready(metadataCache, time.milliseconds());
assertEquals(0, result.readyNodes.size(), "No nodes should be ready."); assertEquals(0, result.readyNodes.size(), "No nodes should be ready.");
assertEquals(lingerMs / 2, result.nextReadyCheckDelayMs, "Next check time should be defined by node1, half remaining linger time"); assertEquals(lingerMs / 2, result.nextReadyCheckDelayMs, "Next check time should be defined by node1, half remaining linger time");
// Add data for another partition on node1, enough to make data sendable immediately // Add data for another partition on node1, enough to make data sendable immediately
for (int i = 0; i < appends + 1; i++) for (int i = 0; i < appends + 1; i++)
accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster);
result = accum.ready(metadataCache, time.milliseconds()); result = accum.ready(metadataCache, time.milliseconds());
assertEquals(Collections.singleton(node1), result.readyNodes, "Node1 should be ready"); assertEquals(Collections.singleton(node1), result.readyNodes, "Node1 should be ready");
// Note this can actually be < linger time because it may use delays from partitions that aren't sendable // Note this can actually be < linger time because it may use delays from partitions that aren't sendable
@ -464,7 +464,7 @@ public class RecordAccumulatorTest {
new BufferPool(totalSize, batchSize, metrics, time, metricGrpName)); new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
long now = time.milliseconds(); long now = time.milliseconds();
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster);
RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, now + lingerMs + 1); RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, now + lingerMs + 1);
assertEquals(Collections.singleton(node1), result.readyNodes, "Node1 should be ready"); assertEquals(Collections.singleton(node1), result.readyNodes, "Node1 should be ready");
Map<Integer, List<ProducerBatch>> batches = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1); Map<Integer, List<ProducerBatch>> batches = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1);
@ -476,7 +476,7 @@ public class RecordAccumulatorTest {
accum.reenqueue(batches.get(0).get(0), now); accum.reenqueue(batches.get(0).get(0), now);
// Put message for partition 1 into accumulator // Put message for partition 1 into accumulator
accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster);
result = accum.ready(metadataCache, now + lingerMs + 1); result = accum.ready(metadataCache, now + lingerMs + 1);
assertEquals(Collections.singleton(node1), result.readyNodes, "Node1 should be ready"); assertEquals(Collections.singleton(node1), result.readyNodes, "Node1 should be ready");
@ -530,7 +530,7 @@ public class RecordAccumulatorTest {
long now = time.milliseconds(); long now = time.milliseconds();
long initial = now; long initial = now;
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster);
// No backoff for initial attempt // No backoff for initial attempt
Map<Integer, List<ProducerBatch>> batches = drainAndCheckBatchAmount(metadataCache, node1, accum, now + lingerMs + 1, 1); Map<Integer, List<ProducerBatch>> batches = drainAndCheckBatchAmount(metadataCache, node1, accum, now + lingerMs + 1, 1);
@ -591,7 +591,7 @@ public class RecordAccumulatorTest {
long now = time.milliseconds(); long now = time.milliseconds();
long initial = now; long initial = now;
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster);
// No backoff for initial attempt // No backoff for initial attempt
Map<Integer, List<ProducerBatch>> batches = drainAndCheckBatchAmount(metadataCache, node1, accum, now + lingerMs + 1, 1); Map<Integer, List<ProducerBatch>> batches = drainAndCheckBatchAmount(metadataCache, node1, accum, now + lingerMs + 1, 1);
@ -648,7 +648,7 @@ public class RecordAccumulatorTest {
4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, Compression.NONE, lingerMs); 4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, Compression.NONE, lingerMs);
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
accum.append(topic, i % 3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.append(topic, i % 3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster);
assertTrue(accum.hasIncomplete()); assertTrue(accum.hasIncomplete());
} }
RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds()); RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds());
@ -684,7 +684,7 @@ public class RecordAccumulatorTest {
public void testAwaitFlushComplete() throws Exception { public void testAwaitFlushComplete() throws Exception {
RecordAccumulator accum = createTestRecordAccumulator( RecordAccumulator accum = createTestRecordAccumulator(
4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, Compression.NONE, Integer.MAX_VALUE); 4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024, Compression.NONE, Integer.MAX_VALUE);
accum.append(topic, 0, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.append(topic, 0, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster);
accum.beginFlush(); accum.beginFlush();
assertTrue(accum.flushInProgress()); assertTrue(accum.flushInProgress());
@ -717,7 +717,7 @@ public class RecordAccumulatorTest {
} }
} }
for (int i = 0; i < numRecords; i++) for (int i = 0; i < numRecords; i++)
accum.append(topic, i % 3, 0L, key, value, null, new TestCallback(), maxBlockTimeMs, false, time.milliseconds(), cluster); accum.append(topic, i % 3, 0L, key, value, null, new TestCallback(), maxBlockTimeMs, time.milliseconds(), cluster);
RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds()); RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds());
assertFalse(result.readyNodes.isEmpty()); assertFalse(result.readyNodes.isEmpty());
Map<Integer, List<ProducerBatch>> drained = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); Map<Integer, List<ProducerBatch>> drained = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
@ -762,7 +762,7 @@ public class RecordAccumulatorTest {
} }
} }
for (int i = 0; i < numRecords; i++) for (int i = 0; i < numRecords; i++)
accum.append(topic, i % 3, 0L, key, value, null, new TestCallback(), maxBlockTimeMs, false, time.milliseconds(), cluster); accum.append(topic, i % 3, 0L, key, value, null, new TestCallback(), maxBlockTimeMs, time.milliseconds(), cluster);
RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds()); RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds());
assertFalse(result.readyNodes.isEmpty()); assertFalse(result.readyNodes.isEmpty());
Map<Integer, List<ProducerBatch>> drained = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, Map<Integer, List<ProducerBatch>> drained = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE,
@ -801,7 +801,7 @@ public class RecordAccumulatorTest {
for (Boolean mute: muteStates) { for (Boolean mute: muteStates) {
if (time.milliseconds() < System.currentTimeMillis()) if (time.milliseconds() < System.currentTimeMillis())
time.setCurrentTimeMs(System.currentTimeMillis()); time.setCurrentTimeMs(System.currentTimeMillis());
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster);
assertEquals(0, accum.ready(metadataCache, time.milliseconds()).readyNodes.size(), "No partition should be ready."); assertEquals(0, accum.ready(metadataCache, time.milliseconds()).readyNodes.size(), "No partition should be ready.");
time.sleep(lingerMs); time.sleep(lingerMs);
@ -850,11 +850,11 @@ public class RecordAccumulatorTest {
// Test batches not in retry // Test batches not in retry
for (int i = 0; i < appends; i++) { for (int i = 0; i < appends; i++) {
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster);
assertEquals(0, accum.ready(metadataCache, time.milliseconds()).readyNodes.size(), "No partitions should be ready."); assertEquals(0, accum.ready(metadataCache, time.milliseconds()).readyNodes.size(), "No partitions should be ready.");
} }
// Make the batches ready due to batch full // Make the batches ready due to batch full
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds(), cluster); accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, time.milliseconds(), cluster);
Set<Node> readyNodes = accum.ready(metadataCache, time.milliseconds()).readyNodes; Set<Node> readyNodes = accum.ready(metadataCache, time.milliseconds()).readyNodes;
assertEquals(Collections.singleton(node1), readyNodes, "Our partition's leader should be ready"); assertEquals(Collections.singleton(node1), readyNodes, "Our partition's leader should be ready");
// Advance the clock to expire the batch. // Advance the clock to expire the batch.
@ -884,7 +884,7 @@ public class RecordAccumulatorTest {
// Test batches in retry. // Test batches in retry.
// Create a retried batch // Create a retried batch
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds(), cluster); accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, time.milliseconds(), cluster);
time.sleep(lingerMs); time.sleep(lingerMs);
readyNodes = accum.ready(metadataCache, time.milliseconds()).readyNodes; readyNodes = accum.ready(metadataCache, time.milliseconds()).readyNodes;
assertEquals(Collections.singleton(node1), readyNodes, "Our partition's leader should be ready"); assertEquals(Collections.singleton(node1), readyNodes, "Our partition's leader should be ready");
@ -908,7 +908,7 @@ public class RecordAccumulatorTest {
assertEquals(0, expiredBatches.size(), "All batches should have been expired."); assertEquals(0, expiredBatches.size(), "All batches should have been expired.");
// Test that when being throttled muted batches are expired before the throttle time is over. // Test that when being throttled muted batches are expired before the throttle time is over.
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds(), cluster); accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, time.milliseconds(), cluster);
time.sleep(lingerMs); time.sleep(lingerMs);
readyNodes = accum.ready(metadataCache, time.milliseconds()).readyNodes; readyNodes = accum.ready(metadataCache, time.milliseconds()).readyNodes;
assertEquals(Collections.singleton(node1), readyNodes, "Our partition's leader should be ready"); assertEquals(Collections.singleton(node1), readyNodes, "Our partition's leader should be ready");
@ -941,7 +941,7 @@ public class RecordAccumulatorTest {
batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, Compression.NONE, 10); batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, Compression.NONE, 10);
int appends = expectedNumAppends(batchSize); int appends = expectedNumAppends(batchSize);
for (int i = 0; i < appends; i++) { for (int i = 0; i < appends; i++) {
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster);
assertEquals(0, accum.ready(metadataCache, now).readyNodes.size(), "No partitions should be ready."); assertEquals(0, accum.ready(metadataCache, now).readyNodes.size(), "No partitions should be ready.");
} }
time.sleep(2000); time.sleep(2000);
@ -988,9 +988,9 @@ public class RecordAccumulatorTest {
Mockito.when(transactionManager.isCompleting()).thenReturn(false); Mockito.when(transactionManager.isCompleting()).thenReturn(false);
accumulator.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, accumulator.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs,
false, time.milliseconds(), cluster); time.milliseconds(), cluster);
accumulator.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, accumulator.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs,
false, time.milliseconds(), cluster); time.milliseconds(), cluster);
assertTrue(accumulator.hasUndrained()); assertTrue(accumulator.hasUndrained());
RecordAccumulator.ReadyCheckResult firstResult = accumulator.ready(metadataCache, time.milliseconds()); RecordAccumulator.ReadyCheckResult firstResult = accumulator.ready(metadataCache, time.milliseconds());
@ -1105,7 +1105,7 @@ public class RecordAccumulatorTest {
int dice = random.nextInt(100); int dice = random.nextInt(100);
byte[] value = (dice < goodCompRatioPercentage) ? byte[] value = (dice < goodCompRatioPercentage) ?
bytesWithGoodCompression(random) : bytesWithPoorCompression(random, 100); bytesWithGoodCompression(random) : bytesWithPoorCompression(random, 100);
accum.append(topic, partition1, 0L, null, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds(), cluster); accum.append(topic, partition1, 0L, null, value, Record.EMPTY_HEADERS, null, 0, time.milliseconds(), cluster);
BatchDrainedResult result = completeOrSplitBatches(accum, batchSize); BatchDrainedResult result = completeOrSplitBatches(accum, batchSize);
numSplit += result.numSplit; numSplit += result.numSplit;
numBatches += result.numBatches; numBatches += result.numBatches;
@ -1128,7 +1128,7 @@ public class RecordAccumulatorTest {
RecordAccumulator accum = createTestRecordAccumulator( RecordAccumulator accum = createTestRecordAccumulator(
batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, Compression.NONE, lingerMs); batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, Compression.NONE, lingerMs);
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster);
Set<Node> readyNodes = accum.ready(metadataCache, time.milliseconds()).readyNodes; Set<Node> readyNodes = accum.ready(metadataCache, time.milliseconds()).readyNodes;
Map<Integer, List<ProducerBatch>> drained = accum.drain(metadataCache, readyNodes, Integer.MAX_VALUE, time.milliseconds()); Map<Integer, List<ProducerBatch>> drained = accum.drain(metadataCache, readyNodes, Integer.MAX_VALUE, time.milliseconds());
assertTrue(drained.isEmpty()); assertTrue(drained.isEmpty());
@ -1143,7 +1143,7 @@ public class RecordAccumulatorTest {
//assertTrue(accum.soonToExpireInFlightBatches().isEmpty()); //assertTrue(accum.soonToExpireInFlightBatches().isEmpty());
// Queue another batch and advance clock such that batch expiry time is earlier than request timeout. // Queue another batch and advance clock such that batch expiry time is earlier than request timeout.
accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), cluster);
time.sleep(lingerMs * 4); time.sleep(lingerMs * 4);
// Now drain and check that accumulator picked up the drained batch because its expiry is soon. // Now drain and check that accumulator picked up the drained batch because its expiry is soon.
@ -1168,7 +1168,7 @@ public class RecordAccumulatorTest {
// Test batches in retry. // Test batches in retry.
for (Boolean mute : muteStates) { for (Boolean mute : muteStates) {
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds(), cluster); accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, time.milliseconds(), cluster);
time.sleep(lingerMs); time.sleep(lingerMs);
readyNodes = accum.ready(metadataCache, time.milliseconds()).readyNodes; readyNodes = accum.ready(metadataCache, time.milliseconds()).readyNodes;
assertEquals(Collections.singleton(node1), readyNodes, "Our partition's leader should be ready"); assertEquals(Collections.singleton(node1), readyNodes, "Our partition's leader should be ready");
@ -1221,7 +1221,7 @@ public class RecordAccumulatorTest {
// Produce small record, we should switch to first partition. // Produce small record, we should switch to first partition.
accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, value, Record.EMPTY_HEADERS, accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, value, Record.EMPTY_HEADERS,
callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster); callbacks, maxBlockTimeMs, time.milliseconds(), cluster);
assertEquals(partition1, partition.get()); assertEquals(partition1, partition.get());
assertEquals(1, mockRandom.get()); assertEquals(1, mockRandom.get());
@ -1230,28 +1230,28 @@ public class RecordAccumulatorTest {
// because of incomplete batch. // because of incomplete batch.
byte[] largeValue = new byte[batchSize]; byte[] largeValue = new byte[batchSize];
accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS, accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS,
callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster); callbacks, maxBlockTimeMs, time.milliseconds(), cluster);
assertEquals(partition1, partition.get()); assertEquals(partition1, partition.get());
assertEquals(1, mockRandom.get()); assertEquals(1, mockRandom.get());
// Produce large record, we should switch to next partition as we complete // Produce large record, we should switch to next partition as we complete
// previous batch and exceeded sticky limit. // previous batch and exceeded sticky limit.
accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS, accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS,
callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster); callbacks, maxBlockTimeMs, time.milliseconds(), cluster);
assertEquals(partition2, partition.get()); assertEquals(partition2, partition.get());
assertEquals(2, mockRandom.get()); assertEquals(2, mockRandom.get());
// Produce large record, we should switch to next partition as we complete // Produce large record, we should switch to next partition as we complete
// previous batch and exceeded sticky limit. // previous batch and exceeded sticky limit.
accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS, accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS,
callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster); callbacks, maxBlockTimeMs, time.milliseconds(), cluster);
assertEquals(partition3, partition.get()); assertEquals(partition3, partition.get());
assertEquals(3, mockRandom.get()); assertEquals(3, mockRandom.get());
// Produce large record, we should switch to next partition as we complete // Produce large record, we should switch to next partition as we complete
// previous batch and exceeded sticky limit. // previous batch and exceeded sticky limit.
accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS, accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS,
callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster); callbacks, maxBlockTimeMs, time.milliseconds(), cluster);
assertEquals(partition1, partition.get()); assertEquals(partition1, partition.get());
assertEquals(4, mockRandom.get()); assertEquals(4, mockRandom.get());
} }
@ -1283,7 +1283,7 @@ public class RecordAccumulatorTest {
for (int c = queueSizes[i]; c-- > 0; ) { for (int c = queueSizes[i]; c-- > 0; ) {
// Add large records to each partition, so that each record creates a batch. // Add large records to each partition, so that each record creates a batch.
accum.append(topic, i, 0L, null, largeValue, Record.EMPTY_HEADERS, accum.append(topic, i, 0L, null, largeValue, Record.EMPTY_HEADERS,
null, maxBlockTimeMs, false, time.milliseconds(), cluster); null, maxBlockTimeMs, time.milliseconds(), cluster);
} }
assertEquals(queueSizes[i], accum.getDeque(new TopicPartition(topic, i)).size()); assertEquals(queueSizes[i], accum.getDeque(new TopicPartition(topic, i)).size());
} }
@ -1308,7 +1308,7 @@ public class RecordAccumulatorTest {
// Prime built-in partitioner so that it'd switch on every record, as switching only // Prime built-in partitioner so that it'd switch on every record, as switching only
// happens after the "sticky" limit is exceeded. // happens after the "sticky" limit is exceeded.
accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS, accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS,
callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster); callbacks, maxBlockTimeMs, time.milliseconds(), cluster);
// Issue a certain number of partition calls to validate that the partitions would be // Issue a certain number of partition calls to validate that the partitions would be
// distributed with frequencies that are reciprocal to the queue sizes. The number of // distributed with frequencies that are reciprocal to the queue sizes. The number of
@ -1320,7 +1320,7 @@ public class RecordAccumulatorTest {
for (int i = 0; i < numberOfIterations; i++) { for (int i = 0; i < numberOfIterations; i++) {
accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS, accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS,
callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster); callbacks, maxBlockTimeMs, time.milliseconds(), cluster);
++frequencies[partition.get()]; ++frequencies[partition.get()];
} }
@ -1337,11 +1337,11 @@ public class RecordAccumulatorTest {
// Do one append, because partition gets switched after append. // Do one append, because partition gets switched after append.
accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS, accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS,
callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster); callbacks, maxBlockTimeMs, time.milliseconds(), cluster);
for (int c = 10; c-- > 0; ) { for (int c = 10; c-- > 0; ) {
accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS, accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS,
callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster); callbacks, maxBlockTimeMs, time.milliseconds(), cluster);
assertEquals(partition3, partition.get()); assertEquals(partition3, partition.get());
} }
@ -1361,7 +1361,7 @@ public class RecordAccumulatorTest {
// Produce about 2/3 of the batch size. // Produce about 2/3 of the batch size.
for (int recCount = batchSize * 2 / 3 / valSize; recCount-- > 0; ) { for (int recCount = batchSize * 2 / 3 / valSize; recCount-- > 0; ) {
accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0, null, value, Record.EMPTY_HEADERS, accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0, null, value, Record.EMPTY_HEADERS,
null, maxBlockTimeMs, false, time.milliseconds(), cluster); null, maxBlockTimeMs, time.milliseconds(), cluster);
} }
// Advance the time to make the batch ready. // Advance the time to make the batch ready.
@ -1404,7 +1404,7 @@ public class RecordAccumulatorTest {
// Create 1 batch(batchA) to be produced to partition1. // Create 1 batch(batchA) to be produced to partition1.
long now = time.milliseconds(); long now = time.milliseconds();
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, now, cluster); accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, now, cluster);
// 1st attempt(not a retry) to produce batchA, it should be ready & drained to be produced. // 1st attempt(not a retry) to produce batchA, it should be ready & drained to be produced.
{ {
@ -1524,7 +1524,7 @@ public class RecordAccumulatorTest {
CompressionRatioEstimator.setEstimation(tp1.topic(), CompressionType.GZIP, 0.1f); CompressionRatioEstimator.setEstimation(tp1.topic(), CompressionType.GZIP, 0.1f);
// Append 20 records of 100 bytes size with poor compression ratio should make the batch too big. // Append 20 records of 100 bytes size with poor compression ratio should make the batch too big.
for (int i = 0; i < numRecords; i++) { for (int i = 0; i < numRecords; i++) {
accum.append(topic, partition1, 0L, null, bytesWithPoorCompression(random, recordSize), Record.EMPTY_HEADERS, null, 0, false, time.milliseconds(), cluster); accum.append(topic, partition1, 0L, null, bytesWithPoorCompression(random, recordSize), Record.EMPTY_HEADERS, null, 0, time.milliseconds(), cluster);
} }
RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds()); RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds());

View File

@ -407,7 +407,7 @@ public class SenderTest {
expiryCallbackCount.incrementAndGet(); expiryCallbackCount.incrementAndGet();
try { try {
accumulator.append(tp1.topic(), tp1.partition(), 0L, key, value, accumulator.append(tp1.topic(), tp1.partition(), 0L, key, value,
Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), metadataCache.cluster()); Record.EMPTY_HEADERS, null, maxBlockTimeMs, time.milliseconds(), metadataCache.cluster());
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException("Unexpected interruption", e); throw new RuntimeException("Unexpected interruption", e);
} }
@ -418,7 +418,7 @@ public class SenderTest {
final long nowMs = time.milliseconds(); final long nowMs = time.milliseconds();
for (int i = 0; i < messagesPerBatch; i++) for (int i = 0; i < messagesPerBatch; i++)
accumulator.append(tp1.topic(), tp1.partition(), 0L, key, value, null, callbacks, maxBlockTimeMs, false, nowMs, metadataCache.cluster()); accumulator.append(tp1.topic(), tp1.partition(), 0L, key, value, null, callbacks, maxBlockTimeMs, nowMs, metadataCache.cluster());
// Advance the clock to expire the first batch. // Advance the clock to expire the first batch.
time.sleep(10000); time.sleep(10000);
@ -2350,9 +2350,9 @@ public class SenderTest {
long nowMs = time.milliseconds(); long nowMs = time.milliseconds();
Cluster cluster = TestUtils.singletonCluster(); Cluster cluster = TestUtils.singletonCluster();
Future<RecordMetadata> f1 = Future<RecordMetadata> f1 =
accumulator.append(tp.topic(), tp.partition(), 0L, "key1".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT, false, nowMs, cluster).future; accumulator.append(tp.topic(), tp.partition(), 0L, "key1".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT, nowMs, cluster).future;
Future<RecordMetadata> f2 = Future<RecordMetadata> f2 =
accumulator.append(tp.topic(), tp.partition(), 0L, "key2".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT, false, nowMs, cluster).future; accumulator.append(tp.topic(), tp.partition(), 0L, "key2".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT, nowMs, cluster).future;
sender.runOnce(); // connect sender.runOnce(); // connect
sender.runOnce(); // send produce request sender.runOnce(); // send produce request
@ -3515,7 +3515,7 @@ public class SenderTest {
private FutureRecordMetadata appendToAccumulator(TopicPartition tp, long timestamp, String key, String value) throws InterruptedException { private FutureRecordMetadata appendToAccumulator(TopicPartition tp, long timestamp, String key, String value) throws InterruptedException {
return accumulator.append(tp.topic(), tp.partition(), timestamp, key.getBytes(), value.getBytes(), Record.EMPTY_HEADERS, return accumulator.append(tp.topic(), tp.partition(), timestamp, key.getBytes(), value.getBytes(), Record.EMPTY_HEADERS,
null, MAX_BLOCK_TIMEOUT, false, time.milliseconds(), TestUtils.singletonCluster()).future; null, MAX_BLOCK_TIMEOUT, time.milliseconds(), TestUtils.singletonCluster()).future;
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")

View File

@ -747,7 +747,7 @@ public class TransactionManagerTest {
assertEquals(0, transactionManager.sequenceNumber(tp0)); assertEquals(0, transactionManager.sequenceNumber(tp0));
Future<RecordMetadata> responseFuture1 = accumulator.append(tp0.topic(), tp0.partition(), time.milliseconds(), Future<RecordMetadata> responseFuture1 = accumulator.append(tp0.topic(), tp0.partition(), time.milliseconds(),
"1".getBytes(), "1".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false, time.milliseconds(), "1".getBytes(), "1".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, time.milliseconds(),
TestUtils.singletonCluster()).future; TestUtils.singletonCluster()).future;
sender.runOnce(); sender.runOnce();
assertEquals(1, transactionManager.sequenceNumber(tp0)); assertEquals(1, transactionManager.sequenceNumber(tp0));
@ -778,7 +778,7 @@ public class TransactionManagerTest {
assertEquals(0, transactionManager.sequenceNumber(tp0)); assertEquals(0, transactionManager.sequenceNumber(tp0));
Future<RecordMetadata> responseFuture2 = accumulator.append(tp0.topic(), tp0.partition(), time.milliseconds(), Future<RecordMetadata> responseFuture2 = accumulator.append(tp0.topic(), tp0.partition(), time.milliseconds(),
"2".getBytes(), "2".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false, time.milliseconds(), "2".getBytes(), "2".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, time.milliseconds(),
TestUtils.singletonCluster()).future; TestUtils.singletonCluster()).future;
sender.runOnce(); sender.runOnce();
sender.runOnce(); sender.runOnce();
@ -3978,7 +3978,7 @@ public class TransactionManagerTest {
private FutureRecordMetadata appendToAccumulator(TopicPartition tp) throws InterruptedException { private FutureRecordMetadata appendToAccumulator(TopicPartition tp) throws InterruptedException {
final long nowMs = time.milliseconds(); final long nowMs = time.milliseconds();
return accumulator.append(tp.topic(), tp.partition(), nowMs, "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, return accumulator.append(tp.topic(), tp.partition(), nowMs, "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS,
null, MAX_BLOCK_TIMEOUT, false, nowMs, TestUtils.singletonCluster()).future; null, MAX_BLOCK_TIMEOUT, nowMs, TestUtils.singletonCluster()).future;
} }
private void verifyCommitOrAbortTransactionRetriable(TransactionResult firstTransactionResult, private void verifyCommitOrAbortTransactionRetriable(TransactionResult firstTransactionResult,

View File

@ -63,6 +63,8 @@
</li> </li>
<li>The <code>log.message.format.version</code> and <code>message.format.version</code> configs were removed. <li>The <code>log.message.format.version</code> and <code>message.format.version</code> configs were removed.
</li> </li>
<li>The function <code>onNewBatch</code> in <code>org.apache.kafka.clients.producer.Partitioner</code> class was removed.
</li>
</ul> </ul>
</li> </li>
<li><b>Broker</b> <li><b>Broker</b>