diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index 22c8de053a0..9ad4bba3424 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -871,6 +871,13 @@ public class ReplicationControlManager { createTopicPolicy.get().validate(supplier.get()); } catch (PolicyViolationException e) { return new ApiError(Errors.POLICY_VIOLATION, e.getMessage()); + } catch (Throwable e) { + // return the corresponding API error, but emit the stack trace first if it is an unknown server error + ApiError apiError = ApiError.fromThrowable(e); + if (apiError.error() == Errors.UNKNOWN_SERVER_ERROR) { + log.error("Unknown server error validating Create Topic", e); + } + return apiError; } } return ApiError.NONE; diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index 3a33111a318..ff18775e79c 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.errors.InvalidReplicaAssignmentException; +import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.PolicyViolationException; import org.apache.kafka.common.errors.StaleBrokerEpochException; import org.apache.kafka.common.errors.UnsupportedVersionException; @@ -143,6 +144,7 @@ import static org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRES import static org.apache.kafka.common.protocol.Errors.POLICY_VIOLATION; import static org.apache.kafka.common.protocol.Errors.PREFERRED_LEADER_NOT_AVAILABLE; import static org.apache.kafka.common.protocol.Errors.THROTTLING_QUOTA_EXCEEDED; +import static org.apache.kafka.common.protocol.Errors.UNKNOWN_SERVER_ERROR; import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID; import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION; import static org.apache.kafka.controller.ControllerRequestContextUtil.QUOTA_EXCEEDED_IN_TEST_MSG; @@ -920,6 +922,34 @@ public class ReplicationControlManagerTest { ctx.createTestTopic("quux", new int[][] {new int[] {1, 2, 0}}, POLICY_VIOLATION.code()); } + @Test + public void testCreateTopicsWithPolicyUnexpectedException() { + CreateTopicPolicy policy = new CreateTopicPolicy() { + @Override + public void validate(RequestMetadata requestMetadata) throws PolicyViolationException { + if (requestMetadata.topic().equals("known_error")) { + throw new InvalidTopicException("Known client-server errors"); + } + + throw new RuntimeException("Unknown client-server errors"); + } + + @Override + public void close() throws Exception { /* Nothing to do */ } + + @Override + public void configure(Map configs) { /* Nothing to do */ } + }; + + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder(). + setCreateTopicPolicy(policy). + build(); + ctx.registerBrokers(0, 1, 2); + ctx.unfenceBrokers(0, 1, 2); + ctx.createTestTopic("known_error", 2, (short) 2, INVALID_TOPIC_EXCEPTION.code()); + ctx.createTestTopic("blah_error", 2, (short) 2, UNKNOWN_SERVER_ERROR.code()); + } + @Test public void testCreateTopicWithCollisionChars() { ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();