mirror of https://github.com/apache/kafka.git
MINOR: Code cleanup in metadata module (#16065)
Reviewers: Mickael Maison <mickael.maison@gmail.com>
This commit is contained in:
parent
ebe1e964ff
commit
226f3c57e3
|
|
@ -258,7 +258,7 @@ public class ClusterControlManager {
|
|||
*/
|
||||
private final boolean zkMigrationEnabled;
|
||||
|
||||
private BrokerUncleanShutdownHandler brokerUncleanShutdownHandler;
|
||||
private final BrokerUncleanShutdownHandler brokerUncleanShutdownHandler;
|
||||
|
||||
/**
|
||||
* Maps controller IDs to controller registrations.
|
||||
|
|
|
|||
|
|
@ -58,8 +58,7 @@ public class PartitionChangeBuilder {
|
|||
if (record.removingReplicas() != null) return false;
|
||||
if (record.addingReplicas() != null) return false;
|
||||
if (record.leaderRecoveryState() != LeaderRecoveryState.NO_CHANGE) return false;
|
||||
if (record.directories() != null) return false;
|
||||
return true;
|
||||
return record.directories() == null;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -515,7 +514,7 @@ public class PartitionChangeBuilder {
|
|||
if (record.isr() != null && record.isr().isEmpty() && (partition.lastKnownElr.length != 1 ||
|
||||
partition.lastKnownElr[0] != partition.leader)) {
|
||||
// Only update the last known leader when the first time the partition becomes leaderless.
|
||||
record.setLastKnownElr(Arrays.asList(partition.leader));
|
||||
record.setLastKnownElr(Collections.singletonList(partition.leader));
|
||||
} else if ((record.leader() >= 0 || (partition.leader != NO_LEADER && record.leader() != NO_LEADER))
|
||||
&& partition.lastKnownElr.length > 0) {
|
||||
// Clear the LastKnownElr field if the partition will have or continues to have a valid leader.
|
||||
|
|
|
|||
|
|
@ -130,7 +130,6 @@ import org.apache.kafka.timeline.SnapshotRegistry;
|
|||
import org.slf4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
|
|
@ -1405,7 +1404,7 @@ public final class QuorumController implements Controller {
|
|||
maybeScheduleNextWriteNoOpRecord();
|
||||
|
||||
return ControllerResult.of(
|
||||
Arrays.asList(new ApiMessageAndVersion(new NoOpRecord(), (short) 0)),
|
||||
Collections.singletonList(new ApiMessageAndVersion(new NoOpRecord(), (short) 0)),
|
||||
null
|
||||
);
|
||||
},
|
||||
|
|
|
|||
|
|
@ -1904,7 +1904,7 @@ public class ReplicationControlManager {
|
|||
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
|
||||
}
|
||||
if (brokerWithUncleanShutdown != NO_LEADER) {
|
||||
builder.setUncleanShutdownReplicas(Arrays.asList(brokerWithUncleanShutdown));
|
||||
builder.setUncleanShutdownReplicas(Collections.singletonList(brokerWithUncleanShutdown));
|
||||
}
|
||||
|
||||
// Note: if brokerToRemove and brokerWithUncleanShutdown were passed as NO_LEADER, this is a no-op (the new
|
||||
|
|
|
|||
|
|
@ -37,8 +37,7 @@ public class ControllerExceptions {
|
|||
exception = exception.getCause();
|
||||
if (exception == null) return false;
|
||||
}
|
||||
if (!(exception instanceof TimeoutException)) return false;
|
||||
return true;
|
||||
return exception instanceof TimeoutException;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -53,8 +52,7 @@ public class ControllerExceptions {
|
|||
exception = exception.getCause();
|
||||
if (exception == null) return false;
|
||||
}
|
||||
if (!(exception instanceof NotControllerException)) return false;
|
||||
return true;
|
||||
return exception instanceof NotControllerException;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -116,8 +116,7 @@ public final class EventHandlerExceptionInfo {
|
|||
if (a == null) return b == null;
|
||||
if (b == null) return false;
|
||||
if (!a.getClass().equals(b.getClass())) return false;
|
||||
if (!Objects.equals(a.getMessage(), b.getMessage())) return false;
|
||||
return true;
|
||||
return Objects.equals(a.getMessage(), b.getMessage());
|
||||
}
|
||||
|
||||
EventHandlerExceptionInfo(
|
||||
|
|
|
|||
|
|
@ -366,26 +366,24 @@ public class BrokerRegistration {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder bld = new StringBuilder();
|
||||
bld.append("BrokerRegistration(id=").append(id);
|
||||
bld.append(", epoch=").append(epoch);
|
||||
bld.append(", incarnationId=").append(incarnationId);
|
||||
bld.append(", listeners=[").append(
|
||||
listeners.keySet().stream().sorted().
|
||||
map(n -> listeners.get(n).toString()).
|
||||
collect(Collectors.joining(", ")));
|
||||
bld.append("], supportedFeatures={").append(
|
||||
supportedFeatures.keySet().stream().sorted().
|
||||
map(k -> k + ": " + supportedFeatures.get(k)).
|
||||
collect(Collectors.joining(", ")));
|
||||
bld.append("}");
|
||||
bld.append(", rack=").append(rack);
|
||||
bld.append(", fenced=").append(fenced);
|
||||
bld.append(", inControlledShutdown=").append(inControlledShutdown);
|
||||
bld.append(", isMigratingZkBroker=").append(isMigratingZkBroker);
|
||||
bld.append(", directories=").append(directories);
|
||||
bld.append(")");
|
||||
return bld.toString();
|
||||
return "BrokerRegistration(id=" + id +
|
||||
", epoch=" + epoch +
|
||||
", incarnationId=" + incarnationId +
|
||||
", listeners=[" +
|
||||
listeners.keySet().stream().sorted().
|
||||
map(n -> listeners.get(n).toString()).
|
||||
collect(Collectors.joining(", ")) +
|
||||
"], supportedFeatures={" +
|
||||
supportedFeatures.keySet().stream().sorted().
|
||||
map(k -> k + ": " + supportedFeatures.get(k)).
|
||||
collect(Collectors.joining(", ")) +
|
||||
"}" +
|
||||
", rack=" + rack +
|
||||
", fenced=" + fenced +
|
||||
", inControlledShutdown=" + inControlledShutdown +
|
||||
", isMigratingZkBroker=" + isMigratingZkBroker +
|
||||
", directories=" + directories +
|
||||
")";
|
||||
}
|
||||
|
||||
public BrokerRegistration cloneWith(
|
||||
|
|
|
|||
|
|
@ -214,20 +214,18 @@ public class ControllerRegistration {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder bld = new StringBuilder();
|
||||
bld.append("ControllerRegistration(id=").append(id);
|
||||
bld.append(", incarnationId=").append(incarnationId);
|
||||
bld.append(", zkMigrationReady=").append(zkMigrationReady);
|
||||
bld.append(", listeners=[").append(
|
||||
listeners.keySet().stream().sorted().
|
||||
map(n -> listeners.get(n).toString()).
|
||||
collect(Collectors.joining(", ")));
|
||||
bld.append("], supportedFeatures={").append(
|
||||
supportedFeatures.keySet().stream().sorted().
|
||||
map(k -> k + ": " + supportedFeatures.get(k)).
|
||||
collect(Collectors.joining(", ")));
|
||||
bld.append("}");
|
||||
bld.append(")");
|
||||
return bld.toString();
|
||||
return "ControllerRegistration(id=" + id +
|
||||
", incarnationId=" + incarnationId +
|
||||
", zkMigrationReady=" + zkMigrationReady +
|
||||
", listeners=[" +
|
||||
listeners.keySet().stream().sorted().
|
||||
map(n -> listeners.get(n).toString()).
|
||||
collect(Collectors.joining(", ")) +
|
||||
"], supportedFeatures={" +
|
||||
supportedFeatures.keySet().stream().sorted().
|
||||
map(k -> k + ": " + supportedFeatures.get(k)).
|
||||
collect(Collectors.joining(", ")) +
|
||||
"}" +
|
||||
")";
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -70,11 +70,9 @@ public class FinalizedControllerFeatures {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder bld = new StringBuilder();
|
||||
bld.append("{");
|
||||
bld.append("featureMap=").append(featureMap.toString());
|
||||
bld.append(", epoch=").append(epoch);
|
||||
bld.append("}");
|
||||
return bld.toString();
|
||||
return "FinalizedControllerFeatures(" +
|
||||
"featureMap=" + featureMap.toString() +
|
||||
", epoch=" + epoch +
|
||||
")";
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -448,20 +448,18 @@ public class PartitionRegistration {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder builder = new StringBuilder("PartitionRegistration(");
|
||||
builder.append("replicas=").append(Arrays.toString(replicas));
|
||||
builder.append(", directories=").append(Arrays.toString(directories));
|
||||
builder.append(", isr=").append(Arrays.toString(isr));
|
||||
builder.append(", removingReplicas=").append(Arrays.toString(removingReplicas));
|
||||
builder.append(", addingReplicas=").append(Arrays.toString(addingReplicas));
|
||||
builder.append(", elr=").append(Arrays.toString(elr));
|
||||
builder.append(", lastKnownElr=").append(Arrays.toString(lastKnownElr));
|
||||
builder.append(", leader=").append(leader);
|
||||
builder.append(", leaderRecoveryState=").append(leaderRecoveryState);
|
||||
builder.append(", leaderEpoch=").append(leaderEpoch);
|
||||
builder.append(", partitionEpoch=").append(partitionEpoch);
|
||||
builder.append(")");
|
||||
return builder.toString();
|
||||
return "PartitionRegistration(" + "replicas=" + Arrays.toString(replicas) +
|
||||
", directories=" + Arrays.toString(directories) +
|
||||
", isr=" + Arrays.toString(isr) +
|
||||
", removingReplicas=" + Arrays.toString(removingReplicas) +
|
||||
", addingReplicas=" + Arrays.toString(addingReplicas) +
|
||||
", elr=" + Arrays.toString(elr) +
|
||||
", lastKnownElr=" + Arrays.toString(lastKnownElr) +
|
||||
", leader=" + leader +
|
||||
", leaderRecoveryState=" + leaderRecoveryState +
|
||||
", leaderEpoch=" + leaderEpoch +
|
||||
", partitionEpoch=" + partitionEpoch +
|
||||
")";
|
||||
}
|
||||
|
||||
public boolean hasSameAssignment(PartitionRegistration registration) {
|
||||
|
|
|
|||
|
|
@ -318,16 +318,16 @@ public class AclControlManagerTest {
|
|||
AclBinding aclBinding = new AclBinding(new ResourcePattern(TOPIC, "topic-1", LITERAL),
|
||||
new AccessControlEntry("User:user", "10.0.0.1", AclOperation.ALL, ALLOW));
|
||||
|
||||
ControllerResult<List<AclCreateResult>> createResult = manager.createAcls(Arrays.asList(aclBinding));
|
||||
ControllerResult<List<AclCreateResult>> createResult = manager.createAcls(Collections.singletonList(aclBinding));
|
||||
Uuid id = ((AccessControlEntryRecord) createResult.records().get(0).message()).id();
|
||||
assertEquals(1, createResult.records().size());
|
||||
|
||||
ControllerResult<List<AclDeleteResult>> deleteAclResultsAnyFilter = manager.deleteAcls(Arrays.asList(AclBindingFilter.ANY));
|
||||
ControllerResult<List<AclDeleteResult>> deleteAclResultsAnyFilter = manager.deleteAcls(Collections.singletonList(AclBindingFilter.ANY));
|
||||
assertEquals(1, deleteAclResultsAnyFilter.records().size());
|
||||
assertEquals(id, ((RemoveAccessControlEntryRecord) deleteAclResultsAnyFilter.records().get(0).message()).id());
|
||||
assertEquals(1, deleteAclResultsAnyFilter.response().size());
|
||||
|
||||
ControllerResult<List<AclDeleteResult>> deleteAclResultsSpecificFilter = manager.deleteAcls(Arrays.asList(aclBinding.toFilter()));
|
||||
ControllerResult<List<AclDeleteResult>> deleteAclResultsSpecificFilter = manager.deleteAcls(Collections.singletonList(aclBinding.toFilter()));
|
||||
assertEquals(1, deleteAclResultsSpecificFilter.records().size());
|
||||
assertEquals(id, ((RemoveAccessControlEntryRecord) deleteAclResultsSpecificFilter.records().get(0).message()).id());
|
||||
assertEquals(1, deleteAclResultsSpecificFilter.response().size());
|
||||
|
|
|
|||
|
|
@ -228,19 +228,19 @@ public class ClientQuotaControlManagerTest {
|
|||
new EntityData().setEntityType("user").setEntityName("user-3"),
|
||||
new EntityData().setEntityType("client-id").setEntityName(null))).
|
||||
setKey("request_percentage").setValue(55.55).setRemove(false), (short) 0),
|
||||
new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
|
||||
new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Collections.singletonList(
|
||||
new EntityData().setEntityType("user").setEntityName("user-1"))).
|
||||
setKey("request_percentage").setValue(56.56).setRemove(false), (short) 0),
|
||||
new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
|
||||
new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Collections.singletonList(
|
||||
new EntityData().setEntityType("user").setEntityName("user-2"))).
|
||||
setKey("request_percentage").setValue(57.57).setRemove(false), (short) 0),
|
||||
new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
|
||||
new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Collections.singletonList(
|
||||
new EntityData().setEntityType("user").setEntityName("user-3"))).
|
||||
setKey("request_percentage").setValue(58.58).setRemove(false), (short) 0),
|
||||
new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
|
||||
new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Collections.singletonList(
|
||||
new EntityData().setEntityType("user").setEntityName(null))).
|
||||
setKey("request_percentage").setValue(59.59).setRemove(false), (short) 0),
|
||||
new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
|
||||
new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Collections.singletonList(
|
||||
new EntityData().setEntityType("client-id").setEntityName("client-id-2"))).
|
||||
setKey("request_percentage").setValue(60.60).setRemove(false), (short) 0));
|
||||
records = new ArrayList<>(records);
|
||||
|
|
@ -323,7 +323,7 @@ public class ClientQuotaControlManagerTest {
|
|||
|
||||
@Test
|
||||
public void testConfigKeysForEntityTypeWithUser() {
|
||||
testConfigKeysForEntityType(Arrays.asList(ClientQuotaEntity.USER),
|
||||
testConfigKeysForEntityType(Collections.singletonList(ClientQuotaEntity.USER),
|
||||
Arrays.asList(
|
||||
"producer_byte_rate",
|
||||
"consumer_byte_rate",
|
||||
|
|
@ -334,7 +334,7 @@ public class ClientQuotaControlManagerTest {
|
|||
|
||||
@Test
|
||||
public void testConfigKeysForEntityTypeWithClientId() {
|
||||
testConfigKeysForEntityType(Arrays.asList(ClientQuotaEntity.CLIENT_ID),
|
||||
testConfigKeysForEntityType(Collections.singletonList(ClientQuotaEntity.CLIENT_ID),
|
||||
Arrays.asList(
|
||||
"producer_byte_rate",
|
||||
"consumer_byte_rate",
|
||||
|
|
@ -356,8 +356,8 @@ public class ClientQuotaControlManagerTest {
|
|||
|
||||
@Test
|
||||
public void testConfigKeysForEntityTypeWithIp() {
|
||||
testConfigKeysForEntityType(Arrays.asList(ClientQuotaEntity.IP),
|
||||
Arrays.asList(
|
||||
testConfigKeysForEntityType(Collections.singletonList(ClientQuotaEntity.IP),
|
||||
Collections.singletonList(
|
||||
"connection_creation_rate"
|
||||
));
|
||||
}
|
||||
|
|
@ -386,7 +386,7 @@ public class ClientQuotaControlManagerTest {
|
|||
|
||||
@Test
|
||||
public void testConfigKeysForEmptyEntity() {
|
||||
testConfigKeysError(Arrays.asList(),
|
||||
testConfigKeysError(Collections.emptyList(),
|
||||
new ApiError(Errors.INVALID_REQUEST, "Invalid empty client quota entity"));
|
||||
}
|
||||
|
||||
|
|
@ -427,7 +427,7 @@ public class ClientQuotaControlManagerTest {
|
|||
static {
|
||||
VALID_CLIENT_ID_QUOTA_KEYS = new HashMap<>();
|
||||
assertEquals(ApiError.NONE, ClientQuotaControlManager.configKeysForEntityType(
|
||||
keysToEntity(Arrays.asList(ClientQuotaEntity.CLIENT_ID)), VALID_CLIENT_ID_QUOTA_KEYS));
|
||||
keysToEntity(Collections.singletonList(ClientQuotaEntity.CLIENT_ID)), VALID_CLIENT_ID_QUOTA_KEYS));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
|
|
@ -327,14 +327,14 @@ public class ClusterControlManagerTest {
|
|||
short expectedVersion = metadataVersion.registerBrokerRecordVersion();
|
||||
|
||||
assertEquals(
|
||||
asList(new ApiMessageAndVersion(new RegisterBrokerRecord().
|
||||
Collections.singletonList(new ApiMessageAndVersion(new RegisterBrokerRecord().
|
||||
setBrokerEpoch(123L).
|
||||
setBrokerId(0).
|
||||
setRack(null).
|
||||
setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")).
|
||||
setFenced(true).
|
||||
setLogDirs(logDirs).
|
||||
setFeatures(new RegisterBrokerRecord.BrokerFeatureCollection(asList(
|
||||
setFeatures(new RegisterBrokerRecord.BrokerFeatureCollection(Collections.singletonList(
|
||||
new RegisterBrokerRecord.BrokerFeature().
|
||||
setName(MetadataVersion.FEATURE_NAME).
|
||||
setMinSupportedVersion((short) 1).
|
||||
|
|
@ -673,7 +673,7 @@ public class ClusterControlManagerTest {
|
|||
RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().setBrokerEpoch(100).setBrokerId(1).setLogDirs(Collections.emptyList());
|
||||
brokerRecord.endPoints().add(new BrokerEndpoint().setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).setPort((short) 9092).setName("PLAINTEXT").setHost("127.0.0.1"));
|
||||
clusterControl.replay(brokerRecord, 100L);
|
||||
registerNewBrokerWithDirs(clusterControl, 2, asList(Uuid.fromString("singleOnlineDirectoryA")));
|
||||
registerNewBrokerWithDirs(clusterControl, 2, Collections.singletonList(Uuid.fromString("singleOnlineDirectoryA")));
|
||||
registerNewBrokerWithDirs(clusterControl, 3, asList(Uuid.fromString("s4fRmyNFSH6J0vI8AVA5ew"), Uuid.fromString("UbtxBcqYSnKUEMcnTyZFWw")));
|
||||
assertEquals(DirectoryId.MIGRATING, clusterControl.defaultDir(1));
|
||||
assertEquals(Uuid.fromString("singleOnlineDirectoryA"), clusterControl.defaultDir(2));
|
||||
|
|
|
|||
|
|
@ -33,7 +33,6 @@ import org.apache.kafka.server.policy.AlterConfigPolicy;
|
|||
import org.apache.kafka.server.policy.AlterConfigPolicy.RequestMetadata;
|
||||
|
||||
import java.util.AbstractMap.SimpleImmutableEntry;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
|
|
@ -57,7 +56,7 @@ import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
|
|||
import static org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD;
|
||||
import static org.apache.kafka.server.config.ConfigSynonym.HOURS_TO_MILLISECONDS;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
|
||||
|
||||
@Timeout(value = 40)
|
||||
|
|
@ -80,9 +79,9 @@ public class ConfigurationControlManagerTest {
|
|||
public static final Map<String, List<ConfigSynonym>> SYNONYMS = new HashMap<>();
|
||||
|
||||
static {
|
||||
SYNONYMS.put("abc", Arrays.asList(new ConfigSynonym("foo.bar")));
|
||||
SYNONYMS.put("def", Arrays.asList(new ConfigSynonym("baz")));
|
||||
SYNONYMS.put("quuux", Arrays.asList(new ConfigSynonym("quux", HOURS_TO_MILLISECONDS)));
|
||||
SYNONYMS.put("abc", Collections.singletonList(new ConfigSynonym("foo.bar")));
|
||||
SYNONYMS.put("def", Collections.singletonList(new ConfigSynonym("baz")));
|
||||
SYNONYMS.put("quuux", Collections.singletonList(new ConfigSynonym("quux", HOURS_TO_MILLISECONDS)));
|
||||
}
|
||||
|
||||
static final KafkaConfigSchema SCHEMA = new KafkaConfigSchema(CONFIGS, SYNONYMS);
|
||||
|
|
@ -138,7 +137,7 @@ public class ConfigurationControlManagerTest {
|
|||
assertEquals(toMap(entry("abc", "x,y,z"), entry("def", "blah")),
|
||||
manager.getConfigs(MYTOPIC));
|
||||
assertEquals("x,y,z", manager.getTopicConfig(MYTOPIC.name(), "abc"));
|
||||
assertTrue(manager.getTopicConfig(MYTOPIC.name(), "none-exists") == null);
|
||||
assertNull(manager.getTopicConfig(MYTOPIC.name(), "none-exists"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -382,7 +381,7 @@ public class ConfigurationControlManagerTest {
|
|||
for (ApiMessageAndVersion message : expectedRecords1) {
|
||||
manager.replay((ConfigRecord) message.message());
|
||||
}
|
||||
assertEquals(ControllerResult.atomicOf(asList(
|
||||
assertEquals(ControllerResult.atomicOf(Collections.singletonList(
|
||||
new ApiMessageAndVersion(
|
||||
new ConfigRecord()
|
||||
.setResourceType(TOPIC.id())
|
||||
|
|
|
|||
|
|
@ -169,8 +169,8 @@ public class FeatureControlManagerTest {
|
|||
setQuorumFeatures(features("foo", 1, 5, "bar", 0, 3)).
|
||||
setSnapshotRegistry(snapshotRegistry).
|
||||
setClusterFeatureSupportDescriber(createFakeClusterFeatureSupportDescriber(
|
||||
Arrays.asList(new SimpleImmutableEntry<>(5, Collections.singletonMap("bar", VersionRange.of(0, 3)))),
|
||||
Arrays.asList())).
|
||||
Collections.singletonList(new SimpleImmutableEntry<>(5, singletonMap("bar", VersionRange.of(0, 3)))),
|
||||
emptyList())).
|
||||
build();
|
||||
|
||||
assertEquals(ControllerResult.atomicOf(emptyList(),
|
||||
|
|
@ -389,14 +389,14 @@ public class FeatureControlManagerTest {
|
|||
FeatureControlManager manager = new FeatureControlManager.Builder().
|
||||
setQuorumFeatures(new QuorumFeatures(0, localSupportedFeatures, emptyList())).
|
||||
setClusterFeatureSupportDescriber(createFakeClusterFeatureSupportDescriber(
|
||||
Arrays.asList(new SimpleImmutableEntry<>(1, Collections.singletonMap("foo", VersionRange.of(0, 3)))),
|
||||
Arrays.asList())).
|
||||
Collections.singletonList(new SimpleImmutableEntry<>(1, singletonMap("foo", VersionRange.of(0, 3)))),
|
||||
emptyList())).
|
||||
build();
|
||||
ControllerResult<Map<String, ApiError>> result = manager.updateFeatures(
|
||||
Collections.singletonMap("foo", (short) 1),
|
||||
Collections.singletonMap("foo", FeatureUpdate.UpgradeType.UPGRADE),
|
||||
false);
|
||||
assertEquals(ControllerResult.atomicOf(Arrays.asList(new ApiMessageAndVersion(
|
||||
assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
|
||||
new FeatureLevelRecord().setName("foo").setFeatureLevel((short) 1), (short) 0)),
|
||||
Collections.singletonMap("foo", ApiError.NONE)), result);
|
||||
RecordTestUtils.replayAll(manager, result.records());
|
||||
|
|
@ -406,7 +406,7 @@ public class FeatureControlManagerTest {
|
|||
Collections.singletonMap("foo", (short) 0),
|
||||
Collections.singletonMap("foo", FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
|
||||
false);
|
||||
assertEquals(ControllerResult.atomicOf(Arrays.asList(new ApiMessageAndVersion(
|
||||
assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
|
||||
new FeatureLevelRecord().setName("foo").setFeatureLevel((short) 0), (short) 0)),
|
||||
Collections.singletonMap("foo", ApiError.NONE)), result2);
|
||||
RecordTestUtils.replayAll(manager, result2.records());
|
||||
|
|
|
|||
|
|
@ -54,7 +54,7 @@ public class OffsetControlManagerTest {
|
|||
assertEquals(-1L, offsetControl.transactionStartOffset());
|
||||
assertEquals(-1L, offsetControl.nextWriteOffset());
|
||||
assertFalse(offsetControl.active());
|
||||
assertEquals(Arrays.asList(-1L), offsetControl.snapshotRegistry().epochsList());
|
||||
assertEquals(Collections.singletonList(-1L), offsetControl.snapshotRegistry().epochsList());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -64,7 +64,7 @@ public class OffsetControlManagerTest {
|
|||
assertEquals(1000L, offsetControl.nextWriteOffset());
|
||||
assertTrue(offsetControl.active());
|
||||
assertTrue(offsetControl.metrics().active());
|
||||
assertEquals(Arrays.asList(-1L), offsetControl.snapshotRegistry().epochsList());
|
||||
assertEquals(Collections.singletonList(-1L), offsetControl.snapshotRegistry().epochsList());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -122,7 +122,7 @@ public class OffsetControlManagerTest {
|
|||
OffsetControlManager offsetControl = new OffsetControlManager.Builder().build();
|
||||
|
||||
offsetControl.handleCommitBatch(newFakeBatch(1000L, 200, 3000L));
|
||||
assertEquals(Arrays.asList(1000L), offsetControl.snapshotRegistry().epochsList());
|
||||
assertEquals(Collections.singletonList(1000L), offsetControl.snapshotRegistry().epochsList());
|
||||
assertEquals(1000L, offsetControl.lastCommittedOffset());
|
||||
assertEquals(200, offsetControl.lastCommittedEpoch());
|
||||
assertEquals(1000L, offsetControl.lastStableOffset());
|
||||
|
|
@ -149,7 +149,7 @@ public class OffsetControlManagerTest {
|
|||
offsetControl.handleCommitBatch(newFakeBatch(2000L, 200, 3000L));
|
||||
assertEquals(2000L, offsetControl.lastStableOffset());
|
||||
assertEquals(2000L, offsetControl.lastCommittedOffset());
|
||||
assertEquals(Arrays.asList(2000L), offsetControl.snapshotRegistry().epochsList());
|
||||
assertEquals(Collections.singletonList(2000L), offsetControl.snapshotRegistry().epochsList());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -163,14 +163,14 @@ public class OffsetControlManagerTest {
|
|||
assertEquals(Arrays.asList("snapshot[-1]", "reset"), snapshotRegistry.operations());
|
||||
assertEquals(new OffsetAndEpoch(4000L, 300), offsetControl.currentSnapshotId());
|
||||
assertEquals("00000000000000004000-0000000300", offsetControl.currentSnapshotName());
|
||||
assertEquals(Arrays.asList(), offsetControl.snapshotRegistry().epochsList());
|
||||
assertEquals(Collections.emptyList(), offsetControl.snapshotRegistry().epochsList());
|
||||
|
||||
offsetControl.endLoadSnapshot(3456L);
|
||||
assertEquals(Arrays.asList("snapshot[-1]", "reset", "snapshot[4000]"),
|
||||
snapshotRegistry.operations());
|
||||
assertNull(offsetControl.currentSnapshotId());
|
||||
assertNull(offsetControl.currentSnapshotName());
|
||||
assertEquals(Arrays.asList(4000L), offsetControl.snapshotRegistry().epochsList());
|
||||
assertEquals(Collections.singletonList(4000L), offsetControl.snapshotRegistry().epochsList());
|
||||
assertEquals(4000L, offsetControl.lastCommittedOffset());
|
||||
assertEquals(300, offsetControl.lastCommittedEpoch());
|
||||
assertEquals(4000L, offsetControl.lastStableOffset());
|
||||
|
|
@ -236,7 +236,7 @@ public class OffsetControlManagerTest {
|
|||
assertEquals(1550L, offsetControl.lastCommittedOffset());
|
||||
assertEquals(100, offsetControl.lastCommittedEpoch());
|
||||
assertEquals(1499L, offsetControl.lastStableOffset());
|
||||
assertEquals(Arrays.asList(1499L), offsetControl.snapshotRegistry().epochsList());
|
||||
assertEquals(Collections.singletonList(1499L), offsetControl.snapshotRegistry().epochsList());
|
||||
|
||||
if (aborted) {
|
||||
offsetControl.replay(new AbortTransactionRecord(), 1600L);
|
||||
|
|
@ -252,7 +252,7 @@ public class OffsetControlManagerTest {
|
|||
|
||||
offsetControl.handleCommitBatch(newFakeBatch(1650, 100, 2100L));
|
||||
assertEquals(1650, offsetControl.lastStableOffset());
|
||||
assertEquals(Arrays.asList(1650L), offsetControl.snapshotRegistry().epochsList());
|
||||
assertEquals(Collections.singletonList(1650L), offsetControl.snapshotRegistry().epochsList());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
|
|
@ -82,13 +82,13 @@ public class PartitionChangeBuilderTest {
|
|||
assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
|
||||
setIsr(Arrays.asList(1, 2, 3))));
|
||||
assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
|
||||
setRemovingReplicas(Arrays.asList(1))));
|
||||
setRemovingReplicas(Collections.singletonList(1))));
|
||||
assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
|
||||
setAddingReplicas(Arrays.asList(4))));
|
||||
setAddingReplicas(Collections.singletonList(4))));
|
||||
assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
|
||||
setEligibleLeaderReplicas(Arrays.asList(5))));
|
||||
setEligibleLeaderReplicas(Collections.singletonList(5))));
|
||||
assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
|
||||
setLastKnownElr(Arrays.asList(6))));
|
||||
setLastKnownElr(Collections.singletonList(6))));
|
||||
assertFalse(
|
||||
changeRecordIsNoOp(
|
||||
new PartitionChangeRecord()
|
||||
|
|
@ -274,12 +274,12 @@ public class PartitionChangeBuilderTest {
|
|||
assertElectLeaderEquals(createFooBuilder(version).setElection(Election.UNCLEAN)
|
||||
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1, 3))), 1, false);
|
||||
assertElectLeaderEquals(createFooBuilder(version)
|
||||
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(3))), NO_LEADER, false);
|
||||
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Collections.singletonList(3))), NO_LEADER, false);
|
||||
assertElectLeaderEquals(createFooBuilder(version).setElection(Election.UNCLEAN).
|
||||
setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(3))), 2, true);
|
||||
setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Collections.singletonList(3))), 2, true);
|
||||
assertElectLeaderEquals(
|
||||
createFooBuilder(version).setElection(Election.UNCLEAN)
|
||||
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(4))).setTargetReplicas(Arrays.asList(2, 1, 3, 4)),
|
||||
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Collections.singletonList(4))).setTargetReplicas(Arrays.asList(2, 1, 3, 4)),
|
||||
4,
|
||||
false
|
||||
);
|
||||
|
|
@ -424,7 +424,7 @@ public class PartitionChangeBuilderTest {
|
|||
2).
|
||||
setEligibleLeaderReplicasEnabled(metadataVersion.isElrSupported()).
|
||||
setDefaultDirProvider(DEFAULT_DIR_PROVIDER).
|
||||
setTargetReplicas(Arrays.asList());
|
||||
setTargetReplicas(Collections.emptyList());
|
||||
PartitionChangeRecord record = new PartitionChangeRecord();
|
||||
builder.triggerLeaderEpochBumpForIsrShrinkIfNeeded(record);
|
||||
assertEquals(NO_LEADER_CHANGE, record.leader());
|
||||
|
|
@ -593,7 +593,7 @@ public class PartitionChangeBuilderTest {
|
|||
new PartitionChangeRecord()
|
||||
.setTopicId(FOO_ID)
|
||||
.setPartitionId(0)
|
||||
.setIsr(Arrays.asList(2))
|
||||
.setIsr(Collections.singletonList(2))
|
||||
.setLeader(2)
|
||||
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value()),
|
||||
version
|
||||
|
|
@ -601,13 +601,13 @@ public class PartitionChangeBuilderTest {
|
|||
assertEquals(
|
||||
Optional.of(expectedRecord),
|
||||
createFooBuilder(version).setElection(Election.UNCLEAN)
|
||||
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(3))).build()
|
||||
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Collections.singletonList(3))).build()
|
||||
);
|
||||
|
||||
PartitionChangeRecord record = new PartitionChangeRecord()
|
||||
.setTopicId(OFFLINE_ID)
|
||||
.setPartitionId(0)
|
||||
.setIsr(Arrays.asList(1))
|
||||
.setIsr(Collections.singletonList(1))
|
||||
.setLeader(1)
|
||||
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value());
|
||||
|
||||
|
|
@ -626,7 +626,7 @@ public class PartitionChangeBuilderTest {
|
|||
assertEquals(
|
||||
Optional.of(expectedRecord),
|
||||
createOfflineBuilder(version).setElection(Election.UNCLEAN)
|
||||
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(2))).build()
|
||||
.setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Collections.singletonList(2))).build()
|
||||
);
|
||||
}
|
||||
|
||||
|
|
@ -1017,7 +1017,7 @@ public class PartitionChangeBuilderTest {
|
|||
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
|
||||
.setUseLastKnownLeaderInBalancedRecovery(false);
|
||||
|
||||
builder.setUncleanShutdownReplicas(Arrays.asList(3));
|
||||
builder.setUncleanShutdownReplicas(Collections.singletonList(3));
|
||||
|
||||
PartitionChangeRecord record = new PartitionChangeRecord()
|
||||
.setTopicId(topicId)
|
||||
|
|
@ -1025,8 +1025,8 @@ public class PartitionChangeBuilderTest {
|
|||
.setLeader(-2)
|
||||
.setLeaderRecoveryState(LeaderRecoveryState.NO_CHANGE);
|
||||
if (version >= 2) {
|
||||
record.setEligibleLeaderReplicas(Arrays.asList(2))
|
||||
.setLastKnownElr(Arrays.asList(3));
|
||||
record.setEligibleLeaderReplicas(Collections.singletonList(2))
|
||||
.setLastKnownElr(Collections.singletonList(3));
|
||||
} else {
|
||||
record.setEligibleLeaderReplicas(Collections.emptyList());
|
||||
}
|
||||
|
|
@ -1146,8 +1146,8 @@ public class PartitionChangeBuilderTest {
|
|||
new PartitionChangeRecord()
|
||||
.setTopicId(topicId)
|
||||
.setPartitionId(0)
|
||||
.setIsr(Arrays.asList(3))
|
||||
.setEligibleLeaderReplicas(Arrays.asList(1))
|
||||
.setIsr(Collections.singletonList(3))
|
||||
.setEligibleLeaderReplicas(Collections.singletonList(1))
|
||||
.setLeader(3)
|
||||
.setLeaderRecoveryState(LeaderRecoveryState.NO_CHANGE),
|
||||
version
|
||||
|
|
@ -1200,7 +1200,7 @@ public class PartitionChangeBuilderTest {
|
|||
.setEligibleLeaderReplicas(Arrays.asList(1, 2, 3, 4));
|
||||
|
||||
if (lastKnownLeaderEnabled) {
|
||||
record.setLastKnownElr(Arrays.asList(1));
|
||||
record.setLastKnownElr(Collections.singletonList(1));
|
||||
}
|
||||
|
||||
ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion(record, version);
|
||||
|
|
@ -1213,7 +1213,7 @@ public class PartitionChangeBuilderTest {
|
|||
metadataVersionForPartitionChangeRecordVersion(version), 3)
|
||||
.setElection(Election.PREFERRED)
|
||||
.setEligibleLeaderReplicasEnabled(true)
|
||||
.setUncleanShutdownReplicas(Arrays.asList(2))
|
||||
.setUncleanShutdownReplicas(Collections.singletonList(2))
|
||||
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
|
||||
.setUseLastKnownLeaderInBalancedRecovery(lastKnownLeaderEnabled);
|
||||
PartitionChangeRecord changeRecord = (PartitionChangeRecord) builder.build().get().message();
|
||||
|
|
@ -1253,7 +1253,7 @@ public class PartitionChangeBuilderTest {
|
|||
new PartitionChangeRecord()
|
||||
.setTopicId(topicId)
|
||||
.setPartitionId(0)
|
||||
.setIsr(Arrays.asList(1))
|
||||
.setIsr(Collections.singletonList(1))
|
||||
.setLeader(1)
|
||||
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value())
|
||||
.setLastKnownElr(Collections.emptyList()),
|
||||
|
|
|
|||
|
|
@ -202,7 +202,7 @@ public class PartitionReassignmentReplicasTest {
|
|||
partitionAssignment(Arrays.asList(0, 1, 2)), partitionAssignment(Arrays.asList(0, 1, 3)));
|
||||
assertTrue(replicas.isReassignmentInProgress());
|
||||
Optional<PartitionReassignmentReplicas.CompletedReassignment> reassignmentOptional =
|
||||
replicas.maybeCompleteReassignment(Arrays.asList(3));
|
||||
replicas.maybeCompleteReassignment(Collections.singletonList(3));
|
||||
assertFalse(reassignmentOptional.isPresent());
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
package org.apache.kafka.controller;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
|
@ -100,7 +99,7 @@ public class QuorumControllerIntegrationTestUtils {
|
|||
Uuid.fromString("TESTBROKER" + Integer.toString(100000 + brokerId).substring(1) + "DIRAAAA")
|
||||
))
|
||||
.setListeners(new ListenerCollection(
|
||||
Arrays.asList(
|
||||
Collections.singletonList(
|
||||
new Listener()
|
||||
.setName("PLAINTEXT")
|
||||
.setHost("localhost")
|
||||
|
|
|
|||
|
|
@ -484,7 +484,7 @@ public class QuorumControllerTest {
|
|||
assertArrayEquals(lastKnownElr, partition.lastKnownElr, partition.toString());
|
||||
|
||||
// Unfence the last one in the ELR, it should be elected.
|
||||
sendBrokerHeartbeatToUnfenceBrokers(active, Arrays.asList(brokerToBeTheLeader), brokerEpochs);
|
||||
sendBrokerHeartbeatToUnfenceBrokers(active, singletonList(brokerToBeTheLeader), brokerEpochs);
|
||||
TestUtils.waitForCondition(() -> {
|
||||
return active.clusterControl().isUnfenced(brokerToBeTheLeader);
|
||||
}, sessionTimeoutMillis * 3,
|
||||
|
|
@ -798,7 +798,7 @@ public class QuorumControllerTest {
|
|||
setIncarnationId(new Uuid(3465346L, i)).
|
||||
setZkMigrationReady(false).
|
||||
setListeners(new ControllerRegistrationRequestData.ListenerCollection(
|
||||
Arrays.asList(
|
||||
singletonList(
|
||||
new ControllerRegistrationRequestData.Listener().
|
||||
setName("CONTROLLER").
|
||||
setHost("localhost").
|
||||
|
|
@ -807,7 +807,7 @@ public class QuorumControllerTest {
|
|||
).iterator()
|
||||
)).
|
||||
setFeatures(new ControllerRegistrationRequestData.FeatureCollection(
|
||||
Arrays.asList(
|
||||
singletonList(
|
||||
new ControllerRegistrationRequestData.Feature().
|
||||
setName(MetadataVersion.FEATURE_NAME).
|
||||
setMinSupportedVersion(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel()).
|
||||
|
|
@ -823,7 +823,7 @@ public class QuorumControllerTest {
|
|||
setClusterId(active.clusterId()).
|
||||
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_7_IV0)).
|
||||
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + i)).
|
||||
setListeners(new ListenerCollection(Arrays.asList(new Listener().
|
||||
setListeners(new ListenerCollection(singletonList(new Listener().
|
||||
setName("PLAINTEXT").setHost("localhost").
|
||||
setPort(9092 + i)).iterator()))).get();
|
||||
brokerEpochs.put(i, reply.epoch());
|
||||
|
|
@ -872,14 +872,14 @@ public class QuorumControllerTest {
|
|||
setControllerId(0).
|
||||
setIncarnationId(Uuid.fromString("AAAAAAA04IIAAAAAAAAAAA")).
|
||||
setEndPoints(new RegisterControllerRecord.ControllerEndpointCollection(
|
||||
Arrays.asList(
|
||||
singletonList(
|
||||
new RegisterControllerRecord.ControllerEndpoint().
|
||||
setName("CONTROLLER").
|
||||
setHost("localhost").
|
||||
setPort(8000).
|
||||
setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)).iterator())).
|
||||
setFeatures(new RegisterControllerRecord.ControllerFeatureCollection(
|
||||
Arrays.asList(
|
||||
singletonList(
|
||||
new RegisterControllerRecord.ControllerFeature().
|
||||
setName(MetadataVersion.FEATURE_NAME).
|
||||
setMinSupportedVersion(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel()).
|
||||
|
|
@ -889,14 +889,14 @@ public class QuorumControllerTest {
|
|||
setControllerId(1).
|
||||
setIncarnationId(Uuid.fromString("AAAAAAA04IIAAAAAAAAAAQ")).
|
||||
setEndPoints(new RegisterControllerRecord.ControllerEndpointCollection(
|
||||
Arrays.asList(
|
||||
singletonList(
|
||||
new RegisterControllerRecord.ControllerEndpoint().
|
||||
setName("CONTROLLER").
|
||||
setHost("localhost").
|
||||
setPort(8001).
|
||||
setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)).iterator())).
|
||||
setFeatures(new RegisterControllerRecord.ControllerFeatureCollection(
|
||||
Arrays.asList(
|
||||
singletonList(
|
||||
new RegisterControllerRecord.ControllerFeature().
|
||||
setName(MetadataVersion.FEATURE_NAME).
|
||||
setMinSupportedVersion(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel()).
|
||||
|
|
@ -906,14 +906,14 @@ public class QuorumControllerTest {
|
|||
setControllerId(2).
|
||||
setIncarnationId(Uuid.fromString("AAAAAAA04IIAAAAAAAAAAg")).
|
||||
setEndPoints(new RegisterControllerRecord.ControllerEndpointCollection(
|
||||
Arrays.asList(
|
||||
singletonList(
|
||||
new RegisterControllerRecord.ControllerEndpoint().
|
||||
setName("CONTROLLER").
|
||||
setHost("localhost").
|
||||
setPort(8002).
|
||||
setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)).iterator())).
|
||||
setFeatures(new RegisterControllerRecord.ControllerFeatureCollection(
|
||||
Arrays.asList(
|
||||
singletonList(
|
||||
new RegisterControllerRecord.ControllerFeature().
|
||||
setName(MetadataVersion.FEATURE_NAME).
|
||||
setMinSupportedVersion(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel()).
|
||||
|
|
@ -923,7 +923,7 @@ public class QuorumControllerTest {
|
|||
setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0)).
|
||||
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB0")).
|
||||
setEndPoints(new BrokerEndpointCollection(
|
||||
Arrays.asList(new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
|
||||
singletonList(new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
|
||||
setPort(9092).setSecurityProtocol((short) 0)).iterator())).
|
||||
setFeatures(registrationFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_7_IV0)).
|
||||
setRack(null).
|
||||
|
|
@ -931,7 +931,7 @@ public class QuorumControllerTest {
|
|||
new ApiMessageAndVersion(new RegisterBrokerRecord().
|
||||
setBrokerId(1).setBrokerEpoch(brokerEpochs.get(1)).
|
||||
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB1")).
|
||||
setEndPoints(new BrokerEndpointCollection(Arrays.asList(
|
||||
setEndPoints(new BrokerEndpointCollection(singletonList(
|
||||
new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
|
||||
setPort(9093).setSecurityProtocol((short) 0)).iterator())).
|
||||
setFeatures(registrationFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_7_IV0)).
|
||||
|
|
@ -941,7 +941,7 @@ public class QuorumControllerTest {
|
|||
setBrokerId(2).setBrokerEpoch(brokerEpochs.get(2)).
|
||||
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB2")).
|
||||
setEndPoints(new BrokerEndpointCollection(
|
||||
Arrays.asList(new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
|
||||
singletonList(new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
|
||||
setPort(9094).setSecurityProtocol((short) 0)).iterator())).
|
||||
setFeatures(registrationFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_7_IV0)).
|
||||
setRack(null).
|
||||
|
|
@ -949,7 +949,7 @@ public class QuorumControllerTest {
|
|||
new ApiMessageAndVersion(new RegisterBrokerRecord().
|
||||
setBrokerId(3).setBrokerEpoch(brokerEpochs.get(3)).
|
||||
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB3")).
|
||||
setEndPoints(new BrokerEndpointCollection(Arrays.asList(
|
||||
setEndPoints(new BrokerEndpointCollection(singletonList(
|
||||
new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
|
||||
setPort(9095).setSecurityProtocol((short) 0)).iterator())).
|
||||
setFeatures(registrationFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_7_IV0)).
|
||||
|
|
@ -1279,7 +1279,7 @@ public class QuorumControllerTest {
|
|||
|
||||
@Test
|
||||
public void testFatalMetadataErrorDuringSnapshotLoading() throws Exception {
|
||||
InitialSnapshot invalidSnapshot = new InitialSnapshot(Collections.unmodifiableList(Arrays.asList(
|
||||
InitialSnapshot invalidSnapshot = new InitialSnapshot(Collections.unmodifiableList(singletonList(
|
||||
new ApiMessageAndVersion(new PartitionRecord(), (short) 0)))
|
||||
);
|
||||
|
||||
|
|
@ -1304,7 +1304,7 @@ public class QuorumControllerTest {
|
|||
@Test
|
||||
public void testFatalMetadataErrorDuringLogLoading() throws Exception {
|
||||
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).build()) {
|
||||
logEnv.appendInitialRecords(Collections.unmodifiableList(Arrays.asList(
|
||||
logEnv.appendInitialRecords(Collections.unmodifiableList(singletonList(
|
||||
new ApiMessageAndVersion(new PartitionRecord(), (short) 0))
|
||||
));
|
||||
|
||||
|
|
@ -1862,7 +1862,7 @@ public class QuorumControllerTest {
|
|||
setIsMigratingZkBroker(true).
|
||||
setFeatures(brokerFeatures(metadataVersion, metadataVersion)).
|
||||
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB0")).
|
||||
setListeners(new ListenerCollection(Arrays.asList(new Listener().
|
||||
setListeners(new ListenerCollection(singletonList(new Listener().
|
||||
setName("PLAINTEXT").setHost("localhost").
|
||||
setPort(9092)).iterator()))).get();
|
||||
|
||||
|
|
|
|||
|
|
@ -49,7 +49,6 @@ public class QuorumControllerTestEnv implements AutoCloseable {
|
|||
private Consumer<QuorumController.Builder> controllerBuilderInitializer = __ -> { };
|
||||
private OptionalLong sessionTimeoutMillis = OptionalLong.empty();
|
||||
private OptionalLong leaderImbalanceCheckIntervalNs = OptionalLong.empty();
|
||||
private boolean eligibleLeaderReplicasEnabled = false;
|
||||
private BootstrapMetadata bootstrapMetadata = BootstrapMetadata.
|
||||
fromVersion(MetadataVersion.latestTesting(), "test-provided version");
|
||||
|
||||
|
|
|
|||
|
|
@ -893,7 +893,7 @@ public class ReplicationControlManagerTest {
|
|||
topics.add(new CreatableTopic().setName("foo.bar"));
|
||||
topics.add(new CreatableTopic().setName("woo.bar_foo"));
|
||||
Map<String, Set<String>> collisionMap = new HashMap<>();
|
||||
collisionMap.put("foo_bar", new TreeSet<>(Arrays.asList("foo_bar")));
|
||||
collisionMap.put("foo_bar", new TreeSet<>(singletonList("foo_bar")));
|
||||
collisionMap.put("woo_bar_foo", new TreeSet<>(Arrays.asList("woo.bar.foo", "woo_bar.foo")));
|
||||
ReplicationControlManager.validateNewTopicNames(topicErrors, topics, collisionMap);
|
||||
Map<String, ApiError> expectedTopicErrors = new HashMap<>();
|
||||
|
|
@ -1157,10 +1157,10 @@ public class ReplicationControlManagerTest {
|
|||
AlterPartitionRequestData request = new AlterPartitionRequestData()
|
||||
.setBrokerId(0)
|
||||
.setBrokerEpoch(100)
|
||||
.setTopics(asList(new AlterPartitionRequestData.TopicData()
|
||||
.setTopics(singletonList(new TopicData()
|
||||
.setTopicName(version <= 1 ? topicName : "")
|
||||
.setTopicId(version > 1 ? topicId : Uuid.ZERO_UUID)
|
||||
.setPartitions(asList(new PartitionData()
|
||||
.setPartitions(singletonList(new PartitionData()
|
||||
.setPartitionIndex(0)))));
|
||||
|
||||
ControllerRequestContext requestContext =
|
||||
|
|
@ -1171,10 +1171,10 @@ public class ReplicationControlManagerTest {
|
|||
|
||||
Errors expectedError = version > 1 ? UNKNOWN_TOPIC_ID : UNKNOWN_TOPIC_OR_PARTITION;
|
||||
AlterPartitionResponseData expectedResponse = new AlterPartitionResponseData()
|
||||
.setTopics(asList(new AlterPartitionResponseData.TopicData()
|
||||
.setTopics(singletonList(new AlterPartitionResponseData.TopicData()
|
||||
.setTopicName(version <= 1 ? topicName : "")
|
||||
.setTopicId(version > 1 ? topicId : Uuid.ZERO_UUID)
|
||||
.setPartitions(asList(new AlterPartitionResponseData.PartitionData()
|
||||
.setPartitions(singletonList(new AlterPartitionResponseData.PartitionData()
|
||||
.setPartitionIndex(0)
|
||||
.setErrorCode(expectedError.code())))));
|
||||
|
||||
|
|
@ -1509,17 +1509,17 @@ public class ReplicationControlManagerTest {
|
|||
ctx.replay(createPartitionsResult.records());
|
||||
List<CreatePartitionsTopic> topics2 = new ArrayList<>();
|
||||
topics2.add(new CreatePartitionsTopic().
|
||||
setName("foo").setCount(6).setAssignments(asList(
|
||||
setName("foo").setCount(6).setAssignments(singletonList(
|
||||
new CreatePartitionsAssignment().setBrokerIds(asList(1, 3)))));
|
||||
topics2.add(new CreatePartitionsTopic().
|
||||
setName("bar").setCount(5).setAssignments(asList(
|
||||
new CreatePartitionsAssignment().setBrokerIds(asList(1)))));
|
||||
setName("bar").setCount(5).setAssignments(singletonList(
|
||||
new CreatePartitionsAssignment().setBrokerIds(singletonList(1)))));
|
||||
topics2.add(new CreatePartitionsTopic().
|
||||
setName("quux").setCount(4).setAssignments(asList(
|
||||
new CreatePartitionsAssignment().setBrokerIds(asList(1, 0)))));
|
||||
setName("quux").setCount(4).setAssignments(singletonList(
|
||||
new CreatePartitionsAssignment().setBrokerIds(asList(1, 0)))));
|
||||
topics2.add(new CreatePartitionsTopic().
|
||||
setName("foo2").setCount(3).setAssignments(asList(
|
||||
new CreatePartitionsAssignment().setBrokerIds(asList(2, 0)))));
|
||||
setName("foo2").setCount(3).setAssignments(singletonList(
|
||||
new CreatePartitionsAssignment().setBrokerIds(asList(2, 0)))));
|
||||
ControllerResult<List<CreatePartitionsTopicResult>> createPartitionsResult2 =
|
||||
replicationControl.createPartitions(requestContext, topics2);
|
||||
assertEquals(asList(new CreatePartitionsTopicResult().
|
||||
|
|
@ -1579,7 +1579,7 @@ public class ReplicationControlManagerTest {
|
|||
// now test the explicit assignment case
|
||||
List<CreatePartitionsTopic> topics2 = new ArrayList<>();
|
||||
topics2.add(new CreatePartitionsTopic().
|
||||
setName("foo").setCount(4).setAssignments(asList(
|
||||
setName("foo").setCount(4).setAssignments(singletonList(
|
||||
new CreatePartitionsAssignment().setBrokerIds(asList(1, 0)))));
|
||||
ControllerResult<List<CreatePartitionsTopicResult>> createPartitionsResult2 =
|
||||
replicationControl.createPartitions(createPartitionsRequestContext, topics2);
|
||||
|
|
@ -1600,7 +1600,7 @@ public class ReplicationControlManagerTest {
|
|||
ControllerRequestContext requestContext =
|
||||
anonymousContextFor(ApiKeys.CREATE_TOPICS);
|
||||
ControllerResult<CreateTopicsResponseData> createTopicResult = replicationControl.
|
||||
createTopics(requestContext, request, new HashSet<>(Arrays.asList("foo")));
|
||||
createTopics(requestContext, request, new HashSet<>(singletonList("foo")));
|
||||
ctx.replay(createTopicResult.records());
|
||||
|
||||
ctx.registerBrokers(0, 1);
|
||||
|
|
@ -1614,7 +1614,7 @@ public class ReplicationControlManagerTest {
|
|||
replicationControl.createPartitions(requestContext, topics);
|
||||
|
||||
assertEquals(
|
||||
asList(new CreatePartitionsTopicResult().
|
||||
singletonList(new CreatePartitionsTopicResult().
|
||||
setName("foo").
|
||||
setErrorCode(INVALID_REPLICATION_FACTOR.code()).
|
||||
setErrorMessage("Unable to replicate the partition 2 time(s): All " +
|
||||
|
|
@ -1640,7 +1640,7 @@ public class ReplicationControlManagerTest {
|
|||
replicationControl.createTopics(requestContext, request, Collections.singleton("foo"));
|
||||
ctx.replay(result.records());
|
||||
|
||||
List<CreatePartitionsTopic> topics = asList(new CreatePartitionsTopic().
|
||||
List<CreatePartitionsTopic> topics = singletonList(new CreatePartitionsTopic().
|
||||
setName("foo").setCount(2).setAssignments(null));
|
||||
|
||||
ControllerResult<List<CreatePartitionsTopicResult>> createPartitionsResult =
|
||||
|
|
@ -1670,9 +1670,9 @@ public class ReplicationControlManagerTest {
|
|||
public void testValidateGoodManualPartitionAssignments() {
|
||||
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
|
||||
ctx.registerBrokers(1, 2, 3);
|
||||
ctx.replicationControl.validateManualPartitionAssignment(partitionAssignment(asList(1)),
|
||||
ctx.replicationControl.validateManualPartitionAssignment(partitionAssignment(singletonList(1)),
|
||||
OptionalInt.of(1));
|
||||
ctx.replicationControl.validateManualPartitionAssignment(partitionAssignment(asList(1)),
|
||||
ctx.replicationControl.validateManualPartitionAssignment(partitionAssignment(singletonList(1)),
|
||||
OptionalInt.empty());
|
||||
ctx.replicationControl.validateManualPartitionAssignment(partitionAssignment(asList(1, 2, 3)),
|
||||
OptionalInt.of(3));
|
||||
|
|
@ -1686,7 +1686,7 @@ public class ReplicationControlManagerTest {
|
|||
ctx.registerBrokers(1, 2);
|
||||
assertEquals("The manual partition assignment includes an empty replica list.",
|
||||
assertThrows(InvalidReplicaAssignmentException.class, () ->
|
||||
ctx.replicationControl.validateManualPartitionAssignment(partitionAssignment(asList()),
|
||||
ctx.replicationControl.validateManualPartitionAssignment(partitionAssignment(Collections.emptyList()),
|
||||
OptionalInt.empty())).getMessage());
|
||||
assertEquals("The manual partition assignment includes broker 3, but no such " +
|
||||
"broker is registered.", assertThrows(InvalidReplicaAssignmentException.class, () ->
|
||||
|
|
@ -1748,17 +1748,17 @@ public class ReplicationControlManagerTest {
|
|||
ctx.replay(alterResult.records());
|
||||
ListPartitionReassignmentsResponseData currentReassigning =
|
||||
new ListPartitionReassignmentsResponseData().setErrorMessage(null).
|
||||
setTopics(asList(new OngoingTopicReassignment().
|
||||
setName("foo").setPartitions(asList(
|
||||
new OngoingPartitionReassignment().setPartitionIndex(1).
|
||||
setRemovingReplicas(asList(3)).
|
||||
setAddingReplicas(asList(0)).
|
||||
setReplicas(asList(0, 2, 1, 3))))));
|
||||
setTopics(singletonList(new OngoingTopicReassignment().
|
||||
setName("foo").setPartitions(singletonList(
|
||||
new OngoingPartitionReassignment().setPartitionIndex(1).
|
||||
setRemovingReplicas(singletonList(3)).
|
||||
setAddingReplicas(singletonList(0)).
|
||||
setReplicas(asList(0, 2, 1, 3))))));
|
||||
assertEquals(currentReassigning, replication.listPartitionReassignments(null, Long.MAX_VALUE));
|
||||
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(asList(
|
||||
new ListPartitionReassignmentsTopics().setName("bar").
|
||||
setPartitionIndexes(asList(0, 1, 2))), Long.MAX_VALUE));
|
||||
assertEquals(currentReassigning, replication.listPartitionReassignments(asList(
|
||||
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(singletonList(
|
||||
new ListPartitionReassignmentsTopics().setName("bar").
|
||||
setPartitionIndexes(asList(0, 1, 2))), Long.MAX_VALUE));
|
||||
assertEquals(currentReassigning, replication.listPartitionReassignments(singletonList(
|
||||
new ListPartitionReassignmentsTopics().setName("foo").
|
||||
setPartitionIndexes(asList(0, 1, 2))), Long.MAX_VALUE));
|
||||
ControllerResult<AlterPartitionReassignmentsResponseData> cancelResult =
|
||||
|
|
@ -1771,7 +1771,7 @@ public class ReplicationControlManagerTest {
|
|||
setReplicas(null),
|
||||
new ReassignablePartition().setPartitionIndex(2).
|
||||
setReplicas(null))),
|
||||
new ReassignableTopic().setName("bar").setPartitions(asList(
|
||||
new ReassignableTopic().setName("bar").setPartitions(singletonList(
|
||||
new ReassignablePartition().setPartitionIndex(0).
|
||||
setReplicas(null))))));
|
||||
assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
|
||||
|
|
@ -1795,7 +1795,7 @@ public class ReplicationControlManagerTest {
|
|||
new ReassignablePartitionResponse().setPartitionIndex(2).
|
||||
setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
|
||||
setErrorMessage("Unable to find partition foo:2."))),
|
||||
new ReassignableTopicResponse().setName("bar").setPartitions(asList(
|
||||
new ReassignableTopicResponse().setName("bar").setPartitions(singletonList(
|
||||
new ReassignablePartitionResponse().setPartitionIndex(0).
|
||||
setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()).
|
||||
setErrorMessage(null)))))),
|
||||
|
|
@ -1806,10 +1806,10 @@ public class ReplicationControlManagerTest {
|
|||
AlterPartitionRequestData alterPartitionRequestData = new AlterPartitionRequestData().
|
||||
setBrokerId(3).
|
||||
setBrokerEpoch(103).
|
||||
setTopics(asList(new TopicData().
|
||||
setTopics(singletonList(new TopicData().
|
||||
setTopicName(version <= 1 ? "foo" : "").
|
||||
setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).
|
||||
setPartitions(asList(new PartitionData().
|
||||
setPartitions(singletonList(new PartitionData().
|
||||
setPartitionIndex(1).
|
||||
setPartitionEpoch(1).
|
||||
setLeaderEpoch(0).
|
||||
|
|
@ -1818,14 +1818,14 @@ public class ReplicationControlManagerTest {
|
|||
requestContext,
|
||||
new AlterPartitionRequest.Builder(alterPartitionRequestData, version > 1).build(version).data());
|
||||
Errors expectedError = version > 1 ? NEW_LEADER_ELECTED : FENCED_LEADER_EPOCH;
|
||||
assertEquals(new AlterPartitionResponseData().setTopics(asList(
|
||||
assertEquals(new AlterPartitionResponseData().setTopics(singletonList(
|
||||
new AlterPartitionResponseData.TopicData().
|
||||
setTopicName(version <= 1 ? "foo" : "").
|
||||
setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).
|
||||
setPartitions(asList(
|
||||
new AlterPartitionResponseData.PartitionData().
|
||||
setPartitionIndex(1).
|
||||
setErrorCode(expectedError.code()))))),
|
||||
setPartitions(singletonList(
|
||||
new AlterPartitionResponseData.PartitionData().
|
||||
setPartitionIndex(1).
|
||||
setErrorCode(expectedError.code()))))),
|
||||
alterPartitionResult.response());
|
||||
ctx.replay(alterPartitionResult.records());
|
||||
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null, Long.MAX_VALUE));
|
||||
|
|
@ -1867,10 +1867,10 @@ public class ReplicationControlManagerTest {
|
|||
AlterPartitionRequestData alterIsrRequest = new AlterPartitionRequestData()
|
||||
.setBrokerId(1)
|
||||
.setBrokerEpoch(101)
|
||||
.setTopics(asList(new TopicData()
|
||||
.setTopics(singletonList(new TopicData()
|
||||
.setTopicName(version <= 1 ? "foo" : "")
|
||||
.setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID)
|
||||
.setPartitions(asList(new PartitionData()
|
||||
.setPartitions(singletonList(new PartitionData()
|
||||
.setPartitionIndex(0)
|
||||
.setPartitionEpoch(1)
|
||||
.setLeaderEpoch(0)
|
||||
|
|
@ -1885,10 +1885,10 @@ public class ReplicationControlManagerTest {
|
|||
Errors expectedError = version <= 1 ? OPERATION_NOT_ATTEMPTED : INELIGIBLE_REPLICA;
|
||||
assertEquals(
|
||||
new AlterPartitionResponseData()
|
||||
.setTopics(asList(new AlterPartitionResponseData.TopicData()
|
||||
.setTopics(singletonList(new AlterPartitionResponseData.TopicData()
|
||||
.setTopicName(version <= 1 ? "foo" : "")
|
||||
.setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID)
|
||||
.setPartitions(asList(new AlterPartitionResponseData.PartitionData()
|
||||
.setPartitions(singletonList(new AlterPartitionResponseData.PartitionData()
|
||||
.setPartitionIndex(0)
|
||||
.setErrorCode(expectedError.code()))))),
|
||||
alterPartitionResult.response());
|
||||
|
|
@ -1901,10 +1901,10 @@ public class ReplicationControlManagerTest {
|
|||
|
||||
assertEquals(
|
||||
new AlterPartitionResponseData()
|
||||
.setTopics(asList(new AlterPartitionResponseData.TopicData()
|
||||
.setTopics(singletonList(new AlterPartitionResponseData.TopicData()
|
||||
.setTopicName(version <= 1 ? "foo" : "")
|
||||
.setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID)
|
||||
.setPartitions(asList(new AlterPartitionResponseData.PartitionData()
|
||||
.setPartitions(singletonList(new AlterPartitionResponseData.PartitionData()
|
||||
.setPartitionIndex(0)
|
||||
.setLeaderId(1)
|
||||
.setLeaderEpoch(0)
|
||||
|
|
@ -1931,10 +1931,10 @@ public class ReplicationControlManagerTest {
|
|||
AlterPartitionRequestData alterIsrRequest = new AlterPartitionRequestData().
|
||||
setBrokerId(1).
|
||||
setBrokerEpoch(101).
|
||||
setTopics(asList(new TopicData().
|
||||
setTopics(singletonList(new TopicData().
|
||||
setTopicName(version <= 1 ? "foo" : "").
|
||||
setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).
|
||||
setPartitions(asList(new PartitionData().
|
||||
setPartitions(singletonList(new PartitionData().
|
||||
setPartitionIndex(0).
|
||||
setPartitionEpoch(1).
|
||||
setLeaderEpoch(0).
|
||||
|
|
@ -1971,10 +1971,10 @@ public class ReplicationControlManagerTest {
|
|||
if (version >= 3) {
|
||||
assertEquals(
|
||||
new AlterPartitionResponseData().
|
||||
setTopics(asList(new AlterPartitionResponseData.TopicData().
|
||||
setTopics(singletonList(new AlterPartitionResponseData.TopicData().
|
||||
setTopicName("").
|
||||
setTopicId(fooId).
|
||||
setPartitions(asList(new AlterPartitionResponseData.PartitionData().
|
||||
setPartitions(singletonList(new AlterPartitionResponseData.PartitionData().
|
||||
setPartitionIndex(0).
|
||||
setErrorCode(INELIGIBLE_REPLICA.code()))))),
|
||||
alterPartitionResult.response());
|
||||
|
|
@ -2017,10 +2017,10 @@ public class ReplicationControlManagerTest {
|
|||
AlterPartitionRequestData alterIsrRequest = new AlterPartitionRequestData()
|
||||
.setBrokerId(1)
|
||||
.setBrokerEpoch(101)
|
||||
.setTopics(asList(new TopicData()
|
||||
.setTopics(singletonList(new TopicData()
|
||||
.setTopicName(version <= 1 ? "foo" : "")
|
||||
.setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID)
|
||||
.setPartitions(asList(new PartitionData()
|
||||
.setPartitions(singletonList(new PartitionData()
|
||||
.setPartitionIndex(0)
|
||||
.setPartitionEpoch(0)
|
||||
.setLeaderEpoch(0)
|
||||
|
|
@ -2035,10 +2035,10 @@ public class ReplicationControlManagerTest {
|
|||
Errors expectedError = version <= 1 ? OPERATION_NOT_ATTEMPTED : INELIGIBLE_REPLICA;
|
||||
assertEquals(
|
||||
new AlterPartitionResponseData()
|
||||
.setTopics(asList(new AlterPartitionResponseData.TopicData()
|
||||
.setTopics(singletonList(new AlterPartitionResponseData.TopicData()
|
||||
.setTopicName(version <= 1 ? "foo" : "")
|
||||
.setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID)
|
||||
.setPartitions(asList(new AlterPartitionResponseData.PartitionData()
|
||||
.setPartitions(singletonList(new AlterPartitionResponseData.PartitionData()
|
||||
.setPartitionIndex(0)
|
||||
.setErrorCode(expectedError.code()))))),
|
||||
alterPartitionResult.response());
|
||||
|
|
@ -2081,8 +2081,8 @@ public class ReplicationControlManagerTest {
|
|||
new ReassignablePartition().setPartitionIndex(2).
|
||||
setReplicas(asList(5, 6, 7)),
|
||||
new ReassignablePartition().setPartitionIndex(3).
|
||||
setReplicas(asList()))),
|
||||
new ReassignableTopic().setName("bar").setPartitions(asList(
|
||||
setReplicas(Collections.emptyList()))),
|
||||
new ReassignableTopic().setName("bar").setPartitions(singletonList(
|
||||
new ReassignablePartition().setPartitionIndex(0).
|
||||
setReplicas(asList(1, 2, 3, 4, 0)))))));
|
||||
assertEquals(new AlterPartitionReassignmentsResponseData().
|
||||
|
|
@ -2100,7 +2100,7 @@ public class ReplicationControlManagerTest {
|
|||
setErrorCode(INVALID_REPLICA_ASSIGNMENT.code()).
|
||||
setErrorMessage("The manual partition assignment includes an empty " +
|
||||
"replica list."))),
|
||||
new ReassignableTopicResponse().setName("bar").setPartitions(asList(
|
||||
new ReassignableTopicResponse().setName("bar").setPartitions(singletonList(
|
||||
new ReassignablePartitionResponse().setPartitionIndex(0).
|
||||
setErrorMessage(null))))),
|
||||
alterResult.response());
|
||||
|
|
@ -2131,27 +2131,27 @@ public class ReplicationControlManagerTest {
|
|||
setAddingReplicas(new int[] {0, 1}).setLeader(4).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(2).build(), replication.getPartition(barId, 0));
|
||||
ListPartitionReassignmentsResponseData currentReassigning =
|
||||
new ListPartitionReassignmentsResponseData().setErrorMessage(null).
|
||||
setTopics(asList(new OngoingTopicReassignment().
|
||||
setName("bar").setPartitions(asList(
|
||||
new OngoingPartitionReassignment().setPartitionIndex(0).
|
||||
setRemovingReplicas(Collections.emptyList()).
|
||||
setAddingReplicas(asList(0, 1)).
|
||||
setReplicas(asList(1, 2, 3, 4, 0))))));
|
||||
setTopics(singletonList(new OngoingTopicReassignment().
|
||||
setName("bar").setPartitions(singletonList(
|
||||
new OngoingPartitionReassignment().setPartitionIndex(0).
|
||||
setRemovingReplicas(Collections.emptyList()).
|
||||
setAddingReplicas(asList(0, 1)).
|
||||
setReplicas(asList(1, 2, 3, 4, 0))))));
|
||||
assertEquals(currentReassigning, replication.listPartitionReassignments(null, Long.MAX_VALUE));
|
||||
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(asList(
|
||||
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(singletonList(
|
||||
new ListPartitionReassignmentsTopics().setName("foo").
|
||||
setPartitionIndexes(asList(0, 1, 2))), Long.MAX_VALUE));
|
||||
assertEquals(currentReassigning, replication.listPartitionReassignments(asList(
|
||||
assertEquals(currentReassigning, replication.listPartitionReassignments(singletonList(
|
||||
new ListPartitionReassignmentsTopics().setName("bar").
|
||||
setPartitionIndexes(asList(0, 1, 2))), Long.MAX_VALUE));
|
||||
ControllerResult<AlterPartitionResponseData> alterPartitionResult = replication.alterPartition(
|
||||
anonymousContextFor(ApiKeys.ALTER_PARTITION),
|
||||
new AlterPartitionRequestData().setBrokerId(4).setBrokerEpoch(104).
|
||||
setTopics(asList(new TopicData().setTopicId(barId).setPartitions(asList(
|
||||
setTopics(singletonList(new TopicData().setTopicId(barId).setPartitions(singletonList(
|
||||
new PartitionData().setPartitionIndex(0).setPartitionEpoch(2).
|
||||
setLeaderEpoch(0).setNewIsrWithEpochs(isrWithDefaultEpoch(4, 1, 2, 0)))))));
|
||||
assertEquals(new AlterPartitionResponseData().setTopics(asList(
|
||||
new AlterPartitionResponseData.TopicData().setTopicId(barId).setPartitions(asList(
|
||||
assertEquals(new AlterPartitionResponseData().setTopics(singletonList(
|
||||
new AlterPartitionResponseData.TopicData().setTopicId(barId).setPartitions(singletonList(
|
||||
new AlterPartitionResponseData.PartitionData().
|
||||
setPartitionIndex(0).
|
||||
setLeaderId(4).
|
||||
|
|
@ -2163,10 +2163,10 @@ public class ReplicationControlManagerTest {
|
|||
ControllerResult<AlterPartitionReassignmentsResponseData> cancelResult =
|
||||
replication.alterPartitionReassignments(
|
||||
new AlterPartitionReassignmentsRequestData().setTopics(asList(
|
||||
new ReassignableTopic().setName("foo").setPartitions(asList(
|
||||
new ReassignableTopic().setName("foo").setPartitions(singletonList(
|
||||
new ReassignablePartition().setPartitionIndex(0).
|
||||
setReplicas(null))),
|
||||
new ReassignableTopic().setName("bar").setPartitions(asList(
|
||||
new ReassignableTopic().setName("bar").setPartitions(singletonList(
|
||||
new ReassignablePartition().setPartitionIndex(0).
|
||||
setReplicas(null))))));
|
||||
assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
|
||||
|
|
@ -2182,10 +2182,10 @@ public class ReplicationControlManagerTest {
|
|||
setRemovingReplicas(null).
|
||||
setAddingReplicas(Collections.emptyList()), MetadataVersion.latestTesting().partitionChangeRecordVersion())),
|
||||
new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(asList(
|
||||
new ReassignableTopicResponse().setName("foo").setPartitions(asList(
|
||||
new ReassignableTopicResponse().setName("foo").setPartitions(singletonList(
|
||||
new ReassignablePartitionResponse().setPartitionIndex(0).
|
||||
setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null))),
|
||||
new ReassignableTopicResponse().setName("bar").setPartitions(asList(
|
||||
new ReassignableTopicResponse().setName("bar").setPartitions(singletonList(
|
||||
new ReassignablePartitionResponse().setPartitionIndex(0).
|
||||
setErrorMessage(null)))))),
|
||||
cancelResult);
|
||||
|
|
@ -2461,15 +2461,15 @@ public class ReplicationControlManagerTest {
|
|||
ControllerResult<AlterPartitionResponseData> alterPartitionResult = replication.alterPartition(
|
||||
anonymousContextFor(ApiKeys.ALTER_PARTITION),
|
||||
new AlterPartitionRequestData().setBrokerId(2).setBrokerEpoch(102).
|
||||
setTopics(asList(new AlterPartitionRequestData.TopicData().setTopicId(fooId).
|
||||
setTopics(singletonList(new TopicData().setTopicId(fooId).
|
||||
setPartitions(asList(
|
||||
new AlterPartitionRequestData.PartitionData().
|
||||
new PartitionData().
|
||||
setPartitionIndex(0).setPartitionEpoch(0).
|
||||
setLeaderEpoch(0).setNewIsrWithEpochs(isrWithDefaultEpoch(1, 2, 3)),
|
||||
new AlterPartitionRequestData.PartitionData().
|
||||
new PartitionData().
|
||||
setPartitionIndex(2).setPartitionEpoch(0).
|
||||
setLeaderEpoch(0).setNewIsrWithEpochs(isrWithDefaultEpoch(0, 2, 1)))))));
|
||||
assertEquals(new AlterPartitionResponseData().setTopics(asList(
|
||||
assertEquals(new AlterPartitionResponseData().setTopics(singletonList(
|
||||
new AlterPartitionResponseData.TopicData().setTopicId(fooId).setPartitions(asList(
|
||||
new AlterPartitionResponseData.PartitionData().
|
||||
setPartitionIndex(0).
|
||||
|
|
@ -2547,12 +2547,12 @@ public class ReplicationControlManagerTest {
|
|||
ControllerResult<AlterPartitionResponseData> alterPartitionResult = replication.alterPartition(
|
||||
anonymousContextFor(ApiKeys.ALTER_PARTITION),
|
||||
new AlterPartitionRequestData().setBrokerId(2).setBrokerEpoch(102).
|
||||
setTopics(asList(new AlterPartitionRequestData.TopicData().setTopicId(fooId).
|
||||
setPartitions(asList(new AlterPartitionRequestData.PartitionData().
|
||||
setTopics(singletonList(new TopicData().setTopicId(fooId).
|
||||
setPartitions(singletonList(new PartitionData().
|
||||
setPartitionIndex(0).setPartitionEpoch(0).
|
||||
setLeaderEpoch(0).setNewIsrWithEpochs(isrWithDefaultEpoch(1, 2, 3)))))));
|
||||
assertEquals(new AlterPartitionResponseData().setTopics(asList(
|
||||
new AlterPartitionResponseData.TopicData().setTopicId(fooId).setPartitions(asList(
|
||||
assertEquals(new AlterPartitionResponseData().setTopics(singletonList(
|
||||
new AlterPartitionResponseData.TopicData().setTopicId(fooId).setPartitions(singletonList(
|
||||
new AlterPartitionResponseData.PartitionData().
|
||||
setPartitionIndex(0).
|
||||
setLeaderId(2).
|
||||
|
|
@ -2570,7 +2570,7 @@ public class ReplicationControlManagerTest {
|
|||
.setPartitionId(0)
|
||||
.setTopicId(fooId)
|
||||
.setLeader(1);
|
||||
assertEquals(asList(new ApiMessageAndVersion(expectedChangeRecord, MetadataVersion.latestTesting().partitionChangeRecordVersion())), balanceResult.records());
|
||||
assertEquals(singletonList(new ApiMessageAndVersion(expectedChangeRecord, MetadataVersion.latestTesting().partitionChangeRecordVersion())), balanceResult.records());
|
||||
assertTrue(replication.arePartitionLeadersImbalanced());
|
||||
assertFalse(balanceResult.response());
|
||||
|
||||
|
|
@ -2579,12 +2579,12 @@ public class ReplicationControlManagerTest {
|
|||
alterPartitionResult = replication.alterPartition(
|
||||
anonymousContextFor(ApiKeys.ALTER_PARTITION),
|
||||
new AlterPartitionRequestData().setBrokerId(2).setBrokerEpoch(102).
|
||||
setTopics(asList(new AlterPartitionRequestData.TopicData().setTopicId(fooId).
|
||||
setPartitions(asList(new AlterPartitionRequestData.PartitionData().
|
||||
setTopics(singletonList(new TopicData().setTopicId(fooId).
|
||||
setPartitions(singletonList(new PartitionData().
|
||||
setPartitionIndex(2).setPartitionEpoch(0).
|
||||
setLeaderEpoch(0).setNewIsrWithEpochs(isrWithDefaultEpoch(0, 2, 1)))))));
|
||||
assertEquals(new AlterPartitionResponseData().setTopics(asList(
|
||||
new AlterPartitionResponseData.TopicData().setTopicId(fooId).setPartitions(asList(
|
||||
assertEquals(new AlterPartitionResponseData().setTopics(singletonList(
|
||||
new AlterPartitionResponseData.TopicData().setTopicId(fooId).setPartitions(singletonList(
|
||||
new AlterPartitionResponseData.PartitionData().
|
||||
setPartitionIndex(2).
|
||||
setLeaderId(2).
|
||||
|
|
@ -2602,7 +2602,7 @@ public class ReplicationControlManagerTest {
|
|||
.setPartitionId(2)
|
||||
.setTopicId(fooId)
|
||||
.setLeader(0);
|
||||
assertEquals(asList(new ApiMessageAndVersion(expectedChangeRecord, MetadataVersion.latestTesting().partitionChangeRecordVersion())), balanceResult.records());
|
||||
assertEquals(singletonList(new ApiMessageAndVersion(expectedChangeRecord, MetadataVersion.latestTesting().partitionChangeRecordVersion())), balanceResult.records());
|
||||
assertFalse(replication.arePartitionLeadersImbalanced());
|
||||
assertFalse(balanceResult.response());
|
||||
}
|
||||
|
|
@ -2773,21 +2773,21 @@ public class ReplicationControlManagerTest {
|
|||
// Reassign to [2, 3]
|
||||
ControllerResult<AlterPartitionReassignmentsResponseData> alterResultOne =
|
||||
replication.alterPartitionReassignments(
|
||||
new AlterPartitionReassignmentsRequestData().setTopics(asList(
|
||||
new ReassignableTopic().setName(topic).setPartitions(asList(
|
||||
new AlterPartitionReassignmentsRequestData().setTopics(singletonList(
|
||||
new ReassignableTopic().setName(topic).setPartitions(singletonList(
|
||||
new ReassignablePartition().setPartitionIndex(0).
|
||||
setReplicas(asList(2, 3)))))));
|
||||
assertEquals(new AlterPartitionReassignmentsResponseData().
|
||||
setErrorMessage(null).setResponses(asList(
|
||||
new ReassignableTopicResponse().setName(topic).setPartitions(asList(
|
||||
setErrorMessage(null).setResponses(singletonList(
|
||||
new ReassignableTopicResponse().setName(topic).setPartitions(singletonList(
|
||||
new ReassignablePartitionResponse().setPartitionIndex(0).
|
||||
setErrorMessage(null))))), alterResultOne.response());
|
||||
ctx.replay(alterResultOne.records());
|
||||
|
||||
ListPartitionReassignmentsResponseData currentReassigning =
|
||||
new ListPartitionReassignmentsResponseData().setErrorMessage(null).
|
||||
setTopics(asList(new OngoingTopicReassignment().
|
||||
setName(topic).setPartitions(asList(
|
||||
setTopics(singletonList(new OngoingTopicReassignment().
|
||||
setName(topic).setPartitions(singletonList(
|
||||
new OngoingPartitionReassignment().setPartitionIndex(0).
|
||||
setRemovingReplicas(asList(0, 1)).
|
||||
setAddingReplicas(asList(2, 3)).
|
||||
|
|
@ -2802,9 +2802,9 @@ public class ReplicationControlManagerTest {
|
|||
AlterPartitionRequestData alterPartitionRequestData = new AlterPartitionRequestData().
|
||||
setBrokerId(partition.leader).
|
||||
setBrokerEpoch(ctx.currentBrokerEpoch(partition.leader)).
|
||||
setTopics(asList(new TopicData().
|
||||
setTopics(singletonList(new TopicData().
|
||||
setTopicId(topicId).
|
||||
setPartitions(asList(new PartitionData().
|
||||
setPartitions(singletonList(new PartitionData().
|
||||
setPartitionIndex(0).
|
||||
setPartitionEpoch(partition.partitionEpoch).
|
||||
setLeaderEpoch(partition.leaderEpoch).
|
||||
|
|
@ -2812,15 +2812,15 @@ public class ReplicationControlManagerTest {
|
|||
ControllerResult<AlterPartitionResponseData> alterPartitionResult = replication.alterPartition(
|
||||
anonymousContextFor(ApiKeys.ALTER_PARTITION),
|
||||
new AlterPartitionRequest.Builder(alterPartitionRequestData, true).build().data());
|
||||
assertEquals(new AlterPartitionResponseData().setTopics(asList(
|
||||
new AlterPartitionResponseData.TopicData().
|
||||
setTopicId(topicId).
|
||||
setPartitions(asList(
|
||||
new AlterPartitionResponseData.PartitionData().
|
||||
setPartitionIndex(0).
|
||||
setIsr(Arrays.asList(0, 1, 2)).
|
||||
setPartitionEpoch(partition.partitionEpoch + 1).
|
||||
setErrorCode(NONE.code()))))),
|
||||
assertEquals(new AlterPartitionResponseData().setTopics(singletonList(
|
||||
new AlterPartitionResponseData.TopicData().
|
||||
setTopicId(topicId).
|
||||
setPartitions(singletonList(
|
||||
new AlterPartitionResponseData.PartitionData().
|
||||
setPartitionIndex(0).
|
||||
setIsr(asList(0, 1, 2)).
|
||||
setPartitionEpoch(partition.partitionEpoch + 1).
|
||||
setErrorCode(NONE.code()))))),
|
||||
alterPartitionResult.response());
|
||||
|
||||
ctx.replay(alterPartitionResult.records());
|
||||
|
|
@ -2832,9 +2832,9 @@ public class ReplicationControlManagerTest {
|
|||
);
|
||||
ControllerResult<ElectLeadersResponseData> electLeaderTwoResult = replication.electLeaders(request);
|
||||
ReplicaElectionResult replicaElectionResult = new ReplicaElectionResult().setTopic(topic);
|
||||
replicaElectionResult.setPartitionResult(Arrays.asList(new PartitionResult().setPartitionId(0).setErrorCode(NONE.code()).setErrorMessage(null)));
|
||||
replicaElectionResult.setPartitionResult(singletonList(new PartitionResult().setPartitionId(0).setErrorCode(NONE.code()).setErrorMessage(null)));
|
||||
assertEquals(
|
||||
new ElectLeadersResponseData().setErrorCode(NONE.code()).setReplicaElectionResults(Arrays.asList(replicaElectionResult)),
|
||||
new ElectLeadersResponseData().setErrorCode(NONE.code()).setReplicaElectionResults(singletonList(replicaElectionResult)),
|
||||
electLeaderTwoResult.response()
|
||||
);
|
||||
ctx.replay(electLeaderTwoResult.records());
|
||||
|
|
@ -2845,13 +2845,13 @@ public class ReplicationControlManagerTest {
|
|||
// Reassign to [4, 5]
|
||||
ControllerResult<AlterPartitionReassignmentsResponseData> alterResultTwo =
|
||||
replication.alterPartitionReassignments(
|
||||
new AlterPartitionReassignmentsRequestData().setTopics(asList(
|
||||
new ReassignableTopic().setName(topic).setPartitions(asList(
|
||||
new AlterPartitionReassignmentsRequestData().setTopics(singletonList(
|
||||
new ReassignableTopic().setName(topic).setPartitions(singletonList(
|
||||
new ReassignablePartition().setPartitionIndex(0).
|
||||
setReplicas(asList(4, 5)))))));
|
||||
assertEquals(new AlterPartitionReassignmentsResponseData().
|
||||
setErrorMessage(null).setResponses(asList(
|
||||
new ReassignableTopicResponse().setName(topic).setPartitions(asList(
|
||||
setErrorMessage(null).setResponses(singletonList(
|
||||
new ReassignableTopicResponse().setName(topic).setPartitions(singletonList(
|
||||
new ReassignablePartitionResponse().setPartitionIndex(0).
|
||||
setErrorMessage(null))))), alterResultTwo.response());
|
||||
ctx.replay(alterResultTwo.records());
|
||||
|
|
@ -2859,8 +2859,8 @@ public class ReplicationControlManagerTest {
|
|||
// Make sure the replicas list contains all the previous replicas 0, 1, 2, 3 as well as the new replicas 3, 4
|
||||
currentReassigning =
|
||||
new ListPartitionReassignmentsResponseData().setErrorMessage(null).
|
||||
setTopics(asList(new OngoingTopicReassignment().
|
||||
setName(topic).setPartitions(asList(
|
||||
setTopics(singletonList(new OngoingTopicReassignment().
|
||||
setName(topic).setPartitions(singletonList(
|
||||
new OngoingPartitionReassignment().setPartitionIndex(0).
|
||||
setRemovingReplicas(asList(0, 1, 2, 3)).
|
||||
setAddingReplicas(asList(4, 5)).
|
||||
|
|
@ -2877,9 +2877,9 @@ public class ReplicationControlManagerTest {
|
|||
AlterPartitionRequestData alterPartitionRequestDataTwo = new AlterPartitionRequestData().
|
||||
setBrokerId(partition.leader).
|
||||
setBrokerEpoch(ctx.currentBrokerEpoch(partition.leader)).
|
||||
setTopics(asList(new TopicData().
|
||||
setTopics(singletonList(new TopicData().
|
||||
setTopicId(topicId).
|
||||
setPartitions(asList(new PartitionData().
|
||||
setPartitions(singletonList(new PartitionData().
|
||||
setPartitionIndex(0).
|
||||
setPartitionEpoch(partition.partitionEpoch).
|
||||
setLeaderEpoch(partition.leaderEpoch).
|
||||
|
|
@ -2887,10 +2887,10 @@ public class ReplicationControlManagerTest {
|
|||
ControllerResult<AlterPartitionResponseData> alterPartitionResultTwo = replication.alterPartition(
|
||||
anonymousContextFor(ApiKeys.ALTER_PARTITION),
|
||||
new AlterPartitionRequest.Builder(alterPartitionRequestDataTwo, true).build().data());
|
||||
assertEquals(new AlterPartitionResponseData().setTopics(asList(
|
||||
assertEquals(new AlterPartitionResponseData().setTopics(singletonList(
|
||||
new AlterPartitionResponseData.TopicData().
|
||||
setTopicId(topicId).
|
||||
setPartitions(asList(
|
||||
setPartitions(singletonList(
|
||||
new AlterPartitionResponseData.PartitionData().
|
||||
setPartitionIndex(0).
|
||||
setErrorCode(NEW_LEADER_ELECTED.code()))))),
|
||||
|
|
|
|||
|
|
@ -73,13 +73,13 @@ public class ClientQuotasImageTest {
|
|||
setRemove(true), CLIENT_QUOTA_RECORD.highestSupportedVersion()));
|
||||
// alter quota
|
||||
DELTA1_RECORDS.add(new ApiMessageAndVersion(new ClientQuotaRecord().
|
||||
setEntity(Arrays.asList(
|
||||
setEntity(Collections.singletonList(
|
||||
new EntityData().setEntityType(ClientQuotaEntity.USER).setEntityName("foo"))).
|
||||
setKey(QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG).
|
||||
setValue(234.0), CLIENT_QUOTA_RECORD.highestSupportedVersion()));
|
||||
// add quota to entity with existing quota
|
||||
DELTA1_RECORDS.add(new ApiMessageAndVersion(new ClientQuotaRecord().
|
||||
setEntity(Arrays.asList(
|
||||
setEntity(Collections.singletonList(
|
||||
new EntityData().setEntityType(ClientQuotaEntity.USER).setEntityName("foo"))).
|
||||
setKey(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG).
|
||||
setValue(999.0), CLIENT_QUOTA_RECORD.highestSupportedVersion()));
|
||||
|
|
|
|||
|
|
@ -45,7 +45,6 @@ import org.junit.jupiter.api.Test;
|
|||
import org.junit.jupiter.api.Timeout;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
|
@ -83,7 +82,7 @@ public class ClusterImageTest {
|
|||
setId(0).
|
||||
setEpoch(1000).
|
||||
setIncarnationId(Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")).
|
||||
setListeners(Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9092))).
|
||||
setListeners(Collections.singletonList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9092))).
|
||||
setSupportedFeatures(Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 3))).
|
||||
setRack(Optional.empty()).
|
||||
setFenced(true).
|
||||
|
|
@ -92,7 +91,7 @@ public class ClusterImageTest {
|
|||
setId(1).
|
||||
setEpoch(1001).
|
||||
setIncarnationId(Uuid.fromString("U52uRe20RsGI0RvpcTx33Q")).
|
||||
setListeners(Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9093))).
|
||||
setListeners(Collections.singletonList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9093))).
|
||||
setSupportedFeatures(Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 3))).
|
||||
setRack(Optional.empty()).
|
||||
setFenced(false).
|
||||
|
|
@ -101,7 +100,7 @@ public class ClusterImageTest {
|
|||
setId(2).
|
||||
setEpoch(123).
|
||||
setIncarnationId(Uuid.fromString("hr4TVh3YQiu3p16Awkka6w")).
|
||||
setListeners(Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9094))).
|
||||
setListeners(Collections.singletonList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9094))).
|
||||
setSupportedFeatures(Collections.emptyMap()).
|
||||
setRack(Optional.of("arack")).
|
||||
setFenced(false).
|
||||
|
|
@ -154,7 +153,7 @@ public class ClusterImageTest {
|
|||
setId(0).
|
||||
setEpoch(1000).
|
||||
setIncarnationId(Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")).
|
||||
setListeners(Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9092))).
|
||||
setListeners(Collections.singletonList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9092))).
|
||||
setSupportedFeatures(Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 3))).
|
||||
setRack(Optional.empty()).
|
||||
setFenced(false).
|
||||
|
|
@ -163,7 +162,7 @@ public class ClusterImageTest {
|
|||
setId(1).
|
||||
setEpoch(1001).
|
||||
setIncarnationId(Uuid.fromString("U52uRe20RsGI0RvpcTx33Q")).
|
||||
setListeners(Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9093))).
|
||||
setListeners(Collections.singletonList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9093))).
|
||||
setSupportedFeatures(Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 3))).
|
||||
setRack(Optional.empty()).
|
||||
setFenced(true).
|
||||
|
|
@ -194,7 +193,7 @@ public class ClusterImageTest {
|
|||
DELTA2_RECORDS.add(new ApiMessageAndVersion(new RegisterBrokerRecord().
|
||||
setBrokerId(2).setIsMigratingZkBroker(true).setIncarnationId(Uuid.fromString("Am5Yse7GQxaw0b2alM74bP")).
|
||||
setBrokerEpoch(1002).setEndPoints(new BrokerEndpointCollection(
|
||||
Arrays.asList(new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
|
||||
Collections.singletonList(new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
|
||||
setPort(9094).setSecurityProtocol((short) 0)).iterator())).
|
||||
setFeatures(new BrokerFeatureCollection(
|
||||
Collections.singleton(new BrokerFeature().
|
||||
|
|
@ -212,7 +211,7 @@ public class ClusterImageTest {
|
|||
setId(0).
|
||||
setEpoch(1000).
|
||||
setIncarnationId(Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")).
|
||||
setListeners(Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9092))).
|
||||
setListeners(Collections.singletonList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9092))).
|
||||
setSupportedFeatures(Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 3))).
|
||||
setRack(Optional.empty()).
|
||||
setFenced(true).
|
||||
|
|
@ -221,7 +220,7 @@ public class ClusterImageTest {
|
|||
setId(1).
|
||||
setEpoch(1001).
|
||||
setIncarnationId(Uuid.fromString("U52uRe20RsGI0RvpcTx33Q")).
|
||||
setListeners(Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9093))).
|
||||
setListeners(Collections.singletonList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9093))).
|
||||
setSupportedFeatures(Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 3))).
|
||||
setRack(Optional.empty()).
|
||||
setFenced(false).
|
||||
|
|
@ -230,7 +229,7 @@ public class ClusterImageTest {
|
|||
setId(2).
|
||||
setEpoch(1002).
|
||||
setIncarnationId(Uuid.fromString("Am5Yse7GQxaw0b2alM74bP")).
|
||||
setListeners(Arrays.asList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9094))).
|
||||
setListeners(Collections.singletonList(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "localhost", 9094))).
|
||||
setSupportedFeatures(Collections.singletonMap("metadata.version",
|
||||
VersionRange.of(MetadataVersion.IBP_3_3_IV3.featureLevel(), MetadataVersion.IBP_3_6_IV0.featureLevel()))).
|
||||
setRack(Optional.of("rack3")).
|
||||
|
|
|
|||
|
|
@ -83,7 +83,7 @@ public class ImageDowngradeTest {
|
|||
@Test
|
||||
public void testPremodernVersion() {
|
||||
writeWithExpectedLosses(MetadataVersion.IBP_3_2_IV0,
|
||||
Arrays.asList(
|
||||
Collections.singletonList(
|
||||
"feature flag(s): foo.feature"),
|
||||
Arrays.asList(
|
||||
metadataVersionRecord(MetadataVersion.IBP_3_3_IV0),
|
||||
|
|
@ -94,7 +94,8 @@ public class ImageDowngradeTest {
|
|||
setFeatureLevel((short) 4), (short) 0)),
|
||||
Arrays.asList(
|
||||
TEST_RECORDS.get(0),
|
||||
TEST_RECORDS.get(1)));
|
||||
TEST_RECORDS.get(1))
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -103,29 +104,30 @@ public class ImageDowngradeTest {
|
|||
@Test
|
||||
public void testPreControlledShutdownStateVersion() {
|
||||
writeWithExpectedLosses(MetadataVersion.IBP_3_3_IV2,
|
||||
Arrays.asList(
|
||||
"the inControlledShutdown state of one or more brokers"),
|
||||
Arrays.asList(
|
||||
metadataVersionRecord(MetadataVersion.IBP_3_3_IV3),
|
||||
new ApiMessageAndVersion(new RegisterBrokerRecord().
|
||||
setBrokerId(123).
|
||||
setIncarnationId(Uuid.fromString("XgjKo16hRWeWrTui0iR5Nw")).
|
||||
setBrokerEpoch(456).
|
||||
setRack(null).
|
||||
setFenced(false).
|
||||
setInControlledShutdown(true), (short) 1),
|
||||
TEST_RECORDS.get(0),
|
||||
TEST_RECORDS.get(1)),
|
||||
Arrays.asList(
|
||||
metadataVersionRecord(MetadataVersion.IBP_3_3_IV2),
|
||||
new ApiMessageAndVersion(new RegisterBrokerRecord().
|
||||
setBrokerId(123).
|
||||
setIncarnationId(Uuid.fromString("XgjKo16hRWeWrTui0iR5Nw")).
|
||||
setBrokerEpoch(456).
|
||||
setRack(null).
|
||||
setFenced(false), (short) 0),
|
||||
TEST_RECORDS.get(0),
|
||||
TEST_RECORDS.get(1)));
|
||||
Collections.singletonList(
|
||||
"the inControlledShutdown state of one or more brokers"),
|
||||
Arrays.asList(
|
||||
metadataVersionRecord(MetadataVersion.IBP_3_3_IV3),
|
||||
new ApiMessageAndVersion(new RegisterBrokerRecord().
|
||||
setBrokerId(123).
|
||||
setIncarnationId(Uuid.fromString("XgjKo16hRWeWrTui0iR5Nw")).
|
||||
setBrokerEpoch(456).
|
||||
setRack(null).
|
||||
setFenced(false).
|
||||
setInControlledShutdown(true), (short) 1),
|
||||
TEST_RECORDS.get(0),
|
||||
TEST_RECORDS.get(1)),
|
||||
Arrays.asList(
|
||||
metadataVersionRecord(MetadataVersion.IBP_3_3_IV2),
|
||||
new ApiMessageAndVersion(new RegisterBrokerRecord().
|
||||
setBrokerId(123).
|
||||
setIncarnationId(Uuid.fromString("XgjKo16hRWeWrTui0iR5Nw")).
|
||||
setBrokerEpoch(456).
|
||||
setRack(null).
|
||||
setFenced(false), (short) 0),
|
||||
TEST_RECORDS.get(0),
|
||||
TEST_RECORDS.get(1))
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -134,7 +136,7 @@ public class ImageDowngradeTest {
|
|||
@Test
|
||||
public void testPreZkMigrationSupportVersion() {
|
||||
writeWithExpectedLosses(MetadataVersion.IBP_3_3_IV3,
|
||||
Arrays.asList(
|
||||
Collections.singletonList(
|
||||
"the isMigratingZkBroker state of one or more brokers"),
|
||||
Arrays.asList(
|
||||
metadataVersionRecord(MetadataVersion.IBP_3_4_IV0),
|
||||
|
|
@ -158,7 +160,8 @@ public class ImageDowngradeTest {
|
|||
setFenced(false).
|
||||
setInControlledShutdown(true), (short) 1),
|
||||
TEST_RECORDS.get(0),
|
||||
TEST_RECORDS.get(1)));
|
||||
TEST_RECORDS.get(1))
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -167,22 +170,24 @@ public class ImageDowngradeTest {
|
|||
MetadataVersion inputMetadataVersion = outputMetadataVersion;
|
||||
PartitionRecord testPartitionRecord = (PartitionRecord) TEST_RECORDS.get(1).message();
|
||||
writeWithExpectedLosses(outputMetadataVersion,
|
||||
Collections.singletonList("the directory assignment state of one or more replicas"),
|
||||
Arrays.asList(
|
||||
metadataVersionRecord(inputMetadataVersion),
|
||||
TEST_RECORDS.get(0),
|
||||
new ApiMessageAndVersion(
|
||||
testPartitionRecord.duplicate().setDirectories(Arrays.asList(
|
||||
Uuid.fromString("c7QfSi6xSIGQVh3Qd5RJxA"),
|
||||
Uuid.fromString("rWaCHejCRRiptDMvW5Xw0g"))),
|
||||
(short) 2)),
|
||||
Arrays.asList(
|
||||
metadataVersionRecord(outputMetadataVersion),
|
||||
new ApiMessageAndVersion(new ZkMigrationStateRecord(), (short) 0),
|
||||
TEST_RECORDS.get(0),
|
||||
new ApiMessageAndVersion(
|
||||
testPartitionRecord.duplicate().setDirectories(Collections.emptyList()),
|
||||
(short) 0)));
|
||||
Collections.singletonList(
|
||||
"the directory assignment state of one or more replicas"),
|
||||
Arrays.asList(
|
||||
metadataVersionRecord(inputMetadataVersion),
|
||||
TEST_RECORDS.get(0),
|
||||
new ApiMessageAndVersion(
|
||||
testPartitionRecord.duplicate().setDirectories(Arrays.asList(
|
||||
Uuid.fromString("c7QfSi6xSIGQVh3Qd5RJxA"),
|
||||
Uuid.fromString("rWaCHejCRRiptDMvW5Xw0g"))),
|
||||
(short) 2)),
|
||||
Arrays.asList(
|
||||
metadataVersionRecord(outputMetadataVersion),
|
||||
new ApiMessageAndVersion(new ZkMigrationStateRecord(), (short) 0),
|
||||
TEST_RECORDS.get(0),
|
||||
new ApiMessageAndVersion(
|
||||
testPartitionRecord.duplicate().setDirectories(Collections.emptyList()),
|
||||
(short) 0))
|
||||
);
|
||||
}
|
||||
|
||||
private static void writeWithExpectedLosses(
|
||||
|
|
|
|||
|
|
@ -247,11 +247,11 @@ public class TopicsImageTest {
|
|||
|
||||
LocalReplicaChanges changes = delta.localChanges(localId);
|
||||
assertEquals(
|
||||
new HashSet<>(Arrays.asList(new TopicPartition("baz", 0))),
|
||||
new HashSet<>(Collections.singletonList(new TopicPartition("baz", 0))),
|
||||
changes.electedLeaders().keySet()
|
||||
);
|
||||
assertEquals(
|
||||
new HashSet<>(Arrays.asList(new TopicPartition("baz", 0))),
|
||||
new HashSet<>(Collections.singletonList(new TopicPartition("baz", 0))),
|
||||
changes.leaders().keySet()
|
||||
);
|
||||
assertEquals(
|
||||
|
|
@ -303,7 +303,7 @@ public class TopicsImageTest {
|
|||
RecordTestUtils.replayAll(delta, topicRecords);
|
||||
|
||||
LocalReplicaChanges changes = delta.localChanges(localId);
|
||||
assertEquals(new HashSet<>(Arrays.asList(new TopicPartition("zoo", 0))), changes.deletes());
|
||||
assertEquals(new HashSet<>(Collections.singletonList(new TopicPartition("zoo", 0))), changes.deletes());
|
||||
assertEquals(Collections.emptyMap(), changes.electedLeaders());
|
||||
assertEquals(Collections.emptyMap(), changes.leaders());
|
||||
assertEquals(Collections.emptyMap(), changes.followers());
|
||||
|
|
@ -345,7 +345,7 @@ public class TopicsImageTest {
|
|||
assertEquals(Collections.emptySet(), changes.deletes());
|
||||
assertEquals(Collections.emptyMap(), changes.electedLeaders());
|
||||
assertEquals(
|
||||
new HashSet<>(Arrays.asList(new TopicPartition("zoo", 0))),
|
||||
new HashSet<>(Collections.singletonList(new TopicPartition("zoo", 0))),
|
||||
changes.leaders().keySet()
|
||||
);
|
||||
assertEquals(Collections.emptyMap(), changes.followers());
|
||||
|
|
|
|||
|
|
@ -52,7 +52,6 @@ import org.junit.jupiter.params.provider.ValueSource;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.OptionalLong;
|
||||
|
|
@ -64,6 +63,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV1;
|
||||
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2;
|
||||
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_5_IV0;
|
||||
|
|
@ -249,17 +249,17 @@ public class MetadataLoaderTest {
|
|||
setFaultHandler(faultHandler).
|
||||
setHighWaterMarkAccessor(() -> OptionalLong.of(0L)).
|
||||
build()) {
|
||||
loader.installPublishers(asList(publisher)).get();
|
||||
loader.installPublishers(singletonList(publisher)).get();
|
||||
if (loadSnapshot) {
|
||||
MockSnapshotReader snapshotReader = new MockSnapshotReader(
|
||||
new MetadataProvenance(200, 100, 4000),
|
||||
asList(
|
||||
singletonList(
|
||||
Batch.control(
|
||||
200,
|
||||
100,
|
||||
4000,
|
||||
10,
|
||||
asList(new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord()))
|
||||
singletonList(new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord()))
|
||||
)
|
||||
)
|
||||
);
|
||||
|
|
@ -277,13 +277,13 @@ public class MetadataLoaderTest {
|
|||
assertEquals("testPublisherCannotBeInstalledMoreThanOnce: Attempted to install " +
|
||||
"publisher MockPublisher, which is already installed.",
|
||||
assertThrows(ExecutionException.class,
|
||||
() -> loader.installPublishers(asList(publisher)).get()).
|
||||
() -> loader.installPublishers(singletonList(publisher)).get()).
|
||||
getCause().getMessage());
|
||||
} else {
|
||||
assertEquals("testPublisherCannotBeInstalledMoreThanOnce: Attempted to install " +
|
||||
"a new publisher named MockPublisher, but there is already a publisher with that name.",
|
||||
assertThrows(ExecutionException.class,
|
||||
() -> loader.installPublishers(asList(new MockPublisher())).get()).
|
||||
() -> loader.installPublishers(singletonList(new MockPublisher())).get()).
|
||||
getCause().getMessage());
|
||||
}
|
||||
}
|
||||
|
|
@ -306,7 +306,7 @@ public class MetadataLoaderTest {
|
|||
loader.removeAndClosePublisher(publishers.get(1)).get();
|
||||
MockSnapshotReader snapshotReader = MockSnapshotReader.fromRecordLists(
|
||||
new MetadataProvenance(100, 50, 2000),
|
||||
asList(asList(new ApiMessageAndVersion(
|
||||
singletonList(singletonList(new ApiMessageAndVersion(
|
||||
new FeatureLevelRecord().
|
||||
setName(MetadataVersion.FEATURE_NAME).
|
||||
setFeatureLevel(IBP_3_3_IV2.featureLevel()), (short) 0))));
|
||||
|
|
@ -334,7 +334,7 @@ public class MetadataLoaderTest {
|
|||
public void testLoadEmptySnapshot() throws Exception {
|
||||
MockFaultHandler faultHandler = new MockFaultHandler("testLoadEmptySnapshot");
|
||||
MockTime time = new MockTime();
|
||||
List<MockPublisher> publishers = asList(new MockPublisher());
|
||||
List<MockPublisher> publishers = singletonList(new MockPublisher());
|
||||
try (MetadataLoader loader = new MetadataLoader.Builder().
|
||||
setFaultHandler(faultHandler).
|
||||
setTime(time).
|
||||
|
|
@ -364,13 +364,13 @@ public class MetadataLoaderTest {
|
|||
) throws Exception {
|
||||
MockSnapshotReader snapshotReader = new MockSnapshotReader(
|
||||
new MetadataProvenance(offset, 100, 4000),
|
||||
asList(
|
||||
singletonList(
|
||||
Batch.control(
|
||||
200,
|
||||
100,
|
||||
4000,
|
||||
10,
|
||||
asList(new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord()))
|
||||
singletonList(new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord()))
|
||||
)
|
||||
)
|
||||
);
|
||||
|
|
@ -393,7 +393,7 @@ public class MetadataLoaderTest {
|
|||
List<ApiMessageAndVersion> records
|
||||
) {
|
||||
return new MockBatchReader(batchBaseOffset,
|
||||
Collections.singletonList(newBatch(batchBaseOffset, epoch, records)));
|
||||
singletonList(newBatch(batchBaseOffset, epoch, records)));
|
||||
}
|
||||
|
||||
static Batch<ApiMessageAndVersion> newBatch(
|
||||
|
|
@ -452,7 +452,7 @@ public class MetadataLoaderTest {
|
|||
public void testLoadEmptyBatch() throws Exception {
|
||||
MockFaultHandler faultHandler = new MockFaultHandler("testLoadEmptyBatch");
|
||||
MockTime time = new MockTime();
|
||||
List<MockPublisher> publishers = asList(new MockPublisher());
|
||||
List<MockPublisher> publishers = singletonList(new MockPublisher());
|
||||
try (MetadataLoader loader = new MetadataLoader.Builder().
|
||||
setFaultHandler(faultHandler).
|
||||
setTime(time).
|
||||
|
|
@ -463,13 +463,13 @@ public class MetadataLoaderTest {
|
|||
publishers.get(0).firstPublish.get(10, TimeUnit.SECONDS);
|
||||
MockBatchReader batchReader = new MockBatchReader(
|
||||
300,
|
||||
asList(
|
||||
singletonList(
|
||||
Batch.control(
|
||||
300,
|
||||
100,
|
||||
4000,
|
||||
10,
|
||||
asList(new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord()))
|
||||
singletonList(new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord()))
|
||||
)
|
||||
)
|
||||
).setTime(time);
|
||||
|
|
@ -508,10 +508,10 @@ public class MetadataLoaderTest {
|
|||
loader.installPublishers(publishers).get();
|
||||
loader.handleLoadSnapshot(MockSnapshotReader.fromRecordLists(
|
||||
new MetadataProvenance(200, 100, 4000), asList(
|
||||
asList(new ApiMessageAndVersion(new FeatureLevelRecord().
|
||||
singletonList(new ApiMessageAndVersion(new FeatureLevelRecord().
|
||||
setName(MetadataVersion.FEATURE_NAME).
|
||||
setFeatureLevel(IBP_3_3_IV1.featureLevel()), (short) 0)),
|
||||
asList(new ApiMessageAndVersion(new TopicRecord().
|
||||
singletonList(new ApiMessageAndVersion(new TopicRecord().
|
||||
setName("foo").
|
||||
setTopicId(Uuid.fromString("Uum7sfhHQP-obSvfywmNUA")), (short) 0))
|
||||
)));
|
||||
|
|
@ -520,8 +520,8 @@ public class MetadataLoaderTest {
|
|||
}
|
||||
loader.waitForAllEventsToBeHandled();
|
||||
assertEquals(200L, loader.lastAppliedOffset());
|
||||
loader.handleCommit(new MockBatchReader(201, asList(
|
||||
MockBatchReader.newBatch(201, 100, asList(
|
||||
loader.handleCommit(new MockBatchReader(201, singletonList(
|
||||
MockBatchReader.newBatch(201, 100, singletonList(
|
||||
new ApiMessageAndVersion(new RemoveTopicRecord().
|
||||
setTopicId(Uuid.fromString("Uum7sfhHQP-obSvfywmNUA")), (short) 0))))));
|
||||
loader.waitForAllEventsToBeHandled();
|
||||
|
|
@ -578,10 +578,10 @@ public class MetadataLoaderTest {
|
|||
) throws Exception {
|
||||
loader.handleLoadSnapshot(MockSnapshotReader.fromRecordLists(
|
||||
new MetadataProvenance(offset, 100, 4000), asList(
|
||||
asList(new ApiMessageAndVersion(new FeatureLevelRecord().
|
||||
singletonList(new ApiMessageAndVersion(new FeatureLevelRecord().
|
||||
setName(MetadataVersion.FEATURE_NAME).
|
||||
setFeatureLevel(IBP_3_3_IV1.featureLevel()), (short) 0)),
|
||||
asList(new ApiMessageAndVersion(new TopicRecord().
|
||||
singletonList(new ApiMessageAndVersion(new TopicRecord().
|
||||
setName("foo").
|
||||
setTopicId(Uuid.fromString("Uum7sfhHQP-obSvfywmNUA")), (short) 0))
|
||||
)));
|
||||
|
|
@ -594,10 +594,10 @@ public class MetadataLoaderTest {
|
|||
) throws Exception {
|
||||
loader.handleLoadSnapshot(MockSnapshotReader.fromRecordLists(
|
||||
new MetadataProvenance(offset, 100, 4000), asList(
|
||||
asList(new ApiMessageAndVersion(new FeatureLevelRecord().
|
||||
singletonList(new ApiMessageAndVersion(new FeatureLevelRecord().
|
||||
setName(MetadataVersion.FEATURE_NAME).
|
||||
setFeatureLevel(IBP_3_3_IV2.featureLevel()), (short) 0)),
|
||||
asList(new ApiMessageAndVersion(new TopicRecord().
|
||||
singletonList(new ApiMessageAndVersion(new TopicRecord().
|
||||
setName("bar").
|
||||
setTopicId(Uuid.fromString("VcL2Mw-cT4aL6XV9VujzoQ")), (short) 0))
|
||||
)));
|
||||
|
|
@ -610,7 +610,7 @@ public class MetadataLoaderTest {
|
|||
@Test
|
||||
public void testReloadSnapshot() throws Exception {
|
||||
MockFaultHandler faultHandler = new MockFaultHandler("testLastAppliedOffset");
|
||||
List<MockPublisher> publishers = asList(new MockPublisher("a"));
|
||||
List<MockPublisher> publishers = singletonList(new MockPublisher("a"));
|
||||
try (MetadataLoader loader = new MetadataLoader.Builder().
|
||||
setFaultHandler(faultHandler).
|
||||
setHighWaterMarkAccessor(() -> OptionalLong.of(0)).
|
||||
|
|
@ -637,8 +637,8 @@ public class MetadataLoaderTest {
|
|||
assertFalse(publishers.get(0).latestImage.topics().topicsByName().containsKey("foo"));
|
||||
assertTrue(publishers.get(0).latestImage.topics().topicsByName().containsKey("bar"));
|
||||
|
||||
loader.handleCommit(new MockBatchReader(500, asList(
|
||||
MockBatchReader.newBatch(500, 100, asList(
|
||||
loader.handleCommit(new MockBatchReader(500, singletonList(
|
||||
MockBatchReader.newBatch(500, 100, singletonList(
|
||||
new ApiMessageAndVersion(new FeatureLevelRecord().
|
||||
setName(MetadataVersion.FEATURE_NAME).
|
||||
setFeatureLevel(IBP_3_5_IV0.featureLevel()), (short) 0))))));
|
||||
|
|
@ -654,7 +654,7 @@ public class MetadataLoaderTest {
|
|||
public void testPublishTransaction(boolean abortTxn) throws Exception {
|
||||
MockFaultHandler faultHandler = new MockFaultHandler("testTransactions");
|
||||
MockPublisher publisher = new MockPublisher("testTransactions");
|
||||
List<MockPublisher> publishers = Collections.singletonList(publisher);
|
||||
List<MockPublisher> publishers = singletonList(publisher);
|
||||
try (MetadataLoader loader = new MetadataLoader.Builder().
|
||||
setFaultHandler(faultHandler).
|
||||
setHighWaterMarkAccessor(() -> OptionalLong.of(0)).
|
||||
|
|
@ -690,7 +690,7 @@ public class MetadataLoaderTest {
|
|||
|
||||
if (abortTxn) {
|
||||
loader.handleCommit(
|
||||
MockBatchReader.newSingleBatchReader(500, 100, Arrays.asList(
|
||||
MockBatchReader.newSingleBatchReader(500, 100, singletonList(
|
||||
new ApiMessageAndVersion(new AbortTransactionRecord(), (short) 0)
|
||||
)));
|
||||
loader.waitForAllEventsToBeHandled();
|
||||
|
|
@ -699,7 +699,7 @@ public class MetadataLoaderTest {
|
|||
"Topic should not be visible since the transaction was aborted");
|
||||
} else {
|
||||
loader.handleCommit(
|
||||
MockBatchReader.newSingleBatchReader(500, 100, Arrays.asList(
|
||||
MockBatchReader.newSingleBatchReader(500, 100, singletonList(
|
||||
new ApiMessageAndVersion(new EndTransactionRecord(), (short) 0)
|
||||
)));
|
||||
loader.waitForAllEventsToBeHandled();
|
||||
|
|
@ -715,7 +715,7 @@ public class MetadataLoaderTest {
|
|||
public void testPublishTransactionWithinBatch() throws Exception {
|
||||
MockFaultHandler faultHandler = new MockFaultHandler("testPublishTransactionWithinBatch");
|
||||
MockPublisher publisher = new MockPublisher("testPublishTransactionWithinBatch");
|
||||
List<MockPublisher> publishers = Collections.singletonList(publisher);
|
||||
List<MockPublisher> publishers = singletonList(publisher);
|
||||
try (MetadataLoader loader = new MetadataLoader.Builder().
|
||||
setFaultHandler(faultHandler).
|
||||
setHighWaterMarkAccessor(() -> OptionalLong.of(0)).
|
||||
|
|
@ -746,7 +746,7 @@ public class MetadataLoaderTest {
|
|||
public void testSnapshotDuringTransaction() throws Exception {
|
||||
MockFaultHandler faultHandler = new MockFaultHandler("testSnapshotDuringTransaction");
|
||||
MockPublisher publisher = new MockPublisher("testSnapshotDuringTransaction");
|
||||
List<MockPublisher> publishers = Collections.singletonList(publisher);
|
||||
List<MockPublisher> publishers = singletonList(publisher);
|
||||
try (MetadataLoader loader = new MetadataLoader.Builder().
|
||||
setFaultHandler(faultHandler).
|
||||
setHighWaterMarkAccessor(() -> OptionalLong.of(0)).
|
||||
|
|
@ -768,8 +768,8 @@ public class MetadataLoaderTest {
|
|||
|
||||
// loading a snapshot discards any in-flight transaction
|
||||
loader.handleLoadSnapshot(MockSnapshotReader.fromRecordLists(
|
||||
new MetadataProvenance(600, 101, 4000), asList(
|
||||
asList(new ApiMessageAndVersion(new TopicRecord().
|
||||
new MetadataProvenance(600, 101, 4000), singletonList(
|
||||
singletonList(new ApiMessageAndVersion(new TopicRecord().
|
||||
setName("foo").
|
||||
setTopicId(Uuid.fromString("Uum7sfhHQP-obSvfywmNUA")), (short) 0))
|
||||
)));
|
||||
|
|
@ -804,9 +804,9 @@ public class MetadataLoaderTest {
|
|||
setFaultHandler(faultHandler).
|
||||
setHighWaterMarkAccessor(() -> OptionalLong.of(1)).
|
||||
build()) {
|
||||
loader.installPublishers(Collections.singletonList(capturingPublisher)).get();
|
||||
loader.installPublishers(singletonList(capturingPublisher)).get();
|
||||
loader.handleCommit(
|
||||
MockBatchReader.newSingleBatchReader(0, 1, Collections.singletonList(
|
||||
MockBatchReader.newSingleBatchReader(0, 1, singletonList(
|
||||
// Any record will work here
|
||||
new ApiMessageAndVersion(new ConfigRecord()
|
||||
.setResourceType(ConfigResource.Type.BROKER.id())
|
||||
|
|
|
|||
|
|
@ -53,7 +53,7 @@ public class ClusterImageBrokersNodeTest {
|
|||
|
||||
@Test
|
||||
public void testChildNames() {
|
||||
assertEquals(Arrays.asList("1"), NODE.childNames());
|
||||
assertEquals(Collections.singletonList("1"), NODE.childNames());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
|
|
@ -25,7 +25,6 @@ import org.apache.kafka.server.common.MetadataVersion;
|
|||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.Timeout;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
|
@ -50,7 +49,7 @@ public class ClusterImageControllersNodeTest {
|
|||
|
||||
@Test
|
||||
public void testChildNames() {
|
||||
assertEquals(Arrays.asList("2"), NODE.childNames());
|
||||
assertEquals(Collections.singletonList("2"), NODE.childNames());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
|
|
@ -31,7 +31,6 @@ import org.junit.jupiter.api.Test;
|
|||
import org.junit.jupiter.api.Timeout;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
|
@ -118,7 +117,7 @@ public class SnapshotGeneratorTest {
|
|||
assertEquals(Collections.emptyList(), emitter.images());
|
||||
emitter.setReady();
|
||||
}
|
||||
assertEquals(Arrays.asList(TEST_IMAGE), emitter.images());
|
||||
assertEquals(Collections.singletonList(TEST_IMAGE), emitter.images());
|
||||
faultHandler.maybeRethrowFirstException();
|
||||
}
|
||||
|
||||
|
|
@ -163,7 +162,7 @@ public class SnapshotGeneratorTest {
|
|||
// so this does not trigger a new snapshot.
|
||||
generator.publishLogDelta(TEST_DELTA, TEST_IMAGE, logDeltaManifestBuilder().numBytes(150).build());
|
||||
}
|
||||
assertEquals(Arrays.asList(TEST_IMAGE), emitter.images());
|
||||
assertEquals(Collections.singletonList(TEST_IMAGE), emitter.images());
|
||||
faultHandler.maybeRethrowFirstException();
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -48,7 +48,7 @@ public class BrokerRegistrationTest {
|
|||
setId(0).
|
||||
setEpoch(0).
|
||||
setIncarnationId(Uuid.fromString("pc1GhUlBS92cGGaKXl6ipw")).
|
||||
setListeners(Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9090))).
|
||||
setListeners(Collections.singletonList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9090))).
|
||||
setSupportedFeatures(Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 2))).
|
||||
setRack(Optional.empty()).
|
||||
setFenced(false).
|
||||
|
|
@ -57,7 +57,7 @@ public class BrokerRegistrationTest {
|
|||
setId(1).
|
||||
setEpoch(0).
|
||||
setIncarnationId(Uuid.fromString("3MfdxWlNSn2UDYsmDP1pYg")).
|
||||
setListeners(Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9091))).
|
||||
setListeners(Collections.singletonList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9091))).
|
||||
setSupportedFeatures(Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 2))).
|
||||
setRack(Optional.empty()).
|
||||
setFenced(true).
|
||||
|
|
@ -66,7 +66,7 @@ public class BrokerRegistrationTest {
|
|||
setId(2).
|
||||
setEpoch(0).
|
||||
setIncarnationId(Uuid.fromString("eY7oaG1RREie5Kk9uy1l6g")).
|
||||
setListeners(Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9092))).
|
||||
setListeners(Collections.singletonList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9092))).
|
||||
setSupportedFeatures(Stream.of(new SimpleEntry<>("foo", VersionRange.of((short) 2, (short) 3)),
|
||||
new SimpleEntry<>("bar", VersionRange.of((short) 1, (short) 4))).collect(
|
||||
Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue))).
|
||||
|
|
@ -77,14 +77,14 @@ public class BrokerRegistrationTest {
|
|||
setId(3).
|
||||
setEpoch(0).
|
||||
setIncarnationId(Uuid.fromString("1t8VyWx2TCSTpUWuqj-FOw")).
|
||||
setListeners(Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9093))).
|
||||
setListeners(Collections.singletonList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9093))).
|
||||
setSupportedFeatures(Stream.of(new SimpleEntry<>("metadata.version", VersionRange.of((short) 7, (short) 7)))
|
||||
.collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue))).
|
||||
setRack(Optional.empty()).
|
||||
setFenced(false).
|
||||
setInControlledShutdown(true).
|
||||
setIsMigratingZkBroker(true).
|
||||
setDirectories(Arrays.asList(Uuid.fromString("r4HpEsMuST6nQ4rznIEJVA"))).
|
||||
setDirectories(Collections.singletonList(Uuid.fromString("r4HpEsMuST6nQ4rznIEJVA"))).
|
||||
build());
|
||||
|
||||
@Test
|
||||
|
|
@ -172,7 +172,7 @@ public class BrokerRegistrationTest {
|
|||
setId(0).
|
||||
setEpoch(0).
|
||||
setIncarnationId(Uuid.fromString("ik32HZbLTW6ulw1yyrC8jQ")).
|
||||
setListeners(Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9090))).
|
||||
setListeners(Collections.singletonList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9090))).
|
||||
setSupportedFeatures(Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 2))).
|
||||
setRack(Optional.empty()).
|
||||
setFenced(false).
|
||||
|
|
@ -202,7 +202,7 @@ public class BrokerRegistrationTest {
|
|||
setId(0).
|
||||
setEpoch(0).
|
||||
setIncarnationId(Uuid.fromString("m6CiJvfITZeKVC6UuhlZew")).
|
||||
setListeners(Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9090))).
|
||||
setListeners(Collections.singletonList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9090))).
|
||||
setSupportedFeatures(Collections.singletonMap("foo", VersionRange.of((short) 1, (short) 2))).
|
||||
setRack(Optional.empty()).
|
||||
setFenced(false).
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ import org.junit.jupiter.api.Test;
|
|||
import org.junit.jupiter.api.Timeout;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
|
@ -39,7 +40,7 @@ public class DelegationTokenDataTest {
|
|||
Uuid.randomUuid().toString(),
|
||||
Uuid.randomUuid().toString());
|
||||
|
||||
private static final List<KafkaPrincipal> EMPTYRENEWERS = Arrays.asList();
|
||||
private static final List<KafkaPrincipal> EMPTYRENEWERS = Collections.emptyList();
|
||||
|
||||
private static final List<TokenInformation> TOKENINFORMATION = Arrays.asList(
|
||||
new TokenInformation(
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ import org.junit.jupiter.params.provider.ValueSource;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
|
@ -65,23 +66,23 @@ public class ListenerInfoTest {
|
|||
|
||||
@Test
|
||||
public void testNullHostname() {
|
||||
assertNull(ListenerInfo.create(Arrays.asList(INTERNAL)).firstListener().host());
|
||||
assertNull(ListenerInfo.create(Collections.singletonList(INTERNAL)).firstListener().host());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNullHostnameGetsResolved() throws Exception {
|
||||
assertNotNull(ListenerInfo.create(Arrays.asList(INTERNAL)).
|
||||
assertNotNull(ListenerInfo.create(Collections.singletonList(INTERNAL)).
|
||||
withWildcardHostnamesResolved().firstListener().host());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEmptyHostname() {
|
||||
assertEquals("", ListenerInfo.create(Arrays.asList(SSL)).firstListener().host());
|
||||
assertEquals("", ListenerInfo.create(Collections.singletonList(SSL)).firstListener().host());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEmptyHostnameGetsResolved() throws Exception {
|
||||
assertNotEquals("", ListenerInfo.create(Arrays.asList(SSL)).
|
||||
assertNotEquals("", ListenerInfo.create(Collections.singletonList(SSL)).
|
||||
withWildcardHostnamesResolved().firstListener().host());
|
||||
}
|
||||
|
||||
|
|
@ -118,14 +119,14 @@ public class ListenerInfoTest {
|
|||
@Test
|
||||
public void testToControllerRegistrationRequestFailsOnNullHost() {
|
||||
assertThrows(RuntimeException.class,
|
||||
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
|
||||
() -> ListenerInfo.create(Collections.singletonList(INTERNAL)).
|
||||
toControllerRegistrationRequest());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testToControllerRegistrationRequestFailsOnZeroPort() {
|
||||
assertThrows(RuntimeException.class,
|
||||
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
|
||||
() -> ListenerInfo.create(Collections.singletonList(INTERNAL)).
|
||||
withWildcardHostnamesResolved().
|
||||
toControllerRegistrationRequest());
|
||||
}
|
||||
|
|
@ -143,14 +144,14 @@ public class ListenerInfoTest {
|
|||
@Test
|
||||
public void testToControllerRegistrationRecordFailsOnNullHost() {
|
||||
assertThrows(RuntimeException.class,
|
||||
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
|
||||
() -> ListenerInfo.create(Collections.singletonList(INTERNAL)).
|
||||
toControllerRegistrationRecord());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testToControllerRegistrationRecordFailsOnZeroPort() {
|
||||
assertThrows(RuntimeException.class,
|
||||
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
|
||||
() -> ListenerInfo.create(Collections.singletonList(INTERNAL)).
|
||||
withWildcardHostnamesResolved().
|
||||
toControllerRegistrationRecord());
|
||||
}
|
||||
|
|
@ -168,14 +169,14 @@ public class ListenerInfoTest {
|
|||
@Test
|
||||
public void testToBrokerRegistrationRequestFailsOnNullHost() {
|
||||
assertThrows(RuntimeException.class,
|
||||
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
|
||||
() -> ListenerInfo.create(Collections.singletonList(INTERNAL)).
|
||||
toBrokerRegistrationRequest());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testToBrokerRegistrationRequestFailsOnZeroPort() {
|
||||
assertThrows(RuntimeException.class,
|
||||
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
|
||||
() -> ListenerInfo.create(Collections.singletonList(INTERNAL)).
|
||||
withWildcardHostnamesResolved().
|
||||
toBrokerRegistrationRequest());
|
||||
}
|
||||
|
|
@ -193,14 +194,14 @@ public class ListenerInfoTest {
|
|||
@Test
|
||||
public void testToBrokerRegistrationRecordFailsOnNullHost() {
|
||||
assertThrows(RuntimeException.class,
|
||||
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
|
||||
() -> ListenerInfo.create(Collections.singletonList(INTERNAL)).
|
||||
toBrokerRegistrationRecord());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testToBrokerRegistrationRecordFailsOnZeroPort() {
|
||||
assertThrows(RuntimeException.class,
|
||||
() -> ListenerInfo.create(Arrays.asList(INTERNAL)).
|
||||
() -> ListenerInfo.create(Collections.singletonList(INTERNAL)).
|
||||
withWildcardHostnamesResolved().
|
||||
toBrokerRegistrationRecord());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -74,7 +74,7 @@ public class PartitionRegistrationTest {
|
|||
setReplicas(new int[]{1, 2, 3}).setDirectories(DirectoryId.unassignedArray(3)).
|
||||
setIsr(new int[]{1}).setLastKnownElr(new int[]{3}).setElr(new int[]{2}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(1).build();
|
||||
assertEquals(b, a.merge(new PartitionChangeRecord().
|
||||
setLeader(3).setIsr(Arrays.asList(3))));
|
||||
setLeader(3).setIsr(Collections.singletonList(3))));
|
||||
assertEquals("isr: [1, 2] -> [3], leader: 1 -> 3, leaderEpoch: 0 -> 1, partitionEpoch: 0 -> 1",
|
||||
b.diff(a));
|
||||
assertEquals("isr: [1, 2] -> [1], elr: [] -> [2], lastKnownElr: [] -> [3], partitionEpoch: 0 -> 1",
|
||||
|
|
@ -322,7 +322,7 @@ public class PartitionRegistrationTest {
|
|||
if (metadataVersion.isElrSupported()) {
|
||||
expectRecord.
|
||||
setEligibleLeaderReplicas(Arrays.asList(2, 3)).
|
||||
setLastKnownElr(Arrays.asList(4));
|
||||
setLastKnownElr(Collections.singletonList(4));
|
||||
}
|
||||
if (metadataVersion.isDirectoryAssignmentSupported()) {
|
||||
expectRecord.setDirectories(Arrays.asList(
|
||||
|
|
|
|||
|
|
@ -373,7 +373,7 @@ public class RecordTestUtils {
|
|||
).iterator()
|
||||
)).
|
||||
setFeatures(new RegisterControllerRecord.ControllerFeatureCollection(
|
||||
Arrays.asList(
|
||||
Collections.singletonList(
|
||||
new RegisterControllerRecord.ControllerFeature().
|
||||
setName(MetadataVersion.FEATURE_NAME).
|
||||
setMinSupportedVersion(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel()).
|
||||
|
|
|
|||
|
|
@ -35,15 +35,15 @@ public class ReplicasTest {
|
|||
@Test
|
||||
public void testToList() {
|
||||
assertEquals(Arrays.asList(1, 2, 3, 4), Replicas.toList(new int[] {1, 2, 3, 4}));
|
||||
assertEquals(Arrays.asList(), Replicas.toList(Replicas.NONE));
|
||||
assertEquals(Arrays.asList(2), Replicas.toList(new int[] {2}));
|
||||
assertEquals(Collections.emptyList(), Replicas.toList(Replicas.NONE));
|
||||
assertEquals(Collections.singletonList(2), Replicas.toList(new int[] {2}));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testToArray() {
|
||||
assertArrayEquals(new int[] {3, 2, 1}, Replicas.toArray(Arrays.asList(3, 2, 1)));
|
||||
assertArrayEquals(new int[] {}, Replicas.toArray(Arrays.asList()));
|
||||
assertArrayEquals(new int[] {2}, Replicas.toArray(Arrays.asList(2)));
|
||||
assertArrayEquals(new int[] {}, Replicas.toArray(Collections.emptyList()));
|
||||
assertArrayEquals(new int[] {2}, Replicas.toArray(Collections.singletonList(2)));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import org.junit.jupiter.api.Test;
|
|||
import org.junit.jupiter.api.Timeout;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.NoSuchElementException;
|
||||
|
||||
import static org.apache.kafka.metadata.authorizer.StandardAclWithIdTest.TEST_ACLS;
|
||||
|
|
@ -47,7 +48,7 @@ public class StandardAclRecordIteratorTest {
|
|||
new ApiMessageAndVersion(TEST_ACLS.get(3).toRecord(), (short) 0)),
|
||||
iterator.next());
|
||||
assertTrue(iterator.hasNext());
|
||||
assertEquals(Arrays.asList(
|
||||
assertEquals(Collections.singletonList(
|
||||
new ApiMessageAndVersion(TEST_ACLS.get(4).toRecord(), (short) 0)),
|
||||
iterator.next());
|
||||
assertFalse(iterator.hasNext());
|
||||
|
|
|
|||
|
|
@ -76,7 +76,7 @@ public class BootstrapMetadataTest {
|
|||
BootstrapMetadata.fromRecords(SAMPLE_RECORDS1, "baz").copyWithOnlyVersion());
|
||||
}
|
||||
|
||||
final static List<ApiMessageAndVersion> RECORDS_WITH_OLD_METADATA_VERSION = unmodifiableList(asList(
|
||||
final static List<ApiMessageAndVersion> RECORDS_WITH_OLD_METADATA_VERSION = unmodifiableList(Collections.singletonList(
|
||||
new ApiMessageAndVersion(new FeatureLevelRecord().
|
||||
setName(FEATURE_NAME).
|
||||
setFeatureLevel(IBP_3_0_IV1.featureLevel()), (short) 0)));
|
||||
|
|
|
|||
|
|
@ -471,9 +471,9 @@ public class KRaftMigrationDriverTest {
|
|||
|
||||
startAndWaitForRecoveringMigrationStateFromZK(driver);
|
||||
if (allNodePresent) {
|
||||
setupDeltaWithControllerRegistrations(delta, Arrays.asList(4, 5, 6), Arrays.asList());
|
||||
setupDeltaWithControllerRegistrations(delta, Arrays.asList(4, 5, 6), Collections.emptyList());
|
||||
} else {
|
||||
setupDeltaWithControllerRegistrations(delta, Arrays.asList(), Arrays.asList(4, 5));
|
||||
setupDeltaWithControllerRegistrations(delta, Collections.emptyList(), Arrays.asList(4, 5));
|
||||
}
|
||||
delta.replay(zkBrokerRecord(1));
|
||||
MetadataProvenance provenance = new MetadataProvenance(100, 1, 1);
|
||||
|
|
@ -493,7 +493,7 @@ public class KRaftMigrationDriverTest {
|
|||
|
||||
// Update so that all controller nodes are zkMigrationReady. Now we should be able to move to the next state.
|
||||
delta = new MetadataDelta(image);
|
||||
setupDeltaWithControllerRegistrations(delta, Arrays.asList(), Arrays.asList(4, 5, 6));
|
||||
setupDeltaWithControllerRegistrations(delta, Collections.emptyList(), Arrays.asList(4, 5, 6));
|
||||
image = delta.apply(new MetadataProvenance(200, 1, 2));
|
||||
driver.onMetadataUpdate(delta, image, new LogDeltaManifest.Builder().
|
||||
provenance(image.provenance()).
|
||||
|
|
|
|||
|
|
@ -119,9 +119,9 @@ public class StripedReplicaPlacerTest {
|
|||
public void testMultiPartitionTopicPlacementOnSingleUnfencedBroker() {
|
||||
MockRandom random = new MockRandom();
|
||||
StripedReplicaPlacer placer = new StripedReplicaPlacer(random);
|
||||
assertEquals(new TopicAssignment(Arrays.asList(partitionAssignment(Arrays.asList(0)),
|
||||
partitionAssignment(Arrays.asList(0)),
|
||||
partitionAssignment(Arrays.asList(0)))),
|
||||
assertEquals(new TopicAssignment(Arrays.asList(partitionAssignment(Collections.singletonList(0)),
|
||||
partitionAssignment(Collections.singletonList(0)),
|
||||
partitionAssignment(Collections.singletonList(0)))),
|
||||
place(placer, 0, 3, (short) 1, Arrays.asList(
|
||||
new UsableBroker(0, Optional.empty(), false),
|
||||
new UsableBroker(1, Optional.empty(), true))));
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import org.apache.kafka.common.Uuid;
|
|||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class TopicAssignmentTest {
|
||||
|
|
@ -44,14 +45,14 @@ public class TopicAssignmentTest {
|
|||
public void testConsistentEqualsAndHashCode() {
|
||||
List<TopicAssignment> topicAssignments = Arrays.asList(
|
||||
new TopicAssignment(
|
||||
Arrays.asList(
|
||||
Collections.singletonList(
|
||||
partitionAssignment(
|
||||
Arrays.asList(0, 1, 2)
|
||||
)
|
||||
)
|
||||
),
|
||||
new TopicAssignment(
|
||||
Arrays.asList(
|
||||
Collections.singletonList(
|
||||
partitionAssignment(
|
||||
Arrays.asList(1, 2, 0)
|
||||
)
|
||||
|
|
@ -81,7 +82,7 @@ public class TopicAssignmentTest {
|
|||
Uuid.fromString("MvUIAsOiRlSePeiBHdZrSQ"),
|
||||
Uuid.fromString("jUqCchHtTHqMxeVv4dw1RA")
|
||||
);
|
||||
List<PartitionAssignment> partitionAssignments = Arrays.asList(
|
||||
List<PartitionAssignment> partitionAssignments = Collections.singletonList(
|
||||
new PartitionAssignment(replicas, directories::get)
|
||||
);
|
||||
TopicAssignment topicAssignment = new TopicAssignment(partitionAssignments);
|
||||
|
|
|
|||
|
|
@ -54,7 +54,7 @@ final public class MetaPropertiesEnsembleTest {
|
|||
private static final MetaPropertiesEnsemble FOO =
|
||||
new MetaPropertiesEnsemble(
|
||||
new HashSet<>(Arrays.asList("/tmp/empty1", "/tmp/empty2")),
|
||||
new HashSet<>(Arrays.asList("/tmp/error3")),
|
||||
new HashSet<>(Collections.singletonList("/tmp/error3")),
|
||||
Stream.of(
|
||||
new SimpleImmutableEntry<>("/tmp/dir4",
|
||||
new MetaProperties.Builder().
|
||||
|
|
@ -104,7 +104,7 @@ final public class MetaPropertiesEnsembleTest {
|
|||
|
||||
@Test
|
||||
public void testErrorLogDirsForFoo() {
|
||||
assertEquals(new HashSet<>(Arrays.asList("/tmp/error3")), FOO.errorLogDirs());
|
||||
assertEquals(new HashSet<>(Collections.singletonList("/tmp/error3")), FOO.errorLogDirs());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
|
|
@ -99,8 +99,7 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
|
|||
public boolean equals(Object o) {
|
||||
if (!(o instanceof LeaderChangeBatch)) return false;
|
||||
LeaderChangeBatch other = (LeaderChangeBatch) o;
|
||||
if (!other.newLeader.equals(newLeader)) return false;
|
||||
return true;
|
||||
return other.newLeader.equals(newLeader);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
Loading…
Reference in New Issue