Compare commits

...

8 Commits

Author SHA1 Message Date
Anshul Bisht bd9faaeb04
Merge fa4b631061 into 4a5aa37169 2025-10-07 23:22:10 +00:00
Anshul Bisht fa4b631061
Merge branch 'apache:trunk' into KAFKA-14249-flaky-test-fix 2025-10-07 18:22:07 -05:00
Chang-Chi Hsu 4a5aa37169
MINOR: Move ReconfigurableQuorumIntegrationTest from core module to server module (#20636)
CI / build (push) Waiting to run Details
It moves the `ReconfigurableQuorumIntegrationTest` class to the
`org.apache.kafka.server` package and consolidates two related tests,
`RemoveAndAddVoterWithValidClusterId` and
`RemoveAndAddVoterWithInconsistentClusterId`, into a single file. This
improves code organization and reduces redundancy.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2025-10-08 01:10:58 +08:00
lucliu1108 2938c4242e
KAFKA-19754: Add RPC-level integration test for StreamsGroupDescribeRequest (#20632)
CI / build (push) Waiting to run Details
Test the `StreamsGroupDescribeRequest` RPC and corresponding responses
for situations where
- `streams.version` not upgraded to 1
- `streams.version` enabled, multiple groups listening to the same
topic.

Reviewers: Lucas Brutschy <lucasbru@apache.org>
2025-10-07 15:47:32 +02:00
Ken Huang ebae768bd8
KAFKA-18193 Refactor Kafka Streams CloseOptions to Fluent API Style (#19955)
In Kafka Streams, configuration classes typically follow a fluent API
pattern to ensure a consistent and intuitive developer experience.
However, the current implementation of
`org.apache.kafka.streams.KafkaStreams$CloseOptions` deviates from this
convention by exposing a public constructor, breaking the uniformity
expected across the API.

To address this inconsistency, we propose introducing a new
`CloseOptions` class that adheres to the fluent API style, replacing the
existing implementation. The new class will retain the existing
`timeout(Duration)` and `leaveGroup(boolean)` methods but will enforce
fluent instantiation and configuration. Given the design shift, we will
not maintain backward compatibility with the current class.

This change aligns with the goal of standardizing configuration objects
across Kafka Streams, offering developers a more cohesive and
predictable API.

Reviewers: Bill Bejeck<bbejeck@apache.org>
2025-10-07 08:50:18 -04:00
Jhen-Yung Hsu fa2496bb91
MINOR: skip ci-complete on 4.0 (#20644)
Since that 4.0 branch does not include
[KAFKA-18748](https://issues.apache.org/jira/browse/KAFKA-18748), it is
unable to find the related scan reports, but the ci-complete workflow is
still being triggered on the 4.0 branch. Disable this on the 4.0 branch,
as its reports can be safely ignored.

See https://github.com/apache/kafka/pull/20616#issuecomment-3370876779.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2025-10-07 20:27:29 +08:00
Dejan Stojadinović c6bbbbe24d
KAFKA-19174 Gradle version upgrade 8 -->> 9 (#19513)
List of changes:
- prerequisite Jira ticket:
  - [KAFKA-19591](https://issues.apache.org/jira/browse/KAFKA-19591)
- mandatory version upgrades:
  - Gradle version: 8.14.3 -->> 9.1.0
  - Gradle Shadow plugin: 8.3.6 -->> 8.3.9
  - Gradle dependencycheck plugin: 8.2.1 -->> 12.1.3
  - Gradle spotbugs plugin: 6.2.3 -->> 6.2.5
  - Gradle spotless plugin: 6.25.0 -->> 7.2.1
- build logic will be refactored to accommodate Gradle 9 breaking
changes
- (optional): a dozen of Gradle plugins versions will also be upgraded
- other JIRA tickets that had to be solved all along:
  - [KAFKA-16801](https://issues.apache.org/jira/browse/KAFKA-16801)
  - [KAFKA-19654](https://issues.apache.org/jira/browse/KAFKA-19654)

 **Related links:**
- https://gradle.org/whats-new/gradle-9
- https://github.com/gradle/gradle/releases/tag/v9.0.0
- https://docs.gradle.org/9.0.0/release-notes.html
- https://docs.gradle.org/9.0.0/userguide/upgrading_major_version_9.html
- https://docs.gradle.org/9.1.0/release-notes.html

Notes:
- new Gradle version brings up some breaking changes, as always 😃
- Kafka build with Gradle 9 has same issues as other projects:
  - https://github.com/redhat-developer/vscode-java/issues/4018
  - https://github.com/gradle/gradle/pull/32597

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2025-10-07 18:20:04 +08:00
Anshul Bisht cb60bbc583 KAFKA-14249: Stabilize TLSv1.3 idle expiry test in Tls13SelectorTest 2025-09-30 22:47:12 -05:00
22 changed files with 873 additions and 345 deletions

View File

@ -42,7 +42,7 @@ runs:
distribution: temurin
java-version: ${{ inputs.java-version }}
- name: Setup Gradle
uses: gradle/actions/setup-gradle@94baf225fe0a508e581a564467443d0e2379123b # v4.3.0
uses: gradle/actions/setup-gradle@748248ddd2a24f49513d8f472f81c3a07d4d50e1 # v4.4.4
env:
GRADLE_BUILD_ACTION_CACHE_DEBUG_ENABLED: true
with:

View File

@ -38,7 +38,7 @@ run-name: Build Scans for ${{ github.event.workflow_run.display_title}}
jobs:
upload-build-scan:
# Skip this workflow if the CI run was skipped or cancelled
if: (github.event.workflow_run.conclusion == 'success' || github.event.workflow_run.conclusion == 'failure')
if: (github.event.workflow_run.conclusion == 'success' || github.event.workflow_run.conclusion == 'failure') && github.event.workflow_run.head_branch != '4.0'
runs-on: ubuntu-latest
strategy:
fail-fast: false

View File

@ -29,22 +29,21 @@ buildscript {
}
plugins {
id 'com.github.ben-manes.versions' version '0.48.0'
id 'com.github.ben-manes.versions' version '0.52.0'
id 'idea'
id 'jacoco'
id 'java-library'
id 'org.owasp.dependencycheck' version '8.2.1'
id 'org.owasp.dependencycheck' version '12.1.3'
id 'org.nosphere.apache.rat' version "0.8.1"
id "io.swagger.core.v3.swagger-gradle-plugin" version "${swaggerVersion}"
id "com.github.spotbugs" version '6.2.3' apply false
id "com.github.spotbugs" version '6.2.5' apply false
id 'org.scoverage' version '8.0.3' apply false
id 'com.gradleup.shadow' version '8.3.6' apply false
id 'com.diffplug.spotless' version "6.25.0"
id 'com.gradleup.shadow' version '8.3.9' apply false
id 'com.diffplug.spotless' version "7.2.1"
}
ext {
gradleVersion = versions.gradle
minClientJavaVersion = 11
minNonClientJavaVersion = 17
modulesNeedingJava11 = [":clients", ":generator", ":streams", ":streams:test-utils", ":streams:examples", ":streams-scala", ":test-common:test-common-util"]
@ -297,7 +296,7 @@ if (repo != null) {
} else {
rat.enabled = false
}
println("Starting build with version $version (commit id ${commitId == null ? "null" : commitId.take(8)}) using Gradle $gradleVersion, Java ${JavaVersion.current()} and Scala ${versions.scala}")
println("Starting build with version $version (commit id ${commitId == null ? "null" : commitId.take(8)}) using Gradle $versions.gradle, Java ${JavaVersion.current()} and Scala ${versions.scala}")
println("Build properties: ignoreFailures=$userIgnoreFailures, maxParallelForks=$maxTestForks, maxScalacThreads=$maxScalacThreads, maxTestRetries=$userMaxTestRetries")
subprojects {
@ -328,6 +327,16 @@ subprojects {
tasks.register('uploadArchives').configure { dependsOn(publish) }
}
tasks.withType(AbstractArchiveTask).configureEach {
reproducibleFileOrder = false
preserveFileTimestamps = true
useFileSystemPermissions()
}
tasks.withType(AbstractTestTask).configureEach {
failOnNoDiscoveredTests = false
}
// apply the eclipse plugin only to subprojects that hold code. 'connect' is just a folder.
if (!project.name.equals('connect')) {
apply plugin: 'eclipse'

View File

@ -1,132 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Tag("integration")
public class ReconfigurableQuorumIntegrationTest {
static Map<Integer, Uuid> descVoterDirs(Admin admin) throws ExecutionException, InterruptedException {
var quorumInfo = admin.describeMetadataQuorum().quorumInfo().get();
return quorumInfo.voters().stream().collect(Collectors.toMap(QuorumInfo.ReplicaState::replicaId, QuorumInfo.ReplicaState::replicaDirectoryId));
}
@Test
public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
TestUtils.waitForCondition(() -> {
Map<Integer, Uuid> voters = descVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
}, "Initial quorum voters should be {3000, 3001, 3002} and all should have non-zero directory IDs");
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
TestUtils.waitForCondition(() -> {
Map<Integer, Uuid> voters = descVoterDirs(admin);
assertEquals(Set.of(3001, 3002), voters.keySet());
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
}, "After removing voter 3000, remaining voters should be {3001, 3002} with non-zero directory IDs");
admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
}
}
}
@Test
public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
var removeFuture = admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture);
var addFuture = admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
}
}
}
}

View File

@ -36,6 +36,7 @@ import java.util.stream.Collectors;
import static java.util.Arrays.asList;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class Tls13SelectorTest extends SslSelectorTest {
@ -47,11 +48,27 @@ public class Tls13SelectorTest extends SslSelectorTest {
return configs;
}
@Flaky(value = "KAFKA-14249", comment = "Copied from base class. Remove this override once the flakiness has been resolved.")
@Flaky(value = "KAFKA-14249", comment = "Stabilize TLSv1.3 idle expiry by waiting for READY and allowing a short observe window.")
@Test
@Override
public void testCloseOldestConnection() throws Exception {
super.testCloseOldestConnection();
String id = "0";
selector.connect(id, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.waitForChannelReady(selector, id);
selector.poll(50);
time.sleep(10_000L);
TestUtils.waitForCondition(() -> {
try {
selector.poll(50);
} catch (IOException e) {
throw new RuntimeException(e);
}
ChannelState state = selector.disconnected().get(id);
return state == ChannelState.EXPIRED;
}, 30_000L, "Expected idle connection to expire");
assertTrue(selector.disconnected().containsKey(id), "The idle connection should have been closed");
assertEquals(ChannelState.EXPIRED, selector.disconnected().get(id));
}
/**

View File

@ -26,9 +26,9 @@ import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse
import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment
import org.apache.kafka.common.message.WriteTxnMarkersRequestData.{WritableTxnMarker, WritableTxnMarkerTopic}
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData, DeleteGroupsResponseData, DescribeGroupsRequestData, DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData, HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeRequestData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData, WriteTxnMarkersRequestData}
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData, DeleteGroupsResponseData, DescribeGroupsRequestData, DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData, HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeRequestData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, StreamsGroupDescribeRequestData, StreamsGroupDescribeResponseData, StreamsGroupHeartbeatRequestData, StreamsGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData, WriteTxnMarkersRequestData}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, EndTxnRequest, EndTxnResponse, HeartbeatRequest, HeartbeatResponse, InitProducerIdRequest, InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsRequest, ListGroupsResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetDeleteRequest, OffsetDeleteResponse, OffsetFetchRequest, OffsetFetchResponse, ShareGroupDescribeRequest, ShareGroupDescribeResponse, ShareGroupHeartbeatRequest, ShareGroupHeartbeatResponse, SyncGroupRequest, SyncGroupResponse, TxnOffsetCommitRequest, TxnOffsetCommitResponse, WriteTxnMarkersRequest, WriteTxnMarkersResponse}
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, EndTxnRequest, EndTxnResponse, HeartbeatRequest, HeartbeatResponse, InitProducerIdRequest, InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsRequest, ListGroupsResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetDeleteRequest, OffsetDeleteResponse, OffsetFetchRequest, OffsetFetchResponse, ShareGroupDescribeRequest, ShareGroupDescribeResponse, ShareGroupHeartbeatRequest, ShareGroupHeartbeatResponse, StreamsGroupDescribeRequest, StreamsGroupDescribeResponse, StreamsGroupHeartbeatRequest, StreamsGroupHeartbeatResponse, SyncGroupRequest, SyncGroupResponse, TxnOffsetCommitRequest, TxnOffsetCommitResponse, WriteTxnMarkersRequest, WriteTxnMarkersResponse}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.utils.ProducerIdAndEpoch
@ -768,6 +768,21 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
shareGroupDescribeResponse.data.groups.asScala.toList
}
protected def streamsGroupDescribe(
groupIds: List[String],
includeAuthorizedOperations: Boolean = false,
version: Short = ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)
): List[StreamsGroupDescribeResponseData.DescribedGroup] = {
val streamsGroupDescribeRequest = new StreamsGroupDescribeRequest.Builder(
new StreamsGroupDescribeRequestData()
.setGroupIds(groupIds.asJava)
.setIncludeAuthorizedOperations(includeAuthorizedOperations)
).build(version)
val streamsGroupDescribeResponse = connectAndReceive[StreamsGroupDescribeResponse](streamsGroupDescribeRequest)
streamsGroupDescribeResponse.data.groups.asScala.toList
}
protected def heartbeat(
groupId: String,
generationId: Int,
@ -855,6 +870,41 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
shareGroupHeartbeatResponse.data
}
protected def streamsGroupHeartbeat(
groupId: String,
memberId: String = "",
memberEpoch: Int = 0,
rebalanceTimeoutMs: Int = -1,
activeTasks: List[StreamsGroupHeartbeatRequestData.TaskIds] = null,
standbyTasks: List[StreamsGroupHeartbeatRequestData.TaskIds] = null,
warmupTasks: List[StreamsGroupHeartbeatRequestData.TaskIds] = null,
topology: StreamsGroupHeartbeatRequestData.Topology = null,
expectedError: Errors = Errors.NONE,
version: Short = ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion(isUnstableApiEnabled)
): StreamsGroupHeartbeatResponseData = {
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequest.Builder(
new StreamsGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(memberEpoch)
.setRebalanceTimeoutMs(rebalanceTimeoutMs)
.setActiveTasks(activeTasks.asJava)
.setStandbyTasks(standbyTasks.asJava)
.setWarmupTasks(warmupTasks.asJava)
.setTopology(topology)
).build(version)
// Send the request until receiving a successful response. There is a delay
// here because the group coordinator is loaded in the background.
var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
streamsGroupHeartbeatResponse = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest)
streamsGroupHeartbeatResponse.data.errorCode == expectedError.code
}, msg = s"Could not heartbeat successfully. Last response $streamsGroupHeartbeatResponse.")
streamsGroupHeartbeatResponse.data
}
protected def leaveGroupWithNewProtocol(
groupId: String,
memberId: String

View File

@ -0,0 +1,316 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import kafka.utils.TestUtils
import org.apache.kafka.common.message.{StreamsGroupDescribeRequestData, StreamsGroupDescribeResponseData, StreamsGroupHeartbeatRequestData, StreamsGroupHeartbeatResponseData}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{StreamsGroupDescribeRequest, StreamsGroupDescribeResponse}
import org.apache.kafka.common.resource.ResourceType
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.test.api._
import scala.jdk.CollectionConverters._
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.common.Feature
import org.junit.Assert.{assertEquals, assertTrue}
import java.lang.{Byte => JByte}
@ClusterTestDefaults(
types = Array(Type.KRAFT),
brokers = 1,
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
class StreamsGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(
features = Array(
new ClusterFeature(feature = Feature.STREAMS_VERSION, version = 0)
)
)
def testStreamsGroupDescribeWhenFeatureFlagNotEnabled(): Unit = {
val streamsGroupDescribeRequest = new StreamsGroupDescribeRequest.Builder(
new StreamsGroupDescribeRequestData().setGroupIds(List("grp-mock-1", "grp-mock-2").asJava)
).build(ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled))
val streamsGroupDescribeResponse = connectAndReceive[StreamsGroupDescribeResponse](streamsGroupDescribeRequest)
val expectedResponse = new StreamsGroupDescribeResponseData()
expectedResponse.groups().add(
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId("grp-mock-1")
.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
)
expectedResponse.groups().add(
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId("grp-mock-2")
.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
)
assertEquals(expectedResponse, streamsGroupDescribeResponse.data)
}
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer,streams"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
def testStreamsGroupDescribeGroupsWithNewGroupCoordinator(): Unit = {
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
createOffsetsTopic()
val admin = cluster.admin()
val topicName = "foo"
try {
TestUtils.createTopicWithAdminRaw(
admin = admin,
topic = topicName,
numPartitions = 3
)
TestUtils.waitUntilTrue(() => {
admin.listTopics().names().get().contains(topicName)
}, msg = s"Topic $topicName is not available to the group coordinator")
val timeoutMs = 5 * 60 * 1000
val clientId = "client-id"
val clientHost = "/127.0.0.1"
val authorizedOperationsInt = Utils.to32BitField(
AclEntry.supportedOperations(ResourceType.GROUP).asScala
.map(_.code.asInstanceOf[JByte]).asJava)
var grp1Member1Response: StreamsGroupHeartbeatResponseData = null
var grp1Member2Response: StreamsGroupHeartbeatResponseData = null
var grp2Member1Response: StreamsGroupHeartbeatResponseData = null
var grp2Member2Response: StreamsGroupHeartbeatResponseData = null
// grp-1 with 2 members
TestUtils.waitUntilTrue(() => {
grp1Member1Response = streamsGroupHeartbeat(
groupId = "grp-1",
memberId = "member-1",
rebalanceTimeoutMs = timeoutMs,
activeTasks = List.empty,
standbyTasks = List.empty,
warmupTasks = List.empty,
topology = new StreamsGroupHeartbeatRequestData.Topology()
.setEpoch(1)
.setSubtopologies(List(
new StreamsGroupHeartbeatRequestData.Subtopology()
.setSubtopologyId("subtopology-1")
.setSourceTopics(List(topicName).asJava)
.setRepartitionSinkTopics(List.empty.asJava)
.setRepartitionSourceTopics(List.empty.asJava)
.setStateChangelogTopics(List.empty.asJava)
).asJava)
)
grp1Member2Response = streamsGroupHeartbeat(
groupId = "grp-1",
memberId = "member-2",
rebalanceTimeoutMs = timeoutMs,
activeTasks = List.empty,
standbyTasks = List.empty,
warmupTasks = List.empty,
topology = new StreamsGroupHeartbeatRequestData.Topology()
.setEpoch(1)
.setSubtopologies(List(
new StreamsGroupHeartbeatRequestData.Subtopology()
.setSubtopologyId("subtopology-1")
.setSourceTopics(List(topicName).asJava)
.setRepartitionSinkTopics(List.empty.asJava)
.setRepartitionSourceTopics(List.empty.asJava)
.setStateChangelogTopics(List.empty.asJava)
).asJava)
)
val groupsDescription1 = streamsGroupDescribe(
groupIds = List("grp-1"),
includeAuthorizedOperations = true
)
grp1Member1Response.errorCode == Errors.NONE.code && grp1Member2Response.errorCode == Errors.NONE.code &&
groupsDescription1.size == 1 && groupsDescription1.head.members.size == 2
}, msg = s"Could not create grp-1 with 2 members successfully")
// grp-2 with 2 members
TestUtils.waitUntilTrue(() => {
grp2Member1Response = streamsGroupHeartbeat(
groupId = "grp-2",
memberId = "member-3",
rebalanceTimeoutMs = timeoutMs,
activeTasks = List.empty,
standbyTasks = List.empty,
warmupTasks = List.empty,
topology = new StreamsGroupHeartbeatRequestData.Topology()
.setEpoch(1)
.setSubtopologies(List(
new StreamsGroupHeartbeatRequestData.Subtopology()
.setSubtopologyId("subtopology-1")
.setSourceTopics(List(topicName).asJava)
.setRepartitionSinkTopics(List.empty.asJava)
.setRepartitionSourceTopics(List.empty.asJava)
.setStateChangelogTopics(List.empty.asJava)
).asJava)
)
grp2Member2Response = streamsGroupHeartbeat(
groupId = "grp-2",
memberId = "member-4",
rebalanceTimeoutMs = timeoutMs,
activeTasks = List.empty,
standbyTasks = List.empty,
warmupTasks = List.empty,
topology = new StreamsGroupHeartbeatRequestData.Topology()
.setEpoch(1)
.setSubtopologies(List(
new StreamsGroupHeartbeatRequestData.Subtopology()
.setSubtopologyId("subtopology-1")
.setSourceTopics(List(topicName).asJava)
.setRepartitionSinkTopics(List.empty.asJava)
.setRepartitionSourceTopics(List.empty.asJava)
.setStateChangelogTopics(List.empty.asJava)
).asJava)
)
val groupsDescription2 = streamsGroupDescribe(
groupIds = List("grp-2"),
includeAuthorizedOperations = true,
version = ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled).toShort
)
grp2Member1Response.errorCode == Errors.NONE.code && grp2Member2Response.errorCode == Errors.NONE.code &&
groupsDescription2.size == 1 && groupsDescription2.head.members.size == 2
}, msg = s"Could not create grp-2 with 2 members successfully")
// Send follow-up heartbeats until both groups are stable
TestUtils.waitUntilTrue(() => {
grp1Member1Response = streamsGroupHeartbeat(
groupId = "grp-1",
memberId = grp1Member1Response.memberId,
memberEpoch = grp1Member1Response.memberEpoch,
rebalanceTimeoutMs = timeoutMs,
activeTasks = convertTaskIds(grp1Member1Response.activeTasks),
standbyTasks = convertTaskIds(grp1Member1Response.standbyTasks),
warmupTasks = convertTaskIds(grp1Member1Response.warmupTasks),
topology = null
)
grp1Member2Response = streamsGroupHeartbeat(
groupId = "grp-1",
memberId = grp1Member2Response.memberId,
memberEpoch = grp1Member2Response.memberEpoch,
rebalanceTimeoutMs = timeoutMs,
activeTasks = convertTaskIds(grp1Member2Response.activeTasks),
standbyTasks = convertTaskIds(grp1Member2Response.standbyTasks),
warmupTasks = convertTaskIds(grp1Member2Response.warmupTasks),
topology = null
)
grp2Member1Response = streamsGroupHeartbeat(
groupId = "grp-2",
memberId = grp2Member1Response.memberId,
memberEpoch = grp2Member1Response.memberEpoch,
rebalanceTimeoutMs = timeoutMs,
activeTasks = convertTaskIds(grp2Member1Response.activeTasks),
standbyTasks = convertTaskIds(grp2Member1Response.standbyTasks),
warmupTasks = convertTaskIds(grp2Member1Response.warmupTasks),
topology = null
)
grp2Member2Response = streamsGroupHeartbeat(
groupId = "grp-2",
memberId = grp2Member2Response.memberId,
memberEpoch = grp2Member2Response.memberEpoch,
rebalanceTimeoutMs = timeoutMs,
activeTasks = convertTaskIds(grp2Member2Response.activeTasks),
standbyTasks = convertTaskIds(grp2Member2Response.standbyTasks),
warmupTasks = convertTaskIds(grp2Member2Response.warmupTasks),
topology = null
)
val actual = streamsGroupDescribe(
groupIds = List("grp-1","grp-2"),
includeAuthorizedOperations = true,
version = ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled).toShort
)
actual.head.groupState() == "Stable" && actual(1).groupState() == "Stable" &&
actual.head.members.size == 2 && actual(1).members.size == 2
}, "Two groups did not stabilize with 2 members each in time")
// Test the describe request for both groups in stable state
for (version <- ApiKeys.STREAMS_GROUP_DESCRIBE.oldestVersion() to ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)) {
val actual = streamsGroupDescribe(
groupIds = List("grp-1","grp-2"),
includeAuthorizedOperations = true,
version = version.toShort
)
assertEquals(2, actual.size)
assertEquals(actual.map(_.groupId).toSet, Set("grp-1", "grp-2"))
for (describedGroup <- actual) {
assertEquals("Stable", describedGroup.groupState)
assertTrue("Group epoch is not equal to the assignment epoch", describedGroup.groupEpoch == describedGroup.assignmentEpoch)
// Verify topology
assertEquals(1, describedGroup.topology.epoch)
assertEquals(1, describedGroup.topology.subtopologies.size)
assertEquals("subtopology-1", describedGroup.topology.subtopologies.get(0).subtopologyId)
assertEquals(List(topicName).asJava, describedGroup.topology.subtopologies.get(0).sourceTopics)
// Verify members
assertEquals(2, describedGroup.members.size)
val expectedMemberIds = describedGroup.groupId match {
case "grp-1" => Set(grp1Member1Response.memberId, grp1Member2Response.memberId)
case "grp-2" => Set(grp2Member1Response.memberId, grp2Member2Response.memberId)
case unexpected => throw new AssertionError(s"Unexpected group ID: $unexpected")
}
val actualMemberIds = describedGroup.members.asScala.map(_.memberId).toSet
assertEquals(expectedMemberIds, actualMemberIds)
assertEquals(authorizedOperationsInt, describedGroup.authorizedOperations)
describedGroup.members.asScala.foreach { member =>
assertTrue("Group epoch is not equal to the member epoch", member.memberEpoch == describedGroup.assignmentEpoch)
assertEquals(1, member.topologyEpoch)
assertEquals(member.targetAssignment, member.assignment)
assertEquals(clientId, member.clientId())
assertEquals(clientHost, member.clientHost())
}
// Verify all partitions 0, 1, 2 are assigned exactly once
val allAssignedPartitions = describedGroup.members.asScala.flatMap { member =>
member.assignment.activeTasks.asScala.flatMap(_.partitions.asScala)
}.toList
assertEquals(List(0, 1, 2).sorted, allAssignedPartitions.sorted)
}
}
} finally{
admin.close()
}
}
private def convertTaskIds(responseTasks: java.util.List[StreamsGroupHeartbeatResponseData.TaskIds]): List[StreamsGroupHeartbeatRequestData.TaskIds] = {
if (responseTasks == null) {
List.empty
} else {
responseTasks.asScala.map { responseTask =>
new StreamsGroupHeartbeatRequestData.TaskIds()
.setSubtopologyId(responseTask.subtopologyId)
.setPartitions(responseTask.partitions)
}.toList
}
}
}

View File

@ -189,6 +189,13 @@
A new metric <code>AvgIdleRatio</code> has been added to the <code>ControllerEventManager</code> group. This metric measures the average idle ratio of the controller event queue thread,
providing visibility into how much time the controller spends waiting for events versus processing them. The metric value ranges from 0.0 (always busy) to 1.0 (always idle).
</li>
<li>
Deprecated <code>org.apache.kafka.streams.KafkaStreams$CloseOptions</code> and its related methods, such as
<code>KafkaStreams#close(org.apache.kafka.streams.KafkaStreams$CloseOptions)</code>.
As a replacement, please use <code>org.apache.kafka.streams.CloseOptions</code> and
<code>KafkaStreams#close(org.apache.kafka.streams.CloseOptions)</code>.
For further details, please refer to <a href="https://cwiki.apache.org/confluence/x/QAq9F">KIP-1153</a>.
</li>
</ul>
<h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0</a></h4>

View File

@ -60,7 +60,7 @@ versions += [
commonsLang: "3.18.0",
commonsValidator: "1.10.0",
classgraph: "4.8.179",
gradle: "8.14.3",
gradle: "9.1.0",
grgit: "4.1.1",
httpclient: "4.5.14",
jackson: "2.19.0",
@ -125,7 +125,7 @@ versions += [
snappy: "1.1.10.7",
spotbugs: "4.9.4",
mockOAuth2Server: "2.2.1",
zinc: "1.9.2",
zinc: "1.10.8",
// When updating the zstd version, please do as well in docker/native/native-image-configs/resource-config.json
// Also make sure the compression levels in org.apache.kafka.common.record.CompressionType are still valid
zstd: "1.5.6-10",

View File

@ -1,7 +1,7 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionSha256Sum=bd71102213493060956ec229d946beee57158dbd89d0e62b91bca0fa2c5f3531
distributionUrl=https\://services.gradle.org/distributions/gradle-8.14.3-bin.zip
distributionSha256Sum=a17ddd85a26b6a7f5ddb71ff8b05fc5104c0202c6e64782429790c933686c806
distributionUrl=https\://services.gradle.org/distributions/gradle-9.1.0-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME

41
gradlew vendored
View File

@ -1,7 +1,7 @@
#!/bin/sh
#
# Copyright © 2015-2021 the original authors.
# Copyright © 2015 the original authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -15,6 +15,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0
#
##############################################################################
#
@ -55,7 +57,7 @@
# Darwin, MinGW, and NonStop.
#
# (3) This script is generated from the Groovy template
# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
# https://github.com/gradle/gradle/blob/HEAD/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
# within the Gradle project.
#
# You can find Gradle at https://github.com/gradle/gradle/.
@ -84,7 +86,7 @@ done
# shellcheck disable=SC2034
APP_BASE_NAME=${0##*/}
# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036)
APP_HOME=$( cd "${APP_HOME:-./}" > /dev/null && pwd -P ) || exit
APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s\n' "$PWD" ) || exit
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD=maximum
@ -113,20 +115,6 @@ case "$( uname )" in #(
esac
# Loop in case we encounter an error.
for attempt in 1 2 3; do
if [ ! -e "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" ]; then
if ! curl -s -S --retry 3 -L -o "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" "https://raw.githubusercontent.com/gradle/gradle/v8.14.3/gradle/wrapper/gradle-wrapper.jar"; then
rm -f "$APP_HOME/gradle/wrapper/gradle-wrapper.jar"
# Pause for a bit before looping in case the server throttled us.
sleep 5
continue
fi
fi
done
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
# Determine the Java command to use to start the JVM.
if [ -n "$JAVA_HOME" ] ; then
@ -183,7 +171,6 @@ fi
# For Cygwin or MSYS, switch paths to Windows format before running java
if "$cygwin" || "$msys" ; then
APP_HOME=$( cygpath --path --mixed "$APP_HOME" )
CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" )
JAVACMD=$( cygpath --unix "$JAVACMD" )
@ -212,19 +199,31 @@ if "$cygwin" || "$msys" ; then
fi
# Loop in case we encounter an error.
for attempt in 1 2 3; do
if [ ! -e "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" ]; then
if ! curl -s -S --retry 3 -L -o "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" "https://raw.githubusercontent.com/gradle/gradle/v9.1.0/gradle/wrapper/gradle-wrapper.jar"; then
rm -f "$APP_HOME/gradle/wrapper/gradle-wrapper.jar"
# Pause for a bit before looping in case the server throttled us.
sleep 5
continue
fi
fi
done
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
# Collect all arguments for the java command:
# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments,
# * DEFAULT_JVM_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments,
# and any embedded shellness will be escaped.
# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be
# treated as '${Hostname}' itself on the command line.
set -- \
"-Dorg.gradle.appname=$APP_BASE_NAME" \
-classpath "$CLASSPATH" \
org.gradle.wrapper.GradleWrapperMain \
-jar "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" \
"$@"
# Stop when "xargs" is not available.

View File

@ -15,13 +15,16 @@
* limitations under the License.
*/
package kafka.server;
package org.apache.kafka.server;
import org.apache.kafka.clients.admin.AddRaftVoterOptions;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.FeatureMetadata;
import org.apache.kafka.clients.admin.QuorumInfo;
import org.apache.kafka.clients.admin.RaftVoterEndpoint;
import org.apache.kafka.clients.admin.RemoveRaftVoterOptions;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.common.test.api.TestKitDefaults;
@ -29,10 +32,12 @@ import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
@ -41,6 +46,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Tag("integration")
public class ReconfigurableQuorumIntegrationTest {
static void checkKRaftVersions(Admin admin, short finalized) throws Exception {
FeatureMetadata featureMetadata = admin.describeFeatures().featureMetadata().get();
@ -70,7 +76,7 @@ public class ReconfigurableQuorumIntegrationTest {
).build()) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_0.featureLevel());
});
@ -88,7 +94,7 @@ public class ReconfigurableQuorumIntegrationTest {
).setStandalone(true).build()) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_1.featureLevel());
});
@ -126,7 +132,7 @@ public class ReconfigurableQuorumIntegrationTest {
) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@ -161,7 +167,7 @@ public class ReconfigurableQuorumIntegrationTest {
) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002, 3003), voters.keySet());
@ -200,7 +206,7 @@ public class ReconfigurableQuorumIntegrationTest {
) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@ -238,7 +244,7 @@ public class ReconfigurableQuorumIntegrationTest {
) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@ -249,4 +255,95 @@ public class ReconfigurableQuorumIntegrationTest {
}
}
}
@Test
public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
for (int replicaId : new int[] {3000, 3001, 3002}) {
assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
}
});
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3001, 3002), voters.keySet());
for (int replicaId : new int[] {3001, 3002}) {
assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
}
});
admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
}
}
}
@Test
public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (var admin = Admin.create(cluster.clientProperties())) {
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
var removeFuture = admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture);
var addFuture = admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
}
}
}
}

View File

@ -28,8 +28,8 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.streams.CloseOptions;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.CloseOptions;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
@ -159,7 +159,7 @@ public class KafkaStreamsCloseOptionsIntegrationTest {
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close(new CloseOptions().leaveGroup(true).timeout(Duration.ofSeconds(30)));
streams.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP).withTimeout(Duration.ofSeconds(30)));
waitForEmptyConsumerGroup(adminClient, streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG), 0);
}

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
public class CloseOptions {
/**
* Enum to specify the group membership operation upon closing the Kafka Streams application.
*
* <ul>
* <li><b>{@code LEAVE_GROUP}</b>: means the consumer leave the group.</li>
* <li><b>{@code REMAIN_IN_GROUP}</b>: means the consumer will remain in the group.</li>
* </ul>
*/
public enum GroupMembershipOperation {
LEAVE_GROUP,
REMAIN_IN_GROUP
}
/**
* Specifies the group membership operation upon shutdown.
* By default, {@code GroupMembershipOperation.REMAIN_IN_GROUP} will be applied, which follows the KafkaStreams default behavior.
*/
protected GroupMembershipOperation operation = GroupMembershipOperation.REMAIN_IN_GROUP;
/**
* Specifies the maximum amount of time to wait for the close process to complete.
* This allows users to define a custom timeout for gracefully stopping the KafkaStreams.
*/
protected Optional<Duration> timeout = Optional.of(Duration.ofMillis(Long.MAX_VALUE));
private CloseOptions() {
}
protected CloseOptions(final CloseOptions closeOptions) {
this.operation = closeOptions.operation;
this.timeout = closeOptions.timeout;
}
/**
* Static method to create a {@code CloseOptions} with a custom timeout.
*
* @param timeout the maximum time to wait for the KafkaStreams to close.
* @return a new {@code CloseOptions} instance with the specified timeout.
*/
public static CloseOptions timeout(final Duration timeout) {
return new CloseOptions().withTimeout(timeout);
}
/**
* Static method to create a {@code CloseOptions} with a specified group membership operation.
*
* @param operation the group membership operation to apply. Must be one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}.
* @return a new {@code CloseOptions} instance with the specified group membership operation.
*/
public static CloseOptions groupMembershipOperation(final GroupMembershipOperation operation) {
return new CloseOptions().withGroupMembershipOperation(operation);
}
/**
* Fluent method to set the timeout for the close process.
*
* @param timeout the maximum time to wait for the KafkaStreams to close. If {@code null}, the default timeout will be used.
* @return this {@code CloseOptions} instance.
*/
public CloseOptions withTimeout(final Duration timeout) {
this.timeout = Optional.ofNullable(timeout);
return this;
}
/**
* Fluent method to set the group membership operation upon shutdown.
*
* @param operation the group membership operation to apply. Must be one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}.
* @return this {@code CloseOptions} instance.
*/
public CloseOptions withGroupMembershipOperation(final GroupMembershipOperation operation) {
this.operation = Objects.requireNonNull(operation, "operation should not be null");
return this;
}
}

View File

@ -49,6 +49,7 @@ import org.apache.kafka.streams.errors.StreamsStoppedException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.internals.ClientInstanceIdsImpl;
import org.apache.kafka.streams.internals.CloseOptionsInternal;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.internals.metrics.StreamsClientMetricsDelegatingReporter;
import org.apache.kafka.streams.processor.StandbyUpdateListener;
@ -488,7 +489,7 @@ public class KafkaStreams implements AutoCloseable {
closeToError();
}
final StreamThread deadThread = (StreamThread) Thread.currentThread();
deadThread.shutdown(false);
deadThread.shutdown(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
addStreamThread();
if (throwable instanceof RuntimeException) {
throw (RuntimeException) throwable;
@ -1136,7 +1137,7 @@ public class KafkaStreams implements AutoCloseable {
return Optional.of(streamThread.getName());
} else {
log.warn("Terminating the new thread because the Kafka Streams client is in state {}", state);
streamThread.shutdown(true);
streamThread.shutdown(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
threads.remove(streamThread);
final long cacheSizePerThread = cacheSizePerThread(numLiveStreamThreads());
log.info("Resizing thread cache due to terminating added thread, new cache size per thread is {}", cacheSizePerThread);
@ -1200,7 +1201,7 @@ public class KafkaStreams implements AutoCloseable {
final boolean callingThreadIsNotCurrentStreamThread = !streamThread.getName().equals(Thread.currentThread().getName());
if (streamThread.isThreadAlive() && (callingThreadIsNotCurrentStreamThread || numLiveStreamThreads() == 1)) {
log.info("Removing StreamThread {}", streamThread.getName());
streamThread.shutdown(true);
streamThread.shutdown(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
if (callingThreadIsNotCurrentStreamThread) {
final long remainingTimeMs = timeoutMs - (time.milliseconds() - startMs);
if (remainingTimeMs <= 0 || !streamThread.waitOnThreadState(StreamThread.State.DEAD, remainingTimeMs)) {
@ -1418,15 +1419,18 @@ public class KafkaStreams implements AutoCloseable {
/**
* Class that handles options passed in case of {@code KafkaStreams} instance scale down
*/
@Deprecated(since = "4.2")
public static class CloseOptions {
private Duration timeout = Duration.ofMillis(Long.MAX_VALUE);
private boolean leaveGroup = false;
@Deprecated(since = "4.2")
public CloseOptions timeout(final Duration timeout) {
this.timeout = timeout;
return this;
}
@Deprecated(since = "4.2")
public CloseOptions leaveGroup(final boolean leaveGroup) {
this.leaveGroup = leaveGroup;
return this;
@ -1438,10 +1442,14 @@ public class KafkaStreams implements AutoCloseable {
* This will block until all threads have stopped.
*/
public void close() {
close(Optional.empty(), false);
close(Optional.empty(), org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
}
private Thread shutdownHelper(final boolean error, final long timeoutMs, final boolean leaveGroup) {
private Thread shutdownHelper(
final boolean error,
final long timeoutMs,
final org.apache.kafka.streams.CloseOptions.GroupMembershipOperation operation
) {
stateDirCleaner.shutdownNow();
if (rocksDBMetricsRecordingService != null) {
rocksDBMetricsRecordingService.shutdownNow();
@ -1453,7 +1461,9 @@ public class KafkaStreams implements AutoCloseable {
return new Thread(() -> {
// notify all the threads to stop; avoid deadlocks by stopping any
// further state reports from the thread since we're shutting down
int numStreamThreads = processStreamThread(streamThread -> streamThread.shutdown(leaveGroup));
int numStreamThreads = processStreamThread(
streamThread -> streamThread.shutdown(operation)
);
log.info("Shutting down {} stream threads", numStreamThreads);
@ -1513,7 +1523,7 @@ public class KafkaStreams implements AutoCloseable {
}, clientId + "-CloseThread");
}
private boolean close(final Optional<Long> timeout, final boolean leaveGroup) {
private boolean close(final Optional<Long> timeout, final org.apache.kafka.streams.CloseOptions.GroupMembershipOperation operation) {
final long timeoutMs;
if (timeout.isPresent()) {
timeoutMs = timeout.get();
@ -1544,7 +1554,7 @@ public class KafkaStreams implements AutoCloseable {
+ "PENDING_SHUTDOWN, PENDING_ERROR, ERROR, or NOT_RUNNING");
}
final Thread shutdownThread = shutdownHelper(false, timeoutMs, leaveGroup);
final Thread shutdownThread = shutdownHelper(false, timeoutMs, operation);
shutdownThread.setDaemon(true);
shutdownThread.start();
@ -1562,7 +1572,7 @@ public class KafkaStreams implements AutoCloseable {
if (!setState(State.PENDING_ERROR)) {
log.info("Skipping shutdown since we are already in {}", state());
} else {
final Thread shutdownThread = shutdownHelper(true, -1, false);
final Thread shutdownThread = shutdownHelper(true, -1, org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
shutdownThread.setDaemon(true);
shutdownThread.start();
@ -1588,12 +1598,13 @@ public class KafkaStreams implements AutoCloseable {
throw new IllegalArgumentException("Timeout can't be negative.");
}
return close(Optional.of(timeoutMs), false);
return close(Optional.of(timeoutMs), org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
}
/**
* Shutdown this {@code KafkaStreams} by signaling all the threads to stop, and then wait up to the timeout for the
* threads to join.
* This method is deprecated and replaced by {@link #close(org.apache.kafka.streams.CloseOptions)}.
* @param options contains timeout to specify how long to wait for the threads to shut down, and a flag leaveGroup to
* trigger consumer leave call
* @return {@code true} if all threads were successfully stopped&mdash;{@code false} if the timeout was reached
@ -1601,15 +1612,36 @@ public class KafkaStreams implements AutoCloseable {
* Note that this method must not be called in the {@link StateListener#onChange(KafkaStreams.State, KafkaStreams.State)} callback of {@link StateListener}.
* @throws IllegalArgumentException if {@code timeout} can't be represented as {@code long milliseconds}
*/
@Deprecated(since = "4.2")
public synchronized boolean close(final CloseOptions options) throws IllegalArgumentException {
final org.apache.kafka.streams.CloseOptions closeOptions = org.apache.kafka.streams.CloseOptions.timeout(options.timeout)
.withGroupMembershipOperation(options.leaveGroup ?
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP :
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
return close(closeOptions);
}
/**
* Shutdown this {@code KafkaStreams} by signaling all the threads to stop, and then wait up to the timeout for the
* threads to join.
* @param options contains timeout to specify how long to wait for the threads to shut down,
* and a {@link org.apache.kafka.streams.CloseOptions.GroupMembershipOperation}
* to trigger consumer leave call or remain in the group
* @return {@code true} if all threads were successfully stopped&mdash;{@code false} if the timeout was reached
* before all threads stopped
* Note that this method must not be called in the {@link StateListener#onChange(KafkaStreams.State, KafkaStreams.State)} callback of {@link StateListener}.
* @throws IllegalArgumentException if {@code timeout} can't be represented as {@code long milliseconds}
*/
public synchronized boolean close(final org.apache.kafka.streams.CloseOptions options) throws IllegalArgumentException {
Objects.requireNonNull(options, "options cannot be null");
final String msgPrefix = prepareMillisCheckFailMsgPrefix(options.timeout, "timeout");
final long timeoutMs = validateMillisecondDuration(options.timeout, msgPrefix);
final CloseOptionsInternal optionsInternal = new CloseOptionsInternal(options);
final String msgPrefix = prepareMillisCheckFailMsgPrefix(optionsInternal.timeout(), "timeout");
final long timeoutMs = validateMillisecondDuration(optionsInternal.timeout().get(), msgPrefix);
if (timeoutMs < 0) {
throw new IllegalArgumentException("Timeout can't be negative.");
}
return close(Optional.of(timeoutMs), options.leaveGroup);
return close(Optional.of(timeoutMs), optionsInternal.operation());
}
/**

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.internals;
import org.apache.kafka.streams.CloseOptions;
import java.time.Duration;
import java.util.Optional;
public class CloseOptionsInternal extends CloseOptions {
public CloseOptionsInternal(final CloseOptions options) {
super(options);
}
public GroupMembershipOperation operation() {
return operation;
}
public Optional<Duration> timeout() {
return timeout;
}
}

View File

@ -91,9 +91,9 @@ import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
@ -367,7 +367,8 @@ public class StreamThread extends Thread implements ProcessingThread {
// These are used to signal from outside the stream thread, but the variables themselves are internal to the thread
private final AtomicLong cacheResizeSize = new AtomicLong(-1L);
private final AtomicBoolean leaveGroupRequested = new AtomicBoolean(false);
private final AtomicReference<org.apache.kafka.streams.CloseOptions.GroupMembershipOperation> leaveGroupRequested =
new AtomicReference<>(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
private final AtomicLong lastShutdownWarningTimestamp = new AtomicLong(0L);
private final boolean eosEnabled;
private final boolean stateUpdaterEnabled;
@ -898,7 +899,7 @@ public class StreamThread extends Thread implements ProcessingThread {
cleanRun = runLoop();
} catch (final Throwable e) {
failedStreamThreadSensor.record();
leaveGroupRequested.set(true);
leaveGroupRequested.set(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
streamsUncaughtExceptionHandler.accept(e, false);
// Note: the above call currently rethrows the exception, so nothing below this line will be executed
} finally {
@ -1879,12 +1880,12 @@ public class StreamThread extends Thread implements ProcessingThread {
* Note that there is nothing to prevent this function from being called multiple times
* (e.g., in testing), hence the state is set only the first time
*
* @param leaveGroup this flag will control whether the consumer will leave the group on close or not
* @param operation the group membership operation to apply on shutdown. Must be one of LEAVE_GROUP or REMAIN_IN_GROUP.
*/
public void shutdown(final boolean leaveGroup) {
public void shutdown(final org.apache.kafka.streams.CloseOptions.GroupMembershipOperation operation) {
log.info("Informed to shut down");
final State oldState = setState(State.PENDING_SHUTDOWN);
leaveGroupRequested.set(leaveGroup);
leaveGroupRequested.set(operation);
if (oldState == State.CREATED) {
// The thread may not have been started. Take responsibility for shutting down
completeShutdown(true);
@ -1917,7 +1918,8 @@ public class StreamThread extends Thread implements ProcessingThread {
log.error("Failed to close changelog reader due to the following error:", e);
}
try {
final GroupMembershipOperation membershipOperation = leaveGroupRequested.get() ? LEAVE_GROUP : REMAIN_IN_GROUP;
final GroupMembershipOperation membershipOperation =
leaveGroupRequested.get() == org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP ? LEAVE_GROUP : REMAIN_IN_GROUP;
mainConsumer.close(CloseOptions.groupMembershipOperation(membershipOperation));
} catch (final Throwable e) {
log.error("Failed to close consumer due to the following error:", e);

View File

@ -19,7 +19,6 @@ package org.apache.kafka.streams;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaFuture;
@ -310,8 +309,12 @@ public class KafkaStreamsTest {
private void prepareConsumer(final StreamThread thread, final AtomicReference<StreamThread.State> state) {
doAnswer(invocation -> {
supplier.consumer.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP));
supplier.restoreConsumer.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP));
supplier.consumer.close(
org.apache.kafka.clients.consumer.CloseOptions.groupMembershipOperation(org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP)
);
supplier.restoreConsumer.close(
org.apache.kafka.clients.consumer.CloseOptions.groupMembershipOperation(org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP)
);
for (final MockProducer<byte[], byte[]> producer : supplier.producers) {
producer.close();
}
@ -320,7 +323,7 @@ public class KafkaStreamsTest {
threadStateListenerCapture.getValue().onChange(thread, StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.RUNNING);
threadStateListenerCapture.getValue().onChange(thread, StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN);
return null;
}).when(thread).shutdown(false);
}).when(thread).shutdown(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
}
private void prepareThreadLock(final StreamThread thread) {
@ -571,7 +574,7 @@ public class KafkaStreamsTest {
for (int i = 0; i < NUM_THREADS; i++) {
final StreamThread tmpThread = streams.threads.get(i);
tmpThread.shutdown(false);
tmpThread.shutdown(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
waitForCondition(() -> tmpThread.state() == StreamThread.State.DEAD,
"Thread never stopped.");
streams.threads.get(i).join();
@ -790,7 +793,7 @@ public class KafkaStreamsTest {
prepareThreadLock(streamThreadTwo);
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
streams.start();
streamThreadOne.shutdown(true);
streamThreadOne.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
final Set<ThreadMetadata> threads = streams.metadataForLocalThreads();
assertThat(threads.size(), equalTo(1));
assertThat(threads, hasItem(streamThreadTwo.threadMetadata()));
@ -1016,9 +1019,8 @@ public class KafkaStreamsTest {
() -> streams.state() == KafkaStreams.State.RUNNING,
"Streams never started.");
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ZERO);
closeOptions.leaveGroup(true);
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ZERO)
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
streams.close(closeOptions);
assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true));
@ -1041,8 +1043,7 @@ public class KafkaStreamsTest {
() -> streams.state() == KafkaStreams.State.RUNNING,
"Streams never started.");
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ZERO);
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ZERO);
streams.close(closeOptions);
assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true));
@ -1229,8 +1230,7 @@ public class KafkaStreamsTest {
prepareStreamThread(streamThreadTwo, 2);
prepareTerminableThread(streamThreadOne);
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ofMillis(10L));
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ofMillis(10L));
try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, supplier)) {
assertFalse(streams.close(closeOptions));
}
@ -1243,8 +1243,7 @@ public class KafkaStreamsTest {
prepareStreamThread(streamThreadTwo, 2);
prepareTerminableThread(streamThreadOne);
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ofMillis(-1L));
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ofMillis(-1L));
try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, supplier, time)) {
assertThrows(IllegalArgumentException.class, () -> streams.close(closeOptions));
}
@ -1257,8 +1256,7 @@ public class KafkaStreamsTest {
prepareStreamThread(streamThreadTwo, 2);
prepareTerminableThread(streamThreadOne);
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ZERO);
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ZERO);
try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, supplier)) {
assertFalse(streams.close(closeOptions));
}
@ -1275,9 +1273,8 @@ public class KafkaStreamsTest {
when(mockClientSupplier.getAdmin(any())).thenReturn(adminClient);
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ofMillis(10L));
closeOptions.leaveGroup(true);
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ofMillis(10L))
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, mockClientSupplier)) {
assertFalse(streams.close(closeOptions));
}
@ -1293,9 +1290,8 @@ public class KafkaStreamsTest {
final MockClientSupplier mockClientSupplier = spy(MockClientSupplier.class);
when(mockClientSupplier.getAdmin(any())).thenReturn(adminClient);
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ofMillis(-1L));
closeOptions.leaveGroup(true);
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ofMillis(-1L))
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, mockClientSupplier, time)) {
assertThrows(IllegalArgumentException.class, () -> streams.close(closeOptions));
}
@ -1312,9 +1308,8 @@ public class KafkaStreamsTest {
when(mockClientSupplier.getAdmin(any())).thenReturn(adminClient);
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ZERO);
closeOptions.leaveGroup(true);
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ZERO)
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, mockClientSupplier)) {
assertFalse(streams.close(closeOptions));
}

View File

@ -60,6 +60,7 @@ import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.CloseOptions;
import org.apache.kafka.streams.GroupProtocol;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
@ -247,7 +248,7 @@ public class StreamThreadTest {
if (thread.state() != State.CREATED) {
thread.taskManager().shutdown(false);
}
thread.shutdown(true);
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
thread = null;
}
final Set<Thread> t = Collections.unmodifiableSet(Thread.getAllStackTraces().keySet());
@ -409,7 +410,7 @@ public class StreamThreadTest {
assertEquals(4, stateListener.numChanges);
assertEquals(StreamThread.State.PARTITIONS_ASSIGNED, stateListener.oldState);
thread.shutdown(true);
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
assertSame(StreamThread.State.PENDING_SHUTDOWN, thread.state());
}
@ -427,14 +428,14 @@ public class StreamThreadTest {
10 * 1000,
"Thread never started.");
thread.shutdown(true);
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
TestUtils.waitForCondition(
() -> thread.state() == StreamThread.State.DEAD,
10 * 1000,
"Thread never shut down.");
thread.shutdown(true);
assertEquals(thread.state(), StreamThread.State.DEAD);
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
assertEquals(State.DEAD, thread.state());
}
@ParameterizedTest
@ -812,7 +813,7 @@ public class StreamThreadTest {
10 * 1000,
"Thread never started.");
thread.shutdown(true);
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
TestUtils.waitForCondition(
() -> thread.state() == StreamThread.State.DEAD,
10 * 1000,
@ -880,7 +881,7 @@ public class StreamThreadTest {
() -> { }
);
thread.shutdown(true);
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
// Validate that the scheduled rebalance wasn't reset then set to MAX_VALUE so we
// don't trigger one before we can shut down, since the rebalance must be ended
@ -1390,7 +1391,7 @@ public class StreamThreadTest {
10 * 1000,
"Thread never started.");
thread.shutdown(true);
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
// even if thread is no longer running, it should still be polling
// as long as the rebalance is still ongoing
@ -1426,7 +1427,7 @@ public class StreamThreadTest {
thread.setStateListener(
(t, newState, oldState) -> {
if (oldState == StreamThread.State.CREATED && newState == StreamThread.State.STARTING) {
thread.shutdown(true);
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
}
});
thread.run();
@ -1524,7 +1525,7 @@ public class StreamThreadTest {
topologyMetadata.buildAndRewriteTopology();
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
.updateThreadMetadata(adminClientId(CLIENT_ID));
thread.shutdown(true);
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
verify(taskManager).shutdown(true);
}
@ -1542,7 +1543,7 @@ public class StreamThreadTest {
topologyMetadata.buildAndRewriteTopology();
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
.updateThreadMetadata(adminClientId(CLIENT_ID));
thread.shutdown(true);
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
// Execute the run method. Verification of the mock will check that shutdown was only done once
thread.run();

View File

@ -27,6 +27,7 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.streams.CloseOptions;
import org.apache.kafka.streams.GroupProtocol;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValueTimestamp;
@ -341,9 +342,8 @@ public class DeleteStreamsGroupOffsetTest {
private void stopKSApp(String appId, String topic, KafkaStreams streams, StreamsGroupCommand.StreamsGroupService service) throws InterruptedException {
if (streams != null) {
KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ofSeconds(30));
closeOptions.leaveGroup(true);
CloseOptions closeOptions = CloseOptions.timeout(Duration.ofSeconds(30))
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
streams.close(closeOptions);
streams.cleanUp();

View File

@ -31,6 +31,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.CloseOptions;
import org.apache.kafka.streams.GroupProtocol;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValueTimestamp;
@ -512,9 +513,8 @@ public class DeleteStreamsGroupTest {
private void stopKSApp(String appId, KafkaStreams streams, StreamsGroupCommand.StreamsGroupService service) throws InterruptedException {
if (streams != null) {
KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ofSeconds(30));
closeOptions.leaveGroup(true);
CloseOptions closeOptions = CloseOptions.timeout(Duration.ofSeconds(30))
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
streams.close(closeOptions);
streams.cleanUp();

View File

@ -23,7 +23,7 @@
// and not the version installed on the machine running the task.
// Read more about the wrapper here: https://docs.gradle.org/current/userguide/gradle_wrapper.html
wrapper {
gradleVersion = project.gradleVersion
gradleVersion = versions.gradle
}
// Custom task to inject support for downloading the gradle wrapper jar if it doesn't exist.
@ -35,14 +35,12 @@ task bootstrapWrapper() {
def wrapperBasePath = "\$APP_HOME/gradle/wrapper"
def wrapperJarPath = wrapperBasePath + "/gradle-wrapper.jar"
// Add a trailing zero to the version if needed.
def fullVersion = project.gradleVersion.count(".") == 1 ? "${project.gradleVersion}.0" : versions.gradle
// Leverages the wrapper jar checked into the gradle project on github because the jar isn't
// available elsewhere. Using raw.githubusercontent.com instead of github.com because
// github.com servers deprecated TLSv1/TLSv1.1 support some time ago, so older versions
// of curl (built against OpenSSL library that doesn't support TLSv1.2) would fail to
// fetch the jar.
def wrapperBaseUrl = "https://raw.githubusercontent.com/gradle/gradle/v$fullVersion/gradle/wrapper"
def wrapperBaseUrl = "https://raw.githubusercontent.com/gradle/gradle/v$versions.gradle/gradle/wrapper"
def wrapperJarUrl = wrapperBaseUrl + "/gradle-wrapper.jar"
def bootstrapString = """
@ -59,13 +57,15 @@ task bootstrapWrapper() {
done
""".stripIndent()
String putBootstrapStringAbove = "# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script."
def wrapperScript = wrapper.scriptFile
def wrapperLines = wrapperScript.readLines()
wrapperScript.withPrintWriter { out ->
def bootstrapWritten = false
wrapperLines.each { line ->
// Print the wrapper bootstrap before the first usage of the wrapper jar.
if (!bootstrapWritten && line.contains("gradle-wrapper.jar")) {
if (!bootstrapWritten && line.contains(putBootstrapStringAbove)) {
out.println(bootstrapString)
bootstrapWritten = true
}