From 03ab5632065eb6ccac24924e90d296d22122319b Mon Sep 17 00:00:00 2001 From: vamossagar12 Date: Wed, 31 May 2023 19:56:31 +0530 Subject: [PATCH] MINOR: Fixing typo in ExactlyOnceWorkerSourceTask#committableRecords (#13755) Co-authored-by: Sagar Rao Reviewers: Yash Mayya , Viktor Somogyi-Vass --- .../runtime/ExactlyOnceWorkerSourceTask.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java index 30dafaac81d..f8f0d9f393c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java @@ -61,7 +61,7 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask { private static final Logger log = LoggerFactory.getLogger(ExactlyOnceWorkerSourceTask.class); private boolean transactionOpen; - private final LinkedHashMap commitableRecords; + private final LinkedHashMap committableRecords; private final TransactionBoundaryManager transactionBoundaryManager; private final TransactionMetricsGroup transactionMetrics; @@ -101,7 +101,7 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask { loader, time, retryWithToleranceOperator, statusBackingStore, closeExecutor); this.transactionOpen = false; - this.commitableRecords = new LinkedHashMap<>(); + this.committableRecords = new LinkedHashMap<>(); this.preProducerCheck = preProducerCheck; this.postProducerCheck = postProducerCheck; @@ -146,8 +146,8 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask { @Override protected void recordDropped(SourceRecord record) { - synchronized (commitableRecords) { - commitableRecords.put(record, null); + synchronized (committableRecords) { + committableRecords.put(record, null); } transactionBoundaryManager.maybeCommitTransactionForRecord(record); } @@ -192,8 +192,8 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask { ProducerRecord producerRecord, RecordMetadata recordMetadata ) { - synchronized (commitableRecords) { - commitableRecords.put(sourceRecord, recordMetadata); + synchronized (committableRecords) { + committableRecords.put(sourceRecord, recordMetadata); } } @@ -329,9 +329,9 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask { log.debug("{} Finished commitOffsets successfully in {} ms", this, durationMillis); // Synchronize in order to guarantee that writes on other threads are picked up by this one - synchronized (commitableRecords) { - commitableRecords.forEach(this::commitTaskRecord); - commitableRecords.clear(); + synchronized (committableRecords) { + committableRecords.forEach(this::commitTaskRecord); + committableRecords.clear(); } commitSourceTask(); }