HOTFIX: Controller topic deletion should be atomic (#10264)

Topic deletions should be atomic. This fixes a build error caused by merging of both https://github.com/apache/kafka/pull/10253 and https://github.com/apache/kafka/pull/10184 at about the same time. 

Reviewers: David Arthur <mumrah@gmail.com>
This commit is contained in:
Jason Gustafson 2021-03-04 12:19:34 -08:00 committed by GitHub
parent eebc6f279e
commit 60a097ae40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 14 additions and 12 deletions

View File

@ -624,7 +624,7 @@ public class ReplicationControlManager {
results.put(id, ApiError.fromThrowable(e)); results.put(id, ApiError.fromThrowable(e));
} }
} }
return new ControllerResult<>(records, results); return ControllerResult.atomicOf(records, results);
} }
void deleteTopic(Uuid id, List<ApiMessageAndVersion> records) { void deleteTopic(Uuid id, List<ApiMessageAndVersion> records) {

View File

@ -67,6 +67,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(40) @Timeout(40)
@ -451,19 +452,19 @@ public class ReplicationControlManagerTest {
unfenceBroker(0, ctx); unfenceBroker(0, ctx);
registerBroker(1, ctx); registerBroker(1, ctx);
unfenceBroker(1, ctx); unfenceBroker(1, ctx);
ControllerResult<CreateTopicsResponseData> result = ControllerResult<CreateTopicsResponseData> createResult =
replicationControl.createTopics(request); replicationControl.createTopics(request);
CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
Uuid topicId = result.response().topics().find("foo").topicId(); Uuid topicId = createResult.response().topics().find("foo").topicId();
expectedResponse.topics().add(new CreatableTopicResult().setName("foo"). expectedResponse.topics().add(new CreatableTopicResult().setName("foo").
setNumPartitions(3).setReplicationFactor((short) 2). setNumPartitions(3).setReplicationFactor((short) 2).
setErrorMessage(null).setErrorCode((short) 0). setErrorMessage(null).setErrorCode((short) 0).
setTopicId(topicId)); setTopicId(topicId));
assertEquals(expectedResponse, result.response()); assertEquals(expectedResponse, createResult.response());
// Until the records are replayed, no changes are made // Until the records are replayed, no changes are made
assertNull(replicationControl.getPartition(topicId, 0)); assertNull(replicationControl.getPartition(topicId, 0));
assertEmptyTopicConfigs(ctx, "foo"); assertEmptyTopicConfigs(ctx, "foo");
ctx.replay(result.records()); ctx.replay(createResult.records());
assertNotNull(replicationControl.getPartition(topicId, 0)); assertNotNull(replicationControl.getPartition(topicId, 0));
assertNotNull(replicationControl.getPartition(topicId, 1)); assertNotNull(replicationControl.getPartition(topicId, 1));
assertNotNull(replicationControl.getPartition(topicId, 2)); assertNotNull(replicationControl.getPartition(topicId, 2));
@ -483,17 +484,18 @@ public class ReplicationControlManagerTest {
new ResultOrError<>(new ApiError(UNKNOWN_TOPIC_OR_PARTITION))), new ResultOrError<>(new ApiError(UNKNOWN_TOPIC_OR_PARTITION))),
replicationControl.findTopicIds(Long.MAX_VALUE, Collections.singleton("bar"))); replicationControl.findTopicIds(Long.MAX_VALUE, Collections.singleton("bar")));
ControllerResult<Map<Uuid, ApiError>> result1 = replicationControl. ControllerResult<Map<Uuid, ApiError>> invalidDeleteResult = replicationControl.
deleteTopics(Collections.singletonList(invalidId)); deleteTopics(Collections.singletonList(invalidId));
assertEquals(0, result1.records().size()); assertEquals(0, invalidDeleteResult.records().size());
assertEquals(Collections.singletonMap(invalidId, new ApiError(UNKNOWN_TOPIC_ID, null)), assertEquals(Collections.singletonMap(invalidId, new ApiError(UNKNOWN_TOPIC_ID, null)),
result1.response()); invalidDeleteResult.response());
ControllerResult<Map<Uuid, ApiError>> result2 = replicationControl. ControllerResult<Map<Uuid, ApiError>> deleteResult = replicationControl.
deleteTopics(Collections.singletonList(topicId)); deleteTopics(Collections.singletonList(topicId));
assertTrue(deleteResult.isAtomic());
assertEquals(Collections.singletonMap(topicId, new ApiError(NONE, null)), assertEquals(Collections.singletonMap(topicId, new ApiError(NONE, null)),
result2.response()); deleteResult.response());
assertEquals(1, result2.records().size()); assertEquals(1, deleteResult.records().size());
ctx.replay(result2.records()); ctx.replay(deleteResult.records());
assertNull(replicationControl.getPartition(topicId, 0)); assertNull(replicationControl.getPartition(topicId, 0));
assertNull(replicationControl.getPartition(topicId, 1)); assertNull(replicationControl.getPartition(topicId, 1));
assertNull(replicationControl.getPartition(topicId, 2)); assertNull(replicationControl.getPartition(topicId, 2));