mirror of https://github.com/apache/kafka.git
MINOR: Improve consistency of acknowledge type terminology (#20282)
The code had a mixture of "acknowledgement type" and "acknowledge type". The latter is preferred. Reviewers: TengYao Chi <frankvicky@apache.org>, Lan Ding <isDing_L@163.com>
This commit is contained in:
parent
e1f45218c9
commit
b909544e99
|
@ -362,7 +362,7 @@ public class ShareConsumerTest {
|
||||||
return partitionOffsetsMap.containsKey(tp);
|
return partitionOffsetsMap.containsKey(tp);
|
||||||
}, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to receive call to callback");
|
}, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to receive call to callback");
|
||||||
|
|
||||||
// We expect no exception as the acknowledgment error code is null.
|
// We expect no exception as the acknowledgement error code is null.
|
||||||
assertFalse(partitionExceptionMap.containsKey(tp));
|
assertFalse(partitionExceptionMap.containsKey(tp));
|
||||||
verifyShareGroupStateTopicRecordsProduced();
|
verifyShareGroupStateTopicRecordsProduced();
|
||||||
}
|
}
|
||||||
|
@ -391,7 +391,7 @@ public class ShareConsumerTest {
|
||||||
shareConsumer.poll(Duration.ofMillis(1000));
|
shareConsumer.poll(Duration.ofMillis(1000));
|
||||||
shareConsumer.close();
|
shareConsumer.close();
|
||||||
|
|
||||||
// We expect no exception as the acknowledgment error code is null.
|
// We expect no exception as the acknowledgement error code is null.
|
||||||
assertFalse(partitionExceptionMap.containsKey(tp));
|
assertFalse(partitionExceptionMap.containsKey(tp));
|
||||||
verifyShareGroupStateTopicRecordsProduced();
|
verifyShareGroupStateTopicRecordsProduced();
|
||||||
}
|
}
|
||||||
|
@ -1500,7 +1500,7 @@ public class ShareConsumerTest {
|
||||||
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallbackWithShareConsumer<>(shareConsumer));
|
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallbackWithShareConsumer<>(shareConsumer));
|
||||||
shareConsumer.subscribe(Set.of(tp.topic()));
|
shareConsumer.subscribe(Set.of(tp.topic()));
|
||||||
|
|
||||||
// The acknowledgment commit callback will try to call a method of ShareConsumer
|
// The acknowledgement commit callback will try to call a method of ShareConsumer
|
||||||
shareConsumer.poll(Duration.ofMillis(5000));
|
shareConsumer.poll(Duration.ofMillis(5000));
|
||||||
// The second poll sends the acknowledgements implicitly.
|
// The second poll sends the acknowledgements implicitly.
|
||||||
// The acknowledgement commit callback will be called and the exception is thrown.
|
// The acknowledgement commit callback will be called and the exception is thrown.
|
||||||
|
@ -1540,14 +1540,14 @@ public class ShareConsumerTest {
|
||||||
producer.send(record);
|
producer.send(record);
|
||||||
producer.flush();
|
producer.flush();
|
||||||
|
|
||||||
// The acknowledgment commit callback will try to call a method of ShareConsumer
|
// The acknowledgement commit callback will try to call a method of ShareConsumer
|
||||||
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallbackWakeup<>(shareConsumer));
|
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallbackWakeup<>(shareConsumer));
|
||||||
shareConsumer.subscribe(Set.of(tp.topic()));
|
shareConsumer.subscribe(Set.of(tp.topic()));
|
||||||
|
|
||||||
TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
|
TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
|
||||||
DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for share consumer");
|
DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for share consumer");
|
||||||
|
|
||||||
// The second poll sends the acknowledgments implicitly.
|
// The second poll sends the acknowledgements implicitly.
|
||||||
shareConsumer.poll(Duration.ofMillis(2000));
|
shareConsumer.poll(Duration.ofMillis(2000));
|
||||||
|
|
||||||
// Till now acknowledgement commit callback has not been called, so no exception thrown yet.
|
// Till now acknowledgement commit callback has not been called, so no exception thrown yet.
|
||||||
|
|
|
@ -195,7 +195,7 @@ import static org.apache.kafka.common.utils.Utils.propsToMap;
|
||||||
* </pre>
|
* </pre>
|
||||||
*
|
*
|
||||||
* <h4>Per-record acknowledgement (explicit acknowledgement)</h4>
|
* <h4>Per-record acknowledgement (explicit acknowledgement)</h4>
|
||||||
* This example demonstrates using different acknowledgement types depending on the outcome of processing the records.
|
* This example demonstrates using different acknowledge types depending on the outcome of processing the records.
|
||||||
* Here the {@code share.acknowledgement.mode} property is set to "explicit" so the consumer must explicitly acknowledge each record.
|
* Here the {@code share.acknowledgement.mode} property is set to "explicit" so the consumer must explicitly acknowledge each record.
|
||||||
* <pre>
|
* <pre>
|
||||||
* Properties props = new Properties();
|
* Properties props = new Properties();
|
||||||
|
|
|
@ -185,7 +185,7 @@ public class Acknowledgements {
|
||||||
currentBatch.acknowledgeTypes().add(ACKNOWLEDGE_TYPE_GAP);
|
currentBatch.acknowledgeTypes().add(ACKNOWLEDGE_TYPE_GAP);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
List<AcknowledgementBatch> optimalBatches = maybeOptimiseAcknowledgementTypes(currentBatch);
|
List<AcknowledgementBatch> optimalBatches = maybeOptimiseAcknowledgeTypes(currentBatch);
|
||||||
|
|
||||||
optimalBatches.forEach(batch -> {
|
optimalBatches.forEach(batch -> {
|
||||||
if (canOptimiseForSingleAcknowledgeType(batch)) {
|
if (canOptimiseForSingleAcknowledgeType(batch)) {
|
||||||
|
@ -204,7 +204,7 @@ public class Acknowledgements {
|
||||||
*/
|
*/
|
||||||
private AcknowledgementBatch maybeCreateNewBatch(AcknowledgementBatch currentBatch, Long nextOffset, List<AcknowledgementBatch> batches) {
|
private AcknowledgementBatch maybeCreateNewBatch(AcknowledgementBatch currentBatch, Long nextOffset, List<AcknowledgementBatch> batches) {
|
||||||
if (nextOffset != currentBatch.lastOffset() + 1) {
|
if (nextOffset != currentBatch.lastOffset() + 1) {
|
||||||
List<AcknowledgementBatch> optimalBatches = maybeOptimiseAcknowledgementTypes(currentBatch);
|
List<AcknowledgementBatch> optimalBatches = maybeOptimiseAcknowledgeTypes(currentBatch);
|
||||||
|
|
||||||
optimalBatches.forEach(batch -> {
|
optimalBatches.forEach(batch -> {
|
||||||
if (canOptimiseForSingleAcknowledgeType(batch)) {
|
if (canOptimiseForSingleAcknowledgeType(batch)) {
|
||||||
|
@ -228,7 +228,7 @@ public class Acknowledgements {
|
||||||
* whose count exceeds the default value. In this case, the batch is split into 2 such that the
|
* whose count exceeds the default value. In this case, the batch is split into 2 such that the
|
||||||
* batch with the continuous records has only 1 acknowledge type in its array.
|
* batch with the continuous records has only 1 acknowledge type in its array.
|
||||||
*/
|
*/
|
||||||
private List<AcknowledgementBatch> maybeOptimiseAcknowledgementTypes(AcknowledgementBatch currentAcknowledgeBatch) {
|
private List<AcknowledgementBatch> maybeOptimiseAcknowledgeTypes(AcknowledgementBatch currentAcknowledgeBatch) {
|
||||||
List<AcknowledgementBatch> batches = new ArrayList<>();
|
List<AcknowledgementBatch> batches = new ArrayList<>();
|
||||||
if (currentAcknowledgeBatch == null) return batches;
|
if (currentAcknowledgeBatch == null) return batches;
|
||||||
|
|
||||||
|
|
|
@ -154,7 +154,7 @@ public class ShareCompletedFetch {
|
||||||
* @param maxRecords The number of records to return; the number returned may be {@code 0 <= maxRecords}
|
* @param maxRecords The number of records to return; the number returned may be {@code 0 <= maxRecords}
|
||||||
* @param checkCrcs Whether to check the CRC of fetched records
|
* @param checkCrcs Whether to check the CRC of fetched records
|
||||||
*
|
*
|
||||||
* @return {@link ShareInFlightBatch The ShareInFlightBatch containing records and their acknowledgments}
|
* @return {@link ShareInFlightBatch The ShareInFlightBatch containing records and their acknowledgements}
|
||||||
*/
|
*/
|
||||||
<K, V> ShareInFlightBatch<K, V> fetchRecords(final Deserializers<K, V> deserializers,
|
<K, V> ShareInFlightBatch<K, V> fetchRecords(final Deserializers<K, V> deserializers,
|
||||||
final int maxRecords,
|
final int maxRecords,
|
||||||
|
|
|
@ -110,7 +110,7 @@ public class ShareFetch<K, V> {
|
||||||
* Acknowledge a single record in the current batch.
|
* Acknowledge a single record in the current batch.
|
||||||
*
|
*
|
||||||
* @param record The record to acknowledge
|
* @param record The record to acknowledge
|
||||||
* @param type The acknowledgment type which indicates whether it was processed successfully
|
* @param type The acknowledge type which indicates whether it was processed successfully
|
||||||
*/
|
*/
|
||||||
public void acknowledge(final ConsumerRecord<K, V> record, final AcknowledgeType type) {
|
public void acknowledge(final ConsumerRecord<K, V> record, final AcknowledgeType type) {
|
||||||
for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> tipBatch : batches.entrySet()) {
|
for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> tipBatch : batches.entrySet()) {
|
||||||
|
@ -129,7 +129,7 @@ public class ShareFetch<K, V> {
|
||||||
* @param topic The topic of the record to acknowledge
|
* @param topic The topic of the record to acknowledge
|
||||||
* @param partition The partition of the record
|
* @param partition The partition of the record
|
||||||
* @param offset The offset of the record
|
* @param offset The offset of the record
|
||||||
* @param type The acknowledgment type which indicates whether it was processed successfully
|
* @param type The acknowledge type which indicates whether it was processed successfully
|
||||||
*/
|
*/
|
||||||
public void acknowledge(final String topic, final int partition, final long offset, final AcknowledgeType type) {
|
public void acknowledge(final String topic, final int partition, final long offset, final AcknowledgeType type) {
|
||||||
for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> tipBatch : batches.entrySet()) {
|
for (Map.Entry<TopicIdPartition, ShareInFlightBatch<K, V>> tipBatch : batches.entrySet()) {
|
||||||
|
|
|
@ -83,7 +83,7 @@ public class AcknowledgementsTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSingleAcknowledgementTypeExceedingLimit() {
|
public void testSingleAcknowledgeTypeExceedingLimit() {
|
||||||
int i = 0;
|
int i = 0;
|
||||||
for (; i < maxRecordsWithSameAcknowledgeType; i++) {
|
for (; i < maxRecordsWithSameAcknowledgeType; i++) {
|
||||||
acks.add(i, AcknowledgeType.ACCEPT);
|
acks.add(i, AcknowledgeType.ACCEPT);
|
||||||
|
@ -119,7 +119,7 @@ public class AcknowledgementsTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSingleAcknowledgementTypeWithGap() {
|
public void testSingleAcknowledgeTypeWithGap() {
|
||||||
for (int i = 0; i < maxRecordsWithSameAcknowledgeType; i++) {
|
for (int i = 0; i < maxRecordsWithSameAcknowledgeType; i++) {
|
||||||
acks.add(i, null);
|
acks.add(i, null);
|
||||||
}
|
}
|
||||||
|
@ -186,7 +186,7 @@ public class AcknowledgementsTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSingleAcknowledgementTypeWithinLimit() {
|
public void testSingleAcknowledgeTypeWithinLimit() {
|
||||||
acks.add(0L, AcknowledgeType.ACCEPT);
|
acks.add(0L, AcknowledgeType.ACCEPT);
|
||||||
acks.add(1L, AcknowledgeType.ACCEPT);
|
acks.add(1L, AcknowledgeType.ACCEPT);
|
||||||
acks.add(2L, AcknowledgeType.ACCEPT);
|
acks.add(2L, AcknowledgeType.ACCEPT);
|
||||||
|
|
|
@ -4462,7 +4462,7 @@ public class SharePartitionTest {
|
||||||
assertEquals(20, sharePartition.startOffset());
|
assertEquals(20, sharePartition.startOffset());
|
||||||
assertEquals(36, sharePartition.endOffset());
|
assertEquals(36, sharePartition.endOffset());
|
||||||
|
|
||||||
// For cached state corresponding to entry 2, the batch state will be ACKNOWLEDGED, hence it will be cleared as part of acknowledgment.
|
// For cached state corresponding to entry 2, the batch state will be ACKNOWLEDGED, hence it will be cleared as part of acknowledgement.
|
||||||
assertEquals(6, sharePartition.cachedState().size());
|
assertEquals(6, sharePartition.cachedState().size());
|
||||||
|
|
||||||
assertEquals(MEMBER_ID, sharePartition.cachedState().get(7L).batchMemberId());
|
assertEquals(MEMBER_ID, sharePartition.cachedState().get(7L).batchMemberId());
|
||||||
|
@ -4768,7 +4768,7 @@ public class SharePartitionTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testLsoMovementAheadOfEndOffsetPostAcknowledgment() {
|
public void testLsoMovementAheadOfEndOffsetPostAcknowledgement() {
|
||||||
SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
|
SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
|
||||||
|
|
||||||
fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5);
|
fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5);
|
||||||
|
@ -4884,7 +4884,7 @@ public class SharePartitionTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testLsoMovementPostGapsInAcknowledgments() {
|
public void testLsoMovementPostGapsInAcknowledgements() {
|
||||||
SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
|
SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
|
||||||
|
|
||||||
MemoryRecords records1 = memoryRecords(2, 5);
|
MemoryRecords records1 = memoryRecords(2, 5);
|
||||||
|
@ -5733,7 +5733,7 @@ public class SharePartitionTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMaybeUpdateCachedStateWhenAcknowledgementTypeAccept() {
|
public void testMaybeUpdateCachedStateWhenAcknowledgeTypeAccept() {
|
||||||
SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
|
SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
|
||||||
|
|
||||||
fetchAcquiredRecords(sharePartition, memoryRecords(250, 0), 250);
|
fetchAcquiredRecords(sharePartition, memoryRecords(250, 0), 250);
|
||||||
|
@ -5753,7 +5753,7 @@ public class SharePartitionTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMaybeUpdateCachedStateWhenAcknowledgementTypeReject() {
|
public void testMaybeUpdateCachedStateWhenAcknowledgeTypeReject() {
|
||||||
SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
|
SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
|
||||||
|
|
||||||
fetchAcquiredRecords(sharePartition, memoryRecords(250, 0), 250);
|
fetchAcquiredRecords(sharePartition, memoryRecords(250, 0), 250);
|
||||||
|
@ -5773,7 +5773,7 @@ public class SharePartitionTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMaybeUpdateCachedStateWhenAcknowledgementTypeRelease() {
|
public void testMaybeUpdateCachedStateWhenAcknowledgeTypeRelease() {
|
||||||
SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
|
SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
|
||||||
|
|
||||||
fetchAcquiredRecords(sharePartition, memoryRecords(250, 0), 250);
|
fetchAcquiredRecords(sharePartition, memoryRecords(250, 0), 250);
|
||||||
|
@ -5937,7 +5937,7 @@ public class SharePartitionTest {
|
||||||
fetchAcquiredRecords(sharePartition, memoryRecords(100, 80), 100);
|
fetchAcquiredRecords(sharePartition, memoryRecords(100, 80), 100);
|
||||||
assertFalse(sharePartition.canAcquireRecords());
|
assertFalse(sharePartition.canAcquireRecords());
|
||||||
|
|
||||||
// Final Acknowledgment, all records are acknowledged here.
|
// Final Acknowledgement, all records are acknowledged here.
|
||||||
sharePartition.acknowledge(MEMBER_ID, List.of(
|
sharePartition.acknowledge(MEMBER_ID, List.of(
|
||||||
new ShareAcknowledgementBatch(50, 179, List.of((byte) 3))));
|
new ShareAcknowledgementBatch(50, 179, List.of((byte) 3))));
|
||||||
|
|
||||||
|
@ -5984,7 +5984,7 @@ public class SharePartitionTest {
|
||||||
fetchAcquiredRecords(sharePartition, memoryRecords(10, 11), 10);
|
fetchAcquiredRecords(sharePartition, memoryRecords(10, 11), 10);
|
||||||
assertTrue(sharePartition.canAcquireRecords());
|
assertTrue(sharePartition.canAcquireRecords());
|
||||||
|
|
||||||
// Sending acknowledgment for the first batch from 11 to 20
|
// Sending acknowledgement for the first batch from 11 to 20
|
||||||
sharePartition.acknowledge(MEMBER_ID, List.of(
|
sharePartition.acknowledge(MEMBER_ID, List.of(
|
||||||
new ShareAcknowledgementBatch(11, 20, List.of((byte) 1))));
|
new ShareAcknowledgementBatch(11, 20, List.of((byte) 1))));
|
||||||
|
|
||||||
|
|
|
@ -7979,12 +7979,12 @@ class KafkaApisTest extends Logging {
|
||||||
|
|
||||||
private def compareAcknowledgementBatches(baseOffset: Long,
|
private def compareAcknowledgementBatches(baseOffset: Long,
|
||||||
endOffset: Long,
|
endOffset: Long,
|
||||||
acknowledgementType: Byte,
|
acknowledgeType: Byte,
|
||||||
acknowledgementBatch: ShareAcknowledgementBatch
|
acknowledgementBatch: ShareAcknowledgementBatch
|
||||||
): Boolean = {
|
): Boolean = {
|
||||||
if (baseOffset == acknowledgementBatch.firstOffset()
|
if (baseOffset == acknowledgementBatch.firstOffset()
|
||||||
&& endOffset == acknowledgementBatch.lastOffset()
|
&& endOffset == acknowledgementBatch.lastOffset()
|
||||||
&& acknowledgementType == acknowledgementBatch.acknowledgeTypes().get(0)) {
|
&& acknowledgeType == acknowledgementBatch.acknowledgeTypes().get(0)) {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
false
|
false
|
||||||
|
|
|
@ -21,7 +21,7 @@ import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The ShareAcknowledgementBatch containing the fields required to acknowledge the fetched records.
|
* The ShareAcknowledgementBatch containing the fields required to acknowledge the fetched records.
|
||||||
* The class abstracts the acknowledgment request for <code>SharePartition</code> class constructed
|
* The class abstracts the acknowledgement request for <code>SharePartition</code> class constructed
|
||||||
* from {@link org.apache.kafka.common.message.ShareFetchRequestData.AcknowledgementBatch} and
|
* from {@link org.apache.kafka.common.message.ShareFetchRequestData.AcknowledgementBatch} and
|
||||||
* {@link org.apache.kafka.common.message.ShareAcknowledgeRequestData.AcknowledgementBatch} classes.
|
* {@link org.apache.kafka.common.message.ShareAcknowledgeRequestData.AcknowledgementBatch} classes.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -48,7 +48,7 @@ public class InFlightBatch {
|
||||||
|
|
||||||
// The offset state map is used to track the state of the records per offset. However, the
|
// The offset state map is used to track the state of the records per offset. However, the
|
||||||
// offset state map is only required when the state of the offsets within same batch are
|
// offset state map is only required when the state of the offsets within same batch are
|
||||||
// different. The states can be different when explicit offset acknowledgment is done which
|
// different. The states can be different when explicit offset acknowledgement is done which
|
||||||
// is different from the batch state.
|
// is different from the batch state.
|
||||||
private NavigableMap<Long, InFlightState> offsetState;
|
private NavigableMap<Long, InFlightState> offsetState;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue