KAFKA-17922 add helper to ClusterInstance to create client component (#17666)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Ken Huang 2024-11-13 09:39:15 +08:00 committed by GitHub
parent edab667a9a
commit 6bc7be70d7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 1746 additions and 1531 deletions

View File

@ -40,6 +40,8 @@ import org.apache.kafka.server.config.ServerLogConfigs;
import org.junit.jupiter.api.extension.ExtendWith;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
@ -82,7 +84,7 @@ public class AdminFenceProducersTest {
clusterInstance.createTopic(TOPIC_NAME, 1, (short) 1);
try (KafkaProducer<byte[], byte[]> producer = createProducer();
Admin adminClient = clusterInstance.createAdminClient()) {
Admin adminClient = clusterInstance.admin()) {
producer.initTransactions();
producer.beginTransaction();
producer.send(RECORD).get();
@ -103,10 +105,10 @@ public class AdminFenceProducersTest {
@ClusterTest
void testFenceProducerTimeoutMs() {
Properties config = new Properties();
Map<String, Object> config = new HashMap<>();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + INCORRECT_BROKER_PORT);
try (Admin adminClient = clusterInstance.createAdminClient(config)) {
try (Admin adminClient = clusterInstance.admin(config)) {
ExecutionException exception = assertThrows(
ExecutionException.class, () ->
adminClient.fenceProducers(Collections.singletonList(TXN_ID), new FenceProducersOptions().timeoutMs(0)).all().get());
@ -119,7 +121,7 @@ public class AdminFenceProducersTest {
clusterInstance.createTopic(TOPIC_NAME, 1, (short) 1);
try (KafkaProducer<byte[], byte[]> producer = createProducer();
Admin adminClient = clusterInstance.createAdminClient()) {
Admin adminClient = clusterInstance.admin()) {
producer.initTransactions();
producer.beginTransaction();

View File

@ -132,7 +132,7 @@ public class ClientTelemetryTest {
public void testIntervalMsParser(ClusterInstance clusterInstance) {
List<String> alterOpts = asList("--bootstrap-server", clusterInstance.bootstrapServers(),
"--alter", "--entity-type", "client-metrics", "--entity-name", "test", "--add-config", "interval.ms=bbb");
try (Admin client = clusterInstance.createAdminClient()) {
try (Admin client = clusterInstance.admin()) {
ConfigCommand.ConfigCommandOptions addOpts = new ConfigCommand.ConfigCommandOptions(toArray(alterOpts));
Throwable e = assertThrows(ExecutionException.class, () -> ConfigCommand.alterConfig(client, addOpts));

View File

@ -142,7 +142,7 @@ public class ConfigCommandIntegrationTest {
public void testDynamicBrokerConfigUpdateUsingKraft() throws Exception {
List<String> alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
try (Admin client = cluster.createAdminClient()) {
try (Admin client = cluster.admin()) {
// Add config
alterAndVerifyConfig(client, Optional.of(defaultBrokerId), singletonMap(MESSAGE_MAX_BYTES_CONFIG, "110000"), alterOpts);
alterAndVerifyConfig(client, Optional.empty(), singletonMap(MESSAGE_MAX_BYTES_CONFIG, "120000"), alterOpts);
@ -191,7 +191,7 @@ public class ConfigCommandIntegrationTest {
}
private void verifyGroupConfigUpdate(List<String> alterOpts) throws Exception {
try (Admin client = cluster.createAdminClient()) {
try (Admin client = cluster.admin()) {
// Add config
Map<String, String> configs = new HashMap<>();
configs.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "50000");
@ -221,7 +221,7 @@ public class ConfigCommandIntegrationTest {
}
private void verifyClientMetricsConfigUpdate(List<String> alterOpts) throws Exception {
try (Admin client = cluster.createAdminClient()) {
try (Admin client = cluster.admin()) {
// Add config
Map<String, String> configs = new HashMap<>();
configs.put("metrics", "");
@ -240,7 +240,7 @@ public class ConfigCommandIntegrationTest {
public void testAlterReadOnlyConfigInKRaftThenShouldFail() {
List<String> alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
try (Admin client = cluster.createAdminClient()) {
try (Admin client = cluster.admin()) {
assertThrows(ExecutionException.class,
() -> alterConfigWithKraft(client, Optional.of(defaultBrokerId),
singletonMap(AUTO_CREATE_TOPICS_ENABLE_CONFIG, "false"), alterOpts));
@ -257,7 +257,7 @@ public class ConfigCommandIntegrationTest {
public void testUpdateClusterWideConfigInKRaftThenShouldSuccessful() throws Exception {
List<String> alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
try (Admin client = cluster.createAdminClient()) {
try (Admin client = cluster.admin()) {
alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
singletonMap("log.flush.interval.messages", "100"), alterOpts);
alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
@ -272,7 +272,7 @@ public class ConfigCommandIntegrationTest {
List<String> alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
String listenerName = "listener.name.internal.";
try (Admin client = cluster.createAdminClient()) {
try (Admin client = cluster.admin()) {
alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
singletonMap(listenerName + "ssl.truststore.type", "PKCS12"), alterOpts);
alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
@ -288,7 +288,7 @@ public class ConfigCommandIntegrationTest {
public void testUpdatePerBrokerConfigInKRaftThenShouldFail() {
List<String> alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
try (Admin client = cluster.createAdminClient()) {
try (Admin client = cluster.admin()) {
assertThrows(ExecutionException.class,
() -> alterConfigWithKraft(client, Optional.of(defaultBrokerId),
singletonMap(SSL_TRUSTSTORE_TYPE_CONFIG, "PKCS12"), alterOpts));

View File

@ -79,7 +79,7 @@ public class DeleteTopicTest {
@ClusterTest
public void testDeleteTopicWithAllAliveReplicas(ClusterInstance cluster) throws Exception {
try (Admin admin = cluster.createAdminClient()) {
try (Admin admin = cluster.admin()) {
admin.createTopics(List.of(new NewTopic(DEFAULT_TOPIC, expectedReplicaAssignment))).all().get();
admin.deleteTopics(List.of(DEFAULT_TOPIC)).all().get();
cluster.waitForTopic(DEFAULT_TOPIC, 0);
@ -88,7 +88,7 @@ public class DeleteTopicTest {
@ClusterTest
public void testResumeDeleteTopicWithRecoveredFollower(ClusterInstance cluster) throws Exception {
try (Admin admin = cluster.createAdminClient()) {
try (Admin admin = cluster.admin()) {
admin.createTopics(List.of(new NewTopic(DEFAULT_TOPIC, expectedReplicaAssignment))).all().get();
TopicPartition topicPartition = new TopicPartition(DEFAULT_TOPIC, 0);
int leaderId = waitUtilLeaderIsKnown(cluster.brokers(), topicPartition);
@ -111,7 +111,7 @@ public class DeleteTopicTest {
@ClusterTest(brokers = 4)
public void testPartitionReassignmentDuringDeleteTopic(ClusterInstance cluster) throws Exception {
try (Admin admin = cluster.createAdminClient()) {
try (Admin admin = cluster.admin()) {
admin.createTopics(List.of(new NewTopic(DEFAULT_TOPIC, expectedReplicaAssignment))).all().get();
TopicPartition topicPartition = new TopicPartition(DEFAULT_TOPIC, 0);
Map<Integer, KafkaBroker> servers = findPartitionHostingBrokers(cluster.brokers());
@ -137,7 +137,7 @@ public class DeleteTopicTest {
@ClusterTest(brokers = 4)
public void testIncreasePartitionCountDuringDeleteTopic(ClusterInstance cluster) throws Exception {
try (Admin admin = cluster.createAdminClient()) {
try (Admin admin = cluster.admin()) {
admin.createTopics(List.of(new NewTopic(DEFAULT_TOPIC, expectedReplicaAssignment))).all().get();
TopicPartition topicPartition = new TopicPartition(DEFAULT_TOPIC, 0);
Map<Integer, KafkaBroker> partitionHostingBrokers = findPartitionHostingBrokers(cluster.brokers());
@ -165,7 +165,7 @@ public class DeleteTopicTest {
@ClusterTest
public void testDeleteTopicDuringAddPartition(ClusterInstance cluster) throws Exception {
try (Admin admin = cluster.createAdminClient()) {
try (Admin admin = cluster.admin()) {
admin.createTopics(List.of(new NewTopic(DEFAULT_TOPIC, expectedReplicaAssignment))).all().get();
int leaderId = waitUtilLeaderIsKnown(cluster.brokers(), new TopicPartition(DEFAULT_TOPIC, 0));
TopicPartition newTopicPartition = new TopicPartition(DEFAULT_TOPIC, 1);
@ -190,7 +190,7 @@ public class DeleteTopicTest {
@ClusterTest
public void testAddPartitionDuringDeleteTopic(ClusterInstance cluster) throws Exception {
try (Admin admin = cluster.createAdminClient()) {
try (Admin admin = cluster.admin()) {
admin.createTopics(List.of(new NewTopic(DEFAULT_TOPIC, expectedReplicaAssignment))).all().get();
// partitions to be added to the topic later
TopicPartition newTopicPartition = new TopicPartition(DEFAULT_TOPIC, 1);
@ -204,7 +204,7 @@ public class DeleteTopicTest {
@ClusterTest
public void testRecreateTopicAfterDeletion(ClusterInstance cluster) throws Exception {
try (Admin admin = cluster.createAdminClient()) {
try (Admin admin = cluster.admin()) {
admin.createTopics(List.of(new NewTopic(DEFAULT_TOPIC, expectedReplicaAssignment))).all().get();
TopicPartition topicPartition = new TopicPartition(DEFAULT_TOPIC, 0);
admin.deleteTopics(List.of(DEFAULT_TOPIC)).all().get();
@ -216,7 +216,7 @@ public class DeleteTopicTest {
}
@ClusterTest
public void testDeleteNonExistingTopic(ClusterInstance cluster) throws Exception {
try (Admin admin = cluster.createAdminClient()) {
try (Admin admin = cluster.admin()) {
admin.createTopics(List.of(new NewTopic(DEFAULT_TOPIC, expectedReplicaAssignment))).all().get();
TopicPartition topicPartition = new TopicPartition(DEFAULT_TOPIC, 0);
String topic = "test2";
@ -243,7 +243,7 @@ public class DeleteTopicTest {
@ClusterConfigProperty(key = "log.cleaner.dedupe.buffer.size", value = "1048577")
})
public void testDeleteTopicWithCleaner(ClusterInstance cluster) throws Exception {
try (Admin admin = cluster.createAdminClient()) {
try (Admin admin = cluster.admin()) {
admin.createTopics(List.of(new NewTopic(DEFAULT_TOPIC, expectedReplicaAssignment))).all().get();
TopicPartition topicPartition = new TopicPartition(DEFAULT_TOPIC, 0);
// for simplicity, we are validating cleaner offsets on a single broker
@ -262,7 +262,7 @@ public class DeleteTopicTest {
@ClusterTest
public void testDeleteTopicAlreadyMarkedAsDeleted(ClusterInstance cluster) throws Exception {
try (Admin admin = cluster.createAdminClient()) {
try (Admin admin = cluster.admin()) {
admin.createTopics(List.of(new NewTopic(DEFAULT_TOPIC, expectedReplicaAssignment))).all().get();
admin.deleteTopics(List.of(DEFAULT_TOPIC)).all().get();
@ -282,7 +282,7 @@ public class DeleteTopicTest {
@ClusterTest(controllers = 1,
serverProperties = {@ClusterConfigProperty(key = ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, value = "false")})
public void testDisableDeleteTopic(ClusterInstance cluster) throws Exception {
try (Admin admin = cluster.createAdminClient()) {
try (Admin admin = cluster.admin()) {
admin.createTopics(List.of(new NewTopic(DEFAULT_TOPIC, expectedReplicaAssignment))).all().get();
TopicPartition topicPartition = new TopicPartition(DEFAULT_TOPIC, 0);
TestUtils.waitForCondition(() -> {

View File

@ -70,7 +70,7 @@ public class LogManagerIntegrationTest {
RaftClusterInvocationContext.RaftClusterInstance raftInstance =
(RaftClusterInvocationContext.RaftClusterInstance) cluster;
try (Admin admin = cluster.createAdminClient()) {
try (Admin admin = cluster.admin()) {
admin.createTopics(Collections.singletonList(new NewTopic("foo", 1, (short) 3))).all().get();
}
cluster.waitForTopic("foo", 1);
@ -82,7 +82,7 @@ public class LogManagerIntegrationTest {
assertTrue(partitionMetadataFile.isPresent());
raftInstance.getUnderlying().brokers().get(0).shutdown();
try (Admin admin = cluster.createAdminClient()) {
try (Admin admin = cluster.admin()) {
TestUtils.waitForCondition(() -> {
List<TopicPartitionInfo> partitionInfos = admin.describeTopics(Collections.singletonList("foo"))
.topicNameValues().get("foo").get().partitions();
@ -96,7 +96,7 @@ public class LogManagerIntegrationTest {
raftInstance.getUnderlying().brokers().get(0).startup();
// make sure there is no error during load logs
assertDoesNotThrow(() -> raftInstance.getUnderlying().fatalFaultHandler().maybeRethrowFirstException());
try (Admin admin = cluster.createAdminClient()) {
try (Admin admin = cluster.admin()) {
TestUtils.waitForCondition(() -> {
List<TopicPartitionInfo> partitionInfos = admin.describeTopics(Collections.singletonList("foo"))
.topicNameValues().get("foo").get().partitions();

View File

@ -289,7 +289,7 @@ class GroupCoordinatorIntegrationTest(cluster: ClusterInstance) {
}
private def withAdmin(f: Admin => Unit): Unit = {
val admin: Admin = cluster.createAdminClient()
val admin: Admin = cluster.admin()
try {
f(admin)
} finally {

View File

@ -40,42 +40,54 @@ class MetadataVersionIntegrationTest {
new ClusterTest(types = Array(Type.KRAFT), metadataVersion = MetadataVersion.IBP_3_4_IV0)
))
def testBasicMetadataVersionUpgrade(clusterInstance: ClusterInstance): Unit = {
val admin = clusterInstance.createAdminClient()
val describeResult = admin.describeFeatures()
val ff = describeResult.featureMetadata().get().finalizedFeatures().get(MetadataVersion.FEATURE_NAME)
assertEquals(ff.minVersionLevel(), clusterInstance.config().metadataVersion().featureLevel())
assertEquals(ff.maxVersionLevel(), clusterInstance.config().metadataVersion().featureLevel())
val admin = clusterInstance.admin()
try {
val describeResult = admin.describeFeatures()
val ff = describeResult.featureMetadata().get().finalizedFeatures().get(MetadataVersion.FEATURE_NAME)
assertEquals(ff.minVersionLevel(), clusterInstance.config().metadataVersion().featureLevel())
assertEquals(ff.maxVersionLevel(), clusterInstance.config().metadataVersion().featureLevel())
// Update to new version
val updateVersion = MetadataVersion.IBP_3_5_IV1.featureLevel.shortValue
val updateResult = admin.updateFeatures(
Map("metadata.version" -> new FeatureUpdate(updateVersion, UpgradeType.UPGRADE)).asJava, new UpdateFeaturesOptions())
updateResult.all().get()
// Update to new version
val updateVersion = MetadataVersion.IBP_3_5_IV1.featureLevel.shortValue
val updateResult = admin.updateFeatures(
Map("metadata.version" -> new FeatureUpdate(updateVersion, UpgradeType.UPGRADE)).asJava, new UpdateFeaturesOptions())
updateResult.all().get()
// Verify that new version is visible on broker
TestUtils.waitUntilTrue(() => {
val describeResult2 = admin.describeFeatures()
val ff2 = describeResult2.featureMetadata().get().finalizedFeatures().get(MetadataVersion.FEATURE_NAME)
ff2.minVersionLevel() == updateVersion && ff2.maxVersionLevel() == updateVersion
}, "Never saw metadata.version increase on broker")
// Verify that new version is visible on broker
TestUtils.waitUntilTrue(() => {
val describeResult2 = admin.describeFeatures()
val ff2 = describeResult2.featureMetadata().get().finalizedFeatures().get(MetadataVersion.FEATURE_NAME)
ff2.minVersionLevel() == updateVersion && ff2.maxVersionLevel() == updateVersion
}, "Never saw metadata.version increase on broker")
} finally {
admin.close()
}
}
@ClusterTest(types = Array(Type.KRAFT), metadataVersion = MetadataVersion.IBP_3_3_IV0)
def testUpgradeSameVersion(clusterInstance: ClusterInstance): Unit = {
val admin = clusterInstance.createAdminClient()
val updateVersion = MetadataVersion.IBP_3_3_IV0.featureLevel.shortValue
val updateResult = admin.updateFeatures(
Map("metadata.version" -> new FeatureUpdate(updateVersion, UpgradeType.UPGRADE)).asJava, new UpdateFeaturesOptions())
updateResult.all().get()
val admin = clusterInstance.admin()
try {
val updateVersion = MetadataVersion.IBP_3_3_IV0.featureLevel.shortValue
val updateResult = admin.updateFeatures(
Map("metadata.version" -> new FeatureUpdate(updateVersion, UpgradeType.UPGRADE)).asJava, new UpdateFeaturesOptions())
updateResult.all().get()
} finally {
admin.close()
}
}
@ClusterTest(types = Array(Type.KRAFT))
def testDefaultIsLatestVersion(clusterInstance: ClusterInstance): Unit = {
val admin = clusterInstance.createAdminClient()
val describeResult = admin.describeFeatures()
val ff = describeResult.featureMetadata().get().finalizedFeatures().get(MetadataVersion.FEATURE_NAME)
assertEquals(ff.minVersionLevel(), MetadataVersion.latestTesting().featureLevel(),
"If this test fails, check the default MetadataVersion in the @ClusterTest annotation")
assertEquals(ff.maxVersionLevel(), MetadataVersion.latestTesting().featureLevel())
val admin = clusterInstance.admin()
try {
val describeResult = admin.describeFeatures()
val ff = describeResult.featureMetadata().get().finalizedFeatures().get(MetadataVersion.FEATURE_NAME)
assertEquals(ff.minVersionLevel(), MetadataVersion.latestTesting().featureLevel(),
"If this test fails, check the default MetadataVersion in the @ClusterTest annotation")
assertEquals(ff.maxVersionLevel(), MetadataVersion.latestTesting().featureLevel())
} finally {
admin.close()
}
}
}

View File

@ -171,23 +171,28 @@ class ClientQuotasRequestTest(cluster: ClusterInstance) {
def testClientQuotasForScramUsers(): Unit = {
val userName = "user"
val results = cluster.createAdminClient().alterUserScramCredentials(util.Arrays.asList(
new UserScramCredentialUpsertion(userName, new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "password")))
results.all.get
val admin = cluster.admin()
try {
val results = admin.alterUserScramCredentials(util.Arrays.asList(
new UserScramCredentialUpsertion(userName, new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "password")))
results.all.get
val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> userName).asJava)
val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> userName).asJava)
verifyDescribeEntityQuotas(entity, Map.empty)
verifyDescribeEntityQuotas(entity, Map.empty)
alterEntityQuotas(entity, Map(
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(10000.0),
QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> Some(20000.0)
), validateOnly = false)
alterEntityQuotas(entity, Map(
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(10000.0),
QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> Some(20000.0)
), validateOnly = false)
verifyDescribeEntityQuotas(entity, Map(
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0,
QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0
))
verifyDescribeEntityQuotas(entity, Map(
QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0,
QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0
))
} finally {
admin.close()
}
}
@ClusterTest

View File

@ -83,130 +83,134 @@ class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCo
// in this test because it does not use FindCoordinator API.
createOffsetsTopic()
val admin = cluster.createAdminClient()
val topicId = TestUtils.createTopicWithAdminRaw(
admin = admin,
topic = "foo",
numPartitions = 3
)
val timeoutMs = 5 * 60 * 1000
val clientId = "client-id"
val clientHost = "/127.0.0.1"
val authorizedOperationsInt = Utils.to32BitField(
AclEntry.supportedOperations(ResourceType.GROUP).asScala
.map(_.code.asInstanceOf[JByte]).asJava)
// Add first group with one member.
var grp1Member1Response: ConsumerGroupHeartbeatResponseData = null
TestUtils.waitUntilTrue(() => {
grp1Member1Response = consumerGroupHeartbeat(
groupId = "grp-1",
memberId = Uuid.randomUuid().toString,
rebalanceTimeoutMs = timeoutMs,
subscribedTopicNames = List("bar"),
topicPartitions = List.empty
val admin = cluster.admin()
try {
val topicId = TestUtils.createTopicWithAdminRaw(
admin = admin,
topic = "foo",
numPartitions = 3
)
grp1Member1Response.errorCode == Errors.NONE.code
}, msg = s"Could not join the group successfully. Last response $grp1Member1Response.")
// Add second group with two members. For the first member, we
// wait until it receives an assignment. We use 'range` in this
// case to validate the assignor selection logic.
var grp2Member1Response: ConsumerGroupHeartbeatResponseData = null
TestUtils.waitUntilTrue(() => {
grp2Member1Response = consumerGroupHeartbeat(
memberId = "member-1",
val timeoutMs = 5 * 60 * 1000
val clientId = "client-id"
val clientHost = "/127.0.0.1"
val authorizedOperationsInt = Utils.to32BitField(
AclEntry.supportedOperations(ResourceType.GROUP).asScala
.map(_.code.asInstanceOf[JByte]).asJava)
// Add first group with one member.
var grp1Member1Response: ConsumerGroupHeartbeatResponseData = null
TestUtils.waitUntilTrue(() => {
grp1Member1Response = consumerGroupHeartbeat(
groupId = "grp-1",
memberId = Uuid.randomUuid().toString,
rebalanceTimeoutMs = timeoutMs,
subscribedTopicNames = List("bar"),
topicPartitions = List.empty
)
grp1Member1Response.errorCode == Errors.NONE.code
}, msg = s"Could not join the group successfully. Last response $grp1Member1Response.")
// Add second group with two members. For the first member, we
// wait until it receives an assignment. We use 'range` in this
// case to validate the assignor selection logic.
var grp2Member1Response: ConsumerGroupHeartbeatResponseData = null
TestUtils.waitUntilTrue(() => {
grp2Member1Response = consumerGroupHeartbeat(
memberId = "member-1",
groupId = "grp-2",
serverAssignor = "range",
rebalanceTimeoutMs = timeoutMs,
subscribedTopicNames = List("foo"),
topicPartitions = List.empty
)
grp2Member1Response.assignment != null && !grp2Member1Response.assignment.topicPartitions.isEmpty
}, msg = s"Could not join the group successfully. Last response $grp2Member1Response.")
val grp2Member2Response = consumerGroupHeartbeat(
memberId = "member-2",
groupId = "grp-2",
serverAssignor = "range",
rebalanceTimeoutMs = timeoutMs,
subscribedTopicNames = List("foo"),
topicPartitions = List.empty
)
grp2Member1Response.assignment != null && !grp2Member1Response.assignment.topicPartitions.isEmpty
}, msg = s"Could not join the group successfully. Last response $grp2Member1Response.")
val grp2Member2Response = consumerGroupHeartbeat(
memberId = "member-2",
groupId = "grp-2",
serverAssignor = "range",
rebalanceTimeoutMs = timeoutMs,
subscribedTopicNames = List("foo"),
topicPartitions = List.empty
)
for (version <- ApiKeys.CONSUMER_GROUP_DESCRIBE.oldestVersion() to ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)) {
val expected = List(
new DescribedGroup()
.setGroupId("grp-1")
.setGroupState(ConsumerGroupState.STABLE.toString)
.setGroupEpoch(1)
.setAssignmentEpoch(1)
.setAssignorName("uniform")
.setAuthorizedOperations(authorizedOperationsInt)
.setMembers(List(
new ConsumerGroupDescribeResponseData.Member()
.setMemberId(grp1Member1Response.memberId)
.setMemberEpoch(grp1Member1Response.memberEpoch)
.setClientId(clientId)
.setClientHost(clientHost)
.setSubscribedTopicRegex("")
.setSubscribedTopicNames(List("bar").asJava)
).asJava),
new DescribedGroup()
.setGroupId("grp-2")
.setGroupState(ConsumerGroupState.RECONCILING.toString)
.setGroupEpoch(grp2Member2Response.memberEpoch)
.setAssignmentEpoch(grp2Member2Response.memberEpoch)
.setAssignorName("range")
.setAuthorizedOperations(authorizedOperationsInt)
.setMembers(List(
new ConsumerGroupDescribeResponseData.Member()
.setMemberId(grp2Member2Response.memberId)
.setMemberEpoch(grp2Member2Response.memberEpoch)
.setClientId(clientId)
.setClientHost(clientHost)
.setSubscribedTopicRegex("")
.setSubscribedTopicNames(List("foo").asJava)
.setAssignment(new Assignment())
.setTargetAssignment(new Assignment()
.setTopicPartitions(List(
new TopicPartitions()
.setTopicId(topicId)
.setTopicName("foo")
.setPartitions(List[Integer](2).asJava)
).asJava)),
new ConsumerGroupDescribeResponseData.Member()
.setMemberId(grp2Member1Response.memberId)
.setMemberEpoch(grp2Member1Response.memberEpoch)
.setClientId(clientId)
.setClientHost(clientHost)
.setSubscribedTopicRegex("")
.setSubscribedTopicNames(List("foo").asJava)
.setAssignment(new Assignment()
.setTopicPartitions(List(
new TopicPartitions()
.setTopicId(topicId)
.setTopicName("foo")
.setPartitions(List[Integer](0, 1, 2).asJava)
).asJava))
.setTargetAssignment(new Assignment()
.setTopicPartitions(List(
new TopicPartitions()
.setTopicId(topicId)
.setTopicName("foo")
.setPartitions(List[Integer](0, 1).asJava)
).asJava)),
).asJava),
)
for (version <- ApiKeys.CONSUMER_GROUP_DESCRIBE.oldestVersion() to ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)) {
val expected = List(
new DescribedGroup()
.setGroupId("grp-1")
.setGroupState(ConsumerGroupState.STABLE.toString)
.setGroupEpoch(1)
.setAssignmentEpoch(1)
.setAssignorName("uniform")
.setAuthorizedOperations(authorizedOperationsInt)
.setMembers(List(
new ConsumerGroupDescribeResponseData.Member()
.setMemberId(grp1Member1Response.memberId)
.setMemberEpoch(grp1Member1Response.memberEpoch)
.setClientId(clientId)
.setClientHost(clientHost)
.setSubscribedTopicRegex("")
.setSubscribedTopicNames(List("bar").asJava)
).asJava),
new DescribedGroup()
.setGroupId("grp-2")
.setGroupState(ConsumerGroupState.RECONCILING.toString)
.setGroupEpoch(grp2Member2Response.memberEpoch)
.setAssignmentEpoch(grp2Member2Response.memberEpoch)
.setAssignorName("range")
.setAuthorizedOperations(authorizedOperationsInt)
.setMembers(List(
new ConsumerGroupDescribeResponseData.Member()
.setMemberId(grp2Member2Response.memberId)
.setMemberEpoch(grp2Member2Response.memberEpoch)
.setClientId(clientId)
.setClientHost(clientHost)
.setSubscribedTopicRegex("")
.setSubscribedTopicNames(List("foo").asJava)
.setAssignment(new Assignment())
.setTargetAssignment(new Assignment()
.setTopicPartitions(List(
new TopicPartitions()
.setTopicId(topicId)
.setTopicName("foo")
.setPartitions(List[Integer](2).asJava)
).asJava)),
new ConsumerGroupDescribeResponseData.Member()
.setMemberId(grp2Member1Response.memberId)
.setMemberEpoch(grp2Member1Response.memberEpoch)
.setClientId(clientId)
.setClientHost(clientHost)
.setSubscribedTopicRegex("")
.setSubscribedTopicNames(List("foo").asJava)
.setAssignment(new Assignment()
.setTopicPartitions(List(
new TopicPartitions()
.setTopicId(topicId)
.setTopicName("foo")
.setPartitions(List[Integer](0, 1, 2).asJava)
).asJava))
.setTargetAssignment(new Assignment()
.setTopicPartitions(List(
new TopicPartitions()
.setTopicId(topicId)
.setTopicName("foo")
.setPartitions(List[Integer](0, 1).asJava)
).asJava)),
).asJava),
)
val actual = consumerGroupDescribe(
groupIds = List("grp-1", "grp-2"),
includeAuthorizedOperations = true,
version = version.toShort,
)
val actual = consumerGroupDescribe(
groupIds = List("grp-1", "grp-2"),
includeAuthorizedOperations = true,
version = version.toShort,
)
assertEquals(expected, actual)
assertEquals(expected, actual)
}
} finally {
admin.close()
}
}
}

View File

@ -79,283 +79,299 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
@ClusterTest
def testConsumerGroupHeartbeatIsAccessibleWhenNewGroupCoordinatorIsEnabled(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val admin = cluster.createAdminClient()
val admin = cluster.admin()
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.values().asScala.toSeq,
controllers = raftCluster.controllers().values().asScala.toSeq
)
try {
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.values().asScala.toSeq,
controllers = raftCluster.controllers().values().asScala.toSeq
)
// Heartbeat request to join the group. Note that the member subscribes
// to an nonexistent topic.
var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(Uuid.randomUuid.toString)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicNames(List("foo").asJava)
.setTopicPartitions(List.empty.asJava),
true
).build()
// Heartbeat request to join the group. Note that the member subscribes
// to an nonexistent topic.
var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(Uuid.randomUuid.toString)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicNames(List("foo").asJava)
.setTopicPartitions(List.empty.asJava),
true
).build()
// Send the request until receiving a successful response. There is a delay
// here because the group coordinator is loaded in the background.
var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
}, msg = s"Could not join the group successfully. Last response $consumerGroupHeartbeatResponse.")
// Verify the response.
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), consumerGroupHeartbeatResponse.data.assignment)
// Create the topic.
val topicId = TestUtils.createTopicWithAdminRaw(
admin = admin,
topic = "foo",
numPartitions = 3
)
// Prepare the next heartbeat.
consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(consumerGroupHeartbeatResponse.data.memberId)
.setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch),
true
).build()
// This is the expected assignment.
val expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List(new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(topicId)
.setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
// Heartbeats until the partitions are assigned.
consumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
}, msg = s"Could not get partitions assigned. Last response $consumerGroupHeartbeatResponse.")
// Verify the response.
assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment)
// Leave the group.
consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(consumerGroupHeartbeatResponse.data.memberId)
.setMemberEpoch(-1),
true
).build()
// Send the request until receiving a successful response. There is a delay
// here because the group coordinator is loaded in the background.
var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
}, msg = s"Could not join the group successfully. Last response $consumerGroupHeartbeatResponse.")
// Verify the response.
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), consumerGroupHeartbeatResponse.data.assignment)
// Create the topic.
val topicId = TestUtils.createTopicWithAdminRaw(
admin = admin,
topic = "foo",
numPartitions = 3
)
// Prepare the next heartbeat.
consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(consumerGroupHeartbeatResponse.data.memberId)
.setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch),
true
).build()
// This is the expected assignment.
val expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List(new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(topicId)
.setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
// Heartbeats until the partitions are assigned.
consumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
}, msg = s"Could not get partitions assigned. Last response $consumerGroupHeartbeatResponse.")
// Verify the response.
assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment)
// Leave the group.
consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(consumerGroupHeartbeatResponse.data.memberId)
.setMemberEpoch(-1),
true
).build()
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
// Verify the response.
assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch)
// Verify the response.
assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch)
} finally {
admin.close()
}
}
@ClusterTest
def testConsumerGroupHeartbeatWithRegularExpression(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val admin = cluster.createAdminClient()
val admin = cluster.admin()
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.values().asScala.toSeq,
controllers = raftCluster.controllers().values().asScala.toSeq
)
try {
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.values().asScala.toSeq,
controllers = raftCluster.controllers().values().asScala.toSeq
)
// Heartbeat request to join the group. Note that the member subscribes
// to an nonexistent topic.
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(Uuid.randomUuid().toString)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicRegex("foo")
.setTopicPartitions(List.empty.asJava),
true
).build()
// Heartbeat request to join the group. Note that the member subscribes
// to an nonexistent topic.
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(Uuid.randomUuid().toString)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicRegex("foo")
.setTopicPartitions(List.empty.asJava),
true
).build()
// Send the request until receiving a successful response. There is a delay
// here because the group coordinator is loaded in the background.
var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
}, msg = s"Could not join the group successfully. Last response $consumerGroupHeartbeatResponse.")
// Send the request until receiving a successful response. There is a delay
// here because the group coordinator is loaded in the background.
var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
}, msg = s"Could not join the group successfully. Last response $consumerGroupHeartbeatResponse.")
// Verify the response.
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), consumerGroupHeartbeatResponse.data.assignment)
// Verify the response.
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), consumerGroupHeartbeatResponse.data.assignment)
} finally {
admin.close()
}
}
@ClusterTest
def testConsumerGroupHeartbeatWithInvalidRegularExpression(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val admin = cluster.createAdminClient()
val admin = cluster.admin()
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.values().asScala.toSeq,
controllers = raftCluster.controllers().values().asScala.toSeq
)
try {
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.values().asScala.toSeq,
controllers = raftCluster.controllers().values().asScala.toSeq
)
// Heartbeat request to join the group. Note that the member subscribes
// to an nonexistent topic.
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(Uuid.randomUuid().toString)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicRegex("[")
.setTopicPartitions(List.empty.asJava),
true
).build()
// Heartbeat request to join the group. Note that the member subscribes
// to an nonexistent topic.
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(Uuid.randomUuid().toString)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicRegex("[")
.setTopicPartitions(List.empty.asJava),
true
).build()
// Send the request until receiving a successful response. There is a delay
// here because the group coordinator is loaded in the background.
var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.INVALID_REGULAR_EXPRESSION.code
}, msg = s"Did not receive the expected error. Last response $consumerGroupHeartbeatResponse.")
// Send the request until receiving a successful response. There is a delay
// here because the group coordinator is loaded in the background.
var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.INVALID_REGULAR_EXPRESSION.code
}, msg = s"Did not receive the expected error. Last response $consumerGroupHeartbeatResponse.")
// Verify the response.
assertEquals(Errors.INVALID_REGULAR_EXPRESSION.code, consumerGroupHeartbeatResponse.data.errorCode)
// Verify the response.
assertEquals(Errors.INVALID_REGULAR_EXPRESSION.code, consumerGroupHeartbeatResponse.data.errorCode)
} finally {
admin.close()
}
}
@ClusterTest
def testRejoiningStaticMemberGetsAssignmentsBackWhenNewGroupCoordinatorIsEnabled(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val admin = cluster.createAdminClient()
val instanceId = "instanceId"
val admin = cluster.admin()
try {
val instanceId = "instanceId"
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.values().asScala.toSeq,
controllers = raftCluster.controllers().values().asScala.toSeq
)
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.values().asScala.toSeq,
controllers = raftCluster.controllers().values().asScala.toSeq
)
// Heartbeat request so that a static member joins the group
var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(Uuid.randomUuid.toString)
.setInstanceId(instanceId)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicNames(List("foo").asJava)
.setTopicPartitions(List.empty.asJava),
true
).build()
// Heartbeat request so that a static member joins the group
var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(Uuid.randomUuid.toString)
.setInstanceId(instanceId)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicNames(List("foo").asJava)
.setTopicPartitions(List.empty.asJava),
true
).build()
// Send the request until receiving a successful response. There is a delay
// here because the group coordinator is loaded in the background.
var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
}, msg = s"Static member could not join the group successfully. Last response $consumerGroupHeartbeatResponse.")
// Verify the response.
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), consumerGroupHeartbeatResponse.data.assignment)
// Create the topic.
val topicId = TestUtils.createTopicWithAdminRaw(
admin = admin,
topic = "foo",
numPartitions = 3
)
// Prepare the next heartbeat.
consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setInstanceId(instanceId)
.setMemberId(consumerGroupHeartbeatResponse.data.memberId)
.setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch),
true
).build()
// This is the expected assignment.
val expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List(new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(topicId)
.setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
// Heartbeats until the partitions are assigned.
consumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
}, msg = s"Static member could not get partitions assigned. Last response $consumerGroupHeartbeatResponse.")
// Verify the response.
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment)
val oldMemberId = consumerGroupHeartbeatResponse.data.memberId
// Leave the group temporarily
consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setInstanceId(instanceId)
.setMemberId(consumerGroupHeartbeatResponse.data.memberId)
.setMemberEpoch(-2),
true
).build()
// Send the request until receiving a successful response. There is a delay
// here because the group coordinator is loaded in the background.
var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
}, msg = s"Static member could not join the group successfully. Last response $consumerGroupHeartbeatResponse.")
// Verify the response.
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), consumerGroupHeartbeatResponse.data.assignment)
// Verify the response.
assertEquals(-2, consumerGroupHeartbeatResponse.data.memberEpoch)
// Create the topic.
val topicId = TestUtils.createTopicWithAdminRaw(
admin = admin,
topic = "foo",
numPartitions = 3
)
// Another static member replaces the above member. It gets the same assignments back
consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(Uuid.randomUuid.toString)
.setInstanceId(instanceId)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicNames(List("foo").asJava)
.setTopicPartitions(List.empty.asJava),
true
).build()
// Prepare the next heartbeat.
consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setInstanceId(instanceId)
.setMemberId(consumerGroupHeartbeatResponse.data.memberId)
.setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch),
true
).build()
// This is the expected assignment.
val expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List(new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(topicId)
.setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
// Heartbeats until the partitions are assigned.
consumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
}, msg = s"Static member could not get partitions assigned. Last response $consumerGroupHeartbeatResponse.")
// Verify the response.
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment)
val oldMemberId = consumerGroupHeartbeatResponse.data.memberId
// Leave the group temporarily
consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setInstanceId(instanceId)
.setMemberId(consumerGroupHeartbeatResponse.data.memberId)
.setMemberEpoch(-2),
true
).build()
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
// Verify the response.
assertEquals(-2, consumerGroupHeartbeatResponse.data.memberEpoch)
// Another static member replaces the above member. It gets the same assignments back
consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(Uuid.randomUuid.toString)
.setInstanceId(instanceId)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicNames(List("foo").asJava)
.setTopicPartitions(List.empty.asJava),
true
).build()
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
// Verify the response.
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment)
// The 2 member IDs should be different
assertNotEquals(oldMemberId, consumerGroupHeartbeatResponse.data.memberId)
// Verify the response.
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment)
// The 2 member IDs should be different
assertNotEquals(oldMemberId, consumerGroupHeartbeatResponse.data.memberId)
} finally {
admin.close()
}
}
@ClusterTest(
@ -366,108 +382,112 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
)
def testStaticMemberRemovedAfterSessionTimeoutExpiryWhenNewGroupCoordinatorIsEnabled(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val admin = cluster.createAdminClient()
val instanceId = "instanceId"
val admin = cluster.admin()
try {
val instanceId = "instanceId"
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.values().asScala.toSeq,
controllers = raftCluster.controllers().values().asScala.toSeq
)
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.values().asScala.toSeq,
controllers = raftCluster.controllers().values().asScala.toSeq
)
// Heartbeat request to join the group. Note that the member subscribes
// to an nonexistent topic.
var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(Uuid.randomUuid.toString)
.setInstanceId(instanceId)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicNames(List("foo").asJava)
.setTopicPartitions(List.empty.asJava),
true
).build()
// Heartbeat request to join the group. Note that the member subscribes
// to an nonexistent topic.
var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(Uuid.randomUuid.toString)
.setInstanceId(instanceId)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicNames(List("foo").asJava)
.setTopicPartitions(List.empty.asJava),
true
).build()
// Send the request until receiving a successful response. There is a delay
// here because the group coordinator is loaded in the background.
var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
// Send the request until receiving a successful response. There is a delay
// here because the group coordinator is loaded in the background.
var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
}, msg = s"Could not join the group successfully. Last response $consumerGroupHeartbeatResponse.")
// Verify the response.
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), consumerGroupHeartbeatResponse.data.assignment)
// Create the topic.
val topicId = TestUtils.createTopicWithAdminRaw(
admin = admin,
topic = "foo",
numPartitions = 3
)
// Prepare the next heartbeat.
consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setInstanceId(instanceId)
.setMemberId(consumerGroupHeartbeatResponse.data.memberId)
.setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch),
true
).build()
// This is the expected assignment.
val expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List(new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(topicId)
.setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
// Heartbeats until the partitions are assigned.
consumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
}, msg = s"Could not get partitions assigned. Last response $consumerGroupHeartbeatResponse.")
// Verify the response.
assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment)
// A new static member tries to join the group with an inuse instanceid.
consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(Uuid.randomUuid.toString)
.setInstanceId(instanceId)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicNames(List("foo").asJava)
.setTopicPartitions(List.empty.asJava),
true
).build()
// Validating that trying to join with an in-use instanceId would throw an UnreleasedInstanceIdException.
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
}, msg = s"Could not join the group successfully. Last response $consumerGroupHeartbeatResponse.")
assertEquals(Errors.UNRELEASED_INSTANCE_ID.code, consumerGroupHeartbeatResponse.data.errorCode)
// Verify the response.
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), consumerGroupHeartbeatResponse.data.assignment)
// The new static member join group will keep failing with an UnreleasedInstanceIdException
// until eventually it gets through because the existing member will be kicked out
// because of not sending a heartbeat till session timeout expiry.
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
}, msg = s"Could not re-join the group successfully. Last response $consumerGroupHeartbeatResponse.")
// Create the topic.
val topicId = TestUtils.createTopicWithAdminRaw(
admin = admin,
topic = "foo",
numPartitions = 3
)
// Prepare the next heartbeat.
consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setInstanceId(instanceId)
.setMemberId(consumerGroupHeartbeatResponse.data.memberId)
.setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch),
true
).build()
// This is the expected assignment.
val expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List(new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(topicId)
.setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
// Heartbeats until the partitions are assigned.
consumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
}, msg = s"Could not get partitions assigned. Last response $consumerGroupHeartbeatResponse.")
// Verify the response.
assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment)
// A new static member tries to join the group with an inuse instanceid.
consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(Uuid.randomUuid.toString)
.setInstanceId(instanceId)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicNames(List("foo").asJava)
.setTopicPartitions(List.empty.asJava),
true
).build()
// Validating that trying to join with an in-use instanceId would throw an UnreleasedInstanceIdException.
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
assertEquals(Errors.UNRELEASED_INSTANCE_ID.code, consumerGroupHeartbeatResponse.data.errorCode)
// The new static member join group will keep failing with an UnreleasedInstanceIdException
// until eventually it gets through because the existing member will be kicked out
// because of not sending a heartbeat till session timeout expiry.
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
}, msg = s"Could not re-join the group successfully. Last response $consumerGroupHeartbeatResponse.")
// Verify the response. The group epoch bumps upto 4 which eventually reflects in the new member epoch.
assertEquals(4, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment)
// Verify the response. The group epoch bumps upto 4 which eventually reflects in the new member epoch.
assertEquals(4, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment)
} finally {
admin.close()
}
}
@ClusterTest(
@ -477,106 +497,114 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
)
def testUpdateConsumerGroupHeartbeatConfigSuccessful(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val admin = cluster.createAdminClient()
val newHeartbeatIntervalMs = 10000
val instanceId = "instanceId"
val consumerGroupId = "grp"
val admin = cluster.admin()
try {
val newHeartbeatIntervalMs = 10000
val instanceId = "instanceId"
val consumerGroupId = "grp"
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.values().asScala.toSeq,
controllers = raftCluster.controllers().values().asScala.toSeq
)
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.values().asScala.toSeq,
controllers = raftCluster.controllers().values().asScala.toSeq
)
// Heartbeat request to join the group. Note that the member subscribes
// to an nonexistent topic.
var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(consumerGroupId)
.setMemberId(Uuid.randomUuid.toString)
.setInstanceId(instanceId)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicNames(List("foo").asJava)
.setTopicPartitions(List.empty.asJava),
true
).build()
// Heartbeat request to join the group. Note that the member subscribes
// to an nonexistent topic.
var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(consumerGroupId)
.setMemberId(Uuid.randomUuid.toString)
.setInstanceId(instanceId)
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicNames(List("foo").asJava)
.setTopicPartitions(List.empty.asJava),
true
).build()
// Send the request until receiving a successful response. There is a delay
// here because the group coordinator is loaded in the background.
var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
}, msg = s"Could not join the group successfully. Last response $consumerGroupHeartbeatResponse.")
// Send the request until receiving a successful response. There is a delay
// here because the group coordinator is loaded in the background.
var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
}, msg = s"Could not join the group successfully. Last response $consumerGroupHeartbeatResponse.")
// Verify the response.
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(5000, consumerGroupHeartbeatResponse.data.heartbeatIntervalMs)
// Verify the response.
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(5000, consumerGroupHeartbeatResponse.data.heartbeatIntervalMs)
// Alter consumer heartbeat interval config
val resource = new ConfigResource(ConfigResource.Type.GROUP, consumerGroupId)
val op = new AlterConfigOp(
new ConfigEntry(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, newHeartbeatIntervalMs.toString),
OpType.SET
)
admin.incrementalAlterConfigs(Map(resource -> List(op).asJavaCollection).asJava).all.get
// Alter consumer heartbeat interval config
val resource = new ConfigResource(ConfigResource.Type.GROUP, consumerGroupId)
val op = new AlterConfigOp(
new ConfigEntry(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, newHeartbeatIntervalMs.toString),
OpType.SET
)
admin.incrementalAlterConfigs(Map(resource -> List(op).asJavaCollection).asJava).all.get
// Prepare the next heartbeat.
consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(consumerGroupId)
.setInstanceId(instanceId)
.setMemberId(consumerGroupHeartbeatResponse.data.memberId)
.setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch),
true
).build()
// Prepare the next heartbeat.
consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(consumerGroupId)
.setInstanceId(instanceId)
.setMemberId(consumerGroupHeartbeatResponse.data.memberId)
.setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch),
true
).build()
// Verify the response. The heartbeat interval was updated.
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
newHeartbeatIntervalMs == consumerGroupHeartbeatResponse.data.heartbeatIntervalMs
}, msg = s"Dynamic update consumer group config failed. Last response $consumerGroupHeartbeatResponse.")
// Verify the response. The heartbeat interval was updated.
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
newHeartbeatIntervalMs == consumerGroupHeartbeatResponse.data.heartbeatIntervalMs
}, msg = s"Dynamic update consumer group config failed. Last response $consumerGroupHeartbeatResponse.")
} finally {
admin.close()
}
}
@ClusterTest
def testConsumerGroupHeartbeatFailureIfMemberIdMissingForVersionsAbove0(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val admin = cluster.createAdminClient()
val admin = cluster.admin()
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.values().asScala.toSeq,
controllers = raftCluster.controllers().values().asScala.toSeq
)
try {
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.values().asScala.toSeq,
controllers = raftCluster.controllers().values().asScala.toSeq
)
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicNames(List("foo").asJava)
.setTopicPartitions(List.empty.asJava),
true
).build()
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicNames(List("foo").asJava)
.setTopicPartitions(List.empty.asJava),
true
).build()
var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.INVALID_REQUEST.code
}, msg = "Should fail due to invalid member id.")
var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.INVALID_REQUEST.code
}, msg = "Should fail due to invalid member id.")
} finally {
admin.close()
}
}
@ClusterTest
def testMemberIdGeneratedOnServerWhenApiVersionIs0(): Unit = {
val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
val admin = cluster.createAdminClient()
val admin = cluster.admin()
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
@ -605,6 +633,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
val memberId = consumerGroupHeartbeatResponse.data().memberId()
assertNotNull(memberId)
assertFalse(memberId.isEmpty)
admin.close()
}
private def connectAndReceive(request: ConsumerGroupHeartbeatRequest): ConsumerGroupHeartbeatResponse = {

View File

@ -47,24 +47,34 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
protected var producer: KafkaProducer[String, String] = _
protected def createOffsetsTopic(): Unit = {
TestUtils.createOffsetsTopicWithAdmin(
admin = cluster.createAdminClient(),
brokers = brokers(),
controllers = controllerServers()
)
val admin = cluster.admin()
try {
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = brokers(),
controllers = controllerServers()
)
} finally {
admin.close()
}
}
protected def createTopic(
topic: String,
numPartitions: Int
): Unit = {
TestUtils.createTopicWithAdmin(
admin = cluster.createAdminClient(),
brokers = brokers(),
controllers = controllerServers(),
topic = topic,
numPartitions = numPartitions
)
val admin = cluster.admin()
try {
TestUtils.createTopicWithAdmin(
admin = admin,
brokers = brokers(),
controllers = controllerServers(),
topic = topic,
numPartitions = numPartitions
)
} finally {
admin.close()
}
}
protected def createTopicAndReturnLeaders(
@ -73,16 +83,21 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
replicationFactor: Int = 1,
topicConfig: Properties = new Properties
): Map[TopicIdPartition, Int] = {
val partitionToLeader = TestUtils.createTopicWithAdmin(
admin = cluster.createAdminClient(),
topic = topic,
brokers = brokers(),
controllers = controllerServers(),
numPartitions = numPartitions,
replicationFactor = replicationFactor,
topicConfig = topicConfig
)
partitionToLeader.map { case (partition, leader) => new TopicIdPartition(getTopicIds(topic), new TopicPartition(topic, partition)) -> leader }
val admin = cluster.admin()
try {
val partitionToLeader = TestUtils.createTopicWithAdmin(
admin = admin,
topic = topic,
brokers = brokers(),
controllers = controllerServers(),
numPartitions = numPartitions,
replicationFactor = replicationFactor,
topicConfig = topicConfig
)
partitionToLeader.map { case (partition, leader) => new TopicIdPartition(getTopicIds(topic), new TopicPartition(topic, partition)) -> leader }
} finally {
admin.close()
}
}
protected def isUnstableApiEnabled: Boolean = {

View File

@ -86,55 +86,59 @@ class ShareGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCoord
// in this test because it does not use FindCoordinator API.
createOffsetsTopic()
val admin = cluster.createAdminClient()
TestUtils.createTopicWithAdminRaw(
admin = admin,
topic = "foo",
numPartitions = 3
)
val clientId = "client-id"
val clientHost = "/127.0.0.1"
val authorizedOperationsInt = Utils.to32BitField(
AclEntry.supportedOperations(ResourceType.GROUP).asScala
.map(_.code.asInstanceOf[JByte]).asJava)
// Add first group with one member.
var grp1Member1Response: ShareGroupHeartbeatResponseData = null
TestUtils.waitUntilTrue(() => {
grp1Member1Response = shareGroupHeartbeat(
groupId = "grp-1",
subscribedTopicNames = List("bar"),
)
grp1Member1Response.errorCode == Errors.NONE.code
}, msg = s"Could not join the group successfully. Last response $grp1Member1Response.")
for (version <- ApiKeys.SHARE_GROUP_DESCRIBE.oldestVersion() to ApiKeys.SHARE_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)) {
val expected = List(
new DescribedGroup()
.setGroupId("grp-1")
.setGroupState(ShareGroupState.STABLE.toString)
.setGroupEpoch(1)
.setAssignmentEpoch(1)
.setAssignorName("simple")
.setAuthorizedOperations(authorizedOperationsInt)
.setMembers(List(
new ShareGroupDescribeResponseData.Member()
.setMemberId(grp1Member1Response.memberId)
.setMemberEpoch(grp1Member1Response.memberEpoch)
.setClientId(clientId)
.setClientHost(clientHost)
.setSubscribedTopicNames(List("bar").asJava)
).asJava),
val admin = cluster.admin()
try {
TestUtils.createTopicWithAdminRaw(
admin = admin,
topic = "foo",
numPartitions = 3
)
val actual = shareGroupDescribe(
groupIds = List("grp-1"),
includeAuthorizedOperations = true,
version = version.toShort,
)
val clientId = "client-id"
val clientHost = "/127.0.0.1"
val authorizedOperationsInt = Utils.to32BitField(
AclEntry.supportedOperations(ResourceType.GROUP).asScala
.map(_.code.asInstanceOf[JByte]).asJava)
assertEquals(expected, actual)
// Add first group with one member.
var grp1Member1Response: ShareGroupHeartbeatResponseData = null
TestUtils.waitUntilTrue(() => {
grp1Member1Response = shareGroupHeartbeat(
groupId = "grp-1",
subscribedTopicNames = List("bar"),
)
grp1Member1Response.errorCode == Errors.NONE.code
}, msg = s"Could not join the group successfully. Last response $grp1Member1Response.")
for (version <- ApiKeys.SHARE_GROUP_DESCRIBE.oldestVersion() to ApiKeys.SHARE_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)) {
val expected = List(
new DescribedGroup()
.setGroupId("grp-1")
.setGroupState(ShareGroupState.STABLE.toString)
.setGroupEpoch(1)
.setAssignmentEpoch(1)
.setAssignorName("simple")
.setAuthorizedOperations(authorizedOperationsInt)
.setMembers(List(
new ShareGroupDescribeResponseData.Member()
.setMemberId(grp1Member1Response.memberId)
.setMemberEpoch(grp1Member1Response.memberEpoch)
.setClientId(clientId)
.setClientHost(clientHost)
.setSubscribedTopicNames(List("bar").asJava)
).asJava),
)
val actual = shareGroupDescribe(
groupIds = List("grp-1"),
includeAuthorizedOperations = true,
version = version.toShort,
)
assertEquals(expected, actual)
}
} finally {
admin.close()
}
}
}

View File

@ -288,13 +288,17 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging {
private def createTopic(topic: String, partitionReplicaAssignment: collection.Map[Int, Seq[Int]]): Unit = {
Using(createAdminClient(brokers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))) { admin =>
TestUtils.createTopicWithAdmin(
admin = admin,
topic = topic,
replicaAssignment = partitionReplicaAssignment,
brokers = brokers,
controllers = controllerServers
)
try {
TestUtils.createTopicWithAdmin(
admin = admin,
topic = topic,
replicaAssignment = partitionReplicaAssignment,
brokers = brokers,
controllers = controllerServers
)
} finally {
admin.close()
}
}
}

View File

@ -62,7 +62,7 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest {
// Create topics.
String leaderTopic = "new-leader";
String followerTopic = "new-follower";
try (Admin admin = clusterInstance.createAdminClient()) {
try (Admin admin = clusterInstance.admin()) {
// Set broker id 0 as the first entry which is taken as the leader.
NewTopic newLeaderTopic = new NewTopic(leaderTopic, Collections.singletonMap(0, Arrays.asList(0, 1, 2)));
// Set broker id 1 as the first entry which is taken as the leader.

View File

@ -80,7 +80,7 @@ public class TopicBasedRemoteLogMetadataManagerTest {
@ClusterTest
public void testDoesTopicExist() throws ExecutionException, InterruptedException {
try (Admin admin = clusterInstance.createAdminClient()) {
try (Admin admin = clusterInstance.admin()) {
String topic = "test-topic-exist";
admin.createTopics(Collections.singletonList(new NewTopic(topic, 1, (short) 1))).all().get();
clusterInstance.waitForTopic(topic, 1);
@ -91,7 +91,7 @@ public class TopicBasedRemoteLogMetadataManagerTest {
@ClusterTest
public void testTopicDoesNotExist() {
try (Admin admin = clusterInstance.createAdminClient()) {
try (Admin admin = clusterInstance.admin()) {
String topic = "dummy-test-topic";
boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin, topic);
assertFalse(doesTopicExist);
@ -110,7 +110,7 @@ public class TopicBasedRemoteLogMetadataManagerTest {
// Create topics.
String leaderTopic = "new-leader";
String followerTopic = "new-follower";
try (Admin admin = clusterInstance.createAdminClient()) {
try (Admin admin = clusterInstance.admin()) {
// Set broker id 0 as the first entry which is taken as the leader.
admin.createTopics(Collections.singletonList(new NewTopic(leaderTopic, Collections.singletonMap(0, Arrays.asList(0, 1, 2))))).all().get();
clusterInstance.waitForTopic(leaderTopic, 1);

View File

@ -32,6 +32,7 @@ import java.io.IOException;
import java.nio.file.Files;
import java.util.Collections;
import java.util.Optional;
import java.util.Random;
import java.util.function.BiFunction;
import java.util.function.Supplier;
@ -43,6 +44,13 @@ import static java.lang.String.format;
public class TestUtils {
private static final Logger log = LoggerFactory.getLogger(TestUtils.class);
/* A consistent random number generator to make tests repeatable */
public static final Random SEEDED_RANDOM = new Random(192348092834L);
public static final String LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
public static final String DIGITS = "0123456789";
public static final String LETTERS_AND_DIGITS = LETTERS + DIGITS;
private static final long DEFAULT_POLL_INTERVAL_MS = 100;
private static final long DEFAULT_MAX_WAIT_MS = 15_000;
private static final long DEFAULT_TIMEOUT_MS = 60_000;
@ -57,6 +65,19 @@ public class TestUtils {
return file;
}
/**
* Generate a random string of letters and digits of the given length
*
* @param len The length of the string
* @return The random string
*/
public static String randomString(final int len) {
final StringBuilder b = new StringBuilder();
for (int i = 0; i < len; i++)
b.append(LETTERS_AND_DIGITS.charAt(SEEDED_RANDOM.nextInt(LETTERS_AND_DIGITS.length())));
return b.toString();
}
/**
* Create a temporary relative directory in the specified parent directory with the given prefix.
*

View File

@ -23,13 +23,23 @@ import kafka.server.BrokerServer;
import kafka.server.ControllerServer;
import kafka.server.KafkaBroker;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile;
@ -40,12 +50,12 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@ -157,10 +167,52 @@ public interface ClusterInstance {
return asClass.cast(getUnderlying());
}
Admin createAdminClient(Properties configOverrides);
//---------------------------[producer/consumer/admin]---------------------------//
default Admin createAdminClient() {
return createAdminClient(new Properties());
default <K, V> Producer<K, V> producer(Map<String, Object> configs) {
Map<String, Object> props = new HashMap<>(configs);
props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, BytesSerializer.class.getName());
props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, BytesSerializer.class.getName());
props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
return new KafkaProducer<>(props);
}
default <K, V> Producer<K, V> producer() {
return new KafkaProducer<>(Map.of());
}
default <K, V> Consumer<K, V> consumer(Map<String, Object> configs) {
Map<String, Object> props = new HashMap<>(configs);
props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group_" + TestUtils.randomString(5));
props.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
return new KafkaConsumer<>(props);
}
default <K, V> Consumer<K, V> consumer() {
return new KafkaConsumer<>(Map.of());
}
default Admin admin(Map<String, Object> configs, boolean usingBootstrapControllers) {
Map<String, Object> props = new HashMap<>(configs);
if (usingBootstrapControllers) {
props.putIfAbsent(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, bootstrapControllers());
props.remove(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
} else {
props.putIfAbsent(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
props.remove(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG);
}
return Admin.create(props);
}
default Admin admin(Map<String, Object> configs) {
return admin(configs, false);
}
default Admin admin() {
return admin(Map.of(), false);
}
default Set<GroupProtocol> supportedGroupProtocols() {
@ -188,7 +240,7 @@ public interface ClusterInstance {
}
default void createTopic(String topicName, int partitions, short replicas) throws InterruptedException {
try (Admin admin = createAdminClient()) {
try (Admin admin = admin()) {
admin.createTopics(Collections.singletonList(new NewTopic(topicName, partitions, replicas)));
waitForTopic(topicName, partitions);
}

View File

@ -21,7 +21,6 @@ import kafka.server.BrokerServer;
import kafka.server.ControllerServer;
import kafka.server.KafkaBroker;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
@ -45,10 +44,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@ -106,7 +103,6 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
final AtomicBoolean started = new AtomicBoolean(false);
final AtomicBoolean stopped = new AtomicBoolean(false);
final AtomicBoolean formated = new AtomicBoolean(false);
private final ConcurrentLinkedQueue<Admin> admins = new ConcurrentLinkedQueue<>();
private KafkaClusterTestKit clusterTestKit;
private final boolean isCombined;
private final ListenerName listenerName;
@ -174,13 +170,6 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
return clusterTestKit;
}
@Override
public Admin createAdminClient(Properties configOverrides) {
Admin admin = Admin.create(clusterTestKit.newClientPropertiesBuilder(configOverrides).build());
admins.add(admin);
return admin;
}
@Override
public void start() {
try {
@ -200,8 +189,6 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
@Override
public void stop() {
if (stopped.compareAndSet(false, true)) {
admins.forEach(admin -> Utils.closeQuietly(admin, "admin"));
admins.clear();
Utils.closeQuietly(clusterTestKit, "cluster");
}
}

View File

@ -22,14 +22,23 @@ import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.DescribeLogDirsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.extension.ExtendWith;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@ -39,10 +48,19 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@ClusterTestDefaults(types = {Type.KRAFT}, serverProperties = {
@ClusterConfigProperty(key = "default.key", value = "default.value"),
@ -61,10 +79,10 @@ public class ClusterTestExtensionsTest {
static List<ClusterConfig> generate1() {
Map<String, String> serverProperties = new HashMap<>();
serverProperties.put("foo", "bar");
return Collections.singletonList(ClusterConfig.defaultBuilder()
.setTypes(Collections.singleton(Type.KRAFT))
return singletonList(ClusterConfig.defaultBuilder()
.setTypes(singleton(Type.KRAFT))
.setServerProperties(serverProperties)
.setTags(Collections.singletonList("Generated Test"))
.setTags(singletonList("Generated Test"))
.build());
}
@ -72,17 +90,17 @@ public class ClusterTestExtensionsTest {
@ClusterTest
public void testClusterTest(ClusterInstance clusterInstance) {
Assertions.assertSame(this.clusterInstance, clusterInstance, "Injected objects should be the same");
Assertions.assertEquals(Type.KRAFT, clusterInstance.type()); // From the class level default
Assertions.assertEquals("default.value", clusterInstance.config().serverProperties().get("default.key"));
assertEquals(Type.KRAFT, clusterInstance.type()); // From the class level default
assertEquals("default.value", clusterInstance.config().serverProperties().get("default.key"));
}
// generate1 is a template method which generates any number of cluster configs
@ClusterTemplate("generate1")
public void testClusterTemplate() {
Assertions.assertEquals(Type.KRAFT, clusterInstance.type(),
assertEquals(Type.KRAFT, clusterInstance.type(),
"generate1 provided a KRAFT cluster, so we should see that here");
Assertions.assertEquals("bar", clusterInstance.config().serverProperties().get("foo"));
Assertions.assertEquals(Collections.singletonList("Generated Test"), clusterInstance.config().tags());
assertEquals("bar", clusterInstance.config().serverProperties().get("foo"));
assertEquals(singletonList("Generated Test"), clusterInstance.config().tags());
}
// Multiple @ClusterTest can be used with @ClusterTests
@ -110,27 +128,27 @@ public class ClusterTestExtensionsTest {
})
})
public void testClusterTests() throws ExecutionException, InterruptedException {
Assertions.assertEquals("baz", clusterInstance.config().serverProperties().get("foo"));
Assertions.assertEquals("eggs", clusterInstance.config().serverProperties().get("spam"));
Assertions.assertEquals("overwrite.value", clusterInstance.config().serverProperties().get("default.key"));
Assertions.assertEquals(Arrays.asList("default.display.key1", "default.display.key2"), clusterInstance.config().tags());
assertEquals("baz", clusterInstance.config().serverProperties().get("foo"));
assertEquals("eggs", clusterInstance.config().serverProperties().get("spam"));
assertEquals("overwrite.value", clusterInstance.config().serverProperties().get("default.key"));
assertEquals(Arrays.asList("default.display.key1", "default.display.key2"), clusterInstance.config().tags());
// assert broker server 0 contains property queued.max.requests 200 from ClusterTest which overrides
// the value 100 in server property in ClusterTestDefaults
try (Admin admin = clusterInstance.createAdminClient()) {
try (Admin admin = clusterInstance.admin()) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "0");
Map<ConfigResource, Config> configs = admin.describeConfigs(Collections.singletonList(configResource)).all().get();
Assertions.assertEquals(1, configs.size());
Assertions.assertEquals("200", configs.get(configResource).get("queued.max.requests").value());
Map<ConfigResource, Config> configs = admin.describeConfigs(singletonList(configResource)).all().get();
assertEquals(1, configs.size());
assertEquals("200", configs.get(configResource).get("queued.max.requests").value());
}
// In KRaft cluster non-combined mode, assert the controller server 3000 contains the property queued.max.requests 300
if (clusterInstance.type() == Type.KRAFT) {
try (Admin admin = Admin.create(Collections.singletonMap(
AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, clusterInstance.bootstrapControllers()))) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "3000");
Map<ConfigResource, Config> configs = admin.describeConfigs(Collections.singletonList(configResource)).all().get();
Assertions.assertEquals(1, configs.size());
Assertions.assertEquals("300", configs.get(configResource).get("queued.max.requests").value());
Map<ConfigResource, Config> configs = admin.describeConfigs(singletonList(configResource)).all().get();
assertEquals(1, configs.size());
assertEquals("300", configs.get(configResource).get("queued.max.requests").value());
}
}
}
@ -140,24 +158,24 @@ public class ClusterTestExtensionsTest {
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, disksPerBroker = 2),
})
public void testClusterTestWithDisksPerBroker() throws ExecutionException, InterruptedException {
Admin admin = clusterInstance.createAdminClient();
DescribeLogDirsResult result = admin.describeLogDirs(clusterInstance.brokerIds());
result.allDescriptions().get().forEach((brokerId, logDirDescriptionMap) -> {
Assertions.assertEquals(clusterInstance.config().numDisksPerBroker(), logDirDescriptionMap.size());
});
try (Admin admin = clusterInstance.admin()) {
DescribeLogDirsResult result = admin.describeLogDirs(clusterInstance.brokerIds());
result.allDescriptions().get().forEach((brokerId, logDirDescriptionMap) -> {
assertEquals(clusterInstance.config().numDisksPerBroker(), logDirDescriptionMap.size());
});
}
}
@ClusterTest(autoStart = AutoStart.NO)
public void testNoAutoStart() {
Assertions.assertThrows(RuntimeException.class, clusterInstance::anyBrokerSocketServer);
clusterInstance.start();
Assertions.assertNotNull(clusterInstance.anyBrokerSocketServer());
assertNotNull(clusterInstance.anyBrokerSocketServer());
}
@ClusterTest
public void testDefaults(ClusterInstance clusterInstance) {
Assertions.assertEquals(MetadataVersion.latestTesting(), clusterInstance.config().metadataVersion());
assertEquals(MetadataVersion.latestTesting(), clusterInstance.config().metadataVersion());
}
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT})
@ -165,7 +183,7 @@ public class ClusterTestExtensionsTest {
Set<GroupProtocol> supportedGroupProtocols = new HashSet<>();
supportedGroupProtocols.add(CLASSIC);
supportedGroupProtocols.add(CONSUMER);
Assertions.assertEquals(supportedGroupProtocols, clusterInstance.supportedGroupProtocols());
assertEquals(supportedGroupProtocols, clusterInstance.supportedGroupProtocols());
}
@ClusterTests({
@ -177,7 +195,7 @@ public class ClusterTestExtensionsTest {
})
})
public void testNotSupportedNewGroupProtocols(ClusterInstance clusterInstance) {
Assertions.assertEquals(Collections.singleton(CLASSIC), clusterInstance.supportedGroupProtocols());
assertEquals(singleton(CLASSIC), clusterInstance.supportedGroupProtocols());
}
@ -189,11 +207,11 @@ public class ClusterTestExtensionsTest {
short numReplicas = 3;
clusterInstance.createTopic(topicName, numPartition, numReplicas);
try (Admin admin = clusterInstance.createAdminClient()) {
try (Admin admin = clusterInstance.admin()) {
Assertions.assertTrue(admin.listTopics().listings().get().stream().anyMatch(s -> s.name().equals(topicName)));
List<TopicPartitionInfo> partitions = admin.describeTopics(Collections.singleton(topicName)).allTopicNames().get()
List<TopicPartitionInfo> partitions = admin.describeTopics(singleton(topicName)).allTopicNames().get()
.get(topicName).partitions();
Assertions.assertEquals(numPartition, partitions.size());
assertEquals(numPartition, partitions.size());
Assertions.assertTrue(partitions.stream().allMatch(partition -> partition.replicas().size() == numReplicas));
}
}
@ -233,15 +251,45 @@ public class ClusterTestExtensionsTest {
}
)
public void testVerifyTopicDeletion(ClusterInstance clusterInstance) throws Exception {
try (Admin admin = clusterInstance.createAdminClient()) {
try (Admin admin = clusterInstance.admin()) {
String testTopic = "testTopic";
admin.createTopics(Collections.singletonList(new NewTopic(testTopic, 1, (short) 1)));
admin.createTopics(singletonList(new NewTopic(testTopic, 1, (short) 1)));
clusterInstance.waitForTopic(testTopic, 1);
admin.deleteTopics(Collections.singletonList(testTopic));
admin.deleteTopics(singletonList(testTopic));
clusterInstance.waitTopicDeletion(testTopic);
Assertions.assertTrue(admin.listTopics().listings().get().stream().noneMatch(
topic -> topic.name().equals(testTopic)
));
}
}
@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, brokers = 3)
public void testCreateProducerAndConsumer(ClusterInstance cluster) throws InterruptedException {
String topic = "topic";
String key = "key";
String value = "value";
try (Admin adminClient = cluster.admin();
Producer<String, String> producer = cluster.producer(Map.of(
ACKS_CONFIG, "all",
KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()));
Consumer<String, String> consumer = cluster.consumer(Map.of(
KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(),
VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()))
) {
adminClient.createTopics(singleton(new NewTopic(topic, 1, (short) 1)));
assertNotNull(producer);
assertNotNull(consumer);
producer.send(new ProducerRecord<>(topic, key, value));
producer.flush();
consumer.subscribe(singletonList(topic));
List<ConsumerRecord<String, String>> records = new ArrayList<>();
TestUtils.waitForCondition(() -> {
consumer.poll(Duration.ofMillis(100)).forEach(records::add);
return records.size() == 1;
}, "Failed to receive message");
assertEquals(key, records.get(0).key());
assertEquals(value, records.get(0).value());
}
}
}

View File

@ -39,6 +39,7 @@ import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@ -52,11 +53,11 @@ public class DeleteRecordsCommandTest {
@ClusterTest
public void testCommand(ClusterInstance cluster) throws Exception {
Properties adminProps = new Properties();
Map<String, Object> adminProps = new HashMap<>();
adminProps.put(AdminClientConfig.RETRIES_CONFIG, 1);
try (Admin admin = cluster.createAdminClient(adminProps)) {
try (Admin admin = cluster.admin(adminProps)) {
assertThrows(
AdminCommandFailedException.class,
() -> DeleteRecordsCommand.execute(admin, "{\"partitions\":[" +

View File

@ -107,7 +107,7 @@ public class GetOffsetShellTest {
}
private void setupTopics(Function<Integer, String> topicName, Map<String, String> configs) {
try (Admin admin = cluster.createAdminClient()) {
try (Admin admin = cluster.admin()) {
List<NewTopic> topics = new ArrayList<>();
IntStream.range(0, topicCount + 1).forEach(i ->

View File

@ -86,30 +86,31 @@ public class LeaderElectionCommandTest {
List<Integer> assignment = asList(broker2, broker3);
cluster.waitForReadyBrokers();
Admin client = cluster.createAdminClient();
try (Admin client = cluster.admin()) {
createTopic(client, topic, Collections.singletonMap(partition, assignment));
createTopic(client, topic, Collections.singletonMap(partition, assignment));
TopicPartition topicPartition = new TopicPartition(topic, partition);
TopicPartition topicPartition = new TopicPartition(topic, partition);
TestUtils.assertLeader(client, topicPartition, broker2);
cluster.shutdownBroker(broker3);
TestUtils.waitForBrokersOutOfIsr(client,
CollectionConverters.asScala(singletonList(topicPartition)).toSet(),
CollectionConverters.asScala(singletonList(broker3)).toSet()
);
cluster.shutdownBroker(broker2);
TestUtils.assertNoLeader(client, topicPartition);
cluster.startBroker(broker3);
TestUtils.waitForOnlineBroker(client, broker3);
TestUtils.assertLeader(client, topicPartition, broker2);
cluster.shutdownBroker(broker3);
TestUtils.waitForBrokersOutOfIsr(client,
CollectionConverters.asScala(singletonList(topicPartition)).toSet(),
CollectionConverters.asScala(singletonList(broker3)).toSet()
);
cluster.shutdownBroker(broker2);
TestUtils.assertNoLeader(client, topicPartition);
cluster.startBroker(broker3);
TestUtils.waitForOnlineBroker(client, broker3);
assertEquals(0, LeaderElectionCommand.mainNoExit(
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "unclean",
"--all-topic-partitions"
));
assertEquals(0, LeaderElectionCommand.mainNoExit(
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "unclean",
"--all-topic-partitions"
));
TestUtils.assertLeader(client, topicPartition, broker3);
TestUtils.assertLeader(client, topicPartition, broker3);
}
}
@ClusterTest
@ -142,31 +143,32 @@ public class LeaderElectionCommandTest {
List<Integer> assignment = asList(broker2, broker3);
cluster.waitForReadyBrokers();
Admin client = cluster.createAdminClient();
createTopic(client, topic, Collections.singletonMap(partition, assignment));
try (Admin client = cluster.admin()) {
createTopic(client, topic, Collections.singletonMap(partition, assignment));
TopicPartition topicPartition = new TopicPartition(topic, partition);
TopicPartition topicPartition = new TopicPartition(topic, partition);
TestUtils.assertLeader(client, topicPartition, broker2);
TestUtils.assertLeader(client, topicPartition, broker2);
cluster.shutdownBroker(broker3);
TestUtils.waitForBrokersOutOfIsr(client,
CollectionConverters.asScala(singletonList(topicPartition)).toSet(),
CollectionConverters.asScala(singletonList(broker3)).toSet()
);
cluster.shutdownBroker(broker2);
TestUtils.assertNoLeader(client, topicPartition);
cluster.startBroker(broker3);
TestUtils.waitForOnlineBroker(client, broker3);
cluster.shutdownBroker(broker3);
TestUtils.waitForBrokersOutOfIsr(client,
CollectionConverters.asScala(singletonList(topicPartition)).toSet(),
CollectionConverters.asScala(singletonList(broker3)).toSet()
);
cluster.shutdownBroker(broker2);
TestUtils.assertNoLeader(client, topicPartition);
cluster.startBroker(broker3);
TestUtils.waitForOnlineBroker(client, broker3);
assertEquals(0, LeaderElectionCommand.mainNoExit(
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "unclean",
"--topic", topic,
"--partition", Integer.toString(partition)
));
assertEquals(0, LeaderElectionCommand.mainNoExit(
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "unclean",
"--topic", topic,
"--partition", Integer.toString(partition)
));
TestUtils.assertLeader(client, topicPartition, broker3);
TestUtils.assertLeader(client, topicPartition, broker3);
}
}
@ClusterTest
@ -179,32 +181,33 @@ public class LeaderElectionCommandTest {
Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
partitionAssignment.put(partition, assignment);
Admin client = cluster.createAdminClient();
createTopic(client, topic, partitionAssignment);
try (Admin client = cluster.admin()) {
createTopic(client, topic, partitionAssignment);
TopicPartition topicPartition = new TopicPartition(topic, partition);
TopicPartition topicPartition = new TopicPartition(topic, partition);
TestUtils.assertLeader(client, topicPartition, broker2);
TestUtils.assertLeader(client, topicPartition, broker2);
cluster.shutdownBroker(broker3);
TestUtils.waitForBrokersOutOfIsr(client,
CollectionConverters.asScala(singletonList(topicPartition)).toSet(),
CollectionConverters.asScala(singletonList(broker3)).toSet()
);
cluster.shutdownBroker(broker2);
TestUtils.assertNoLeader(client, topicPartition);
cluster.startBroker(broker3);
TestUtils.waitForOnlineBroker(client, broker3);
cluster.shutdownBroker(broker3);
TestUtils.waitForBrokersOutOfIsr(client,
CollectionConverters.asScala(singletonList(topicPartition)).toSet(),
CollectionConverters.asScala(singletonList(broker3)).toSet()
);
cluster.shutdownBroker(broker2);
TestUtils.assertNoLeader(client, topicPartition);
cluster.startBroker(broker3);
TestUtils.waitForOnlineBroker(client, broker3);
Path topicPartitionPath = tempTopicPartitionFile(singletonList(topicPartition));
Path topicPartitionPath = tempTopicPartitionFile(singletonList(topicPartition));
assertEquals(0, LeaderElectionCommand.mainNoExit(
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "unclean",
"--path-to-json-file", topicPartitionPath.toString()
));
assertEquals(0, LeaderElectionCommand.mainNoExit(
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "unclean",
"--path-to-json-file", topicPartitionPath.toString()
));
TestUtils.assertLeader(client, topicPartition, broker3);
TestUtils.assertLeader(client, topicPartition, broker3);
}
}
@ClusterTest
@ -214,30 +217,31 @@ public class LeaderElectionCommandTest {
List<Integer> assignment = asList(broker2, broker3);
cluster.waitForReadyBrokers();
Admin client = cluster.createAdminClient();
Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
partitionAssignment.put(partition, assignment);
try (Admin client = cluster.admin()) {
Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
partitionAssignment.put(partition, assignment);
createTopic(client, topic, partitionAssignment);
createTopic(client, topic, partitionAssignment);
TopicPartition topicPartition = new TopicPartition(topic, partition);
TopicPartition topicPartition = new TopicPartition(topic, partition);
TestUtils.assertLeader(client, topicPartition, broker2);
TestUtils.assertLeader(client, topicPartition, broker2);
cluster.shutdownBroker(broker2);
TestUtils.assertLeader(client, topicPartition, broker3);
cluster.startBroker(broker2);
TestUtils.waitForBrokersInIsr(client, topicPartition,
CollectionConverters.asScala(singletonList(broker2)).toSet()
);
cluster.shutdownBroker(broker2);
TestUtils.assertLeader(client, topicPartition, broker3);
cluster.startBroker(broker2);
TestUtils.waitForBrokersInIsr(client, topicPartition,
CollectionConverters.asScala(singletonList(broker2)).toSet()
);
assertEquals(0, LeaderElectionCommand.mainNoExit(
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "preferred",
"--all-topic-partitions"
));
assertEquals(0, LeaderElectionCommand.mainNoExit(
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "preferred",
"--all-topic-partitions"
));
TestUtils.assertLeader(client, topicPartition, broker2);
TestUtils.assertLeader(client, topicPartition, broker2);
}
}
@ClusterTest
@ -261,28 +265,31 @@ public class LeaderElectionCommandTest {
List<Integer> assignment1 = asList(broker3, broker2);
cluster.waitForReadyBrokers();
Admin client = cluster.createAdminClient();
Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
partitionAssignment.put(partition0, assignment0);
partitionAssignment.put(partition1, assignment1);
TopicPartition topicPartition0;
TopicPartition topicPartition1;
try (Admin client = cluster.admin()) {
Map<Integer, List<Integer>> partitionAssignment = new HashMap<>();
partitionAssignment.put(partition0, assignment0);
partitionAssignment.put(partition1, assignment1);
createTopic(client, topic, partitionAssignment);
createTopic(client, topic, partitionAssignment);
TopicPartition topicPartition0 = new TopicPartition(topic, partition0);
TopicPartition topicPartition1 = new TopicPartition(topic, partition1);
topicPartition0 = new TopicPartition(topic, partition0);
topicPartition1 = new TopicPartition(topic, partition1);
TestUtils.assertLeader(client, topicPartition0, broker2);
TestUtils.assertLeader(client, topicPartition1, broker3);
TestUtils.assertLeader(client, topicPartition0, broker2);
TestUtils.assertLeader(client, topicPartition1, broker3);
cluster.shutdownBroker(broker2);
TestUtils.assertLeader(client, topicPartition0, broker3);
cluster.startBroker(broker2);
TestUtils.waitForBrokersInIsr(client, topicPartition0,
CollectionConverters.asScala(singletonList(broker2)).toSet()
);
TestUtils.waitForBrokersInIsr(client, topicPartition1,
CollectionConverters.asScala(singletonList(broker2)).toSet()
);
cluster.shutdownBroker(broker2);
TestUtils.assertLeader(client, topicPartition0, broker3);
cluster.startBroker(broker2);
TestUtils.waitForBrokersInIsr(client, topicPartition0,
CollectionConverters.asScala(singletonList(broker2)).toSet()
);
TestUtils.waitForBrokersInIsr(client, topicPartition1,
CollectionConverters.asScala(singletonList(broker2)).toSet()
);
}
Path topicPartitionPath = tempTopicPartitionFile(asList(topicPartition0, topicPartition1));
String output = ToolsTestUtils.captureStandardOut(() ->

View File

@ -377,7 +377,7 @@ public class TopicCommandTest {
public void testCreate(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient()) {
try (Admin adminClient = clusterInstance.admin()) {
adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
@ -395,7 +395,7 @@ public class TopicCommandTest {
public void testCreateWithDefaults(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient()) {
try (Admin adminClient = clusterInstance.admin()) {
adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
@ -422,7 +422,7 @@ public class TopicCommandTest {
public void testCreateWithDefaultReplication(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient()) {
try (Admin adminClient = clusterInstance.admin()) {
adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, 2, defaultReplicationFactor)));
clusterInstance.waitForTopic(testTopicName, 2);
List<TopicPartitionInfo> partitions = adminClient
@ -440,7 +440,7 @@ public class TopicCommandTest {
public void testCreateWithDefaultPartitions(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient()) {
try (Admin adminClient = clusterInstance.admin()) {
adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, defaultNumPartitions, (short) 2)));
clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
List<TopicPartitionInfo> partitions = adminClient
@ -459,7 +459,7 @@ public class TopicCommandTest {
public void testCreateWithConfigs(ClusterInstance clusterInstance) throws Exception {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient()) {
try (Admin adminClient = clusterInstance.admin()) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
Map<String, String> topicConfig = new HashMap<>();
topicConfig.put(TopicConfig.DELETE_RETENTION_MS_CONFIG, "1000");
@ -477,7 +477,7 @@ public class TopicCommandTest {
@ClusterTemplate("generate")
public void testCreateWhenAlreadyExists(ClusterInstance clusterInstance) throws Exception {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient();
try (Admin adminClient = clusterInstance.admin();
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) {
TopicCommand.TopicCommandOptions createOpts = buildTopicCommandOptionsWithBootstrap(
clusterInstance, "--create", "--partitions", Integer.toString(defaultNumPartitions), "--replication-factor", "1",
@ -495,7 +495,7 @@ public class TopicCommandTest {
@ClusterTemplate("generate")
public void testCreateWhenAlreadyExistsWithIfNotExists(ClusterInstance clusterInstance) throws Exception {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient();
try (Admin adminClient = clusterInstance.admin();
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) {
adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
@ -513,7 +513,7 @@ public class TopicCommandTest {
@ClusterTemplate("generate")
public void testCreateWithReplicaAssignment(ClusterInstance clusterInstance) throws Exception {
Map<Integer, List<Integer>> replicaAssignmentMap = new HashMap<>();
try (Admin adminClient = clusterInstance.createAdminClient()) {
try (Admin adminClient = clusterInstance.admin()) {
String testTopicName = TestUtils.randomString(10);
replicaAssignmentMap.put(0, Arrays.asList(5, 4));
@ -545,7 +545,7 @@ public class TopicCommandTest {
@ClusterTemplate("generate")
public void testCreateWithInvalidReplicationFactor(ClusterInstance clusterInstance) throws Exception {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient();
try (Admin adminClient = clusterInstance.admin();
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) {
TopicCommand.TopicCommandOptions opts = buildTopicCommandOptionsWithBootstrap(clusterInstance, "--create", "--partitions", "2", "--replication-factor", Integer.toString(Short.MAX_VALUE + 1),
@ -557,7 +557,7 @@ public class TopicCommandTest {
@ClusterTemplate("generate")
public void testCreateWithNegativeReplicationFactor(ClusterInstance clusterInstance) throws Exception {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient();
try (Admin adminClient = clusterInstance.admin();
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) {
TopicCommand.TopicCommandOptions opts = buildTopicCommandOptionsWithBootstrap(clusterInstance, "--create",
"--partitions", "2", "--replication-factor", "-1", "--topic", testTopicName);
@ -568,7 +568,7 @@ public class TopicCommandTest {
@ClusterTemplate("generate")
public void testCreateWithNegativePartitionCount(ClusterInstance clusterInstance) throws Exception {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient();
try (Admin adminClient = clusterInstance.admin();
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) {
TopicCommand.TopicCommandOptions opts = buildTopicCommandOptionsWithBootstrap(clusterInstance, "--create", "--partitions", "-1", "--replication-factor", "1", "--topic", testTopicName);
assertThrows(IllegalArgumentException.class, () -> topicService.createTopic(opts), "Expected IllegalArgumentException to throw");
@ -578,7 +578,7 @@ public class TopicCommandTest {
@ClusterTemplate("generate")
public void testInvalidTopicLevelConfig(ClusterInstance clusterInstance) {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient()) {
try (Admin adminClient = clusterInstance.admin()) {
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);
TopicCommand.TopicCommandOptions createOpts = buildTopicCommandOptionsWithBootstrap(clusterInstance, "--create",
@ -591,7 +591,7 @@ public class TopicCommandTest {
@ClusterTemplate("generate")
public void testListTopics(ClusterInstance clusterInstance) throws InterruptedException {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient()) {
try (Admin adminClient = clusterInstance.admin()) {
adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
@ -602,7 +602,7 @@ public class TopicCommandTest {
@ClusterTemplate("generate")
public void testListTopicsWithIncludeList(ClusterInstance clusterInstance) throws InterruptedException {
try (Admin adminClient = clusterInstance.createAdminClient()) {
try (Admin adminClient = clusterInstance.admin()) {
String topic1 = "kafka.testTopic1";
String topic2 = "kafka.testTopic2";
String topic3 = "oooof.testTopic1";
@ -624,7 +624,7 @@ public class TopicCommandTest {
@ClusterTemplate("generate")
public void testListTopicsWithExcludeInternal(ClusterInstance clusterInstance) throws InterruptedException {
try (Admin adminClient = clusterInstance.createAdminClient()) {
try (Admin adminClient = clusterInstance.admin()) {
String topic1 = "kafka.testTopic1";
String hiddenConsumerTopic = Topic.GROUP_METADATA_TOPIC_NAME;
int partition = 2;
@ -641,7 +641,7 @@ public class TopicCommandTest {
@ClusterTemplate("generate")
public void testAlterPartitionCount(ClusterInstance clusterInstance) throws Exception {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient();
try (Admin adminClient = clusterInstance.admin();
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) {
int partition = 2;
short replicationFactor = 2;
@ -666,7 +666,7 @@ public class TopicCommandTest {
@ClusterTemplate("generate")
public void testAlterAssignment(ClusterInstance clusterInstance) throws Exception {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient();
try (Admin adminClient = clusterInstance.admin();
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) {
int partition = 2;
short replicationFactor = 2;
@ -698,7 +698,7 @@ public class TopicCommandTest {
@ClusterTemplate("generate")
public void testAlterAssignmentWithMoreAssignmentThanPartitions(ClusterInstance clusterInstance) throws Exception {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient();
try (Admin adminClient = clusterInstance.admin();
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) {
int partition = 2;
@ -717,7 +717,7 @@ public class TopicCommandTest {
@ClusterTemplate("generate")
public void testAlterAssignmentWithMorePartitionsThanAssignment(ClusterInstance clusterInstance) throws Exception {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient();
try (Admin adminClient = clusterInstance.admin();
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) {
int partition = 2;
short replicationFactor = 2;
@ -736,7 +736,7 @@ public class TopicCommandTest {
public void testAlterWithInvalidPartitionCount(ClusterInstance clusterInstance) throws Exception {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient();
try (Admin adminClient = clusterInstance.admin();
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) {
adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
@ -751,7 +751,7 @@ public class TopicCommandTest {
public void testAlterWhenTopicDoesntExist(ClusterInstance clusterInstance) throws Exception {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient();
try (Admin adminClient = clusterInstance.admin();
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) {
// alter a topic that does not exist without --if-exists
TopicCommand.TopicCommandOptions alterOpts = buildTopicCommandOptionsWithBootstrap(clusterInstance, "--alter", "--topic", testTopicName, "--partitions", "1");
@ -763,7 +763,7 @@ public class TopicCommandTest {
@ClusterTemplate("generate")
public void testAlterWhenTopicDoesntExistWithIfExists(ClusterInstance clusterInstance) throws Exception {
String testTopicName = TestUtils.randomString(10);
Admin adminClient = clusterInstance.createAdminClient();
Admin adminClient = clusterInstance.admin();
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap(clusterInstance, "--alter", "--topic", testTopicName, "--partitions", "1", "--if-exists"));
@ -774,7 +774,7 @@ public class TopicCommandTest {
@ClusterTemplate("generate")
public void testCreateAlterTopicWithRackAware(ClusterInstance clusterInstance) throws Exception {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient();
try (Admin adminClient = clusterInstance.admin();
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) {
Map<Integer, String> rackInfo = new HashMap<>();
@ -826,7 +826,7 @@ public class TopicCommandTest {
@ClusterTemplate("generate")
public void testConfigPreservationAcrossPartitionAlteration(ClusterInstance clusterInstance) throws Exception {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient();
try (Admin adminClient = clusterInstance.admin();
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) {
String cleanUpPolicy = "compact";
@ -859,7 +859,7 @@ public class TopicCommandTest {
@ClusterTemplate("generate")
public void testTopicDeletion(ClusterInstance clusterInstance) throws Exception {
try (Admin adminClient = clusterInstance.createAdminClient();
try (Admin adminClient = clusterInstance.admin();
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) {
String testTopicName = TestUtils.randomString(10);
@ -879,7 +879,7 @@ public class TopicCommandTest {
@ClusterTemplate("generate")
public void testTopicWithCollidingCharDeletionAndCreateAgain(ClusterInstance clusterInstance) throws Exception {
try (Admin adminClient = clusterInstance.createAdminClient();
try (Admin adminClient = clusterInstance.admin();
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) {
// create the topic with colliding chars
String topicWithCollidingChar = "test.a";
@ -904,7 +904,7 @@ public class TopicCommandTest {
@ClusterTemplate("generate")
public void testDeleteInternalTopic(ClusterInstance clusterInstance) throws Exception {
try (Admin adminClient = clusterInstance.createAdminClient();
try (Admin adminClient = clusterInstance.admin();
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) {
// create the offset topic
@ -932,7 +932,7 @@ public class TopicCommandTest {
@ClusterTemplate("generate")
public void testDeleteWhenTopicDoesntExist(ClusterInstance clusterInstance) throws Exception {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient();
try (Admin adminClient = clusterInstance.admin();
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) {
// delete a topic that does not exist
TopicCommand.TopicCommandOptions deleteOpts = buildTopicCommandOptionsWithBootstrap(clusterInstance, "--delete", "--topic", testTopicName);
@ -944,7 +944,7 @@ public class TopicCommandTest {
@ClusterTemplate("generate")
public void testDeleteWhenTopicDoesntExistWithIfExists(ClusterInstance clusterInstance) throws Exception {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient();
try (Admin adminClient = clusterInstance.admin();
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) {
topicService.deleteTopic(buildTopicCommandOptionsWithBootstrap(clusterInstance, "--delete", "--topic", testTopicName, "--if-exists"));
}
@ -953,7 +953,7 @@ public class TopicCommandTest {
@ClusterTemplate("generate")
public void testDescribe(ClusterInstance clusterInstance) throws InterruptedException {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient()) {
try (Admin adminClient = clusterInstance.admin()) {
int partition = 2;
short replicationFactor = 2;
adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, partition, replicationFactor)));
@ -970,7 +970,7 @@ public class TopicCommandTest {
public void testDescribeWithDescribeTopicPartitionsApi(ClusterInstance clusterInstance) throws InterruptedException {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient()) {
try (Admin adminClient = clusterInstance.admin()) {
List<NewTopic> topics = new ArrayList<>();
topics.add(new NewTopic(testTopicName, 20, (short) 2));
@ -1000,7 +1000,7 @@ public class TopicCommandTest {
@ClusterTemplate("generate")
public void testDescribeWhenTopicDoesntExist(ClusterInstance clusterInstance) {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient()) {
try (Admin adminClient = clusterInstance.admin()) {
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);
assertThrows(IllegalArgumentException.class,
@ -1013,7 +1013,7 @@ public class TopicCommandTest {
@ClusterTemplate("generate")
public void testDescribeWhenTopicDoesntExistWithIfExists(ClusterInstance clusterInstance) throws Exception {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient()) {
try (Admin adminClient = clusterInstance.admin()) {
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient);
topicService.describeTopic(buildTopicCommandOptionsWithBootstrap(clusterInstance, "--describe", "--topic", testTopicName, "--if-exists"));
@ -1027,7 +1027,7 @@ public class TopicCommandTest {
public void testDescribeUnavailablePartitions(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient()) {
try (Admin adminClient = clusterInstance.admin()) {
int partitions = 6;
short replicationFactor = 1;
@ -1055,7 +1055,7 @@ public class TopicCommandTest {
@ClusterTemplate("generate")
public void testDescribeUnderReplicatedPartitions(ClusterInstance clusterInstance) throws InterruptedException {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient()) {
try (Admin adminClient = clusterInstance.admin()) {
int partitions = 1;
short replicationFactor = 6;
adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor)));
@ -1084,7 +1084,7 @@ public class TopicCommandTest {
public void testDescribeUnderMinIsrPartitions(ClusterInstance clusterInstance) throws InterruptedException {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient()) {
try (Admin adminClient = clusterInstance.admin()) {
Map<String, String> topicConfig = new HashMap<>();
topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6");
int partitions = 1;
@ -1111,8 +1111,8 @@ public class TopicCommandTest {
public void testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient();
KafkaProducer<String, String> producer = createProducer(clusterInstance)) {
try (Admin adminClient = clusterInstance.admin();
KafkaProducer<String, String> producer = createProducer(clusterInstance)) {
adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
@ -1184,7 +1184,7 @@ public class TopicCommandTest {
public void testDescribeAtMinIsrPartitions(ClusterInstance clusterInstance) throws InterruptedException {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient()) {
try (Admin adminClient = clusterInstance.admin()) {
Map<String, String> topicConfig = new HashMap<>();
topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "4");
@ -1223,7 +1223,7 @@ public class TopicCommandTest {
*/
@ClusterTemplate("generate")
public void testDescribeUnderMinIsrPartitionsMixed(ClusterInstance clusterInstance) throws InterruptedException {
try (Admin adminClient = clusterInstance.createAdminClient()) {
try (Admin adminClient = clusterInstance.admin()) {
String underMinIsrTopic = "under-min-isr-topic";
String notUnderMinIsrTopic = "not-under-min-isr-topic";
String offlineTopic = "offline-topic";
@ -1281,7 +1281,7 @@ public class TopicCommandTest {
@ClusterTemplate("generate")
public void testDescribeReportOverriddenConfigs(ClusterInstance clusterInstance) throws InterruptedException {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient()) {
try (Admin adminClient = clusterInstance.admin()) {
String config = "file.delete.delay.ms=1000";
Map<String, String> topicConfig = new HashMap<>();
topicConfig.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1000");
@ -1300,7 +1300,7 @@ public class TopicCommandTest {
@ClusterTemplate("generate")
public void testDescribeAndListTopicsWithoutInternalTopics(ClusterInstance clusterInstance) throws InterruptedException {
String testTopicName = TestUtils.randomString(10);
try (Admin adminClient = clusterInstance.createAdminClient()) {
try (Admin adminClient = clusterInstance.admin()) {
adminClient.createTopics(Collections.singletonList(new NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
@ -1322,7 +1322,7 @@ public class TopicCommandTest {
@ClusterTemplate("generate")
public void testDescribeDoesNotFailWhenListingReassignmentIsUnauthorized(ClusterInstance clusterInstance) throws Exception {
String testTopicName = TestUtils.randomString(10);
Admin adminClient = clusterInstance.createAdminClient();
Admin adminClient = clusterInstance.admin();
adminClient = spy(adminClient);
@ -1345,7 +1345,7 @@ public class TopicCommandTest {
@ClusterTemplate("generate")
public void testCreateWithTopicNameCollision(ClusterInstance clusterInstance) throws Exception {
try (Admin adminClient = clusterInstance.createAdminClient();
try (Admin adminClient = clusterInstance.admin();
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) {
String topic = "foo_bar";
@ -1402,7 +1402,7 @@ public class TopicCommandTest {
private String captureDescribeTopicStandardOut(ClusterInstance clusterInstance, TopicCommand.TopicCommandOptions opts) {
Runnable runnable = () -> {
try (Admin adminClient = clusterInstance.createAdminClient();
try (Admin adminClient = clusterInstance.admin();
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) {
topicService.describeTopic(opts);
} catch (Exception e) {
@ -1414,7 +1414,7 @@ public class TopicCommandTest {
private String captureListTopicStandardOut(ClusterInstance clusterInstance, TopicCommand.TopicCommandOptions opts) {
Runnable runnable = () -> {
try (Admin adminClient = clusterInstance.createAdminClient();
try (Admin adminClient = clusterInstance.admin();
TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient)) {
topicService.listTopics(opts);
} catch (Exception e) {

View File

@ -295,7 +295,7 @@ public class ConsoleConsumerTest {
@ClusterTest(brokers = 3)
public void testTransactionLogMessageFormatter(ClusterInstance cluster) throws Exception {
try (Admin admin = cluster.createAdminClient()) {
try (Admin admin = cluster.admin()) {
NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
admin.createTopics(singleton(newTopic));
@ -334,7 +334,7 @@ public class ConsoleConsumerTest {
@ClusterTest(brokers = 3)
public void testOffsetsMessageFormatter(ClusterInstance cluster) throws Exception {
try (Admin admin = cluster.createAdminClient()) {
try (Admin admin = cluster.admin()) {
NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
admin.createTopics(singleton(newTopic));
@ -376,7 +376,7 @@ public class ConsoleConsumerTest {
@ClusterTest(brokers = 3)
public void testGroupMetadataMessageFormatter(ClusterInstance cluster) throws Exception {
try (Admin admin = cluster.createAdminClient()) {
try (Admin admin = cluster.admin()) {
NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
admin.createTopics(singleton(newTopic));

View File

@ -231,7 +231,7 @@ public class ResetConsumerGroupOffsetTest {
resetAndAssertOffsets(cluster, addTo(args, "--execute"),
50, false, topics);
try (Admin admin = cluster.createAdminClient()) {
try (Admin admin = cluster.admin()) {
admin.deleteConsumerGroups(groups).all().get();
}
}
@ -315,7 +315,7 @@ public class ResetConsumerGroupOffsetTest {
String[] args = buildArgsForGroup(cluster, group, "--topic", topic, "--by-duration", "PT1M", "--execute");
try (Admin admin = cluster.createAdminClient()) {
try (Admin admin = cluster.admin()) {
admin.createTopics(singleton(new NewTopic(topic, 1, (short) 1))).all().get();
resetAndAssertOffsets(cluster, args, 0, false, singletonList(topic));
admin.deleteTopics(singleton(topic)).all().get();
@ -444,7 +444,7 @@ public class ResetConsumerGroupOffsetTest {
String[] args = buildArgsForGroup(cluster, group, "--topic", topic + ":1",
"--to-earliest", "--execute");
try (Admin admin = cluster.createAdminClient();
try (Admin admin = cluster.admin();
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(args)) {
admin.createTopics(singleton(new NewTopic(topic, 2, (short) 1))).all().get();
@ -473,7 +473,7 @@ public class ResetConsumerGroupOffsetTest {
"--topic", topic2,
"--to-earliest", "--execute");
try (Admin admin = cluster.createAdminClient();
try (Admin admin = cluster.admin();
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(args)) {
admin.createTopics(asList(new NewTopic(topic1, 1, (short) 1),
new NewTopic(topic2, 1, (short) 1))).all().get();
@ -508,7 +508,7 @@ public class ResetConsumerGroupOffsetTest {
"--topic", topic2 + ":1",
"--to-earliest", "--execute");
try (Admin admin = cluster.createAdminClient();
try (Admin admin = cluster.admin();
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(args)) {
admin.createTopics(asList(new NewTopic(topic1, 2, (short) 1),
new NewTopic(topic2, 2, (short) 1))).all().get();
@ -551,7 +551,7 @@ public class ResetConsumerGroupOffsetTest {
String[] cgcArgs = buildArgsForGroup(cluster, group, "--all-topics", "--to-offset", "2", "--export");
File file = TestUtils.tempFile("reset", ".csv");
try (Admin admin = cluster.createAdminClient();
try (Admin admin = cluster.admin();
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs)) {
admin.createTopics(singleton(new NewTopic(topic, 2, (short) 1))).all().get();
@ -596,7 +596,7 @@ public class ResetConsumerGroupOffsetTest {
"--all-topics", "--to-offset", "2", "--export");
File file = TestUtils.tempFile("reset", ".csv");
try (Admin admin = cluster.createAdminClient();
try (Admin admin = cluster.admin();
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs)) {
admin.createTopics(asList(new NewTopic(topic1, 2, (short) 1),