MINOR: Fixing typo in ExactlyOnceWorkerSourceTask#committableRecords (#13755)

Co-authored-by: Sagar Rao <sagarrao@Sagars-MacBook-Pro.local>

Reviewers: Yash Mayya <yash.mayya@gmail.com>, Viktor Somogyi-Vass <viktorsomogyi@gmail.com>
This commit is contained in:
vamossagar12 2023-05-31 19:56:31 +05:30 committed by GitHub
parent 1957be19d9
commit 03ab563206
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 9 additions and 9 deletions

View File

@ -61,7 +61,7 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask {
private static final Logger log = LoggerFactory.getLogger(ExactlyOnceWorkerSourceTask.class);
private boolean transactionOpen;
private final LinkedHashMap<SourceRecord, RecordMetadata> commitableRecords;
private final LinkedHashMap<SourceRecord, RecordMetadata> committableRecords;
private final TransactionBoundaryManager transactionBoundaryManager;
private final TransactionMetricsGroup transactionMetrics;
@ -101,7 +101,7 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask {
loader, time, retryWithToleranceOperator, statusBackingStore, closeExecutor);
this.transactionOpen = false;
this.commitableRecords = new LinkedHashMap<>();
this.committableRecords = new LinkedHashMap<>();
this.preProducerCheck = preProducerCheck;
this.postProducerCheck = postProducerCheck;
@ -146,8 +146,8 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask {
@Override
protected void recordDropped(SourceRecord record) {
synchronized (commitableRecords) {
commitableRecords.put(record, null);
synchronized (committableRecords) {
committableRecords.put(record, null);
}
transactionBoundaryManager.maybeCommitTransactionForRecord(record);
}
@ -192,8 +192,8 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask {
ProducerRecord<byte[], byte[]> producerRecord,
RecordMetadata recordMetadata
) {
synchronized (commitableRecords) {
commitableRecords.put(sourceRecord, recordMetadata);
synchronized (committableRecords) {
committableRecords.put(sourceRecord, recordMetadata);
}
}
@ -329,9 +329,9 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask {
log.debug("{} Finished commitOffsets successfully in {} ms", this, durationMillis);
// Synchronize in order to guarantee that writes on other threads are picked up by this one
synchronized (commitableRecords) {
commitableRecords.forEach(this::commitTaskRecord);
commitableRecords.clear();
synchronized (committableRecords) {
committableRecords.forEach(this::commitTaskRecord);
committableRecords.clear();
}
commitSourceTask();
}