MINOR: Refactor write timeout in CoordinatorRuntime (#15976)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
David Jacot 2024-05-17 19:47:44 +02:00 committed by GitHub
parent 7fea279ff9
commit 5b34574e86
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 40 additions and 35 deletions

View File

@ -541,6 +541,28 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
}
}
class OperationTimeout extends TimerTask {
private final TopicPartition tp;
private final DeferredEvent event;
public OperationTimeout(
TopicPartition tp,
DeferredEvent event,
long delayMs
) {
super(delayMs);
this.event = event;
this.tp = tp;
}
@Override
public void run() {
String name = event.toString();
scheduleInternalOperation("OperationTimeout(name=" + name + ", tp=" + tp + ")", tp,
() -> event.complete(new TimeoutException(name + " timed out after " + delayMs + "ms")));
}
}
/**
* A coordinator write operation.
*
@ -614,7 +636,10 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
*/
final Duration writeTimeout;
private TimerTask writeTimeoutTask = null;
/**
* The operation timeout.
*/
private OperationTimeout operationTimeout = null;
/**
* The result of the write operation. It could be null
@ -753,19 +778,8 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
// Add the response to the deferred queue.
if (!future.isDone()) {
context.deferredEventQueue.add(offset, this);
writeTimeoutTask = new TimerTask(writeTimeout.toMillis()) {
@Override
public void run() {
if (!future.isDone()) {
scheduleInternalOperation(
"WriteTimeout(name=" + name + ", tp=" + tp + ")",
tp,
() -> complete(new TimeoutException("CoordinatorWriteEvent " + name + " timed out after " + writeTimeout.toMillis() + "ms"))
);
}
}
};
timer.add(writeTimeoutTask);
operationTimeout = new OperationTimeout(tp, this, writeTimeout.toMillis());
timer.add(operationTimeout);
} else {
complete(null);
}
@ -798,9 +812,9 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
future.completeExceptionally(exception);
}
if (writeTimeoutTask != null) {
writeTimeoutTask.cancel();
writeTimeoutTask = null;
if (operationTimeout != null) {
operationTimeout.cancel();
operationTimeout = null;
}
}
@ -988,7 +1002,10 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
*/
final Duration writeTimeout;
private TimerTask writeTimeoutTask = null;
/**
* The operation timeout.
*/
private OperationTimeout operationTimeout = null;
/**
* The future that will be completed with the response
@ -1057,20 +1074,8 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
if (!future.isDone()) {
context.deferredEventQueue.add(offset, this);
writeTimeoutTask = new TimerTask(writeTimeout.toMillis()) {
@Override
public void run() {
if (!future.isDone()) {
scheduleInternalOperation(
"WriteTimeout(name=" + name + ", tp=" + tp + ")",
tp,
() -> complete(new TimeoutException("CoordinatorCompleteTransactionEvent " + name +
" timed out after " + writeTimeout.toMillis() + "ms"))
);
}
}
};
timer.add(writeTimeoutTask);
operationTimeout = new OperationTimeout(tp, this, writeTimeout.toMillis());
timer.add(operationTimeout);
} else {
complete(null);
}
@ -1098,9 +1103,9 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
future.completeExceptionally(exception);
}
if (writeTimeoutTask != null) {
writeTimeoutTask.cancel();
writeTimeoutTask = null;
if (operationTimeout != null) {
operationTimeout.cancel();
operationTimeout = null;
}
}