MINOR: Fix NPE from addingReplicas and removingReplicas (#10992)

Fix NPE from addingReplicas and removingReplicas. Make addingReplicas and 
removingReplicas in PartitionRecord non-nullable as described in KIP-746.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
David Arthur 2021-07-07 18:39:43 -04:00 committed by GitHub
parent 61b6539517
commit 03890ff1d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 28 additions and 27 deletions

View File

@ -45,6 +45,7 @@ import org.easymock.EasyMock
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.{ArgumentMatchers, Mockito}
import java.io.File
import java.net.InetAddress
import java.nio.file.Files
@ -52,11 +53,10 @@ import java.util
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.{Collections, Optional, Properties}
import org.apache.kafka.common.metadata.{PartitionRecord, RemoveTopicRecord, TopicRecord}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.image.{TopicImage, TopicsDelta, TopicsImage}
import org.apache.kafka.metadata.PartitionRegistration
import org.apache.kafka.metadata.{PartitionRegistration, Replicas}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
@ -2543,13 +2543,13 @@ class ReplicaManagerTest {
val topicsByName = new util.HashMap[String, TopicImage]()
val fooPartitions = new util.HashMap[Integer, PartitionRegistration]()
fooPartitions.put(0, new PartitionRegistration(Array(1, 2, 3),
Array(1, 2, 3), Array.empty[Int], Array.empty[Int], 1, 100, 200))
Array(1, 2, 3), Replicas.NONE, Replicas.NONE, 1, 100, 200))
fooPartitions.put(1, new PartitionRegistration(Array(4, 5, 6),
Array(4, 5), Array.empty[Int], Array.empty[Int], 5, 300, 400))
Array(4, 5), Replicas.NONE, Replicas.NONE, 5, 300, 400))
val foo = new TopicImage("foo", FOO_UUID, fooPartitions)
val barPartitions = new util.HashMap[Integer, PartitionRegistration]()
barPartitions.put(0, new PartitionRegistration(Array(2, 3, 4),
Array(2, 3, 4), Array.empty[Int], Array.empty[Int], 3, 100, 200))
Array(2, 3, 4), Replicas.NONE, Replicas.NONE, 3, 100, 200))
val bar = new TopicImage("bar", BAR_UUID, barPartitions)
topicsById.put(FOO_UUID, foo)
topicsByName.put("foo", foo)
@ -2601,10 +2601,10 @@ class ReplicaManagerTest {
new TopicPartition("foo", 1) -> true),
Map(new TopicPartition("baz", 0) -> LocalLeaderInfo(BAZ_UUID,
new PartitionRegistration(Array(1, 2, 4), Array(1, 2, 4),
Array.empty[Int], Array.empty[Int], 1, 123, 456))),
Replicas.NONE, Replicas.NONE, 1, 123, 456))),
Map(new TopicPartition("baz", 1) -> LocalLeaderInfo(BAZ_UUID,
new PartitionRegistration(Array(2, 4, 1), Array(2, 4, 1),
Array.empty[Int], Array.empty[Int], 2, 123, 456)))),
Replicas.NONE, Replicas.NONE, 2, 123, 456)))),
replicaManager.calculateDeltaChanges(TEST_DELTA))
}
}

View File

@ -378,7 +378,7 @@ public class ReplicationControlManager {
replicationFactor = OptionalInt.of(assignment.brokerIds().size());
int[] replicas = Replicas.toArray(assignment.brokerIds());
newParts.put(assignment.partitionIndex(), new PartitionRegistration(
replicas, replicas, null, null, replicas[0], 0, 0));
replicas, replicas, Replicas.NONE, Replicas.NONE, replicas[0], 0, 0));
}
} else if (topic.replicationFactor() < -1 || topic.replicationFactor() == 0) {
return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
@ -401,7 +401,7 @@ public class ReplicationControlManager {
for (int partitionId = 0; partitionId < replicas.size(); partitionId++) {
int[] r = Replicas.toArray(replicas.get(partitionId));
newParts.put(partitionId,
new PartitionRegistration(r, r, null, null, r[0], 0, 0));
new PartitionRegistration(r, r, Replicas.NONE, Replicas.NONE, r[0], 0, 0));
}
} catch (InvalidReplicationFactorException e) {
return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
@ -928,8 +928,8 @@ public class ReplicationControlManager {
setTopicId(topicId).
setReplicas(placement).
setIsr(placement).
setRemovingReplicas(null).
setAddingReplicas(null).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()).
setLeader(placement.get(0)).
setLeaderEpoch(0).
setPartitionEpoch(0), PARTITION_RECORD.highestSupportedVersion()));

View File

@ -28,9 +28,9 @@
"about": "The replicas of this partition, sorted by preferred order." },
{ "name": "Isr", "type": "[]int32", "versions": "0+",
"about": "The in-sync replicas of this partition" },
{ "name": "RemovingReplicas", "type": "[]int32", "versions": "0+", "nullableVersions": "0+", "entityType": "brokerId",
{ "name": "RemovingReplicas", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
"about": "The replicas that we are in the process of removing." },
{ "name": "AddingReplicas", "type": "[]int32", "versions": "0+", "nullableVersions": "0+", "entityType": "brokerId",
{ "name": "AddingReplicas", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
"about": "The replicas that we are in the process of adding." },
{ "name": "Leader", "type": "int32", "versions": "0+", "default": "-1", "entityType": "brokerId",
"about": "The lead replica, or -1 if there is no leader." },

View File

@ -427,13 +427,13 @@ public class QuorumControllerTest {
setName("foo").setTopicId(fooId), (short) 1),
new ApiMessageAndVersion(new PartitionRecord().setPartitionId(0).
setTopicId(fooId).setReplicas(Arrays.asList(0, 1, 2)).
setIsr(Arrays.asList(0, 1, 2)).setRemovingReplicas(null).
setAddingReplicas(null).setLeader(0).setLeaderEpoch(0).
setIsr(Arrays.asList(0, 1, 2)).setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()).setLeader(0).setLeaderEpoch(0).
setPartitionEpoch(0), (short) 1),
new ApiMessageAndVersion(new PartitionRecord().setPartitionId(1).
setTopicId(fooId).setReplicas(Arrays.asList(1, 2, 0)).
setIsr(Arrays.asList(1, 2, 0)).setRemovingReplicas(null).
setAddingReplicas(null).setLeader(1).setLeaderEpoch(0).
setIsr(Arrays.asList(1, 2, 0)).setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()).setLeader(1).setLeaderEpoch(0).
setPartitionEpoch(0), (short) 1),
new ApiMessageAndVersion(new RegisterBrokerRecord().
setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0)).

View File

@ -185,7 +185,7 @@ public class ReplicationControlManagerTest {
assertEquals(expectedResponse2, result2.response());
ctx.replay(result2.records());
assertEquals(new PartitionRegistration(new int[] {1, 2, 0},
new int[] {1, 2, 0}, null, null, 1, 0, 0),
new int[] {1, 2, 0}, Replicas.NONE, Replicas.NONE, 1, 0, 0),
replicationControl.getPartition(
((TopicRecord) result2.records().get(0).message()).topicId(), 0));
ControllerResult<CreateTopicsResponseData> result3 =
@ -200,7 +200,7 @@ public class ReplicationControlManagerTest {
Arrays.asList(new ApiMessageAndVersion(new PartitionRecord().
setPartitionId(0).setTopicId(fooId).
setReplicas(Arrays.asList(1, 2, 0)).setIsr(Arrays.asList(1, 2, 0)).
setRemovingReplicas(null).setAddingReplicas(null).setLeader(1).
setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Collections.emptyList()).setLeader(1).
setLeaderEpoch(0).setPartitionEpoch(0), (short) 1),
new ApiMessageAndVersion(new TopicRecord().
setTopicId(fooId).setName("foo"), (short) 1))),

View File

@ -24,6 +24,7 @@ import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@ -82,9 +83,9 @@ public class TopicsImageTest {
List<TopicImage> topics1 = Arrays.asList(
newTopicImage("foo", Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA"),
new PartitionRegistration(new int[] {2, 3, 4},
new int[] {2, 3}, new int[0], new int[0], 2, 1, 345),
new int[] {2, 3}, Replicas.NONE, Replicas.NONE, 2, 1, 345),
new PartitionRegistration(new int[] {3, 4, 5},
new int[] {3, 4, 5}, new int[0], new int[0], 3, 4, 684)),
new int[] {3, 4, 5}, Replicas.NONE, Replicas.NONE, 3, 4, 684)),
newTopicImage("bar", Uuid.fromString("f62ptyETTjet8SL5ZeREiw"),
new PartitionRegistration(new int[] {0, 1, 2, 3, 4},
new int[] {0, 1, 2, 3}, new int[] {1}, new int[] {3, 4}, 0, 1, 345)));

View File

@ -47,11 +47,11 @@ public class PartitionRegistrationTest {
@Test
public void testPartitionControlInfoMergeAndDiff() {
PartitionRegistration a = new PartitionRegistration(
new int[]{1, 2, 3}, new int[]{1, 2}, new int[]{}, new int[]{}, 1, 0, 0);
new int[]{1, 2, 3}, new int[]{1, 2}, Replicas.NONE, Replicas.NONE, 1, 0, 0);
PartitionRegistration b = new PartitionRegistration(
new int[]{1, 2, 3}, new int[]{3}, new int[]{}, new int[]{}, 3, 1, 1);
new int[]{1, 2, 3}, new int[]{3}, Replicas.NONE, Replicas.NONE, 3, 1, 1);
PartitionRegistration c = new PartitionRegistration(
new int[]{1, 2, 3}, new int[]{1}, new int[]{}, new int[]{}, 1, 0, 1);
new int[]{1, 2, 3}, new int[]{1}, Replicas.NONE, Replicas.NONE, 1, 0, 1);
assertEquals(b, a.merge(new PartitionChangeRecord().
setLeader(3).setIsr(Arrays.asList(3))));
assertEquals("isr: [1, 2] -> [3], leader: 1 -> 3, leaderEpoch: 0 -> 1, partitionEpoch: 0 -> 1",
@ -63,7 +63,7 @@ public class PartitionRegistrationTest {
@Test
public void testRecordRoundTrip() {
PartitionRegistration registrationA = new PartitionRegistration(
new int[]{1, 2, 3}, new int[]{1, 2}, new int[]{1}, new int[]{}, 1, 0, 0);
new int[]{1, 2, 3}, new int[]{1, 2}, new int[]{1}, Replicas.NONE, 1, 0, 0);
Uuid topicId = Uuid.fromString("OGdAI5nxT_m-ds3rJMqPLA");
int partitionId = 4;
ApiMessageAndVersion record = registrationA.toRecord(topicId, partitionId);
@ -75,9 +75,9 @@ public class PartitionRegistrationTest {
@Test
public void testToLeaderAndIsrPartitionState() {
PartitionRegistration a = new PartitionRegistration(
new int[]{1, 2, 3}, new int[]{1, 2}, new int[]{}, new int[]{}, 1, 123, 456);
new int[]{1, 2, 3}, new int[]{1, 2}, Replicas.NONE, Replicas.NONE, 1, 123, 456);
PartitionRegistration b = new PartitionRegistration(
new int[]{2, 3, 4}, new int[]{2, 3, 4}, new int[]{}, new int[]{}, 2, 234, 567);
new int[]{2, 3, 4}, new int[]{2, 3, 4}, Replicas.NONE, Replicas.NONE, 2, 234, 567);
assertEquals(new LeaderAndIsrPartitionState().
setTopicName("foo").
setPartitionIndex(1).