mirror of https://github.com/apache/kafka.git
KAFKA-19455: Retry persister request on metadata image issues. (#20078)
* If we get an `UNKNOWN_TOPIC_OR_PARTITION` response from the `ShareCoordinator` is could imply a transient issue where the metadata image is not upto date. * In this case it makes sense to retry the request to give time for data to be available. * In this PR, we are making the required change. Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
parent
ad934d3202
commit
ac583ad2c0
|
@ -454,6 +454,7 @@ public class PersisterStateManager {
|
|||
case COORDINATOR_NOT_AVAILABLE: // retriable error codes
|
||||
case COORDINATOR_LOAD_IN_PROGRESS:
|
||||
case NOT_COORDINATOR:
|
||||
case UNKNOWN_TOPIC_OR_PARTITION:
|
||||
log.debug("Received retriable error in find coordinator for {} using key {}: {}", name(), partitionKey(), error.message());
|
||||
if (!findCoordBackoff.canAttempt()) {
|
||||
log.error("Exhausted max retries to find coordinator for {} using key {} without success.", name(), partitionKey());
|
||||
|
@ -581,6 +582,7 @@ public class PersisterStateManager {
|
|||
case COORDINATOR_NOT_AVAILABLE:
|
||||
case COORDINATOR_LOAD_IN_PROGRESS:
|
||||
case NOT_COORDINATOR:
|
||||
case UNKNOWN_TOPIC_OR_PARTITION:
|
||||
log.debug("Received retriable error in initialize state RPC for key {}: {}", partitionKey(), error.message());
|
||||
if (!initializeStateBackoff.canAttempt()) {
|
||||
log.error("Exhausted max retries for initialize state RPC for key {} without success.", partitionKey());
|
||||
|
@ -739,6 +741,7 @@ public class PersisterStateManager {
|
|||
case COORDINATOR_NOT_AVAILABLE:
|
||||
case COORDINATOR_LOAD_IN_PROGRESS:
|
||||
case NOT_COORDINATOR:
|
||||
case UNKNOWN_TOPIC_OR_PARTITION:
|
||||
log.debug("Received retriable error in write state RPC for key {}: {}", partitionKey(), error.message());
|
||||
if (!writeStateBackoff.canAttempt()) {
|
||||
log.error("Exhausted max retries for write state RPC for key {} without success.", partitionKey());
|
||||
|
@ -881,6 +884,7 @@ public class PersisterStateManager {
|
|||
case COORDINATOR_NOT_AVAILABLE:
|
||||
case COORDINATOR_LOAD_IN_PROGRESS:
|
||||
case NOT_COORDINATOR:
|
||||
case UNKNOWN_TOPIC_OR_PARTITION:
|
||||
log.debug("Received retriable error in read state RPC for key {}: {}", partitionKey(), error.message());
|
||||
if (!readStateBackoff.canAttempt()) {
|
||||
log.error("Exhausted max retries for read state RPC for key {} without success.", partitionKey());
|
||||
|
@ -1023,6 +1027,7 @@ public class PersisterStateManager {
|
|||
case COORDINATOR_NOT_AVAILABLE:
|
||||
case COORDINATOR_LOAD_IN_PROGRESS:
|
||||
case NOT_COORDINATOR:
|
||||
case UNKNOWN_TOPIC_OR_PARTITION:
|
||||
log.debug("Received retriable error in read state summary RPC for key {}: {}", partitionKey(), error.message());
|
||||
if (!readStateSummaryBackoff.canAttempt()) {
|
||||
log.error("Exhausted max retries for read state summary RPC for key {} without success.", partitionKey());
|
||||
|
@ -1162,6 +1167,7 @@ public class PersisterStateManager {
|
|||
case COORDINATOR_NOT_AVAILABLE:
|
||||
case COORDINATOR_LOAD_IN_PROGRESS:
|
||||
case NOT_COORDINATOR:
|
||||
case UNKNOWN_TOPIC_OR_PARTITION:
|
||||
log.debug("Received retriable error in delete state RPC for key {}: {}", partitionKey(), error.message());
|
||||
if (!deleteStateBackoff.canAttempt()) {
|
||||
log.error("Exhausted max retries for delete state RPC for key {} without success.", partitionKey());
|
||||
|
|
|
@ -207,6 +207,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
|
|||
|
||||
@Override
|
||||
public void onLoaded(MetadataImage newImage) {
|
||||
this.metadataImage = newImage;
|
||||
coordinatorMetrics.activateMetricsShard(metricsShard);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue