mirror of https://github.com/apache/kafka.git
Compare commits
8 Commits
fdee9ae97b
...
bd9faaeb04
Author | SHA1 | Date |
---|---|---|
|
bd9faaeb04 | |
|
fa4b631061 | |
|
4a5aa37169 | |
|
2938c4242e | |
|
ebae768bd8 | |
|
fa2496bb91 | |
|
c6bbbbe24d | |
|
cb60bbc583 |
|
@ -42,7 +42,7 @@ runs:
|
||||||
distribution: temurin
|
distribution: temurin
|
||||||
java-version: ${{ inputs.java-version }}
|
java-version: ${{ inputs.java-version }}
|
||||||
- name: Setup Gradle
|
- name: Setup Gradle
|
||||||
uses: gradle/actions/setup-gradle@94baf225fe0a508e581a564467443d0e2379123b # v4.3.0
|
uses: gradle/actions/setup-gradle@748248ddd2a24f49513d8f472f81c3a07d4d50e1 # v4.4.4
|
||||||
env:
|
env:
|
||||||
GRADLE_BUILD_ACTION_CACHE_DEBUG_ENABLED: true
|
GRADLE_BUILD_ACTION_CACHE_DEBUG_ENABLED: true
|
||||||
with:
|
with:
|
||||||
|
|
|
@ -38,7 +38,7 @@ run-name: Build Scans for ${{ github.event.workflow_run.display_title}}
|
||||||
jobs:
|
jobs:
|
||||||
upload-build-scan:
|
upload-build-scan:
|
||||||
# Skip this workflow if the CI run was skipped or cancelled
|
# Skip this workflow if the CI run was skipped or cancelled
|
||||||
if: (github.event.workflow_run.conclusion == 'success' || github.event.workflow_run.conclusion == 'failure')
|
if: (github.event.workflow_run.conclusion == 'success' || github.event.workflow_run.conclusion == 'failure') && github.event.workflow_run.head_branch != '4.0'
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
strategy:
|
strategy:
|
||||||
fail-fast: false
|
fail-fast: false
|
||||||
|
|
23
build.gradle
23
build.gradle
|
@ -29,22 +29,21 @@ buildscript {
|
||||||
}
|
}
|
||||||
|
|
||||||
plugins {
|
plugins {
|
||||||
id 'com.github.ben-manes.versions' version '0.48.0'
|
id 'com.github.ben-manes.versions' version '0.52.0'
|
||||||
id 'idea'
|
id 'idea'
|
||||||
id 'jacoco'
|
id 'jacoco'
|
||||||
id 'java-library'
|
id 'java-library'
|
||||||
id 'org.owasp.dependencycheck' version '8.2.1'
|
id 'org.owasp.dependencycheck' version '12.1.3'
|
||||||
id 'org.nosphere.apache.rat' version "0.8.1"
|
id 'org.nosphere.apache.rat' version "0.8.1"
|
||||||
id "io.swagger.core.v3.swagger-gradle-plugin" version "${swaggerVersion}"
|
id "io.swagger.core.v3.swagger-gradle-plugin" version "${swaggerVersion}"
|
||||||
|
|
||||||
id "com.github.spotbugs" version '6.2.3' apply false
|
id "com.github.spotbugs" version '6.2.5' apply false
|
||||||
id 'org.scoverage' version '8.0.3' apply false
|
id 'org.scoverage' version '8.0.3' apply false
|
||||||
id 'com.gradleup.shadow' version '8.3.6' apply false
|
id 'com.gradleup.shadow' version '8.3.9' apply false
|
||||||
id 'com.diffplug.spotless' version "6.25.0"
|
id 'com.diffplug.spotless' version "7.2.1"
|
||||||
}
|
}
|
||||||
|
|
||||||
ext {
|
ext {
|
||||||
gradleVersion = versions.gradle
|
|
||||||
minClientJavaVersion = 11
|
minClientJavaVersion = 11
|
||||||
minNonClientJavaVersion = 17
|
minNonClientJavaVersion = 17
|
||||||
modulesNeedingJava11 = [":clients", ":generator", ":streams", ":streams:test-utils", ":streams:examples", ":streams-scala", ":test-common:test-common-util"]
|
modulesNeedingJava11 = [":clients", ":generator", ":streams", ":streams:test-utils", ":streams:examples", ":streams-scala", ":test-common:test-common-util"]
|
||||||
|
@ -297,7 +296,7 @@ if (repo != null) {
|
||||||
} else {
|
} else {
|
||||||
rat.enabled = false
|
rat.enabled = false
|
||||||
}
|
}
|
||||||
println("Starting build with version $version (commit id ${commitId == null ? "null" : commitId.take(8)}) using Gradle $gradleVersion, Java ${JavaVersion.current()} and Scala ${versions.scala}")
|
println("Starting build with version $version (commit id ${commitId == null ? "null" : commitId.take(8)}) using Gradle $versions.gradle, Java ${JavaVersion.current()} and Scala ${versions.scala}")
|
||||||
println("Build properties: ignoreFailures=$userIgnoreFailures, maxParallelForks=$maxTestForks, maxScalacThreads=$maxScalacThreads, maxTestRetries=$userMaxTestRetries")
|
println("Build properties: ignoreFailures=$userIgnoreFailures, maxParallelForks=$maxTestForks, maxScalacThreads=$maxScalacThreads, maxTestRetries=$userMaxTestRetries")
|
||||||
|
|
||||||
subprojects {
|
subprojects {
|
||||||
|
@ -328,6 +327,16 @@ subprojects {
|
||||||
tasks.register('uploadArchives').configure { dependsOn(publish) }
|
tasks.register('uploadArchives').configure { dependsOn(publish) }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tasks.withType(AbstractArchiveTask).configureEach {
|
||||||
|
reproducibleFileOrder = false
|
||||||
|
preserveFileTimestamps = true
|
||||||
|
useFileSystemPermissions()
|
||||||
|
}
|
||||||
|
|
||||||
|
tasks.withType(AbstractTestTask).configureEach {
|
||||||
|
failOnNoDiscoveredTests = false
|
||||||
|
}
|
||||||
|
|
||||||
// apply the eclipse plugin only to subprojects that hold code. 'connect' is just a folder.
|
// apply the eclipse plugin only to subprojects that hold code. 'connect' is just a folder.
|
||||||
if (!project.name.equals('connect')) {
|
if (!project.name.equals('connect')) {
|
||||||
apply plugin: 'eclipse'
|
apply plugin: 'eclipse'
|
||||||
|
|
|
@ -1,132 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.kafka.clients.admin;
|
|
||||||
|
|
||||||
import org.apache.kafka.common.Uuid;
|
|
||||||
import org.apache.kafka.common.errors.InconsistentClusterIdException;
|
|
||||||
import org.apache.kafka.common.test.KafkaClusterTestKit;
|
|
||||||
import org.apache.kafka.common.test.TestKitNodes;
|
|
||||||
import org.apache.kafka.test.TestUtils;
|
|
||||||
|
|
||||||
import org.junit.jupiter.api.Tag;
|
|
||||||
import org.junit.jupiter.api.Test;
|
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
|
||||||
|
|
||||||
@Tag("integration")
|
|
||||||
public class ReconfigurableQuorumIntegrationTest {
|
|
||||||
|
|
||||||
static Map<Integer, Uuid> descVoterDirs(Admin admin) throws ExecutionException, InterruptedException {
|
|
||||||
var quorumInfo = admin.describeMetadataQuorum().quorumInfo().get();
|
|
||||||
return quorumInfo.voters().stream().collect(Collectors.toMap(QuorumInfo.ReplicaState::replicaId, QuorumInfo.ReplicaState::replicaDirectoryId));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
|
|
||||||
final var nodes = new TestKitNodes.Builder()
|
|
||||||
.setClusterId("test-cluster")
|
|
||||||
.setNumBrokerNodes(1)
|
|
||||||
.setNumControllerNodes(3)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
final Map<Integer, Uuid> initialVoters = new HashMap<>();
|
|
||||||
for (final var controllerNode : nodes.controllerNodes().values()) {
|
|
||||||
initialVoters.put(
|
|
||||||
controllerNode.id(),
|
|
||||||
controllerNode.metadataDirectoryId()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
|
|
||||||
cluster.format();
|
|
||||||
cluster.startup();
|
|
||||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
|
||||||
TestUtils.waitForCondition(() -> {
|
|
||||||
Map<Integer, Uuid> voters = descVoterDirs(admin);
|
|
||||||
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
|
||||||
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
|
|
||||||
}, "Initial quorum voters should be {3000, 3001, 3002} and all should have non-zero directory IDs");
|
|
||||||
|
|
||||||
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
|
|
||||||
admin.removeRaftVoter(
|
|
||||||
3000,
|
|
||||||
dirId,
|
|
||||||
new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
|
|
||||||
).all().get();
|
|
||||||
TestUtils.waitForCondition(() -> {
|
|
||||||
Map<Integer, Uuid> voters = descVoterDirs(admin);
|
|
||||||
assertEquals(Set.of(3001, 3002), voters.keySet());
|
|
||||||
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
|
|
||||||
}, "After removing voter 3000, remaining voters should be {3001, 3002} with non-zero directory IDs");
|
|
||||||
|
|
||||||
admin.addRaftVoter(
|
|
||||||
3000,
|
|
||||||
dirId,
|
|
||||||
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
|
|
||||||
new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
|
|
||||||
).all().get();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception {
|
|
||||||
final var nodes = new TestKitNodes.Builder()
|
|
||||||
.setClusterId("test-cluster")
|
|
||||||
.setNumBrokerNodes(1)
|
|
||||||
.setNumControllerNodes(3)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
final Map<Integer, Uuid> initialVoters = new HashMap<>();
|
|
||||||
for (final var controllerNode : nodes.controllerNodes().values()) {
|
|
||||||
initialVoters.put(
|
|
||||||
controllerNode.id(),
|
|
||||||
controllerNode.metadataDirectoryId()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
|
|
||||||
cluster.format();
|
|
||||||
cluster.startup();
|
|
||||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
|
||||||
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
|
|
||||||
var removeFuture = admin.removeRaftVoter(
|
|
||||||
3000,
|
|
||||||
dirId,
|
|
||||||
new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
|
|
||||||
).all();
|
|
||||||
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture);
|
|
||||||
|
|
||||||
var addFuture = admin.addRaftVoter(
|
|
||||||
3000,
|
|
||||||
dirId,
|
|
||||||
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
|
|
||||||
new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
|
|
||||||
).all();
|
|
||||||
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -36,6 +36,7 @@ import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static java.util.Arrays.asList;
|
import static java.util.Arrays.asList;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
public class Tls13SelectorTest extends SslSelectorTest {
|
public class Tls13SelectorTest extends SslSelectorTest {
|
||||||
|
|
||||||
|
@ -47,11 +48,27 @@ public class Tls13SelectorTest extends SslSelectorTest {
|
||||||
return configs;
|
return configs;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Flaky(value = "KAFKA-14249", comment = "Copied from base class. Remove this override once the flakiness has been resolved.")
|
@Flaky(value = "KAFKA-14249", comment = "Stabilize TLSv1.3 idle expiry by waiting for READY and allowing a short observe window.")
|
||||||
@Test
|
@Test
|
||||||
@Override
|
@Override
|
||||||
public void testCloseOldestConnection() throws Exception {
|
public void testCloseOldestConnection() throws Exception {
|
||||||
super.testCloseOldestConnection();
|
String id = "0";
|
||||||
|
selector.connect(id, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE);
|
||||||
|
NetworkTestUtils.waitForChannelReady(selector, id);
|
||||||
|
selector.poll(50);
|
||||||
|
time.sleep(10_000L);
|
||||||
|
TestUtils.waitForCondition(() -> {
|
||||||
|
try {
|
||||||
|
selector.poll(50);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
ChannelState state = selector.disconnected().get(id);
|
||||||
|
return state == ChannelState.EXPIRED;
|
||||||
|
}, 30_000L, "Expected idle connection to expire");
|
||||||
|
|
||||||
|
assertTrue(selector.disconnected().containsKey(id), "The idle connection should have been closed");
|
||||||
|
assertEquals(ChannelState.EXPIRED, selector.disconnected().get(id));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -26,9 +26,9 @@ import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
|
||||||
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse
|
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse
|
||||||
import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment
|
import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment
|
||||||
import org.apache.kafka.common.message.WriteTxnMarkersRequestData.{WritableTxnMarker, WritableTxnMarkerTopic}
|
import org.apache.kafka.common.message.WriteTxnMarkersRequestData.{WritableTxnMarker, WritableTxnMarkerTopic}
|
||||||
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData, DeleteGroupsResponseData, DescribeGroupsRequestData, DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData, HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeRequestData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData, WriteTxnMarkersRequestData}
|
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData, DeleteGroupsResponseData, DescribeGroupsRequestData, DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData, HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeRequestData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, StreamsGroupDescribeRequestData, StreamsGroupDescribeResponseData, StreamsGroupHeartbeatRequestData, StreamsGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData, WriteTxnMarkersRequestData}
|
||||||
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
|
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
|
||||||
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, EndTxnRequest, EndTxnResponse, HeartbeatRequest, HeartbeatResponse, InitProducerIdRequest, InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsRequest, ListGroupsResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetDeleteRequest, OffsetDeleteResponse, OffsetFetchRequest, OffsetFetchResponse, ShareGroupDescribeRequest, ShareGroupDescribeResponse, ShareGroupHeartbeatRequest, ShareGroupHeartbeatResponse, SyncGroupRequest, SyncGroupResponse, TxnOffsetCommitRequest, TxnOffsetCommitResponse, WriteTxnMarkersRequest, WriteTxnMarkersResponse}
|
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, EndTxnRequest, EndTxnResponse, HeartbeatRequest, HeartbeatResponse, InitProducerIdRequest, InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsRequest, ListGroupsResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetDeleteRequest, OffsetDeleteResponse, OffsetFetchRequest, OffsetFetchResponse, ShareGroupDescribeRequest, ShareGroupDescribeResponse, ShareGroupHeartbeatRequest, ShareGroupHeartbeatResponse, StreamsGroupDescribeRequest, StreamsGroupDescribeResponse, StreamsGroupHeartbeatRequest, StreamsGroupHeartbeatResponse, SyncGroupRequest, SyncGroupResponse, TxnOffsetCommitRequest, TxnOffsetCommitResponse, WriteTxnMarkersRequest, WriteTxnMarkersResponse}
|
||||||
import org.apache.kafka.common.serialization.StringSerializer
|
import org.apache.kafka.common.serialization.StringSerializer
|
||||||
import org.apache.kafka.common.test.ClusterInstance
|
import org.apache.kafka.common.test.ClusterInstance
|
||||||
import org.apache.kafka.common.utils.ProducerIdAndEpoch
|
import org.apache.kafka.common.utils.ProducerIdAndEpoch
|
||||||
|
@ -768,6 +768,21 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
|
||||||
shareGroupDescribeResponse.data.groups.asScala.toList
|
shareGroupDescribeResponse.data.groups.asScala.toList
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected def streamsGroupDescribe(
|
||||||
|
groupIds: List[String],
|
||||||
|
includeAuthorizedOperations: Boolean = false,
|
||||||
|
version: Short = ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)
|
||||||
|
): List[StreamsGroupDescribeResponseData.DescribedGroup] = {
|
||||||
|
val streamsGroupDescribeRequest = new StreamsGroupDescribeRequest.Builder(
|
||||||
|
new StreamsGroupDescribeRequestData()
|
||||||
|
.setGroupIds(groupIds.asJava)
|
||||||
|
.setIncludeAuthorizedOperations(includeAuthorizedOperations)
|
||||||
|
).build(version)
|
||||||
|
|
||||||
|
val streamsGroupDescribeResponse = connectAndReceive[StreamsGroupDescribeResponse](streamsGroupDescribeRequest)
|
||||||
|
streamsGroupDescribeResponse.data.groups.asScala.toList
|
||||||
|
}
|
||||||
|
|
||||||
protected def heartbeat(
|
protected def heartbeat(
|
||||||
groupId: String,
|
groupId: String,
|
||||||
generationId: Int,
|
generationId: Int,
|
||||||
|
@ -855,6 +870,41 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
|
||||||
shareGroupHeartbeatResponse.data
|
shareGroupHeartbeatResponse.data
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected def streamsGroupHeartbeat(
|
||||||
|
groupId: String,
|
||||||
|
memberId: String = "",
|
||||||
|
memberEpoch: Int = 0,
|
||||||
|
rebalanceTimeoutMs: Int = -1,
|
||||||
|
activeTasks: List[StreamsGroupHeartbeatRequestData.TaskIds] = null,
|
||||||
|
standbyTasks: List[StreamsGroupHeartbeatRequestData.TaskIds] = null,
|
||||||
|
warmupTasks: List[StreamsGroupHeartbeatRequestData.TaskIds] = null,
|
||||||
|
topology: StreamsGroupHeartbeatRequestData.Topology = null,
|
||||||
|
expectedError: Errors = Errors.NONE,
|
||||||
|
version: Short = ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion(isUnstableApiEnabled)
|
||||||
|
): StreamsGroupHeartbeatResponseData = {
|
||||||
|
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequest.Builder(
|
||||||
|
new StreamsGroupHeartbeatRequestData()
|
||||||
|
.setGroupId(groupId)
|
||||||
|
.setMemberId(memberId)
|
||||||
|
.setMemberEpoch(memberEpoch)
|
||||||
|
.setRebalanceTimeoutMs(rebalanceTimeoutMs)
|
||||||
|
.setActiveTasks(activeTasks.asJava)
|
||||||
|
.setStandbyTasks(standbyTasks.asJava)
|
||||||
|
.setWarmupTasks(warmupTasks.asJava)
|
||||||
|
.setTopology(topology)
|
||||||
|
).build(version)
|
||||||
|
|
||||||
|
// Send the request until receiving a successful response. There is a delay
|
||||||
|
// here because the group coordinator is loaded in the background.
|
||||||
|
var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponse = null
|
||||||
|
TestUtils.waitUntilTrue(() => {
|
||||||
|
streamsGroupHeartbeatResponse = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest)
|
||||||
|
streamsGroupHeartbeatResponse.data.errorCode == expectedError.code
|
||||||
|
}, msg = s"Could not heartbeat successfully. Last response $streamsGroupHeartbeatResponse.")
|
||||||
|
|
||||||
|
streamsGroupHeartbeatResponse.data
|
||||||
|
}
|
||||||
|
|
||||||
protected def leaveGroupWithNewProtocol(
|
protected def leaveGroupWithNewProtocol(
|
||||||
groupId: String,
|
groupId: String,
|
||||||
memberId: String
|
memberId: String
|
||||||
|
|
|
@ -0,0 +1,316 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package kafka.server
|
||||||
|
|
||||||
|
import kafka.utils.TestUtils
|
||||||
|
import org.apache.kafka.common.message.{StreamsGroupDescribeRequestData, StreamsGroupDescribeResponseData, StreamsGroupHeartbeatRequestData, StreamsGroupHeartbeatResponseData}
|
||||||
|
import org.apache.kafka.common.utils.Utils
|
||||||
|
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
|
||||||
|
import org.apache.kafka.common.requests.{StreamsGroupDescribeRequest, StreamsGroupDescribeResponse}
|
||||||
|
import org.apache.kafka.common.resource.ResourceType
|
||||||
|
import org.apache.kafka.common.test.ClusterInstance
|
||||||
|
import org.apache.kafka.common.test.api._
|
||||||
|
|
||||||
|
import scala.jdk.CollectionConverters._
|
||||||
|
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
|
||||||
|
import org.apache.kafka.security.authorizer.AclEntry
|
||||||
|
import org.apache.kafka.server.common.Feature
|
||||||
|
import org.junit.Assert.{assertEquals, assertTrue}
|
||||||
|
|
||||||
|
import java.lang.{Byte => JByte}
|
||||||
|
|
||||||
|
@ClusterTestDefaults(
|
||||||
|
types = Array(Type.KRAFT),
|
||||||
|
brokers = 1,
|
||||||
|
serverProperties = Array(
|
||||||
|
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
|
||||||
|
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
class StreamsGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
|
||||||
|
|
||||||
|
@ClusterTest(
|
||||||
|
features = Array(
|
||||||
|
new ClusterFeature(feature = Feature.STREAMS_VERSION, version = 0)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
def testStreamsGroupDescribeWhenFeatureFlagNotEnabled(): Unit = {
|
||||||
|
val streamsGroupDescribeRequest = new StreamsGroupDescribeRequest.Builder(
|
||||||
|
new StreamsGroupDescribeRequestData().setGroupIds(List("grp-mock-1", "grp-mock-2").asJava)
|
||||||
|
).build(ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled))
|
||||||
|
|
||||||
|
val streamsGroupDescribeResponse = connectAndReceive[StreamsGroupDescribeResponse](streamsGroupDescribeRequest)
|
||||||
|
val expectedResponse = new StreamsGroupDescribeResponseData()
|
||||||
|
expectedResponse.groups().add(
|
||||||
|
new StreamsGroupDescribeResponseData.DescribedGroup()
|
||||||
|
.setGroupId("grp-mock-1")
|
||||||
|
.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
|
||||||
|
)
|
||||||
|
expectedResponse.groups().add(
|
||||||
|
new StreamsGroupDescribeResponseData.DescribedGroup()
|
||||||
|
.setGroupId("grp-mock-2")
|
||||||
|
.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
|
||||||
|
)
|
||||||
|
assertEquals(expectedResponse, streamsGroupDescribeResponse.data)
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest(
|
||||||
|
serverProperties = Array(
|
||||||
|
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer,streams"),
|
||||||
|
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
|
||||||
|
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
def testStreamsGroupDescribeGroupsWithNewGroupCoordinator(): Unit = {
|
||||||
|
// Creates the __consumer_offsets topics because it won't be created automatically
|
||||||
|
// in this test because it does not use FindCoordinator API.
|
||||||
|
createOffsetsTopic()
|
||||||
|
|
||||||
|
val admin = cluster.admin()
|
||||||
|
val topicName = "foo"
|
||||||
|
|
||||||
|
try {
|
||||||
|
TestUtils.createTopicWithAdminRaw(
|
||||||
|
admin = admin,
|
||||||
|
topic = topicName,
|
||||||
|
numPartitions = 3
|
||||||
|
)
|
||||||
|
|
||||||
|
TestUtils.waitUntilTrue(() => {
|
||||||
|
admin.listTopics().names().get().contains(topicName)
|
||||||
|
}, msg = s"Topic $topicName is not available to the group coordinator")
|
||||||
|
|
||||||
|
val timeoutMs = 5 * 60 * 1000
|
||||||
|
val clientId = "client-id"
|
||||||
|
val clientHost = "/127.0.0.1"
|
||||||
|
val authorizedOperationsInt = Utils.to32BitField(
|
||||||
|
AclEntry.supportedOperations(ResourceType.GROUP).asScala
|
||||||
|
.map(_.code.asInstanceOf[JByte]).asJava)
|
||||||
|
|
||||||
|
var grp1Member1Response: StreamsGroupHeartbeatResponseData = null
|
||||||
|
var grp1Member2Response: StreamsGroupHeartbeatResponseData = null
|
||||||
|
var grp2Member1Response: StreamsGroupHeartbeatResponseData = null
|
||||||
|
var grp2Member2Response: StreamsGroupHeartbeatResponseData = null
|
||||||
|
|
||||||
|
// grp-1 with 2 members
|
||||||
|
TestUtils.waitUntilTrue(() => {
|
||||||
|
grp1Member1Response = streamsGroupHeartbeat(
|
||||||
|
groupId = "grp-1",
|
||||||
|
memberId = "member-1",
|
||||||
|
rebalanceTimeoutMs = timeoutMs,
|
||||||
|
activeTasks = List.empty,
|
||||||
|
standbyTasks = List.empty,
|
||||||
|
warmupTasks = List.empty,
|
||||||
|
topology = new StreamsGroupHeartbeatRequestData.Topology()
|
||||||
|
.setEpoch(1)
|
||||||
|
.setSubtopologies(List(
|
||||||
|
new StreamsGroupHeartbeatRequestData.Subtopology()
|
||||||
|
.setSubtopologyId("subtopology-1")
|
||||||
|
.setSourceTopics(List(topicName).asJava)
|
||||||
|
.setRepartitionSinkTopics(List.empty.asJava)
|
||||||
|
.setRepartitionSourceTopics(List.empty.asJava)
|
||||||
|
.setStateChangelogTopics(List.empty.asJava)
|
||||||
|
).asJava)
|
||||||
|
)
|
||||||
|
grp1Member2Response = streamsGroupHeartbeat(
|
||||||
|
groupId = "grp-1",
|
||||||
|
memberId = "member-2",
|
||||||
|
rebalanceTimeoutMs = timeoutMs,
|
||||||
|
activeTasks = List.empty,
|
||||||
|
standbyTasks = List.empty,
|
||||||
|
warmupTasks = List.empty,
|
||||||
|
topology = new StreamsGroupHeartbeatRequestData.Topology()
|
||||||
|
.setEpoch(1)
|
||||||
|
.setSubtopologies(List(
|
||||||
|
new StreamsGroupHeartbeatRequestData.Subtopology()
|
||||||
|
.setSubtopologyId("subtopology-1")
|
||||||
|
.setSourceTopics(List(topicName).asJava)
|
||||||
|
.setRepartitionSinkTopics(List.empty.asJava)
|
||||||
|
.setRepartitionSourceTopics(List.empty.asJava)
|
||||||
|
.setStateChangelogTopics(List.empty.asJava)
|
||||||
|
).asJava)
|
||||||
|
)
|
||||||
|
|
||||||
|
val groupsDescription1 = streamsGroupDescribe(
|
||||||
|
groupIds = List("grp-1"),
|
||||||
|
includeAuthorizedOperations = true
|
||||||
|
)
|
||||||
|
grp1Member1Response.errorCode == Errors.NONE.code && grp1Member2Response.errorCode == Errors.NONE.code &&
|
||||||
|
groupsDescription1.size == 1 && groupsDescription1.head.members.size == 2
|
||||||
|
}, msg = s"Could not create grp-1 with 2 members successfully")
|
||||||
|
|
||||||
|
// grp-2 with 2 members
|
||||||
|
TestUtils.waitUntilTrue(() => {
|
||||||
|
grp2Member1Response = streamsGroupHeartbeat(
|
||||||
|
groupId = "grp-2",
|
||||||
|
memberId = "member-3",
|
||||||
|
rebalanceTimeoutMs = timeoutMs,
|
||||||
|
activeTasks = List.empty,
|
||||||
|
standbyTasks = List.empty,
|
||||||
|
warmupTasks = List.empty,
|
||||||
|
topology = new StreamsGroupHeartbeatRequestData.Topology()
|
||||||
|
.setEpoch(1)
|
||||||
|
.setSubtopologies(List(
|
||||||
|
new StreamsGroupHeartbeatRequestData.Subtopology()
|
||||||
|
.setSubtopologyId("subtopology-1")
|
||||||
|
.setSourceTopics(List(topicName).asJava)
|
||||||
|
.setRepartitionSinkTopics(List.empty.asJava)
|
||||||
|
.setRepartitionSourceTopics(List.empty.asJava)
|
||||||
|
.setStateChangelogTopics(List.empty.asJava)
|
||||||
|
).asJava)
|
||||||
|
)
|
||||||
|
grp2Member2Response = streamsGroupHeartbeat(
|
||||||
|
groupId = "grp-2",
|
||||||
|
memberId = "member-4",
|
||||||
|
rebalanceTimeoutMs = timeoutMs,
|
||||||
|
activeTasks = List.empty,
|
||||||
|
standbyTasks = List.empty,
|
||||||
|
warmupTasks = List.empty,
|
||||||
|
topology = new StreamsGroupHeartbeatRequestData.Topology()
|
||||||
|
.setEpoch(1)
|
||||||
|
.setSubtopologies(List(
|
||||||
|
new StreamsGroupHeartbeatRequestData.Subtopology()
|
||||||
|
.setSubtopologyId("subtopology-1")
|
||||||
|
.setSourceTopics(List(topicName).asJava)
|
||||||
|
.setRepartitionSinkTopics(List.empty.asJava)
|
||||||
|
.setRepartitionSourceTopics(List.empty.asJava)
|
||||||
|
.setStateChangelogTopics(List.empty.asJava)
|
||||||
|
).asJava)
|
||||||
|
)
|
||||||
|
val groupsDescription2 = streamsGroupDescribe(
|
||||||
|
groupIds = List("grp-2"),
|
||||||
|
includeAuthorizedOperations = true,
|
||||||
|
version = ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled).toShort
|
||||||
|
)
|
||||||
|
|
||||||
|
grp2Member1Response.errorCode == Errors.NONE.code && grp2Member2Response.errorCode == Errors.NONE.code &&
|
||||||
|
groupsDescription2.size == 1 && groupsDescription2.head.members.size == 2
|
||||||
|
}, msg = s"Could not create grp-2 with 2 members successfully")
|
||||||
|
|
||||||
|
// Send follow-up heartbeats until both groups are stable
|
||||||
|
TestUtils.waitUntilTrue(() => {
|
||||||
|
grp1Member1Response = streamsGroupHeartbeat(
|
||||||
|
groupId = "grp-1",
|
||||||
|
memberId = grp1Member1Response.memberId,
|
||||||
|
memberEpoch = grp1Member1Response.memberEpoch,
|
||||||
|
rebalanceTimeoutMs = timeoutMs,
|
||||||
|
activeTasks = convertTaskIds(grp1Member1Response.activeTasks),
|
||||||
|
standbyTasks = convertTaskIds(grp1Member1Response.standbyTasks),
|
||||||
|
warmupTasks = convertTaskIds(grp1Member1Response.warmupTasks),
|
||||||
|
topology = null
|
||||||
|
)
|
||||||
|
grp1Member2Response = streamsGroupHeartbeat(
|
||||||
|
groupId = "grp-1",
|
||||||
|
memberId = grp1Member2Response.memberId,
|
||||||
|
memberEpoch = grp1Member2Response.memberEpoch,
|
||||||
|
rebalanceTimeoutMs = timeoutMs,
|
||||||
|
activeTasks = convertTaskIds(grp1Member2Response.activeTasks),
|
||||||
|
standbyTasks = convertTaskIds(grp1Member2Response.standbyTasks),
|
||||||
|
warmupTasks = convertTaskIds(grp1Member2Response.warmupTasks),
|
||||||
|
topology = null
|
||||||
|
)
|
||||||
|
grp2Member1Response = streamsGroupHeartbeat(
|
||||||
|
groupId = "grp-2",
|
||||||
|
memberId = grp2Member1Response.memberId,
|
||||||
|
memberEpoch = grp2Member1Response.memberEpoch,
|
||||||
|
rebalanceTimeoutMs = timeoutMs,
|
||||||
|
activeTasks = convertTaskIds(grp2Member1Response.activeTasks),
|
||||||
|
standbyTasks = convertTaskIds(grp2Member1Response.standbyTasks),
|
||||||
|
warmupTasks = convertTaskIds(grp2Member1Response.warmupTasks),
|
||||||
|
topology = null
|
||||||
|
)
|
||||||
|
grp2Member2Response = streamsGroupHeartbeat(
|
||||||
|
groupId = "grp-2",
|
||||||
|
memberId = grp2Member2Response.memberId,
|
||||||
|
memberEpoch = grp2Member2Response.memberEpoch,
|
||||||
|
rebalanceTimeoutMs = timeoutMs,
|
||||||
|
activeTasks = convertTaskIds(grp2Member2Response.activeTasks),
|
||||||
|
standbyTasks = convertTaskIds(grp2Member2Response.standbyTasks),
|
||||||
|
warmupTasks = convertTaskIds(grp2Member2Response.warmupTasks),
|
||||||
|
topology = null
|
||||||
|
)
|
||||||
|
val actual = streamsGroupDescribe(
|
||||||
|
groupIds = List("grp-1","grp-2"),
|
||||||
|
includeAuthorizedOperations = true,
|
||||||
|
version = ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled).toShort
|
||||||
|
)
|
||||||
|
actual.head.groupState() == "Stable" && actual(1).groupState() == "Stable" &&
|
||||||
|
actual.head.members.size == 2 && actual(1).members.size == 2
|
||||||
|
}, "Two groups did not stabilize with 2 members each in time")
|
||||||
|
|
||||||
|
// Test the describe request for both groups in stable state
|
||||||
|
for (version <- ApiKeys.STREAMS_GROUP_DESCRIBE.oldestVersion() to ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)) {
|
||||||
|
val actual = streamsGroupDescribe(
|
||||||
|
groupIds = List("grp-1","grp-2"),
|
||||||
|
includeAuthorizedOperations = true,
|
||||||
|
version = version.toShort
|
||||||
|
)
|
||||||
|
|
||||||
|
assertEquals(2, actual.size)
|
||||||
|
assertEquals(actual.map(_.groupId).toSet, Set("grp-1", "grp-2"))
|
||||||
|
for (describedGroup <- actual) {
|
||||||
|
assertEquals("Stable", describedGroup.groupState)
|
||||||
|
assertTrue("Group epoch is not equal to the assignment epoch", describedGroup.groupEpoch == describedGroup.assignmentEpoch)
|
||||||
|
// Verify topology
|
||||||
|
assertEquals(1, describedGroup.topology.epoch)
|
||||||
|
assertEquals(1, describedGroup.topology.subtopologies.size)
|
||||||
|
assertEquals("subtopology-1", describedGroup.topology.subtopologies.get(0).subtopologyId)
|
||||||
|
assertEquals(List(topicName).asJava, describedGroup.topology.subtopologies.get(0).sourceTopics)
|
||||||
|
|
||||||
|
// Verify members
|
||||||
|
assertEquals(2, describedGroup.members.size)
|
||||||
|
val expectedMemberIds = describedGroup.groupId match {
|
||||||
|
case "grp-1" => Set(grp1Member1Response.memberId, grp1Member2Response.memberId)
|
||||||
|
case "grp-2" => Set(grp2Member1Response.memberId, grp2Member2Response.memberId)
|
||||||
|
case unexpected => throw new AssertionError(s"Unexpected group ID: $unexpected")
|
||||||
|
}
|
||||||
|
|
||||||
|
val actualMemberIds = describedGroup.members.asScala.map(_.memberId).toSet
|
||||||
|
assertEquals(expectedMemberIds, actualMemberIds)
|
||||||
|
assertEquals(authorizedOperationsInt, describedGroup.authorizedOperations)
|
||||||
|
|
||||||
|
describedGroup.members.asScala.foreach { member =>
|
||||||
|
assertTrue("Group epoch is not equal to the member epoch", member.memberEpoch == describedGroup.assignmentEpoch)
|
||||||
|
assertEquals(1, member.topologyEpoch)
|
||||||
|
assertEquals(member.targetAssignment, member.assignment)
|
||||||
|
assertEquals(clientId, member.clientId())
|
||||||
|
assertEquals(clientHost, member.clientHost())
|
||||||
|
}
|
||||||
|
// Verify all partitions 0, 1, 2 are assigned exactly once
|
||||||
|
val allAssignedPartitions = describedGroup.members.asScala.flatMap { member =>
|
||||||
|
member.assignment.activeTasks.asScala.flatMap(_.partitions.asScala)
|
||||||
|
}.toList
|
||||||
|
assertEquals(List(0, 1, 2).sorted, allAssignedPartitions.sorted)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally{
|
||||||
|
admin.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private def convertTaskIds(responseTasks: java.util.List[StreamsGroupHeartbeatResponseData.TaskIds]): List[StreamsGroupHeartbeatRequestData.TaskIds] = {
|
||||||
|
if (responseTasks == null) {
|
||||||
|
List.empty
|
||||||
|
} else {
|
||||||
|
responseTasks.asScala.map { responseTask =>
|
||||||
|
new StreamsGroupHeartbeatRequestData.TaskIds()
|
||||||
|
.setSubtopologyId(responseTask.subtopologyId)
|
||||||
|
.setPartitions(responseTask.partitions)
|
||||||
|
}.toList
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -189,6 +189,13 @@
|
||||||
A new metric <code>AvgIdleRatio</code> has been added to the <code>ControllerEventManager</code> group. This metric measures the average idle ratio of the controller event queue thread,
|
A new metric <code>AvgIdleRatio</code> has been added to the <code>ControllerEventManager</code> group. This metric measures the average idle ratio of the controller event queue thread,
|
||||||
providing visibility into how much time the controller spends waiting for events versus processing them. The metric value ranges from 0.0 (always busy) to 1.0 (always idle).
|
providing visibility into how much time the controller spends waiting for events versus processing them. The metric value ranges from 0.0 (always busy) to 1.0 (always idle).
|
||||||
</li>
|
</li>
|
||||||
|
<li>
|
||||||
|
Deprecated <code>org.apache.kafka.streams.KafkaStreams$CloseOptions</code> and its related methods, such as
|
||||||
|
<code>KafkaStreams#close(org.apache.kafka.streams.KafkaStreams$CloseOptions)</code>.
|
||||||
|
As a replacement, please use <code>org.apache.kafka.streams.CloseOptions</code> and
|
||||||
|
<code>KafkaStreams#close(org.apache.kafka.streams.CloseOptions)</code>.
|
||||||
|
For further details, please refer to <a href="https://cwiki.apache.org/confluence/x/QAq9F">KIP-1153</a>.
|
||||||
|
</li>
|
||||||
</ul>
|
</ul>
|
||||||
|
|
||||||
<h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0</a></h4>
|
<h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0</a></h4>
|
||||||
|
|
|
@ -60,7 +60,7 @@ versions += [
|
||||||
commonsLang: "3.18.0",
|
commonsLang: "3.18.0",
|
||||||
commonsValidator: "1.10.0",
|
commonsValidator: "1.10.0",
|
||||||
classgraph: "4.8.179",
|
classgraph: "4.8.179",
|
||||||
gradle: "8.14.3",
|
gradle: "9.1.0",
|
||||||
grgit: "4.1.1",
|
grgit: "4.1.1",
|
||||||
httpclient: "4.5.14",
|
httpclient: "4.5.14",
|
||||||
jackson: "2.19.0",
|
jackson: "2.19.0",
|
||||||
|
@ -125,7 +125,7 @@ versions += [
|
||||||
snappy: "1.1.10.7",
|
snappy: "1.1.10.7",
|
||||||
spotbugs: "4.9.4",
|
spotbugs: "4.9.4",
|
||||||
mockOAuth2Server: "2.2.1",
|
mockOAuth2Server: "2.2.1",
|
||||||
zinc: "1.9.2",
|
zinc: "1.10.8",
|
||||||
// When updating the zstd version, please do as well in docker/native/native-image-configs/resource-config.json
|
// When updating the zstd version, please do as well in docker/native/native-image-configs/resource-config.json
|
||||||
// Also make sure the compression levels in org.apache.kafka.common.record.CompressionType are still valid
|
// Also make sure the compression levels in org.apache.kafka.common.record.CompressionType are still valid
|
||||||
zstd: "1.5.6-10",
|
zstd: "1.5.6-10",
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
distributionBase=GRADLE_USER_HOME
|
distributionBase=GRADLE_USER_HOME
|
||||||
distributionPath=wrapper/dists
|
distributionPath=wrapper/dists
|
||||||
distributionSha256Sum=bd71102213493060956ec229d946beee57158dbd89d0e62b91bca0fa2c5f3531
|
distributionSha256Sum=a17ddd85a26b6a7f5ddb71ff8b05fc5104c0202c6e64782429790c933686c806
|
||||||
distributionUrl=https\://services.gradle.org/distributions/gradle-8.14.3-bin.zip
|
distributionUrl=https\://services.gradle.org/distributions/gradle-9.1.0-bin.zip
|
||||||
networkTimeout=10000
|
networkTimeout=10000
|
||||||
validateDistributionUrl=true
|
validateDistributionUrl=true
|
||||||
zipStoreBase=GRADLE_USER_HOME
|
zipStoreBase=GRADLE_USER_HOME
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
#!/bin/sh
|
#!/bin/sh
|
||||||
|
|
||||||
#
|
#
|
||||||
# Copyright © 2015-2021 the original authors.
|
# Copyright © 2015 the original authors.
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
# you may not use this file except in compliance with the License.
|
# you may not use this file except in compliance with the License.
|
||||||
|
@ -15,6 +15,8 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
#
|
#
|
||||||
|
# SPDX-License-Identifier: Apache-2.0
|
||||||
|
#
|
||||||
|
|
||||||
##############################################################################
|
##############################################################################
|
||||||
#
|
#
|
||||||
|
@ -55,7 +57,7 @@
|
||||||
# Darwin, MinGW, and NonStop.
|
# Darwin, MinGW, and NonStop.
|
||||||
#
|
#
|
||||||
# (3) This script is generated from the Groovy template
|
# (3) This script is generated from the Groovy template
|
||||||
# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
|
# https://github.com/gradle/gradle/blob/HEAD/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
|
||||||
# within the Gradle project.
|
# within the Gradle project.
|
||||||
#
|
#
|
||||||
# You can find Gradle at https://github.com/gradle/gradle/.
|
# You can find Gradle at https://github.com/gradle/gradle/.
|
||||||
|
@ -84,7 +86,7 @@ done
|
||||||
# shellcheck disable=SC2034
|
# shellcheck disable=SC2034
|
||||||
APP_BASE_NAME=${0##*/}
|
APP_BASE_NAME=${0##*/}
|
||||||
# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036)
|
# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036)
|
||||||
APP_HOME=$( cd "${APP_HOME:-./}" > /dev/null && pwd -P ) || exit
|
APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s\n' "$PWD" ) || exit
|
||||||
|
|
||||||
# Use the maximum available, or set MAX_FD != -1 to use that value.
|
# Use the maximum available, or set MAX_FD != -1 to use that value.
|
||||||
MAX_FD=maximum
|
MAX_FD=maximum
|
||||||
|
@ -113,20 +115,6 @@ case "$( uname )" in #(
|
||||||
esac
|
esac
|
||||||
|
|
||||||
|
|
||||||
# Loop in case we encounter an error.
|
|
||||||
for attempt in 1 2 3; do
|
|
||||||
if [ ! -e "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" ]; then
|
|
||||||
if ! curl -s -S --retry 3 -L -o "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" "https://raw.githubusercontent.com/gradle/gradle/v8.14.3/gradle/wrapper/gradle-wrapper.jar"; then
|
|
||||||
rm -f "$APP_HOME/gradle/wrapper/gradle-wrapper.jar"
|
|
||||||
# Pause for a bit before looping in case the server throttled us.
|
|
||||||
sleep 5
|
|
||||||
continue
|
|
||||||
fi
|
|
||||||
fi
|
|
||||||
done
|
|
||||||
|
|
||||||
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
|
|
||||||
|
|
||||||
|
|
||||||
# Determine the Java command to use to start the JVM.
|
# Determine the Java command to use to start the JVM.
|
||||||
if [ -n "$JAVA_HOME" ] ; then
|
if [ -n "$JAVA_HOME" ] ; then
|
||||||
|
@ -183,7 +171,6 @@ fi
|
||||||
# For Cygwin or MSYS, switch paths to Windows format before running java
|
# For Cygwin or MSYS, switch paths to Windows format before running java
|
||||||
if "$cygwin" || "$msys" ; then
|
if "$cygwin" || "$msys" ; then
|
||||||
APP_HOME=$( cygpath --path --mixed "$APP_HOME" )
|
APP_HOME=$( cygpath --path --mixed "$APP_HOME" )
|
||||||
CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" )
|
|
||||||
|
|
||||||
JAVACMD=$( cygpath --unix "$JAVACMD" )
|
JAVACMD=$( cygpath --unix "$JAVACMD" )
|
||||||
|
|
||||||
|
@ -212,19 +199,31 @@ if "$cygwin" || "$msys" ; then
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# Loop in case we encounter an error.
|
||||||
|
for attempt in 1 2 3; do
|
||||||
|
if [ ! -e "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" ]; then
|
||||||
|
if ! curl -s -S --retry 3 -L -o "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" "https://raw.githubusercontent.com/gradle/gradle/v9.1.0/gradle/wrapper/gradle-wrapper.jar"; then
|
||||||
|
rm -f "$APP_HOME/gradle/wrapper/gradle-wrapper.jar"
|
||||||
|
# Pause for a bit before looping in case the server throttled us.
|
||||||
|
sleep 5
|
||||||
|
continue
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
|
||||||
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
|
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
|
||||||
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
|
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
|
||||||
|
|
||||||
# Collect all arguments for the java command:
|
# Collect all arguments for the java command:
|
||||||
# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments,
|
# * DEFAULT_JVM_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments,
|
||||||
# and any embedded shellness will be escaped.
|
# and any embedded shellness will be escaped.
|
||||||
# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be
|
# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be
|
||||||
# treated as '${Hostname}' itself on the command line.
|
# treated as '${Hostname}' itself on the command line.
|
||||||
|
|
||||||
set -- \
|
set -- \
|
||||||
"-Dorg.gradle.appname=$APP_BASE_NAME" \
|
"-Dorg.gradle.appname=$APP_BASE_NAME" \
|
||||||
-classpath "$CLASSPATH" \
|
-jar "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" \
|
||||||
org.gradle.wrapper.GradleWrapperMain \
|
|
||||||
"$@"
|
"$@"
|
||||||
|
|
||||||
# Stop when "xargs" is not available.
|
# Stop when "xargs" is not available.
|
||||||
|
|
|
@ -15,13 +15,16 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package kafka.server;
|
package org.apache.kafka.server;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.admin.AddRaftVoterOptions;
|
||||||
import org.apache.kafka.clients.admin.Admin;
|
import org.apache.kafka.clients.admin.Admin;
|
||||||
import org.apache.kafka.clients.admin.FeatureMetadata;
|
import org.apache.kafka.clients.admin.FeatureMetadata;
|
||||||
import org.apache.kafka.clients.admin.QuorumInfo;
|
import org.apache.kafka.clients.admin.QuorumInfo;
|
||||||
import org.apache.kafka.clients.admin.RaftVoterEndpoint;
|
import org.apache.kafka.clients.admin.RaftVoterEndpoint;
|
||||||
|
import org.apache.kafka.clients.admin.RemoveRaftVoterOptions;
|
||||||
import org.apache.kafka.common.Uuid;
|
import org.apache.kafka.common.Uuid;
|
||||||
|
import org.apache.kafka.common.errors.InconsistentClusterIdException;
|
||||||
import org.apache.kafka.common.test.KafkaClusterTestKit;
|
import org.apache.kafka.common.test.KafkaClusterTestKit;
|
||||||
import org.apache.kafka.common.test.TestKitNodes;
|
import org.apache.kafka.common.test.TestKitNodes;
|
||||||
import org.apache.kafka.common.test.api.TestKitDefaults;
|
import org.apache.kafka.common.test.api.TestKitDefaults;
|
||||||
|
@ -29,10 +32,12 @@ import org.apache.kafka.raft.QuorumConfig;
|
||||||
import org.apache.kafka.server.common.KRaftVersion;
|
import org.apache.kafka.server.common.KRaftVersion;
|
||||||
import org.apache.kafka.test.TestUtils;
|
import org.apache.kafka.test.TestUtils;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Tag;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
|
||||||
|
@ -41,6 +46,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
@Tag("integration")
|
||||||
public class ReconfigurableQuorumIntegrationTest {
|
public class ReconfigurableQuorumIntegrationTest {
|
||||||
static void checkKRaftVersions(Admin admin, short finalized) throws Exception {
|
static void checkKRaftVersions(Admin admin, short finalized) throws Exception {
|
||||||
FeatureMetadata featureMetadata = admin.describeFeatures().featureMetadata().get();
|
FeatureMetadata featureMetadata = admin.describeFeatures().featureMetadata().get();
|
||||||
|
@ -70,7 +76,7 @@ public class ReconfigurableQuorumIntegrationTest {
|
||||||
).build()) {
|
).build()) {
|
||||||
cluster.format();
|
cluster.format();
|
||||||
cluster.startup();
|
cluster.startup();
|
||||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||||
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
|
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
|
||||||
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_0.featureLevel());
|
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_0.featureLevel());
|
||||||
});
|
});
|
||||||
|
@ -88,7 +94,7 @@ public class ReconfigurableQuorumIntegrationTest {
|
||||||
).setStandalone(true).build()) {
|
).setStandalone(true).build()) {
|
||||||
cluster.format();
|
cluster.format();
|
||||||
cluster.startup();
|
cluster.startup();
|
||||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||||
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
|
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
|
||||||
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_1.featureLevel());
|
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_1.featureLevel());
|
||||||
});
|
});
|
||||||
|
@ -126,7 +132,7 @@ public class ReconfigurableQuorumIntegrationTest {
|
||||||
) {
|
) {
|
||||||
cluster.format();
|
cluster.format();
|
||||||
cluster.startup();
|
cluster.startup();
|
||||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||||
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
||||||
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
||||||
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
||||||
|
@ -161,7 +167,7 @@ public class ReconfigurableQuorumIntegrationTest {
|
||||||
) {
|
) {
|
||||||
cluster.format();
|
cluster.format();
|
||||||
cluster.startup();
|
cluster.startup();
|
||||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||||
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
||||||
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
||||||
assertEquals(Set.of(3000, 3001, 3002, 3003), voters.keySet());
|
assertEquals(Set.of(3000, 3001, 3002, 3003), voters.keySet());
|
||||||
|
@ -200,7 +206,7 @@ public class ReconfigurableQuorumIntegrationTest {
|
||||||
) {
|
) {
|
||||||
cluster.format();
|
cluster.format();
|
||||||
cluster.startup();
|
cluster.startup();
|
||||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||||
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
||||||
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
||||||
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
||||||
|
@ -238,7 +244,7 @@ public class ReconfigurableQuorumIntegrationTest {
|
||||||
) {
|
) {
|
||||||
cluster.format();
|
cluster.format();
|
||||||
cluster.startup();
|
cluster.startup();
|
||||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||||
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
||||||
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
||||||
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
||||||
|
@ -249,4 +255,95 @@ public class ReconfigurableQuorumIntegrationTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
|
||||||
|
final var nodes = new TestKitNodes.Builder()
|
||||||
|
.setClusterId("test-cluster")
|
||||||
|
.setNumBrokerNodes(1)
|
||||||
|
.setNumControllerNodes(3)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
final Map<Integer, Uuid> initialVoters = new HashMap<>();
|
||||||
|
for (final var controllerNode : nodes.controllerNodes().values()) {
|
||||||
|
initialVoters.put(
|
||||||
|
controllerNode.id(),
|
||||||
|
controllerNode.metadataDirectoryId()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
|
||||||
|
cluster.format();
|
||||||
|
cluster.startup();
|
||||||
|
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||||
|
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
||||||
|
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
||||||
|
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
||||||
|
for (int replicaId : new int[] {3000, 3001, 3002}) {
|
||||||
|
assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
|
||||||
|
admin.removeRaftVoter(
|
||||||
|
3000,
|
||||||
|
dirId,
|
||||||
|
new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
|
||||||
|
).all().get();
|
||||||
|
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
||||||
|
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
||||||
|
assertEquals(Set.of(3001, 3002), voters.keySet());
|
||||||
|
for (int replicaId : new int[] {3001, 3002}) {
|
||||||
|
assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
admin.addRaftVoter(
|
||||||
|
3000,
|
||||||
|
dirId,
|
||||||
|
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
|
||||||
|
new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
|
||||||
|
).all().get();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception {
|
||||||
|
final var nodes = new TestKitNodes.Builder()
|
||||||
|
.setClusterId("test-cluster")
|
||||||
|
.setNumBrokerNodes(1)
|
||||||
|
.setNumControllerNodes(3)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
final Map<Integer, Uuid> initialVoters = new HashMap<>();
|
||||||
|
for (final var controllerNode : nodes.controllerNodes().values()) {
|
||||||
|
initialVoters.put(
|
||||||
|
controllerNode.id(),
|
||||||
|
controllerNode.metadataDirectoryId()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
|
||||||
|
cluster.format();
|
||||||
|
cluster.startup();
|
||||||
|
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||||
|
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
|
||||||
|
var removeFuture = admin.removeRaftVoter(
|
||||||
|
3000,
|
||||||
|
dirId,
|
||||||
|
new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
|
||||||
|
).all();
|
||||||
|
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture);
|
||||||
|
|
||||||
|
var addFuture = admin.addRaftVoter(
|
||||||
|
3000,
|
||||||
|
dirId,
|
||||||
|
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
|
||||||
|
new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
|
||||||
|
).all();
|
||||||
|
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -28,8 +28,8 @@ import org.apache.kafka.common.serialization.StringSerializer;
|
||||||
import org.apache.kafka.common.utils.MockTime;
|
import org.apache.kafka.common.utils.MockTime;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
import org.apache.kafka.common.utils.Utils;
|
||||||
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
|
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
|
||||||
|
import org.apache.kafka.streams.CloseOptions;
|
||||||
import org.apache.kafka.streams.KafkaStreams;
|
import org.apache.kafka.streams.KafkaStreams;
|
||||||
import org.apache.kafka.streams.KafkaStreams.CloseOptions;
|
|
||||||
import org.apache.kafka.streams.KeyValue;
|
import org.apache.kafka.streams.KeyValue;
|
||||||
import org.apache.kafka.streams.StreamsBuilder;
|
import org.apache.kafka.streams.StreamsBuilder;
|
||||||
import org.apache.kafka.streams.StreamsConfig;
|
import org.apache.kafka.streams.StreamsConfig;
|
||||||
|
@ -159,7 +159,7 @@ public class KafkaStreamsCloseOptionsIntegrationTest {
|
||||||
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
|
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
|
||||||
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
|
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
|
||||||
|
|
||||||
streams.close(new CloseOptions().leaveGroup(true).timeout(Duration.ofSeconds(30)));
|
streams.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP).withTimeout(Duration.ofSeconds(30)));
|
||||||
waitForEmptyConsumerGroup(adminClient, streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG), 0);
|
waitForEmptyConsumerGroup(adminClient, streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,98 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.streams;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
public class CloseOptions {
|
||||||
|
/**
|
||||||
|
* Enum to specify the group membership operation upon closing the Kafka Streams application.
|
||||||
|
*
|
||||||
|
* <ul>
|
||||||
|
* <li><b>{@code LEAVE_GROUP}</b>: means the consumer leave the group.</li>
|
||||||
|
* <li><b>{@code REMAIN_IN_GROUP}</b>: means the consumer will remain in the group.</li>
|
||||||
|
* </ul>
|
||||||
|
*/
|
||||||
|
public enum GroupMembershipOperation {
|
||||||
|
LEAVE_GROUP,
|
||||||
|
REMAIN_IN_GROUP
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Specifies the group membership operation upon shutdown.
|
||||||
|
* By default, {@code GroupMembershipOperation.REMAIN_IN_GROUP} will be applied, which follows the KafkaStreams default behavior.
|
||||||
|
*/
|
||||||
|
protected GroupMembershipOperation operation = GroupMembershipOperation.REMAIN_IN_GROUP;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Specifies the maximum amount of time to wait for the close process to complete.
|
||||||
|
* This allows users to define a custom timeout for gracefully stopping the KafkaStreams.
|
||||||
|
*/
|
||||||
|
protected Optional<Duration> timeout = Optional.of(Duration.ofMillis(Long.MAX_VALUE));
|
||||||
|
|
||||||
|
private CloseOptions() {
|
||||||
|
}
|
||||||
|
|
||||||
|
protected CloseOptions(final CloseOptions closeOptions) {
|
||||||
|
this.operation = closeOptions.operation;
|
||||||
|
this.timeout = closeOptions.timeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Static method to create a {@code CloseOptions} with a custom timeout.
|
||||||
|
*
|
||||||
|
* @param timeout the maximum time to wait for the KafkaStreams to close.
|
||||||
|
* @return a new {@code CloseOptions} instance with the specified timeout.
|
||||||
|
*/
|
||||||
|
public static CloseOptions timeout(final Duration timeout) {
|
||||||
|
return new CloseOptions().withTimeout(timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Static method to create a {@code CloseOptions} with a specified group membership operation.
|
||||||
|
*
|
||||||
|
* @param operation the group membership operation to apply. Must be one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}.
|
||||||
|
* @return a new {@code CloseOptions} instance with the specified group membership operation.
|
||||||
|
*/
|
||||||
|
public static CloseOptions groupMembershipOperation(final GroupMembershipOperation operation) {
|
||||||
|
return new CloseOptions().withGroupMembershipOperation(operation);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fluent method to set the timeout for the close process.
|
||||||
|
*
|
||||||
|
* @param timeout the maximum time to wait for the KafkaStreams to close. If {@code null}, the default timeout will be used.
|
||||||
|
* @return this {@code CloseOptions} instance.
|
||||||
|
*/
|
||||||
|
public CloseOptions withTimeout(final Duration timeout) {
|
||||||
|
this.timeout = Optional.ofNullable(timeout);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fluent method to set the group membership operation upon shutdown.
|
||||||
|
*
|
||||||
|
* @param operation the group membership operation to apply. Must be one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}.
|
||||||
|
* @return this {@code CloseOptions} instance.
|
||||||
|
*/
|
||||||
|
public CloseOptions withGroupMembershipOperation(final GroupMembershipOperation operation) {
|
||||||
|
this.operation = Objects.requireNonNull(operation, "operation should not be null");
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
}
|
|
@ -49,6 +49,7 @@ import org.apache.kafka.streams.errors.StreamsStoppedException;
|
||||||
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
|
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
|
||||||
import org.apache.kafka.streams.errors.UnknownStateStoreException;
|
import org.apache.kafka.streams.errors.UnknownStateStoreException;
|
||||||
import org.apache.kafka.streams.internals.ClientInstanceIdsImpl;
|
import org.apache.kafka.streams.internals.ClientInstanceIdsImpl;
|
||||||
|
import org.apache.kafka.streams.internals.CloseOptionsInternal;
|
||||||
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
|
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
|
||||||
import org.apache.kafka.streams.internals.metrics.StreamsClientMetricsDelegatingReporter;
|
import org.apache.kafka.streams.internals.metrics.StreamsClientMetricsDelegatingReporter;
|
||||||
import org.apache.kafka.streams.processor.StandbyUpdateListener;
|
import org.apache.kafka.streams.processor.StandbyUpdateListener;
|
||||||
|
@ -488,7 +489,7 @@ public class KafkaStreams implements AutoCloseable {
|
||||||
closeToError();
|
closeToError();
|
||||||
}
|
}
|
||||||
final StreamThread deadThread = (StreamThread) Thread.currentThread();
|
final StreamThread deadThread = (StreamThread) Thread.currentThread();
|
||||||
deadThread.shutdown(false);
|
deadThread.shutdown(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
|
||||||
addStreamThread();
|
addStreamThread();
|
||||||
if (throwable instanceof RuntimeException) {
|
if (throwable instanceof RuntimeException) {
|
||||||
throw (RuntimeException) throwable;
|
throw (RuntimeException) throwable;
|
||||||
|
@ -1136,7 +1137,7 @@ public class KafkaStreams implements AutoCloseable {
|
||||||
return Optional.of(streamThread.getName());
|
return Optional.of(streamThread.getName());
|
||||||
} else {
|
} else {
|
||||||
log.warn("Terminating the new thread because the Kafka Streams client is in state {}", state);
|
log.warn("Terminating the new thread because the Kafka Streams client is in state {}", state);
|
||||||
streamThread.shutdown(true);
|
streamThread.shutdown(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
|
||||||
threads.remove(streamThread);
|
threads.remove(streamThread);
|
||||||
final long cacheSizePerThread = cacheSizePerThread(numLiveStreamThreads());
|
final long cacheSizePerThread = cacheSizePerThread(numLiveStreamThreads());
|
||||||
log.info("Resizing thread cache due to terminating added thread, new cache size per thread is {}", cacheSizePerThread);
|
log.info("Resizing thread cache due to terminating added thread, new cache size per thread is {}", cacheSizePerThread);
|
||||||
|
@ -1200,7 +1201,7 @@ public class KafkaStreams implements AutoCloseable {
|
||||||
final boolean callingThreadIsNotCurrentStreamThread = !streamThread.getName().equals(Thread.currentThread().getName());
|
final boolean callingThreadIsNotCurrentStreamThread = !streamThread.getName().equals(Thread.currentThread().getName());
|
||||||
if (streamThread.isThreadAlive() && (callingThreadIsNotCurrentStreamThread || numLiveStreamThreads() == 1)) {
|
if (streamThread.isThreadAlive() && (callingThreadIsNotCurrentStreamThread || numLiveStreamThreads() == 1)) {
|
||||||
log.info("Removing StreamThread {}", streamThread.getName());
|
log.info("Removing StreamThread {}", streamThread.getName());
|
||||||
streamThread.shutdown(true);
|
streamThread.shutdown(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
|
||||||
if (callingThreadIsNotCurrentStreamThread) {
|
if (callingThreadIsNotCurrentStreamThread) {
|
||||||
final long remainingTimeMs = timeoutMs - (time.milliseconds() - startMs);
|
final long remainingTimeMs = timeoutMs - (time.milliseconds() - startMs);
|
||||||
if (remainingTimeMs <= 0 || !streamThread.waitOnThreadState(StreamThread.State.DEAD, remainingTimeMs)) {
|
if (remainingTimeMs <= 0 || !streamThread.waitOnThreadState(StreamThread.State.DEAD, remainingTimeMs)) {
|
||||||
|
@ -1418,15 +1419,18 @@ public class KafkaStreams implements AutoCloseable {
|
||||||
/**
|
/**
|
||||||
* Class that handles options passed in case of {@code KafkaStreams} instance scale down
|
* Class that handles options passed in case of {@code KafkaStreams} instance scale down
|
||||||
*/
|
*/
|
||||||
|
@Deprecated(since = "4.2")
|
||||||
public static class CloseOptions {
|
public static class CloseOptions {
|
||||||
private Duration timeout = Duration.ofMillis(Long.MAX_VALUE);
|
private Duration timeout = Duration.ofMillis(Long.MAX_VALUE);
|
||||||
private boolean leaveGroup = false;
|
private boolean leaveGroup = false;
|
||||||
|
|
||||||
|
@Deprecated(since = "4.2")
|
||||||
public CloseOptions timeout(final Duration timeout) {
|
public CloseOptions timeout(final Duration timeout) {
|
||||||
this.timeout = timeout;
|
this.timeout = timeout;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Deprecated(since = "4.2")
|
||||||
public CloseOptions leaveGroup(final boolean leaveGroup) {
|
public CloseOptions leaveGroup(final boolean leaveGroup) {
|
||||||
this.leaveGroup = leaveGroup;
|
this.leaveGroup = leaveGroup;
|
||||||
return this;
|
return this;
|
||||||
|
@ -1438,10 +1442,14 @@ public class KafkaStreams implements AutoCloseable {
|
||||||
* This will block until all threads have stopped.
|
* This will block until all threads have stopped.
|
||||||
*/
|
*/
|
||||||
public void close() {
|
public void close() {
|
||||||
close(Optional.empty(), false);
|
close(Optional.empty(), org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Thread shutdownHelper(final boolean error, final long timeoutMs, final boolean leaveGroup) {
|
private Thread shutdownHelper(
|
||||||
|
final boolean error,
|
||||||
|
final long timeoutMs,
|
||||||
|
final org.apache.kafka.streams.CloseOptions.GroupMembershipOperation operation
|
||||||
|
) {
|
||||||
stateDirCleaner.shutdownNow();
|
stateDirCleaner.shutdownNow();
|
||||||
if (rocksDBMetricsRecordingService != null) {
|
if (rocksDBMetricsRecordingService != null) {
|
||||||
rocksDBMetricsRecordingService.shutdownNow();
|
rocksDBMetricsRecordingService.shutdownNow();
|
||||||
|
@ -1453,7 +1461,9 @@ public class KafkaStreams implements AutoCloseable {
|
||||||
return new Thread(() -> {
|
return new Thread(() -> {
|
||||||
// notify all the threads to stop; avoid deadlocks by stopping any
|
// notify all the threads to stop; avoid deadlocks by stopping any
|
||||||
// further state reports from the thread since we're shutting down
|
// further state reports from the thread since we're shutting down
|
||||||
int numStreamThreads = processStreamThread(streamThread -> streamThread.shutdown(leaveGroup));
|
int numStreamThreads = processStreamThread(
|
||||||
|
streamThread -> streamThread.shutdown(operation)
|
||||||
|
);
|
||||||
|
|
||||||
log.info("Shutting down {} stream threads", numStreamThreads);
|
log.info("Shutting down {} stream threads", numStreamThreads);
|
||||||
|
|
||||||
|
@ -1513,7 +1523,7 @@ public class KafkaStreams implements AutoCloseable {
|
||||||
}, clientId + "-CloseThread");
|
}, clientId + "-CloseThread");
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean close(final Optional<Long> timeout, final boolean leaveGroup) {
|
private boolean close(final Optional<Long> timeout, final org.apache.kafka.streams.CloseOptions.GroupMembershipOperation operation) {
|
||||||
final long timeoutMs;
|
final long timeoutMs;
|
||||||
if (timeout.isPresent()) {
|
if (timeout.isPresent()) {
|
||||||
timeoutMs = timeout.get();
|
timeoutMs = timeout.get();
|
||||||
|
@ -1544,7 +1554,7 @@ public class KafkaStreams implements AutoCloseable {
|
||||||
+ "PENDING_SHUTDOWN, PENDING_ERROR, ERROR, or NOT_RUNNING");
|
+ "PENDING_SHUTDOWN, PENDING_ERROR, ERROR, or NOT_RUNNING");
|
||||||
}
|
}
|
||||||
|
|
||||||
final Thread shutdownThread = shutdownHelper(false, timeoutMs, leaveGroup);
|
final Thread shutdownThread = shutdownHelper(false, timeoutMs, operation);
|
||||||
|
|
||||||
shutdownThread.setDaemon(true);
|
shutdownThread.setDaemon(true);
|
||||||
shutdownThread.start();
|
shutdownThread.start();
|
||||||
|
@ -1562,7 +1572,7 @@ public class KafkaStreams implements AutoCloseable {
|
||||||
if (!setState(State.PENDING_ERROR)) {
|
if (!setState(State.PENDING_ERROR)) {
|
||||||
log.info("Skipping shutdown since we are already in {}", state());
|
log.info("Skipping shutdown since we are already in {}", state());
|
||||||
} else {
|
} else {
|
||||||
final Thread shutdownThread = shutdownHelper(true, -1, false);
|
final Thread shutdownThread = shutdownHelper(true, -1, org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
|
||||||
|
|
||||||
shutdownThread.setDaemon(true);
|
shutdownThread.setDaemon(true);
|
||||||
shutdownThread.start();
|
shutdownThread.start();
|
||||||
|
@ -1588,12 +1598,13 @@ public class KafkaStreams implements AutoCloseable {
|
||||||
throw new IllegalArgumentException("Timeout can't be negative.");
|
throw new IllegalArgumentException("Timeout can't be negative.");
|
||||||
}
|
}
|
||||||
|
|
||||||
return close(Optional.of(timeoutMs), false);
|
return close(Optional.of(timeoutMs), org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Shutdown this {@code KafkaStreams} by signaling all the threads to stop, and then wait up to the timeout for the
|
* Shutdown this {@code KafkaStreams} by signaling all the threads to stop, and then wait up to the timeout for the
|
||||||
* threads to join.
|
* threads to join.
|
||||||
|
* This method is deprecated and replaced by {@link #close(org.apache.kafka.streams.CloseOptions)}.
|
||||||
* @param options contains timeout to specify how long to wait for the threads to shut down, and a flag leaveGroup to
|
* @param options contains timeout to specify how long to wait for the threads to shut down, and a flag leaveGroup to
|
||||||
* trigger consumer leave call
|
* trigger consumer leave call
|
||||||
* @return {@code true} if all threads were successfully stopped—{@code false} if the timeout was reached
|
* @return {@code true} if all threads were successfully stopped—{@code false} if the timeout was reached
|
||||||
|
@ -1601,15 +1612,36 @@ public class KafkaStreams implements AutoCloseable {
|
||||||
* Note that this method must not be called in the {@link StateListener#onChange(KafkaStreams.State, KafkaStreams.State)} callback of {@link StateListener}.
|
* Note that this method must not be called in the {@link StateListener#onChange(KafkaStreams.State, KafkaStreams.State)} callback of {@link StateListener}.
|
||||||
* @throws IllegalArgumentException if {@code timeout} can't be represented as {@code long milliseconds}
|
* @throws IllegalArgumentException if {@code timeout} can't be represented as {@code long milliseconds}
|
||||||
*/
|
*/
|
||||||
|
@Deprecated(since = "4.2")
|
||||||
public synchronized boolean close(final CloseOptions options) throws IllegalArgumentException {
|
public synchronized boolean close(final CloseOptions options) throws IllegalArgumentException {
|
||||||
|
final org.apache.kafka.streams.CloseOptions closeOptions = org.apache.kafka.streams.CloseOptions.timeout(options.timeout)
|
||||||
|
.withGroupMembershipOperation(options.leaveGroup ?
|
||||||
|
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP :
|
||||||
|
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
|
||||||
|
return close(closeOptions);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shutdown this {@code KafkaStreams} by signaling all the threads to stop, and then wait up to the timeout for the
|
||||||
|
* threads to join.
|
||||||
|
* @param options contains timeout to specify how long to wait for the threads to shut down,
|
||||||
|
* and a {@link org.apache.kafka.streams.CloseOptions.GroupMembershipOperation}
|
||||||
|
* to trigger consumer leave call or remain in the group
|
||||||
|
* @return {@code true} if all threads were successfully stopped—{@code false} if the timeout was reached
|
||||||
|
* before all threads stopped
|
||||||
|
* Note that this method must not be called in the {@link StateListener#onChange(KafkaStreams.State, KafkaStreams.State)} callback of {@link StateListener}.
|
||||||
|
* @throws IllegalArgumentException if {@code timeout} can't be represented as {@code long milliseconds}
|
||||||
|
*/
|
||||||
|
public synchronized boolean close(final org.apache.kafka.streams.CloseOptions options) throws IllegalArgumentException {
|
||||||
Objects.requireNonNull(options, "options cannot be null");
|
Objects.requireNonNull(options, "options cannot be null");
|
||||||
final String msgPrefix = prepareMillisCheckFailMsgPrefix(options.timeout, "timeout");
|
final CloseOptionsInternal optionsInternal = new CloseOptionsInternal(options);
|
||||||
final long timeoutMs = validateMillisecondDuration(options.timeout, msgPrefix);
|
final String msgPrefix = prepareMillisCheckFailMsgPrefix(optionsInternal.timeout(), "timeout");
|
||||||
|
final long timeoutMs = validateMillisecondDuration(optionsInternal.timeout().get(), msgPrefix);
|
||||||
if (timeoutMs < 0) {
|
if (timeoutMs < 0) {
|
||||||
throw new IllegalArgumentException("Timeout can't be negative.");
|
throw new IllegalArgumentException("Timeout can't be negative.");
|
||||||
}
|
}
|
||||||
|
|
||||||
return close(Optional.of(timeoutMs), options.leaveGroup);
|
return close(Optional.of(timeoutMs), optionsInternal.operation());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -0,0 +1,37 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.streams.internals;
|
||||||
|
|
||||||
|
import org.apache.kafka.streams.CloseOptions;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
public class CloseOptionsInternal extends CloseOptions {
|
||||||
|
|
||||||
|
public CloseOptionsInternal(final CloseOptions options) {
|
||||||
|
super(options);
|
||||||
|
}
|
||||||
|
|
||||||
|
public GroupMembershipOperation operation() {
|
||||||
|
return operation;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Optional<Duration> timeout() {
|
||||||
|
return timeout;
|
||||||
|
}
|
||||||
|
}
|
|
@ -91,9 +91,9 @@ import java.util.Optional;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.function.BiConsumer;
|
import java.util.function.BiConsumer;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
@ -367,7 +367,8 @@ public class StreamThread extends Thread implements ProcessingThread {
|
||||||
|
|
||||||
// These are used to signal from outside the stream thread, but the variables themselves are internal to the thread
|
// These are used to signal from outside the stream thread, but the variables themselves are internal to the thread
|
||||||
private final AtomicLong cacheResizeSize = new AtomicLong(-1L);
|
private final AtomicLong cacheResizeSize = new AtomicLong(-1L);
|
||||||
private final AtomicBoolean leaveGroupRequested = new AtomicBoolean(false);
|
private final AtomicReference<org.apache.kafka.streams.CloseOptions.GroupMembershipOperation> leaveGroupRequested =
|
||||||
|
new AtomicReference<>(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
|
||||||
private final AtomicLong lastShutdownWarningTimestamp = new AtomicLong(0L);
|
private final AtomicLong lastShutdownWarningTimestamp = new AtomicLong(0L);
|
||||||
private final boolean eosEnabled;
|
private final boolean eosEnabled;
|
||||||
private final boolean stateUpdaterEnabled;
|
private final boolean stateUpdaterEnabled;
|
||||||
|
@ -898,7 +899,7 @@ public class StreamThread extends Thread implements ProcessingThread {
|
||||||
cleanRun = runLoop();
|
cleanRun = runLoop();
|
||||||
} catch (final Throwable e) {
|
} catch (final Throwable e) {
|
||||||
failedStreamThreadSensor.record();
|
failedStreamThreadSensor.record();
|
||||||
leaveGroupRequested.set(true);
|
leaveGroupRequested.set(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
|
||||||
streamsUncaughtExceptionHandler.accept(e, false);
|
streamsUncaughtExceptionHandler.accept(e, false);
|
||||||
// Note: the above call currently rethrows the exception, so nothing below this line will be executed
|
// Note: the above call currently rethrows the exception, so nothing below this line will be executed
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -1879,12 +1880,12 @@ public class StreamThread extends Thread implements ProcessingThread {
|
||||||
* Note that there is nothing to prevent this function from being called multiple times
|
* Note that there is nothing to prevent this function from being called multiple times
|
||||||
* (e.g., in testing), hence the state is set only the first time
|
* (e.g., in testing), hence the state is set only the first time
|
||||||
*
|
*
|
||||||
* @param leaveGroup this flag will control whether the consumer will leave the group on close or not
|
* @param operation the group membership operation to apply on shutdown. Must be one of LEAVE_GROUP or REMAIN_IN_GROUP.
|
||||||
*/
|
*/
|
||||||
public void shutdown(final boolean leaveGroup) {
|
public void shutdown(final org.apache.kafka.streams.CloseOptions.GroupMembershipOperation operation) {
|
||||||
log.info("Informed to shut down");
|
log.info("Informed to shut down");
|
||||||
final State oldState = setState(State.PENDING_SHUTDOWN);
|
final State oldState = setState(State.PENDING_SHUTDOWN);
|
||||||
leaveGroupRequested.set(leaveGroup);
|
leaveGroupRequested.set(operation);
|
||||||
if (oldState == State.CREATED) {
|
if (oldState == State.CREATED) {
|
||||||
// The thread may not have been started. Take responsibility for shutting down
|
// The thread may not have been started. Take responsibility for shutting down
|
||||||
completeShutdown(true);
|
completeShutdown(true);
|
||||||
|
@ -1917,7 +1918,8 @@ public class StreamThread extends Thread implements ProcessingThread {
|
||||||
log.error("Failed to close changelog reader due to the following error:", e);
|
log.error("Failed to close changelog reader due to the following error:", e);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
final GroupMembershipOperation membershipOperation = leaveGroupRequested.get() ? LEAVE_GROUP : REMAIN_IN_GROUP;
|
final GroupMembershipOperation membershipOperation =
|
||||||
|
leaveGroupRequested.get() == org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP ? LEAVE_GROUP : REMAIN_IN_GROUP;
|
||||||
mainConsumer.close(CloseOptions.groupMembershipOperation(membershipOperation));
|
mainConsumer.close(CloseOptions.groupMembershipOperation(membershipOperation));
|
||||||
} catch (final Throwable e) {
|
} catch (final Throwable e) {
|
||||||
log.error("Failed to close consumer due to the following error:", e);
|
log.error("Failed to close consumer due to the following error:", e);
|
||||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.kafka.streams;
|
||||||
import org.apache.kafka.clients.admin.Admin;
|
import org.apache.kafka.clients.admin.Admin;
|
||||||
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
|
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
|
||||||
import org.apache.kafka.clients.admin.MockAdminClient;
|
import org.apache.kafka.clients.admin.MockAdminClient;
|
||||||
import org.apache.kafka.clients.consumer.CloseOptions;
|
|
||||||
import org.apache.kafka.clients.producer.MockProducer;
|
import org.apache.kafka.clients.producer.MockProducer;
|
||||||
import org.apache.kafka.common.Cluster;
|
import org.apache.kafka.common.Cluster;
|
||||||
import org.apache.kafka.common.KafkaFuture;
|
import org.apache.kafka.common.KafkaFuture;
|
||||||
|
@ -310,8 +309,12 @@ public class KafkaStreamsTest {
|
||||||
|
|
||||||
private void prepareConsumer(final StreamThread thread, final AtomicReference<StreamThread.State> state) {
|
private void prepareConsumer(final StreamThread thread, final AtomicReference<StreamThread.State> state) {
|
||||||
doAnswer(invocation -> {
|
doAnswer(invocation -> {
|
||||||
supplier.consumer.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP));
|
supplier.consumer.close(
|
||||||
supplier.restoreConsumer.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP));
|
org.apache.kafka.clients.consumer.CloseOptions.groupMembershipOperation(org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP)
|
||||||
|
);
|
||||||
|
supplier.restoreConsumer.close(
|
||||||
|
org.apache.kafka.clients.consumer.CloseOptions.groupMembershipOperation(org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP)
|
||||||
|
);
|
||||||
for (final MockProducer<byte[], byte[]> producer : supplier.producers) {
|
for (final MockProducer<byte[], byte[]> producer : supplier.producers) {
|
||||||
producer.close();
|
producer.close();
|
||||||
}
|
}
|
||||||
|
@ -320,7 +323,7 @@ public class KafkaStreamsTest {
|
||||||
threadStateListenerCapture.getValue().onChange(thread, StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.RUNNING);
|
threadStateListenerCapture.getValue().onChange(thread, StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.RUNNING);
|
||||||
threadStateListenerCapture.getValue().onChange(thread, StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN);
|
threadStateListenerCapture.getValue().onChange(thread, StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN);
|
||||||
return null;
|
return null;
|
||||||
}).when(thread).shutdown(false);
|
}).when(thread).shutdown(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void prepareThreadLock(final StreamThread thread) {
|
private void prepareThreadLock(final StreamThread thread) {
|
||||||
|
@ -571,7 +574,7 @@ public class KafkaStreamsTest {
|
||||||
|
|
||||||
for (int i = 0; i < NUM_THREADS; i++) {
|
for (int i = 0; i < NUM_THREADS; i++) {
|
||||||
final StreamThread tmpThread = streams.threads.get(i);
|
final StreamThread tmpThread = streams.threads.get(i);
|
||||||
tmpThread.shutdown(false);
|
tmpThread.shutdown(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
|
||||||
waitForCondition(() -> tmpThread.state() == StreamThread.State.DEAD,
|
waitForCondition(() -> tmpThread.state() == StreamThread.State.DEAD,
|
||||||
"Thread never stopped.");
|
"Thread never stopped.");
|
||||||
streams.threads.get(i).join();
|
streams.threads.get(i).join();
|
||||||
|
@ -790,7 +793,7 @@ public class KafkaStreamsTest {
|
||||||
prepareThreadLock(streamThreadTwo);
|
prepareThreadLock(streamThreadTwo);
|
||||||
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
|
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
|
||||||
streams.start();
|
streams.start();
|
||||||
streamThreadOne.shutdown(true);
|
streamThreadOne.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
|
||||||
final Set<ThreadMetadata> threads = streams.metadataForLocalThreads();
|
final Set<ThreadMetadata> threads = streams.metadataForLocalThreads();
|
||||||
assertThat(threads.size(), equalTo(1));
|
assertThat(threads.size(), equalTo(1));
|
||||||
assertThat(threads, hasItem(streamThreadTwo.threadMetadata()));
|
assertThat(threads, hasItem(streamThreadTwo.threadMetadata()));
|
||||||
|
@ -1016,9 +1019,8 @@ public class KafkaStreamsTest {
|
||||||
() -> streams.state() == KafkaStreams.State.RUNNING,
|
() -> streams.state() == KafkaStreams.State.RUNNING,
|
||||||
"Streams never started.");
|
"Streams never started.");
|
||||||
|
|
||||||
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
|
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ZERO)
|
||||||
closeOptions.timeout(Duration.ZERO);
|
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
|
||||||
closeOptions.leaveGroup(true);
|
|
||||||
|
|
||||||
streams.close(closeOptions);
|
streams.close(closeOptions);
|
||||||
assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true));
|
assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true));
|
||||||
|
@ -1041,8 +1043,7 @@ public class KafkaStreamsTest {
|
||||||
() -> streams.state() == KafkaStreams.State.RUNNING,
|
() -> streams.state() == KafkaStreams.State.RUNNING,
|
||||||
"Streams never started.");
|
"Streams never started.");
|
||||||
|
|
||||||
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
|
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ZERO);
|
||||||
closeOptions.timeout(Duration.ZERO);
|
|
||||||
|
|
||||||
streams.close(closeOptions);
|
streams.close(closeOptions);
|
||||||
assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true));
|
assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true));
|
||||||
|
@ -1229,8 +1230,7 @@ public class KafkaStreamsTest {
|
||||||
prepareStreamThread(streamThreadTwo, 2);
|
prepareStreamThread(streamThreadTwo, 2);
|
||||||
prepareTerminableThread(streamThreadOne);
|
prepareTerminableThread(streamThreadOne);
|
||||||
|
|
||||||
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
|
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ofMillis(10L));
|
||||||
closeOptions.timeout(Duration.ofMillis(10L));
|
|
||||||
try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, supplier)) {
|
try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, supplier)) {
|
||||||
assertFalse(streams.close(closeOptions));
|
assertFalse(streams.close(closeOptions));
|
||||||
}
|
}
|
||||||
|
@ -1243,8 +1243,7 @@ public class KafkaStreamsTest {
|
||||||
prepareStreamThread(streamThreadTwo, 2);
|
prepareStreamThread(streamThreadTwo, 2);
|
||||||
prepareTerminableThread(streamThreadOne);
|
prepareTerminableThread(streamThreadOne);
|
||||||
|
|
||||||
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
|
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ofMillis(-1L));
|
||||||
closeOptions.timeout(Duration.ofMillis(-1L));
|
|
||||||
try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, supplier, time)) {
|
try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, supplier, time)) {
|
||||||
assertThrows(IllegalArgumentException.class, () -> streams.close(closeOptions));
|
assertThrows(IllegalArgumentException.class, () -> streams.close(closeOptions));
|
||||||
}
|
}
|
||||||
|
@ -1257,8 +1256,7 @@ public class KafkaStreamsTest {
|
||||||
prepareStreamThread(streamThreadTwo, 2);
|
prepareStreamThread(streamThreadTwo, 2);
|
||||||
prepareTerminableThread(streamThreadOne);
|
prepareTerminableThread(streamThreadOne);
|
||||||
|
|
||||||
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
|
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ZERO);
|
||||||
closeOptions.timeout(Duration.ZERO);
|
|
||||||
try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, supplier)) {
|
try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, supplier)) {
|
||||||
assertFalse(streams.close(closeOptions));
|
assertFalse(streams.close(closeOptions));
|
||||||
}
|
}
|
||||||
|
@ -1275,9 +1273,8 @@ public class KafkaStreamsTest {
|
||||||
|
|
||||||
when(mockClientSupplier.getAdmin(any())).thenReturn(adminClient);
|
when(mockClientSupplier.getAdmin(any())).thenReturn(adminClient);
|
||||||
|
|
||||||
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
|
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ofMillis(10L))
|
||||||
closeOptions.timeout(Duration.ofMillis(10L));
|
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
|
||||||
closeOptions.leaveGroup(true);
|
|
||||||
try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, mockClientSupplier)) {
|
try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, mockClientSupplier)) {
|
||||||
assertFalse(streams.close(closeOptions));
|
assertFalse(streams.close(closeOptions));
|
||||||
}
|
}
|
||||||
|
@ -1293,9 +1290,8 @@ public class KafkaStreamsTest {
|
||||||
final MockClientSupplier mockClientSupplier = spy(MockClientSupplier.class);
|
final MockClientSupplier mockClientSupplier = spy(MockClientSupplier.class);
|
||||||
when(mockClientSupplier.getAdmin(any())).thenReturn(adminClient);
|
when(mockClientSupplier.getAdmin(any())).thenReturn(adminClient);
|
||||||
|
|
||||||
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
|
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ofMillis(-1L))
|
||||||
closeOptions.timeout(Duration.ofMillis(-1L));
|
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
|
||||||
closeOptions.leaveGroup(true);
|
|
||||||
try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, mockClientSupplier, time)) {
|
try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, mockClientSupplier, time)) {
|
||||||
assertThrows(IllegalArgumentException.class, () -> streams.close(closeOptions));
|
assertThrows(IllegalArgumentException.class, () -> streams.close(closeOptions));
|
||||||
}
|
}
|
||||||
|
@ -1312,9 +1308,8 @@ public class KafkaStreamsTest {
|
||||||
|
|
||||||
when(mockClientSupplier.getAdmin(any())).thenReturn(adminClient);
|
when(mockClientSupplier.getAdmin(any())).thenReturn(adminClient);
|
||||||
|
|
||||||
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
|
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ZERO)
|
||||||
closeOptions.timeout(Duration.ZERO);
|
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
|
||||||
closeOptions.leaveGroup(true);
|
|
||||||
try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, mockClientSupplier)) {
|
try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, mockClientSupplier)) {
|
||||||
assertFalse(streams.close(closeOptions));
|
assertFalse(streams.close(closeOptions));
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,6 +60,7 @@ import org.apache.kafka.common.utils.LogCaptureAppender;
|
||||||
import org.apache.kafka.common.utils.LogContext;
|
import org.apache.kafka.common.utils.LogContext;
|
||||||
import org.apache.kafka.common.utils.MockTime;
|
import org.apache.kafka.common.utils.MockTime;
|
||||||
import org.apache.kafka.common.utils.Time;
|
import org.apache.kafka.common.utils.Time;
|
||||||
|
import org.apache.kafka.streams.CloseOptions;
|
||||||
import org.apache.kafka.streams.GroupProtocol;
|
import org.apache.kafka.streams.GroupProtocol;
|
||||||
import org.apache.kafka.streams.StreamsConfig;
|
import org.apache.kafka.streams.StreamsConfig;
|
||||||
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
|
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
|
||||||
|
@ -247,7 +248,7 @@ public class StreamThreadTest {
|
||||||
if (thread.state() != State.CREATED) {
|
if (thread.state() != State.CREATED) {
|
||||||
thread.taskManager().shutdown(false);
|
thread.taskManager().shutdown(false);
|
||||||
}
|
}
|
||||||
thread.shutdown(true);
|
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
|
||||||
thread = null;
|
thread = null;
|
||||||
}
|
}
|
||||||
final Set<Thread> t = Collections.unmodifiableSet(Thread.getAllStackTraces().keySet());
|
final Set<Thread> t = Collections.unmodifiableSet(Thread.getAllStackTraces().keySet());
|
||||||
|
@ -409,7 +410,7 @@ public class StreamThreadTest {
|
||||||
assertEquals(4, stateListener.numChanges);
|
assertEquals(4, stateListener.numChanges);
|
||||||
assertEquals(StreamThread.State.PARTITIONS_ASSIGNED, stateListener.oldState);
|
assertEquals(StreamThread.State.PARTITIONS_ASSIGNED, stateListener.oldState);
|
||||||
|
|
||||||
thread.shutdown(true);
|
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
|
||||||
assertSame(StreamThread.State.PENDING_SHUTDOWN, thread.state());
|
assertSame(StreamThread.State.PENDING_SHUTDOWN, thread.state());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -427,14 +428,14 @@ public class StreamThreadTest {
|
||||||
10 * 1000,
|
10 * 1000,
|
||||||
"Thread never started.");
|
"Thread never started.");
|
||||||
|
|
||||||
thread.shutdown(true);
|
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
|
||||||
TestUtils.waitForCondition(
|
TestUtils.waitForCondition(
|
||||||
() -> thread.state() == StreamThread.State.DEAD,
|
() -> thread.state() == StreamThread.State.DEAD,
|
||||||
10 * 1000,
|
10 * 1000,
|
||||||
"Thread never shut down.");
|
"Thread never shut down.");
|
||||||
|
|
||||||
thread.shutdown(true);
|
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
|
||||||
assertEquals(thread.state(), StreamThread.State.DEAD);
|
assertEquals(State.DEAD, thread.state());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
|
@ -812,7 +813,7 @@ public class StreamThreadTest {
|
||||||
10 * 1000,
|
10 * 1000,
|
||||||
"Thread never started.");
|
"Thread never started.");
|
||||||
|
|
||||||
thread.shutdown(true);
|
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
|
||||||
TestUtils.waitForCondition(
|
TestUtils.waitForCondition(
|
||||||
() -> thread.state() == StreamThread.State.DEAD,
|
() -> thread.state() == StreamThread.State.DEAD,
|
||||||
10 * 1000,
|
10 * 1000,
|
||||||
|
@ -880,7 +881,7 @@ public class StreamThreadTest {
|
||||||
() -> { }
|
() -> { }
|
||||||
);
|
);
|
||||||
|
|
||||||
thread.shutdown(true);
|
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
|
||||||
|
|
||||||
// Validate that the scheduled rebalance wasn't reset then set to MAX_VALUE so we
|
// Validate that the scheduled rebalance wasn't reset then set to MAX_VALUE so we
|
||||||
// don't trigger one before we can shut down, since the rebalance must be ended
|
// don't trigger one before we can shut down, since the rebalance must be ended
|
||||||
|
@ -1390,7 +1391,7 @@ public class StreamThreadTest {
|
||||||
10 * 1000,
|
10 * 1000,
|
||||||
"Thread never started.");
|
"Thread never started.");
|
||||||
|
|
||||||
thread.shutdown(true);
|
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
|
||||||
|
|
||||||
// even if thread is no longer running, it should still be polling
|
// even if thread is no longer running, it should still be polling
|
||||||
// as long as the rebalance is still ongoing
|
// as long as the rebalance is still ongoing
|
||||||
|
@ -1426,7 +1427,7 @@ public class StreamThreadTest {
|
||||||
thread.setStateListener(
|
thread.setStateListener(
|
||||||
(t, newState, oldState) -> {
|
(t, newState, oldState) -> {
|
||||||
if (oldState == StreamThread.State.CREATED && newState == StreamThread.State.STARTING) {
|
if (oldState == StreamThread.State.CREATED && newState == StreamThread.State.STARTING) {
|
||||||
thread.shutdown(true);
|
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
thread.run();
|
thread.run();
|
||||||
|
@ -1524,7 +1525,7 @@ public class StreamThreadTest {
|
||||||
topologyMetadata.buildAndRewriteTopology();
|
topologyMetadata.buildAndRewriteTopology();
|
||||||
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
|
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
|
||||||
.updateThreadMetadata(adminClientId(CLIENT_ID));
|
.updateThreadMetadata(adminClientId(CLIENT_ID));
|
||||||
thread.shutdown(true);
|
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
|
||||||
|
|
||||||
verify(taskManager).shutdown(true);
|
verify(taskManager).shutdown(true);
|
||||||
}
|
}
|
||||||
|
@ -1542,7 +1543,7 @@ public class StreamThreadTest {
|
||||||
topologyMetadata.buildAndRewriteTopology();
|
topologyMetadata.buildAndRewriteTopology();
|
||||||
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
|
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
|
||||||
.updateThreadMetadata(adminClientId(CLIENT_ID));
|
.updateThreadMetadata(adminClientId(CLIENT_ID));
|
||||||
thread.shutdown(true);
|
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
|
||||||
// Execute the run method. Verification of the mock will check that shutdown was only done once
|
// Execute the run method. Verification of the mock will check that shutdown was only done once
|
||||||
thread.run();
|
thread.run();
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.kafka.common.protocol.Errors;
|
||||||
import org.apache.kafka.common.serialization.Serdes;
|
import org.apache.kafka.common.serialization.Serdes;
|
||||||
import org.apache.kafka.common.serialization.StringSerializer;
|
import org.apache.kafka.common.serialization.StringSerializer;
|
||||||
import org.apache.kafka.common.utils.Exit;
|
import org.apache.kafka.common.utils.Exit;
|
||||||
|
import org.apache.kafka.streams.CloseOptions;
|
||||||
import org.apache.kafka.streams.GroupProtocol;
|
import org.apache.kafka.streams.GroupProtocol;
|
||||||
import org.apache.kafka.streams.KafkaStreams;
|
import org.apache.kafka.streams.KafkaStreams;
|
||||||
import org.apache.kafka.streams.KeyValueTimestamp;
|
import org.apache.kafka.streams.KeyValueTimestamp;
|
||||||
|
@ -341,9 +342,8 @@ public class DeleteStreamsGroupOffsetTest {
|
||||||
|
|
||||||
private void stopKSApp(String appId, String topic, KafkaStreams streams, StreamsGroupCommand.StreamsGroupService service) throws InterruptedException {
|
private void stopKSApp(String appId, String topic, KafkaStreams streams, StreamsGroupCommand.StreamsGroupService service) throws InterruptedException {
|
||||||
if (streams != null) {
|
if (streams != null) {
|
||||||
KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
|
CloseOptions closeOptions = CloseOptions.timeout(Duration.ofSeconds(30))
|
||||||
closeOptions.timeout(Duration.ofSeconds(30));
|
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
|
||||||
closeOptions.leaveGroup(true);
|
|
||||||
streams.close(closeOptions);
|
streams.close(closeOptions);
|
||||||
streams.cleanUp();
|
streams.cleanUp();
|
||||||
|
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.kafka.common.serialization.Serdes;
|
||||||
import org.apache.kafka.common.serialization.StringSerializer;
|
import org.apache.kafka.common.serialization.StringSerializer;
|
||||||
import org.apache.kafka.common.utils.Exit;
|
import org.apache.kafka.common.utils.Exit;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
import org.apache.kafka.common.utils.Utils;
|
||||||
|
import org.apache.kafka.streams.CloseOptions;
|
||||||
import org.apache.kafka.streams.GroupProtocol;
|
import org.apache.kafka.streams.GroupProtocol;
|
||||||
import org.apache.kafka.streams.KafkaStreams;
|
import org.apache.kafka.streams.KafkaStreams;
|
||||||
import org.apache.kafka.streams.KeyValueTimestamp;
|
import org.apache.kafka.streams.KeyValueTimestamp;
|
||||||
|
@ -512,9 +513,8 @@ public class DeleteStreamsGroupTest {
|
||||||
|
|
||||||
private void stopKSApp(String appId, KafkaStreams streams, StreamsGroupCommand.StreamsGroupService service) throws InterruptedException {
|
private void stopKSApp(String appId, KafkaStreams streams, StreamsGroupCommand.StreamsGroupService service) throws InterruptedException {
|
||||||
if (streams != null) {
|
if (streams != null) {
|
||||||
KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
|
CloseOptions closeOptions = CloseOptions.timeout(Duration.ofSeconds(30))
|
||||||
closeOptions.timeout(Duration.ofSeconds(30));
|
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
|
||||||
closeOptions.leaveGroup(true);
|
|
||||||
streams.close(closeOptions);
|
streams.close(closeOptions);
|
||||||
streams.cleanUp();
|
streams.cleanUp();
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,7 @@
|
||||||
// and not the version installed on the machine running the task.
|
// and not the version installed on the machine running the task.
|
||||||
// Read more about the wrapper here: https://docs.gradle.org/current/userguide/gradle_wrapper.html
|
// Read more about the wrapper here: https://docs.gradle.org/current/userguide/gradle_wrapper.html
|
||||||
wrapper {
|
wrapper {
|
||||||
gradleVersion = project.gradleVersion
|
gradleVersion = versions.gradle
|
||||||
}
|
}
|
||||||
|
|
||||||
// Custom task to inject support for downloading the gradle wrapper jar if it doesn't exist.
|
// Custom task to inject support for downloading the gradle wrapper jar if it doesn't exist.
|
||||||
|
@ -35,14 +35,12 @@ task bootstrapWrapper() {
|
||||||
def wrapperBasePath = "\$APP_HOME/gradle/wrapper"
|
def wrapperBasePath = "\$APP_HOME/gradle/wrapper"
|
||||||
def wrapperJarPath = wrapperBasePath + "/gradle-wrapper.jar"
|
def wrapperJarPath = wrapperBasePath + "/gradle-wrapper.jar"
|
||||||
|
|
||||||
// Add a trailing zero to the version if needed.
|
|
||||||
def fullVersion = project.gradleVersion.count(".") == 1 ? "${project.gradleVersion}.0" : versions.gradle
|
|
||||||
// Leverages the wrapper jar checked into the gradle project on github because the jar isn't
|
// Leverages the wrapper jar checked into the gradle project on github because the jar isn't
|
||||||
// available elsewhere. Using raw.githubusercontent.com instead of github.com because
|
// available elsewhere. Using raw.githubusercontent.com instead of github.com because
|
||||||
// github.com servers deprecated TLSv1/TLSv1.1 support some time ago, so older versions
|
// github.com servers deprecated TLSv1/TLSv1.1 support some time ago, so older versions
|
||||||
// of curl (built against OpenSSL library that doesn't support TLSv1.2) would fail to
|
// of curl (built against OpenSSL library that doesn't support TLSv1.2) would fail to
|
||||||
// fetch the jar.
|
// fetch the jar.
|
||||||
def wrapperBaseUrl = "https://raw.githubusercontent.com/gradle/gradle/v$fullVersion/gradle/wrapper"
|
def wrapperBaseUrl = "https://raw.githubusercontent.com/gradle/gradle/v$versions.gradle/gradle/wrapper"
|
||||||
def wrapperJarUrl = wrapperBaseUrl + "/gradle-wrapper.jar"
|
def wrapperJarUrl = wrapperBaseUrl + "/gradle-wrapper.jar"
|
||||||
|
|
||||||
def bootstrapString = """
|
def bootstrapString = """
|
||||||
|
@ -59,13 +57,15 @@ task bootstrapWrapper() {
|
||||||
done
|
done
|
||||||
""".stripIndent()
|
""".stripIndent()
|
||||||
|
|
||||||
|
String putBootstrapStringAbove = "# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script."
|
||||||
|
|
||||||
def wrapperScript = wrapper.scriptFile
|
def wrapperScript = wrapper.scriptFile
|
||||||
def wrapperLines = wrapperScript.readLines()
|
def wrapperLines = wrapperScript.readLines()
|
||||||
wrapperScript.withPrintWriter { out ->
|
wrapperScript.withPrintWriter { out ->
|
||||||
def bootstrapWritten = false
|
def bootstrapWritten = false
|
||||||
wrapperLines.each { line ->
|
wrapperLines.each { line ->
|
||||||
// Print the wrapper bootstrap before the first usage of the wrapper jar.
|
// Print the wrapper bootstrap before the first usage of the wrapper jar.
|
||||||
if (!bootstrapWritten && line.contains("gradle-wrapper.jar")) {
|
if (!bootstrapWritten && line.contains(putBootstrapStringAbove)) {
|
||||||
out.println(bootstrapString)
|
out.println(bootstrapString)
|
||||||
bootstrapWritten = true
|
bootstrapWritten = true
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue