KAFKA-19350 Don't propagate the error caused by CreateTopicPolicy to FatalFaultHandler (#19857)

`CreateTopicPolicy#validate` may throw unexpected exception  other than
`PolicyViolationException`. We should handle this case as well.

Reviewers: Jhen-Yung Hsu <jhenyunghsu@gmail.com>, Chia-Ping Tsai
 <chia7712@gmail.com>
This commit is contained in:
S.Y. Wang 2025-06-18 01:51:50 +09:00 committed by GitHub
parent 91ce182ec5
commit bf15205647
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 37 additions and 0 deletions

View File

@ -871,6 +871,13 @@ public class ReplicationControlManager {
createTopicPolicy.get().validate(supplier.get()); createTopicPolicy.get().validate(supplier.get());
} catch (PolicyViolationException e) { } catch (PolicyViolationException e) {
return new ApiError(Errors.POLICY_VIOLATION, e.getMessage()); return new ApiError(Errors.POLICY_VIOLATION, e.getMessage());
} catch (Throwable e) {
// return the corresponding API error, but emit the stack trace first if it is an unknown server error
ApiError apiError = ApiError.fromThrowable(e);
if (apiError.error() == Errors.UNKNOWN_SERVER_ERROR) {
log.error("Unknown server error validating Create Topic", e);
}
return apiError;
} }
} }
return ApiError.NONE; return ApiError.NONE;

View File

@ -25,6 +25,7 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException; import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.PolicyViolationException; import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.StaleBrokerEpochException; import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.errors.UnsupportedVersionException;
@ -143,6 +144,7 @@ import static org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRES
import static org.apache.kafka.common.protocol.Errors.POLICY_VIOLATION; import static org.apache.kafka.common.protocol.Errors.POLICY_VIOLATION;
import static org.apache.kafka.common.protocol.Errors.PREFERRED_LEADER_NOT_AVAILABLE; import static org.apache.kafka.common.protocol.Errors.PREFERRED_LEADER_NOT_AVAILABLE;
import static org.apache.kafka.common.protocol.Errors.THROTTLING_QUOTA_EXCEEDED; import static org.apache.kafka.common.protocol.Errors.THROTTLING_QUOTA_EXCEEDED;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_SERVER_ERROR;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID; import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION; import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION;
import static org.apache.kafka.controller.ControllerRequestContextUtil.QUOTA_EXCEEDED_IN_TEST_MSG; import static org.apache.kafka.controller.ControllerRequestContextUtil.QUOTA_EXCEEDED_IN_TEST_MSG;
@ -920,6 +922,34 @@ public class ReplicationControlManagerTest {
ctx.createTestTopic("quux", new int[][] {new int[] {1, 2, 0}}, POLICY_VIOLATION.code()); ctx.createTestTopic("quux", new int[][] {new int[] {1, 2, 0}}, POLICY_VIOLATION.code());
} }
@Test
public void testCreateTopicsWithPolicyUnexpectedException() {
CreateTopicPolicy policy = new CreateTopicPolicy() {
@Override
public void validate(RequestMetadata requestMetadata) throws PolicyViolationException {
if (requestMetadata.topic().equals("known_error")) {
throw new InvalidTopicException("Known client-server errors");
}
throw new RuntimeException("Unknown client-server errors");
}
@Override
public void close() throws Exception { /* Nothing to do */ }
@Override
public void configure(Map<String, ?> configs) { /* Nothing to do */ }
};
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().
setCreateTopicPolicy(policy).
build();
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);
ctx.createTestTopic("known_error", 2, (short) 2, INVALID_TOPIC_EXCEPTION.code());
ctx.createTestTopic("blah_error", 2, (short) 2, UNKNOWN_SERVER_ERROR.code());
}
@Test @Test
public void testCreateTopicWithCollisionChars() { public void testCreateTopicWithCollisionChars() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build(); ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();