Compare commits

...

9 Commits

Author SHA1 Message Date
Maros Orsak 0e68b769ad
Merge 989a78e747 into 2938c4242e 2025-10-07 18:28:22 +02:00
lucliu1108 2938c4242e
KAFKA-19754: Add RPC-level integration test for StreamsGroupDescribeRequest (#20632)
CI / build (push) Waiting to run Details
Test the `StreamsGroupDescribeRequest` RPC and corresponding responses
for situations where
- `streams.version` not upgraded to 1
- `streams.version` enabled, multiple groups listening to the same
topic.

Reviewers: Lucas Brutschy <lucasbru@apache.org>
2025-10-07 15:47:32 +02:00
Ken Huang ebae768bd8
KAFKA-18193 Refactor Kafka Streams CloseOptions to Fluent API Style (#19955)
In Kafka Streams, configuration classes typically follow a fluent API
pattern to ensure a consistent and intuitive developer experience.
However, the current implementation of
`org.apache.kafka.streams.KafkaStreams$CloseOptions` deviates from this
convention by exposing a public constructor, breaking the uniformity
expected across the API.

To address this inconsistency, we propose introducing a new
`CloseOptions` class that adheres to the fluent API style, replacing the
existing implementation. The new class will retain the existing
`timeout(Duration)` and `leaveGroup(boolean)` methods but will enforce
fluent instantiation and configuration. Given the design shift, we will
not maintain backward compatibility with the current class.

This change aligns with the goal of standardizing configuration objects
across Kafka Streams, offering developers a more cohesive and
predictable API.

Reviewers: Bill Bejeck<bbejeck@apache.org>
2025-10-07 08:50:18 -04:00
Jhen-Yung Hsu fa2496bb91
MINOR: skip ci-complete on 4.0 (#20644)
Since that 4.0 branch does not include
[KAFKA-18748](https://issues.apache.org/jira/browse/KAFKA-18748), it is
unable to find the related scan reports, but the ci-complete workflow is
still being triggered on the 4.0 branch. Disable this on the 4.0 branch,
as its reports can be safely ignored.

See https://github.com/apache/kafka/pull/20616#issuecomment-3370876779.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2025-10-07 20:27:29 +08:00
see-quick 989a78e747 update logic inside detect container runtime
Signed-off-by: see-quick <maros.orsak159@gmail.com>
2025-10-02 11:47:09 +02:00
see-quick 405ab31a4c update review
Signed-off-by: see-quick <maros.orsak159@gmail.com>
2025-09-23 16:13:31 +02:00
see-quick 14628697c7 nit
Signed-off-by: see-quick <maros.orsak159@gmail.com>
2025-09-17 14:43:02 +02:00
see-quick cdc4e86e11 a few nits
Signed-off-by: see-quick <maros.orsak159@gmail.com>
2025-09-17 14:42:05 +02:00
see-quick 3c7ac2fb3a MINOR: Add support for podman when running system tests
Signed-off-by: see-quick <maros.orsak159@gmail.com>
2025-09-17 14:38:03 +02:00
15 changed files with 798 additions and 212 deletions

View File

@ -38,7 +38,7 @@ run-name: Build Scans for ${{ github.event.workflow_run.display_title}}
jobs:
upload-build-scan:
# Skip this workflow if the CI run was skipped or cancelled
if: (github.event.workflow_run.conclusion == 'success' || github.event.workflow_run.conclusion == 'failure')
if: (github.event.workflow_run.conclusion == 'success' || github.event.workflow_run.conclusion == 'failure') && github.event.workflow_run.head_branch != '4.0'
runs-on: ubuntu-latest
strategy:
fail-fast: false

View File

@ -26,9 +26,9 @@ import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse
import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment
import org.apache.kafka.common.message.WriteTxnMarkersRequestData.{WritableTxnMarker, WritableTxnMarkerTopic}
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData, DeleteGroupsResponseData, DescribeGroupsRequestData, DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData, HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeRequestData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData, WriteTxnMarkersRequestData}
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData, DeleteGroupsResponseData, DescribeGroupsRequestData, DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData, HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeRequestData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, StreamsGroupDescribeRequestData, StreamsGroupDescribeResponseData, StreamsGroupHeartbeatRequestData, StreamsGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData, WriteTxnMarkersRequestData}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, EndTxnRequest, EndTxnResponse, HeartbeatRequest, HeartbeatResponse, InitProducerIdRequest, InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsRequest, ListGroupsResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetDeleteRequest, OffsetDeleteResponse, OffsetFetchRequest, OffsetFetchResponse, ShareGroupDescribeRequest, ShareGroupDescribeResponse, ShareGroupHeartbeatRequest, ShareGroupHeartbeatResponse, SyncGroupRequest, SyncGroupResponse, TxnOffsetCommitRequest, TxnOffsetCommitResponse, WriteTxnMarkersRequest, WriteTxnMarkersResponse}
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, EndTxnRequest, EndTxnResponse, HeartbeatRequest, HeartbeatResponse, InitProducerIdRequest, InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsRequest, ListGroupsResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetDeleteRequest, OffsetDeleteResponse, OffsetFetchRequest, OffsetFetchResponse, ShareGroupDescribeRequest, ShareGroupDescribeResponse, ShareGroupHeartbeatRequest, ShareGroupHeartbeatResponse, StreamsGroupDescribeRequest, StreamsGroupDescribeResponse, StreamsGroupHeartbeatRequest, StreamsGroupHeartbeatResponse, SyncGroupRequest, SyncGroupResponse, TxnOffsetCommitRequest, TxnOffsetCommitResponse, WriteTxnMarkersRequest, WriteTxnMarkersResponse}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.utils.ProducerIdAndEpoch
@ -768,6 +768,21 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
shareGroupDescribeResponse.data.groups.asScala.toList
}
protected def streamsGroupDescribe(
groupIds: List[String],
includeAuthorizedOperations: Boolean = false,
version: Short = ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)
): List[StreamsGroupDescribeResponseData.DescribedGroup] = {
val streamsGroupDescribeRequest = new StreamsGroupDescribeRequest.Builder(
new StreamsGroupDescribeRequestData()
.setGroupIds(groupIds.asJava)
.setIncludeAuthorizedOperations(includeAuthorizedOperations)
).build(version)
val streamsGroupDescribeResponse = connectAndReceive[StreamsGroupDescribeResponse](streamsGroupDescribeRequest)
streamsGroupDescribeResponse.data.groups.asScala.toList
}
protected def heartbeat(
groupId: String,
generationId: Int,
@ -855,6 +870,41 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
shareGroupHeartbeatResponse.data
}
protected def streamsGroupHeartbeat(
groupId: String,
memberId: String = "",
memberEpoch: Int = 0,
rebalanceTimeoutMs: Int = -1,
activeTasks: List[StreamsGroupHeartbeatRequestData.TaskIds] = null,
standbyTasks: List[StreamsGroupHeartbeatRequestData.TaskIds] = null,
warmupTasks: List[StreamsGroupHeartbeatRequestData.TaskIds] = null,
topology: StreamsGroupHeartbeatRequestData.Topology = null,
expectedError: Errors = Errors.NONE,
version: Short = ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion(isUnstableApiEnabled)
): StreamsGroupHeartbeatResponseData = {
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequest.Builder(
new StreamsGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(memberEpoch)
.setRebalanceTimeoutMs(rebalanceTimeoutMs)
.setActiveTasks(activeTasks.asJava)
.setStandbyTasks(standbyTasks.asJava)
.setWarmupTasks(warmupTasks.asJava)
.setTopology(topology)
).build(version)
// Send the request until receiving a successful response. There is a delay
// here because the group coordinator is loaded in the background.
var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
streamsGroupHeartbeatResponse = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest)
streamsGroupHeartbeatResponse.data.errorCode == expectedError.code
}, msg = s"Could not heartbeat successfully. Last response $streamsGroupHeartbeatResponse.")
streamsGroupHeartbeatResponse.data
}
protected def leaveGroupWithNewProtocol(
groupId: String,
memberId: String

View File

@ -0,0 +1,316 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import kafka.utils.TestUtils
import org.apache.kafka.common.message.{StreamsGroupDescribeRequestData, StreamsGroupDescribeResponseData, StreamsGroupHeartbeatRequestData, StreamsGroupHeartbeatResponseData}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{StreamsGroupDescribeRequest, StreamsGroupDescribeResponse}
import org.apache.kafka.common.resource.ResourceType
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.test.api._
import scala.jdk.CollectionConverters._
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.common.Feature
import org.junit.Assert.{assertEquals, assertTrue}
import java.lang.{Byte => JByte}
@ClusterTestDefaults(
types = Array(Type.KRAFT),
brokers = 1,
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
class StreamsGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(
features = Array(
new ClusterFeature(feature = Feature.STREAMS_VERSION, version = 0)
)
)
def testStreamsGroupDescribeWhenFeatureFlagNotEnabled(): Unit = {
val streamsGroupDescribeRequest = new StreamsGroupDescribeRequest.Builder(
new StreamsGroupDescribeRequestData().setGroupIds(List("grp-mock-1", "grp-mock-2").asJava)
).build(ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled))
val streamsGroupDescribeResponse = connectAndReceive[StreamsGroupDescribeResponse](streamsGroupDescribeRequest)
val expectedResponse = new StreamsGroupDescribeResponseData()
expectedResponse.groups().add(
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId("grp-mock-1")
.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
)
expectedResponse.groups().add(
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId("grp-mock-2")
.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
)
assertEquals(expectedResponse, streamsGroupDescribeResponse.data)
}
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer,streams"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
def testStreamsGroupDescribeGroupsWithNewGroupCoordinator(): Unit = {
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
createOffsetsTopic()
val admin = cluster.admin()
val topicName = "foo"
try {
TestUtils.createTopicWithAdminRaw(
admin = admin,
topic = topicName,
numPartitions = 3
)
TestUtils.waitUntilTrue(() => {
admin.listTopics().names().get().contains(topicName)
}, msg = s"Topic $topicName is not available to the group coordinator")
val timeoutMs = 5 * 60 * 1000
val clientId = "client-id"
val clientHost = "/127.0.0.1"
val authorizedOperationsInt = Utils.to32BitField(
AclEntry.supportedOperations(ResourceType.GROUP).asScala
.map(_.code.asInstanceOf[JByte]).asJava)
var grp1Member1Response: StreamsGroupHeartbeatResponseData = null
var grp1Member2Response: StreamsGroupHeartbeatResponseData = null
var grp2Member1Response: StreamsGroupHeartbeatResponseData = null
var grp2Member2Response: StreamsGroupHeartbeatResponseData = null
// grp-1 with 2 members
TestUtils.waitUntilTrue(() => {
grp1Member1Response = streamsGroupHeartbeat(
groupId = "grp-1",
memberId = "member-1",
rebalanceTimeoutMs = timeoutMs,
activeTasks = List.empty,
standbyTasks = List.empty,
warmupTasks = List.empty,
topology = new StreamsGroupHeartbeatRequestData.Topology()
.setEpoch(1)
.setSubtopologies(List(
new StreamsGroupHeartbeatRequestData.Subtopology()
.setSubtopologyId("subtopology-1")
.setSourceTopics(List(topicName).asJava)
.setRepartitionSinkTopics(List.empty.asJava)
.setRepartitionSourceTopics(List.empty.asJava)
.setStateChangelogTopics(List.empty.asJava)
).asJava)
)
grp1Member2Response = streamsGroupHeartbeat(
groupId = "grp-1",
memberId = "member-2",
rebalanceTimeoutMs = timeoutMs,
activeTasks = List.empty,
standbyTasks = List.empty,
warmupTasks = List.empty,
topology = new StreamsGroupHeartbeatRequestData.Topology()
.setEpoch(1)
.setSubtopologies(List(
new StreamsGroupHeartbeatRequestData.Subtopology()
.setSubtopologyId("subtopology-1")
.setSourceTopics(List(topicName).asJava)
.setRepartitionSinkTopics(List.empty.asJava)
.setRepartitionSourceTopics(List.empty.asJava)
.setStateChangelogTopics(List.empty.asJava)
).asJava)
)
val groupsDescription1 = streamsGroupDescribe(
groupIds = List("grp-1"),
includeAuthorizedOperations = true
)
grp1Member1Response.errorCode == Errors.NONE.code && grp1Member2Response.errorCode == Errors.NONE.code &&
groupsDescription1.size == 1 && groupsDescription1.head.members.size == 2
}, msg = s"Could not create grp-1 with 2 members successfully")
// grp-2 with 2 members
TestUtils.waitUntilTrue(() => {
grp2Member1Response = streamsGroupHeartbeat(
groupId = "grp-2",
memberId = "member-3",
rebalanceTimeoutMs = timeoutMs,
activeTasks = List.empty,
standbyTasks = List.empty,
warmupTasks = List.empty,
topology = new StreamsGroupHeartbeatRequestData.Topology()
.setEpoch(1)
.setSubtopologies(List(
new StreamsGroupHeartbeatRequestData.Subtopology()
.setSubtopologyId("subtopology-1")
.setSourceTopics(List(topicName).asJava)
.setRepartitionSinkTopics(List.empty.asJava)
.setRepartitionSourceTopics(List.empty.asJava)
.setStateChangelogTopics(List.empty.asJava)
).asJava)
)
grp2Member2Response = streamsGroupHeartbeat(
groupId = "grp-2",
memberId = "member-4",
rebalanceTimeoutMs = timeoutMs,
activeTasks = List.empty,
standbyTasks = List.empty,
warmupTasks = List.empty,
topology = new StreamsGroupHeartbeatRequestData.Topology()
.setEpoch(1)
.setSubtopologies(List(
new StreamsGroupHeartbeatRequestData.Subtopology()
.setSubtopologyId("subtopology-1")
.setSourceTopics(List(topicName).asJava)
.setRepartitionSinkTopics(List.empty.asJava)
.setRepartitionSourceTopics(List.empty.asJava)
.setStateChangelogTopics(List.empty.asJava)
).asJava)
)
val groupsDescription2 = streamsGroupDescribe(
groupIds = List("grp-2"),
includeAuthorizedOperations = true,
version = ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled).toShort
)
grp2Member1Response.errorCode == Errors.NONE.code && grp2Member2Response.errorCode == Errors.NONE.code &&
groupsDescription2.size == 1 && groupsDescription2.head.members.size == 2
}, msg = s"Could not create grp-2 with 2 members successfully")
// Send follow-up heartbeats until both groups are stable
TestUtils.waitUntilTrue(() => {
grp1Member1Response = streamsGroupHeartbeat(
groupId = "grp-1",
memberId = grp1Member1Response.memberId,
memberEpoch = grp1Member1Response.memberEpoch,
rebalanceTimeoutMs = timeoutMs,
activeTasks = convertTaskIds(grp1Member1Response.activeTasks),
standbyTasks = convertTaskIds(grp1Member1Response.standbyTasks),
warmupTasks = convertTaskIds(grp1Member1Response.warmupTasks),
topology = null
)
grp1Member2Response = streamsGroupHeartbeat(
groupId = "grp-1",
memberId = grp1Member2Response.memberId,
memberEpoch = grp1Member2Response.memberEpoch,
rebalanceTimeoutMs = timeoutMs,
activeTasks = convertTaskIds(grp1Member2Response.activeTasks),
standbyTasks = convertTaskIds(grp1Member2Response.standbyTasks),
warmupTasks = convertTaskIds(grp1Member2Response.warmupTasks),
topology = null
)
grp2Member1Response = streamsGroupHeartbeat(
groupId = "grp-2",
memberId = grp2Member1Response.memberId,
memberEpoch = grp2Member1Response.memberEpoch,
rebalanceTimeoutMs = timeoutMs,
activeTasks = convertTaskIds(grp2Member1Response.activeTasks),
standbyTasks = convertTaskIds(grp2Member1Response.standbyTasks),
warmupTasks = convertTaskIds(grp2Member1Response.warmupTasks),
topology = null
)
grp2Member2Response = streamsGroupHeartbeat(
groupId = "grp-2",
memberId = grp2Member2Response.memberId,
memberEpoch = grp2Member2Response.memberEpoch,
rebalanceTimeoutMs = timeoutMs,
activeTasks = convertTaskIds(grp2Member2Response.activeTasks),
standbyTasks = convertTaskIds(grp2Member2Response.standbyTasks),
warmupTasks = convertTaskIds(grp2Member2Response.warmupTasks),
topology = null
)
val actual = streamsGroupDescribe(
groupIds = List("grp-1","grp-2"),
includeAuthorizedOperations = true,
version = ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled).toShort
)
actual.head.groupState() == "Stable" && actual(1).groupState() == "Stable" &&
actual.head.members.size == 2 && actual(1).members.size == 2
}, "Two groups did not stabilize with 2 members each in time")
// Test the describe request for both groups in stable state
for (version <- ApiKeys.STREAMS_GROUP_DESCRIBE.oldestVersion() to ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)) {
val actual = streamsGroupDescribe(
groupIds = List("grp-1","grp-2"),
includeAuthorizedOperations = true,
version = version.toShort
)
assertEquals(2, actual.size)
assertEquals(actual.map(_.groupId).toSet, Set("grp-1", "grp-2"))
for (describedGroup <- actual) {
assertEquals("Stable", describedGroup.groupState)
assertTrue("Group epoch is not equal to the assignment epoch", describedGroup.groupEpoch == describedGroup.assignmentEpoch)
// Verify topology
assertEquals(1, describedGroup.topology.epoch)
assertEquals(1, describedGroup.topology.subtopologies.size)
assertEquals("subtopology-1", describedGroup.topology.subtopologies.get(0).subtopologyId)
assertEquals(List(topicName).asJava, describedGroup.topology.subtopologies.get(0).sourceTopics)
// Verify members
assertEquals(2, describedGroup.members.size)
val expectedMemberIds = describedGroup.groupId match {
case "grp-1" => Set(grp1Member1Response.memberId, grp1Member2Response.memberId)
case "grp-2" => Set(grp2Member1Response.memberId, grp2Member2Response.memberId)
case unexpected => throw new AssertionError(s"Unexpected group ID: $unexpected")
}
val actualMemberIds = describedGroup.members.asScala.map(_.memberId).toSet
assertEquals(expectedMemberIds, actualMemberIds)
assertEquals(authorizedOperationsInt, describedGroup.authorizedOperations)
describedGroup.members.asScala.foreach { member =>
assertTrue("Group epoch is not equal to the member epoch", member.memberEpoch == describedGroup.assignmentEpoch)
assertEquals(1, member.topologyEpoch)
assertEquals(member.targetAssignment, member.assignment)
assertEquals(clientId, member.clientId())
assertEquals(clientHost, member.clientHost())
}
// Verify all partitions 0, 1, 2 are assigned exactly once
val allAssignedPartitions = describedGroup.members.asScala.flatMap { member =>
member.assignment.activeTasks.asScala.flatMap(_.partitions.asScala)
}.toList
assertEquals(List(0, 1, 2).sorted, allAssignedPartitions.sorted)
}
}
} finally{
admin.close()
}
}
private def convertTaskIds(responseTasks: java.util.List[StreamsGroupHeartbeatResponseData.TaskIds]): List[StreamsGroupHeartbeatRequestData.TaskIds] = {
if (responseTasks == null) {
List.empty
} else {
responseTasks.asScala.map { responseTask =>
new StreamsGroupHeartbeatRequestData.TaskIds()
.setSubtopologyId(responseTask.subtopologyId)
.setPartitions(responseTask.partitions)
}.toList
}
}
}

View File

@ -189,6 +189,13 @@
A new metric <code>AvgIdleRatio</code> has been added to the <code>ControllerEventManager</code> group. This metric measures the average idle ratio of the controller event queue thread,
providing visibility into how much time the controller spends waiting for events versus processing them. The metric value ranges from 0.0 (always busy) to 1.0 (always idle).
</li>
<li>
Deprecated <code>org.apache.kafka.streams.KafkaStreams$CloseOptions</code> and its related methods, such as
<code>KafkaStreams#close(org.apache.kafka.streams.KafkaStreams$CloseOptions)</code>.
As a replacement, please use <code>org.apache.kafka.streams.CloseOptions</code> and
<code>KafkaStreams#close(org.apache.kafka.streams.CloseOptions)</code>.
For further details, please refer to <a href="https://cwiki.apache.org/confluence/x/QAq9F">KIP-1153</a>.
</li>
</ul>
<h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0</a></h4>

View File

@ -28,8 +28,8 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.streams.CloseOptions;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.CloseOptions;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
@ -159,7 +159,7 @@ public class KafkaStreamsCloseOptionsIntegrationTest {
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
streams.close(new CloseOptions().leaveGroup(true).timeout(Duration.ofSeconds(30)));
streams.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP).withTimeout(Duration.ofSeconds(30)));
waitForEmptyConsumerGroup(adminClient, streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG), 0);
}

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
public class CloseOptions {
/**
* Enum to specify the group membership operation upon closing the Kafka Streams application.
*
* <ul>
* <li><b>{@code LEAVE_GROUP}</b>: means the consumer leave the group.</li>
* <li><b>{@code REMAIN_IN_GROUP}</b>: means the consumer will remain in the group.</li>
* </ul>
*/
public enum GroupMembershipOperation {
LEAVE_GROUP,
REMAIN_IN_GROUP
}
/**
* Specifies the group membership operation upon shutdown.
* By default, {@code GroupMembershipOperation.REMAIN_IN_GROUP} will be applied, which follows the KafkaStreams default behavior.
*/
protected GroupMembershipOperation operation = GroupMembershipOperation.REMAIN_IN_GROUP;
/**
* Specifies the maximum amount of time to wait for the close process to complete.
* This allows users to define a custom timeout for gracefully stopping the KafkaStreams.
*/
protected Optional<Duration> timeout = Optional.of(Duration.ofMillis(Long.MAX_VALUE));
private CloseOptions() {
}
protected CloseOptions(final CloseOptions closeOptions) {
this.operation = closeOptions.operation;
this.timeout = closeOptions.timeout;
}
/**
* Static method to create a {@code CloseOptions} with a custom timeout.
*
* @param timeout the maximum time to wait for the KafkaStreams to close.
* @return a new {@code CloseOptions} instance with the specified timeout.
*/
public static CloseOptions timeout(final Duration timeout) {
return new CloseOptions().withTimeout(timeout);
}
/**
* Static method to create a {@code CloseOptions} with a specified group membership operation.
*
* @param operation the group membership operation to apply. Must be one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}.
* @return a new {@code CloseOptions} instance with the specified group membership operation.
*/
public static CloseOptions groupMembershipOperation(final GroupMembershipOperation operation) {
return new CloseOptions().withGroupMembershipOperation(operation);
}
/**
* Fluent method to set the timeout for the close process.
*
* @param timeout the maximum time to wait for the KafkaStreams to close. If {@code null}, the default timeout will be used.
* @return this {@code CloseOptions} instance.
*/
public CloseOptions withTimeout(final Duration timeout) {
this.timeout = Optional.ofNullable(timeout);
return this;
}
/**
* Fluent method to set the group membership operation upon shutdown.
*
* @param operation the group membership operation to apply. Must be one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}.
* @return this {@code CloseOptions} instance.
*/
public CloseOptions withGroupMembershipOperation(final GroupMembershipOperation operation) {
this.operation = Objects.requireNonNull(operation, "operation should not be null");
return this;
}
}

View File

@ -49,6 +49,7 @@ import org.apache.kafka.streams.errors.StreamsStoppedException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.internals.ClientInstanceIdsImpl;
import org.apache.kafka.streams.internals.CloseOptionsInternal;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.internals.metrics.StreamsClientMetricsDelegatingReporter;
import org.apache.kafka.streams.processor.StandbyUpdateListener;
@ -488,7 +489,7 @@ public class KafkaStreams implements AutoCloseable {
closeToError();
}
final StreamThread deadThread = (StreamThread) Thread.currentThread();
deadThread.shutdown(false);
deadThread.shutdown(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
addStreamThread();
if (throwable instanceof RuntimeException) {
throw (RuntimeException) throwable;
@ -765,7 +766,7 @@ public class KafkaStreams implements AutoCloseable {
@Override
public void onUpdateStart(final TopicPartition topicPartition,
final String storeName,
final String storeName,
final long startingOffset) {
if (userStandbyListener != null) {
try {
@ -1136,7 +1137,7 @@ public class KafkaStreams implements AutoCloseable {
return Optional.of(streamThread.getName());
} else {
log.warn("Terminating the new thread because the Kafka Streams client is in state {}", state);
streamThread.shutdown(true);
streamThread.shutdown(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
threads.remove(streamThread);
final long cacheSizePerThread = cacheSizePerThread(numLiveStreamThreads());
log.info("Resizing thread cache due to terminating added thread, new cache size per thread is {}", cacheSizePerThread);
@ -1200,7 +1201,7 @@ public class KafkaStreams implements AutoCloseable {
final boolean callingThreadIsNotCurrentStreamThread = !streamThread.getName().equals(Thread.currentThread().getName());
if (streamThread.isThreadAlive() && (callingThreadIsNotCurrentStreamThread || numLiveStreamThreads() == 1)) {
log.info("Removing StreamThread {}", streamThread.getName());
streamThread.shutdown(true);
streamThread.shutdown(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
if (callingThreadIsNotCurrentStreamThread) {
final long remainingTimeMs = timeoutMs - (time.milliseconds() - startMs);
if (remainingTimeMs <= 0 || !streamThread.waitOnThreadState(StreamThread.State.DEAD, remainingTimeMs)) {
@ -1418,15 +1419,18 @@ public class KafkaStreams implements AutoCloseable {
/**
* Class that handles options passed in case of {@code KafkaStreams} instance scale down
*/
@Deprecated(since = "4.2")
public static class CloseOptions {
private Duration timeout = Duration.ofMillis(Long.MAX_VALUE);
private boolean leaveGroup = false;
@Deprecated(since = "4.2")
public CloseOptions timeout(final Duration timeout) {
this.timeout = timeout;
return this;
}
@Deprecated(since = "4.2")
public CloseOptions leaveGroup(final boolean leaveGroup) {
this.leaveGroup = leaveGroup;
return this;
@ -1438,10 +1442,14 @@ public class KafkaStreams implements AutoCloseable {
* This will block until all threads have stopped.
*/
public void close() {
close(Optional.empty(), false);
close(Optional.empty(), org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
}
private Thread shutdownHelper(final boolean error, final long timeoutMs, final boolean leaveGroup) {
private Thread shutdownHelper(
final boolean error,
final long timeoutMs,
final org.apache.kafka.streams.CloseOptions.GroupMembershipOperation operation
) {
stateDirCleaner.shutdownNow();
if (rocksDBMetricsRecordingService != null) {
rocksDBMetricsRecordingService.shutdownNow();
@ -1453,7 +1461,9 @@ public class KafkaStreams implements AutoCloseable {
return new Thread(() -> {
// notify all the threads to stop; avoid deadlocks by stopping any
// further state reports from the thread since we're shutting down
int numStreamThreads = processStreamThread(streamThread -> streamThread.shutdown(leaveGroup));
int numStreamThreads = processStreamThread(
streamThread -> streamThread.shutdown(operation)
);
log.info("Shutting down {} stream threads", numStreamThreads);
@ -1513,7 +1523,7 @@ public class KafkaStreams implements AutoCloseable {
}, clientId + "-CloseThread");
}
private boolean close(final Optional<Long> timeout, final boolean leaveGroup) {
private boolean close(final Optional<Long> timeout, final org.apache.kafka.streams.CloseOptions.GroupMembershipOperation operation) {
final long timeoutMs;
if (timeout.isPresent()) {
timeoutMs = timeout.get();
@ -1544,7 +1554,7 @@ public class KafkaStreams implements AutoCloseable {
+ "PENDING_SHUTDOWN, PENDING_ERROR, ERROR, or NOT_RUNNING");
}
final Thread shutdownThread = shutdownHelper(false, timeoutMs, leaveGroup);
final Thread shutdownThread = shutdownHelper(false, timeoutMs, operation);
shutdownThread.setDaemon(true);
shutdownThread.start();
@ -1562,7 +1572,7 @@ public class KafkaStreams implements AutoCloseable {
if (!setState(State.PENDING_ERROR)) {
log.info("Skipping shutdown since we are already in {}", state());
} else {
final Thread shutdownThread = shutdownHelper(true, -1, false);
final Thread shutdownThread = shutdownHelper(true, -1, org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
shutdownThread.setDaemon(true);
shutdownThread.start();
@ -1588,12 +1598,13 @@ public class KafkaStreams implements AutoCloseable {
throw new IllegalArgumentException("Timeout can't be negative.");
}
return close(Optional.of(timeoutMs), false);
return close(Optional.of(timeoutMs), org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
}
/**
* Shutdown this {@code KafkaStreams} by signaling all the threads to stop, and then wait up to the timeout for the
* threads to join.
* This method is deprecated and replaced by {@link #close(org.apache.kafka.streams.CloseOptions)}.
* @param options contains timeout to specify how long to wait for the threads to shut down, and a flag leaveGroup to
* trigger consumer leave call
* @return {@code true} if all threads were successfully stopped&mdash;{@code false} if the timeout was reached
@ -1601,15 +1612,36 @@ public class KafkaStreams implements AutoCloseable {
* Note that this method must not be called in the {@link StateListener#onChange(KafkaStreams.State, KafkaStreams.State)} callback of {@link StateListener}.
* @throws IllegalArgumentException if {@code timeout} can't be represented as {@code long milliseconds}
*/
@Deprecated(since = "4.2")
public synchronized boolean close(final CloseOptions options) throws IllegalArgumentException {
final org.apache.kafka.streams.CloseOptions closeOptions = org.apache.kafka.streams.CloseOptions.timeout(options.timeout)
.withGroupMembershipOperation(options.leaveGroup ?
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP :
org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
return close(closeOptions);
}
/**
* Shutdown this {@code KafkaStreams} by signaling all the threads to stop, and then wait up to the timeout for the
* threads to join.
* @param options contains timeout to specify how long to wait for the threads to shut down,
* and a {@link org.apache.kafka.streams.CloseOptions.GroupMembershipOperation}
* to trigger consumer leave call or remain in the group
* @return {@code true} if all threads were successfully stopped&mdash;{@code false} if the timeout was reached
* before all threads stopped
* Note that this method must not be called in the {@link StateListener#onChange(KafkaStreams.State, KafkaStreams.State)} callback of {@link StateListener}.
* @throws IllegalArgumentException if {@code timeout} can't be represented as {@code long milliseconds}
*/
public synchronized boolean close(final org.apache.kafka.streams.CloseOptions options) throws IllegalArgumentException {
Objects.requireNonNull(options, "options cannot be null");
final String msgPrefix = prepareMillisCheckFailMsgPrefix(options.timeout, "timeout");
final long timeoutMs = validateMillisecondDuration(options.timeout, msgPrefix);
final CloseOptionsInternal optionsInternal = new CloseOptionsInternal(options);
final String msgPrefix = prepareMillisCheckFailMsgPrefix(optionsInternal.timeout(), "timeout");
final long timeoutMs = validateMillisecondDuration(optionsInternal.timeout().get(), msgPrefix);
if (timeoutMs < 0) {
throw new IllegalArgumentException("Timeout can't be negative.");
}
return close(Optional.of(timeoutMs), options.leaveGroup);
return close(Optional.of(timeoutMs), optionsInternal.operation());
}
/**

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.internals;
import org.apache.kafka.streams.CloseOptions;
import java.time.Duration;
import java.util.Optional;
public class CloseOptionsInternal extends CloseOptions {
public CloseOptionsInternal(final CloseOptions options) {
super(options);
}
public GroupMembershipOperation operation() {
return operation;
}
public Optional<Duration> timeout() {
return timeout;
}
}

View File

@ -91,9 +91,9 @@ import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
@ -367,7 +367,8 @@ public class StreamThread extends Thread implements ProcessingThread {
// These are used to signal from outside the stream thread, but the variables themselves are internal to the thread
private final AtomicLong cacheResizeSize = new AtomicLong(-1L);
private final AtomicBoolean leaveGroupRequested = new AtomicBoolean(false);
private final AtomicReference<org.apache.kafka.streams.CloseOptions.GroupMembershipOperation> leaveGroupRequested =
new AtomicReference<>(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
private final AtomicLong lastShutdownWarningTimestamp = new AtomicLong(0L);
private final boolean eosEnabled;
private final boolean stateUpdaterEnabled;
@ -898,7 +899,7 @@ public class StreamThread extends Thread implements ProcessingThread {
cleanRun = runLoop();
} catch (final Throwable e) {
failedStreamThreadSensor.record();
leaveGroupRequested.set(true);
leaveGroupRequested.set(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
streamsUncaughtExceptionHandler.accept(e, false);
// Note: the above call currently rethrows the exception, so nothing below this line will be executed
} finally {
@ -1547,7 +1548,7 @@ public class StreamThread extends Thread implements ProcessingThread {
if (streamsRebalanceData.isPresent()) {
boolean hasMissingSourceTopics = false;
String missingTopicsDetail = null;
for (final StreamsGroupHeartbeatResponseData.Status status : streamsRebalanceData.get().statuses()) {
if (status.statusCode() == StreamsGroupHeartbeatResponse.Status.SHUTDOWN_APPLICATION.code()) {
shutdownErrorHook.run();
@ -1560,7 +1561,7 @@ public class StreamThread extends Thread implements ProcessingThread {
throw new TopologyException(errorMsg);
}
}
if (hasMissingSourceTopics) {
handleMissingSourceTopicsWithTimeout(missingTopicsDetail);
} else {
@ -1589,25 +1590,25 @@ public class StreamThread extends Thread implements ProcessingThread {
// Start timeout tracking on first encounter with missing topics
if (topicsReadyTimer == null) {
topicsReadyTimer = time.timer(maxPollTimeMs);
log.info("Missing source topics detected: {}. Will wait up to {}ms before failing.",
log.info("Missing source topics detected: {}. Will wait up to {}ms before failing.",
missingTopicsDetail, maxPollTimeMs);
} else {
topicsReadyTimer.update();
}
if (topicsReadyTimer.isExpired()) {
final long elapsedTime = topicsReadyTimer.elapsedMs();
final String errorMsg = String.format("Missing source topics: %s. Timeout exceeded after %dms.",
final String errorMsg = String.format("Missing source topics: %s. Timeout exceeded after %dms.",
missingTopicsDetail, elapsedTime);
log.error(errorMsg);
throw new MissingSourceTopicException(errorMsg);
} else {
log.debug("Missing source topics: {}. Elapsed time: {}ms, timeout in: {}ms",
log.debug("Missing source topics: {}. Elapsed time: {}ms, timeout in: {}ms",
missingTopicsDetail, topicsReadyTimer.elapsedMs(), topicsReadyTimer.remainingMs());
}
}
static Map<TopicPartition, PartitionInfo> getTopicPartitionInfo(final Map<HostInfo, Set<TopicPartition>> partitionsByHost) {
final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<>();
@ -1879,12 +1880,12 @@ public class StreamThread extends Thread implements ProcessingThread {
* Note that there is nothing to prevent this function from being called multiple times
* (e.g., in testing), hence the state is set only the first time
*
* @param leaveGroup this flag will control whether the consumer will leave the group on close or not
* @param operation the group membership operation to apply on shutdown. Must be one of LEAVE_GROUP or REMAIN_IN_GROUP.
*/
public void shutdown(final boolean leaveGroup) {
public void shutdown(final org.apache.kafka.streams.CloseOptions.GroupMembershipOperation operation) {
log.info("Informed to shut down");
final State oldState = setState(State.PENDING_SHUTDOWN);
leaveGroupRequested.set(leaveGroup);
leaveGroupRequested.set(operation);
if (oldState == State.CREATED) {
// The thread may not have been started. Take responsibility for shutting down
completeShutdown(true);
@ -1917,7 +1918,8 @@ public class StreamThread extends Thread implements ProcessingThread {
log.error("Failed to close changelog reader due to the following error:", e);
}
try {
final GroupMembershipOperation membershipOperation = leaveGroupRequested.get() ? LEAVE_GROUP : REMAIN_IN_GROUP;
final GroupMembershipOperation membershipOperation =
leaveGroupRequested.get() == org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.LEAVE_GROUP ? LEAVE_GROUP : REMAIN_IN_GROUP;
mainConsumer.close(CloseOptions.groupMembershipOperation(membershipOperation));
} catch (final Throwable e) {
log.error("Failed to close consumer due to the following error:", e);

View File

@ -19,7 +19,6 @@ package org.apache.kafka.streams;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaFuture;
@ -310,8 +309,12 @@ public class KafkaStreamsTest {
private void prepareConsumer(final StreamThread thread, final AtomicReference<StreamThread.State> state) {
doAnswer(invocation -> {
supplier.consumer.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP));
supplier.restoreConsumer.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP));
supplier.consumer.close(
org.apache.kafka.clients.consumer.CloseOptions.groupMembershipOperation(org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP)
);
supplier.restoreConsumer.close(
org.apache.kafka.clients.consumer.CloseOptions.groupMembershipOperation(org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP)
);
for (final MockProducer<byte[], byte[]> producer : supplier.producers) {
producer.close();
}
@ -320,7 +323,7 @@ public class KafkaStreamsTest {
threadStateListenerCapture.getValue().onChange(thread, StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.RUNNING);
threadStateListenerCapture.getValue().onChange(thread, StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN);
return null;
}).when(thread).shutdown(false);
}).when(thread).shutdown(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
}
private void prepareThreadLock(final StreamThread thread) {
@ -571,7 +574,7 @@ public class KafkaStreamsTest {
for (int i = 0; i < NUM_THREADS; i++) {
final StreamThread tmpThread = streams.threads.get(i);
tmpThread.shutdown(false);
tmpThread.shutdown(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
waitForCondition(() -> tmpThread.state() == StreamThread.State.DEAD,
"Thread never stopped.");
streams.threads.get(i).join();
@ -790,7 +793,7 @@ public class KafkaStreamsTest {
prepareThreadLock(streamThreadTwo);
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
streams.start();
streamThreadOne.shutdown(true);
streamThreadOne.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
final Set<ThreadMetadata> threads = streams.metadataForLocalThreads();
assertThat(threads.size(), equalTo(1));
assertThat(threads, hasItem(streamThreadTwo.threadMetadata()));
@ -1016,9 +1019,8 @@ public class KafkaStreamsTest {
() -> streams.state() == KafkaStreams.State.RUNNING,
"Streams never started.");
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ZERO);
closeOptions.leaveGroup(true);
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ZERO)
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
streams.close(closeOptions);
assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true));
@ -1041,8 +1043,7 @@ public class KafkaStreamsTest {
() -> streams.state() == KafkaStreams.State.RUNNING,
"Streams never started.");
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ZERO);
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ZERO);
streams.close(closeOptions);
assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true));
@ -1229,8 +1230,7 @@ public class KafkaStreamsTest {
prepareStreamThread(streamThreadTwo, 2);
prepareTerminableThread(streamThreadOne);
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ofMillis(10L));
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ofMillis(10L));
try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, supplier)) {
assertFalse(streams.close(closeOptions));
}
@ -1243,8 +1243,7 @@ public class KafkaStreamsTest {
prepareStreamThread(streamThreadTwo, 2);
prepareTerminableThread(streamThreadOne);
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ofMillis(-1L));
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ofMillis(-1L));
try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, supplier, time)) {
assertThrows(IllegalArgumentException.class, () -> streams.close(closeOptions));
}
@ -1257,8 +1256,7 @@ public class KafkaStreamsTest {
prepareStreamThread(streamThreadTwo, 2);
prepareTerminableThread(streamThreadOne);
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ZERO);
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ZERO);
try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, supplier)) {
assertFalse(streams.close(closeOptions));
}
@ -1275,9 +1273,8 @@ public class KafkaStreamsTest {
when(mockClientSupplier.getAdmin(any())).thenReturn(adminClient);
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ofMillis(10L));
closeOptions.leaveGroup(true);
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ofMillis(10L))
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, mockClientSupplier)) {
assertFalse(streams.close(closeOptions));
}
@ -1293,9 +1290,8 @@ public class KafkaStreamsTest {
final MockClientSupplier mockClientSupplier = spy(MockClientSupplier.class);
when(mockClientSupplier.getAdmin(any())).thenReturn(adminClient);
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ofMillis(-1L));
closeOptions.leaveGroup(true);
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ofMillis(-1L))
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, mockClientSupplier, time)) {
assertThrows(IllegalArgumentException.class, () -> streams.close(closeOptions));
}
@ -1312,9 +1308,8 @@ public class KafkaStreamsTest {
when(mockClientSupplier.getAdmin(any())).thenReturn(adminClient);
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ZERO);
closeOptions.leaveGroup(true);
final CloseOptions closeOptions = CloseOptions.timeout(Duration.ZERO)
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, mockClientSupplier)) {
assertFalse(streams.close(closeOptions));
}
@ -1720,7 +1715,7 @@ public class KafkaStreamsTest {
producerFuture.complete(producerInstanceId);
final Uuid adminInstanceId = Uuid.randomUuid();
adminClient.setClientInstanceId(adminInstanceId);
final Map<String, KafkaFuture<Uuid>> expectedClientIds = Map.of("main-consumer", consumerFuture, "some-thread-producer", producerFuture);
when(streamThreadOne.clientInstanceIds(any())).thenReturn(expectedClientIds);

View File

@ -60,6 +60,7 @@ import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.CloseOptions;
import org.apache.kafka.streams.GroupProtocol;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
@ -247,7 +248,7 @@ public class StreamThreadTest {
if (thread.state() != State.CREATED) {
thread.taskManager().shutdown(false);
}
thread.shutdown(true);
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
thread = null;
}
final Set<Thread> t = Collections.unmodifiableSet(Thread.getAllStackTraces().keySet());
@ -409,12 +410,12 @@ public class StreamThreadTest {
assertEquals(4, stateListener.numChanges);
assertEquals(StreamThread.State.PARTITIONS_ASSIGNED, stateListener.oldState);
thread.shutdown(true);
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
assertSame(StreamThread.State.PENDING_SHUTDOWN, thread.state());
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldChangeStateAtStartClose(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception {
thread = createStreamThread(CLIENT_ID, new MockTime(1), stateUpdaterEnabled, processingThreadsEnabled);
@ -427,18 +428,18 @@ public class StreamThreadTest {
10 * 1000,
"Thread never started.");
thread.shutdown(true);
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
TestUtils.waitForCondition(
() -> thread.state() == StreamThread.State.DEAD,
10 * 1000,
"Thread never shut down.");
thread.shutdown(true);
assertEquals(thread.state(), StreamThread.State.DEAD);
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
assertEquals(State.DEAD, thread.state());
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldCreateMetricsAtStartup(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
thread = createStreamThread(CLIENT_ID, new MockTime(1), stateUpdaterEnabled, processingThreadsEnabled);
final String defaultGroupName = "stream-thread-metrics";
@ -538,7 +539,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldNotCommitBeforeTheCommitInterval(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final long commitInterval = 1000L;
final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
@ -565,7 +566,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldNotPurgeBeforeThePurgeInterval(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final long commitInterval = 1000L;
final long purgeInterval = 2000L;
@ -593,7 +594,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldAlsoPurgeWhenNothingGetsCommitted(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final long purgeInterval = 1000L;
final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
@ -658,7 +659,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldNotProcessWhenPartitionRevoked(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
assumeFalse(processingThreadsEnabled);
@ -682,7 +683,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldProcessWhenRunning(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
assumeFalse(processingThreadsEnabled);
final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
@ -707,7 +708,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldProcessWhenPartitionAssigned(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
assumeTrue(stateUpdaterEnabled);
assumeFalse(processingThreadsEnabled);
@ -732,7 +733,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldProcessWhenStarting(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
assumeTrue(stateUpdaterEnabled);
assumeFalse(processingThreadsEnabled);
@ -757,7 +758,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldEnforceRebalanceWhenScheduledAndNotCurrentlyRebalancing(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws InterruptedException {
final Time mockTime = new MockTime(1);
final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
@ -812,7 +813,7 @@ public class StreamThreadTest {
10 * 1000,
"Thread never started.");
thread.shutdown(true);
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
TestUtils.waitForCondition(
() -> thread.state() == StreamThread.State.DEAD,
10 * 1000,
@ -822,7 +823,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldNotEnforceRebalanceWhenCurrentlyRebalancing(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws InterruptedException {
final Time mockTime = new MockTime(1);
final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
@ -880,7 +881,7 @@ public class StreamThreadTest {
() -> { }
);
thread.shutdown(true);
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
// Validate that the scheduled rebalance wasn't reset then set to MAX_VALUE so we
// don't trigger one before we can shut down, since the rebalance must be ended
@ -918,7 +919,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldRespectNumIterationsInMainLoopWithoutProcessingThreads(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
// With processing threads, there is no guarantee how many iterations will be performed
assumeFalse(processingThreadsEnabled);
@ -1047,7 +1048,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldNotCauseExceptionIfNothingCommitted(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final long commitInterval = 1000L;
final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
@ -1076,7 +1077,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldCommitAfterCommitInterval(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final long commitInterval = 100L;
final long commitLatency = 10L;
@ -1137,7 +1138,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldPurgeAfterPurgeInterval(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final long commitInterval = 100L;
final long purgeInterval = 200L;
@ -1170,7 +1171,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldRecordCommitLatency(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class);
when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
@ -1279,7 +1280,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
internalStreamsBuilder.buildAndOptimizeTopology();
@ -1319,7 +1320,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldInjectProducerPerThreadUsingClientSupplierOnCreateIfEosV2Enabled(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
@ -1357,7 +1358,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws InterruptedException {
// The state updater is disabled for this test because this test relies on the fact the mainConsumer.resume()
// is not called. This is not true when the state updater is enabled which leads to
@ -1390,7 +1391,7 @@ public class StreamThreadTest {
10 * 1000,
"Thread never started.");
thread.shutdown(true);
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
// even if thread is no longer running, it should still be polling
// as long as the rebalance is still ongoing
@ -1411,7 +1412,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldShutdownTaskManagerOnClose(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class);
when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
@ -1426,7 +1427,7 @@ public class StreamThreadTest {
thread.setStateListener(
(t, newState, oldState) -> {
if (oldState == StreamThread.State.CREATED && newState == StreamThread.State.STARTING) {
thread.shutdown(true);
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
}
});
thread.run();
@ -1435,7 +1436,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldNotReturnDataAfterTaskMigrated(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final TaskManager taskManager = mock(TaskManager.class);
final InternalTopologyBuilder internalTopologyBuilder = mock(InternalTopologyBuilder.class);
@ -1512,7 +1513,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldShutdownTaskManagerOnCloseWithoutStart(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class);
when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
@ -1524,13 +1525,13 @@ public class StreamThreadTest {
topologyMetadata.buildAndRewriteTopology();
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
.updateThreadMetadata(adminClientId(CLIENT_ID));
thread.shutdown(true);
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
verify(taskManager).shutdown(true);
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldOnlyShutdownOnce(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class);
when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
@ -1542,7 +1543,7 @@ public class StreamThreadTest {
topologyMetadata.buildAndRewriteTopology();
thread = buildStreamThread(consumer, taskManager, config, topologyMetadata)
.updateThreadMetadata(adminClientId(CLIENT_ID));
thread.shutdown(true);
thread.shutdown(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
// Execute the run method. Verification of the mock will check that shutdown was only done once
thread.run();
@ -1550,7 +1551,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldNotThrowWhenStandbyTasksAssignedAndNoStateStoresForTopology(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
internalTopologyBuilder.addSource(null, "name", null, null, null, "topic");
internalTopologyBuilder.addSink("out", "output", null, null, null, "name");
@ -1572,7 +1573,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerWasFencedWhileProcessing(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception {
internalTopologyBuilder.addSource(null, "source", null, null, null, topic1);
internalTopologyBuilder.addSink("sink", "dummyTopic", null, null, null, "source");
@ -1688,18 +1689,18 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommitTransactionWhenSuspendingTasks(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception {
testThrowingDurringCommitTransactionException(new ProducerFencedException("Producer is fenced"), stateUpdaterEnabled, processingThreadsEnabled);
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldNotCloseTaskAndRemoveFromTaskManagerIfInvalidPidMappingOccurredInCommitTransactionWhenSuspendingTasks(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception {
testThrowingDurringCommitTransactionException(new InvalidPidMappingException("PidMapping is invalid"), stateUpdaterEnabled, processingThreadsEnabled);
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldReinitializeRevivedTasksInAnyState(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception {
final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
thread = createStreamThread(CLIENT_ID, config, new MockTime(1));
@ -1873,19 +1874,19 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommitTransactionWhenCommitting(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
testNotCloseTaskAndRemoveFromTaskManagerInCommitTransactionWhenCommitting(new ProducerFencedException("Producer is fenced"), stateUpdaterEnabled, processingThreadsEnabled);
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldNotCloseTaskAndRemoveFromTaskManagerIfPidMappingIsInvalidInCommitTransactionWhenCommitting(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
testNotCloseTaskAndRemoveFromTaskManagerInCommitTransactionWhenCommitting(new InvalidPidMappingException("PID Mapping is invalid"), stateUpdaterEnabled, processingThreadsEnabled);
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldNotCloseTaskProducerWhenSuspending(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception {
final StreamsConfig config = new StreamsConfig(configProps(true, stateUpdaterEnabled, processingThreadsEnabled));
thread = createStreamThread(CLIENT_ID, config);
@ -1933,7 +1934,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldReturnActiveTaskMetadataWhileRunningState(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
internalTopologyBuilder.addSource(null, "source", null, null, null, topic1);
@ -2011,7 +2012,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldReturnStandbyTaskMetadataWhileRunningState(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
@ -2059,7 +2060,7 @@ public class StreamThreadTest {
@SuppressWarnings("unchecked")
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldUpdateStandbyTask(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception {
// Updating standby tasks on the stream thread only happens when the state updater is disabled
assumeFalse(stateUpdaterEnabled);
@ -2183,7 +2184,7 @@ public class StreamThreadTest {
@SuppressWarnings("unchecked")
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldNotUpdateStandbyTaskWhenPaused(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception {
// Updating standby tasks on the stream thread only happens when the state updater is disabled
assumeFalse(stateUpdaterEnabled);
@ -2243,7 +2244,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldCreateStandbyTask(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
setupInternalTopologyWithoutState(config);
@ -2253,7 +2254,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldNotCreateStandbyTaskWithoutStateStores(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
setupInternalTopologyWithoutState(config);
@ -2262,7 +2263,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldNotCreateStandbyTaskIfStateStoresHaveLoggingDisabled(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
setupInternalTopologyWithoutState(config);
@ -2275,7 +2276,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
@SuppressWarnings("deprecation")
public void shouldPunctuateActiveTask(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
assumeFalse(processingThreadsEnabled);
@ -2426,7 +2427,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldAlwaysUpdateTasksMetadataAfterChangingState(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
thread = createStreamThread(CLIENT_ID, config);
@ -2442,7 +2443,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception {
internalStreamsBuilder.stream(Collections.singleton("topic"), consumed)
.groupByKey()
@ -2632,7 +2633,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldThrowTaskMigratedExceptionHandlingTaskLost(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
final Set<TopicPartition> assignedPartitions = Collections.singleton(t1p1);
@ -2660,7 +2661,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldThrowTaskMigratedExceptionHandlingRevocation(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
final Set<TopicPartition> assignedPartitions = Collections.singleton(t1p1);
@ -2688,7 +2689,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
@SuppressWarnings("unchecked")
public void shouldCatchHandleCorruptionOnTaskCorruptedExceptionPath(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
@ -2749,7 +2750,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
@SuppressWarnings("unchecked")
public void shouldCatchTimeoutExceptionFromHandleCorruptionAndInvokeExceptionHandler(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
@ -2815,7 +2816,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
@SuppressWarnings("unchecked")
public void shouldCatchTaskMigratedExceptionOnOnTaskCorruptedExceptionPath(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
@ -2881,7 +2882,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
@SuppressWarnings("unchecked")
public void shouldEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnActiveTask(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final StreamsConfig config = new StreamsConfig(configProps(true, stateUpdaterEnabled, processingThreadsEnabled));
@ -2946,7 +2947,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
@SuppressWarnings("unchecked")
public void shouldNotEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnInactiveTask(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final StreamsConfig config = new StreamsConfig(configProps(true, stateUpdaterEnabled, processingThreadsEnabled));
@ -3009,7 +3010,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldNotCommitNonRunningNonRestoringTasks(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
final TaskManager taskManager = mock(TaskManager.class);
@ -3048,7 +3049,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldLogAndRecordSkippedRecordsForInvalidTimestamps(
final boolean stateUpdaterEnabled,
final boolean processingThreadsEnabled
@ -3155,7 +3156,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldTransmitTaskManagerMetrics(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class);
@ -3182,7 +3183,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldConstructAdminMetrics(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled));
final Node broker1 = new Node(0, "dummyHost-1", 1234);
@ -3239,13 +3240,13 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldNotRecordFailedStreamThread(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
runAndVerifyFailedStreamThreadRecording(false, stateUpdaterEnabled, processingThreadsEnabled);
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldRecordFailedStreamThread(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
runAndVerifyFailedStreamThreadRecording(true, stateUpdaterEnabled, processingThreadsEnabled);
}
@ -3308,7 +3309,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldCheckStateUpdater(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
assumeTrue(stateUpdaterEnabled);
final Properties streamsConfigProps = configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
@ -3326,7 +3327,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldCheckStateUpdaterInBetweenProcessCalls(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
assumeTrue(stateUpdaterEnabled);
assumeFalse(processingThreadsEnabled);
@ -3344,7 +3345,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldUpdateLagsAfterPolling(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final Properties streamsConfigProps = configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
thread = setUpThread(streamsConfigProps);
@ -3362,7 +3363,7 @@ public class StreamThreadTest {
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldResumePollingForPartitionsWithAvailableSpaceBeforePolling(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final Properties streamsConfigProps = configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
thread = setUpThread(streamsConfigProps);
@ -3377,7 +3378,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldRespectPollTimeInPartitionsAssignedStateWithStateUpdater(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
assumeTrue(stateUpdaterEnabled);
final Properties streamsConfigProps = configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
@ -3393,7 +3394,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldNotBlockWhenPollingInPartitionsAssignedStateWithoutStateUpdater(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
assumeFalse(stateUpdaterEnabled);
final Properties streamsConfigProps = configProps(false, stateUpdaterEnabled, processingThreadsEnabled);
@ -3407,13 +3408,13 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldGetMainAndRestoreConsumerInstanceId(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception {
getClientInstanceId(false, stateUpdaterEnabled, processingThreadsEnabled);
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldGetMainAndRestoreConsumerInstanceIdWithInternalTimeout(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception {
getClientInstanceId(true, stateUpdaterEnabled, processingThreadsEnabled);
}
@ -3460,7 +3461,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldReturnErrorIfMainConsumerInstanceIdNotInitialized(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
thread.setState(State.STARTING);
@ -3477,7 +3478,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldReturnErrorIfRestoreConsumerInstanceIdNotInitialized(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
thread.setState(State.STARTING);
@ -3494,7 +3495,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldReturnErrorIfProducerInstanceIdNotInitialized(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
thread.setState(State.STARTING);
@ -3511,7 +3512,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldReturnNullIfMainConsumerTelemetryDisabled(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception {
clientSupplier.consumer.disableTelemetry();
thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
@ -3528,7 +3529,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldReturnNullIfRestoreConsumerTelemetryDisabled(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception {
clientSupplier.restoreConsumer.disableTelemetry();
@ -3546,7 +3547,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldReturnNullIfProducerTelemetryDisabled(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception {
final MockProducer<byte[], byte[]> producer = new MockProducer<>();
producer.disableTelemetry();
@ -3566,7 +3567,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldTimeOutOnMainConsumerInstanceId(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
clientSupplier.consumer.setClientInstanceId(Uuid.randomUuid());
clientSupplier.consumer.injectTimeoutException(-1);
@ -3591,7 +3592,7 @@ public class StreamThreadTest {
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldTimeOutOnRestoreConsumerInstanceId(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
clientSupplier.restoreConsumer.setClientInstanceId(Uuid.randomUuid());
clientSupplier.restoreConsumer.injectTimeoutException(-1);
@ -3616,7 +3617,7 @@ public class StreamThreadTest {
}
@ParameterizedTest
@MethodSource("data")
@MethodSource("data")
public void shouldTimeOutOnProducerInstanceId(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
final MockProducer<byte[], byte[]> producer = new MockProducer<>();
producer.setClientInstanceId(Uuid.randomUuid());
@ -3964,13 +3965,13 @@ public class StreamThreadTest {
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code())
.setStatusDetail("Missing source topics")
));
// First call should not throw exception (within timeout)
thread.runOnceWithoutProcessingThreads();
// Advance time beyond max.poll.interval.ms (default is 300000ms) to trigger timeout
mockTime.sleep(300001);
final MissingSourceTopicException exception = assertThrows(MissingSourceTopicException.class, () -> thread.runOnceWithoutProcessingThreads());
assertTrue(exception.getMessage().contains("Missing source topics"));
assertTrue(exception.getMessage().contains("Timeout exceeded"));
@ -4032,7 +4033,7 @@ public class StreamThreadTest {
));
// Should immediately throw TopologyException (no timeout like MISSING_SOURCE_TOPICS)
final TopologyException exception = assertThrows(TopologyException.class,
final TopologyException exception = assertThrows(TopologyException.class,
() -> thread.runOnceWithoutProcessingThreads());
assertTrue(exception.getMessage().contains("Topics are incorrectly partitioned"));
}
@ -4151,13 +4152,13 @@ public class StreamThreadTest {
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code())
.setStatusDetail("Missing source topics")
));
// First call should not throw exception (within timeout)
thread.runOnceWithProcessingThreads();
// Advance time beyond max.poll.interval.ms (default is 300000ms) to trigger timeout
mockTime.sleep(300001);
final MissingSourceTopicException exception = assertThrows(MissingSourceTopicException.class, () -> thread.runOnceWithProcessingThreads());
assertTrue(exception.getMessage().contains("Missing source topics"));
assertTrue(exception.getMessage().contains("Timeout exceeded"));
@ -4219,35 +4220,35 @@ public class StreamThreadTest {
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code())
.setStatusDetail("Missing source topics")
));
// First call should not throw exception (within timeout)
thread.runOnceWithoutProcessingThreads();
// Advance time but not beyond timeout
mockTime.sleep(150000); // Half of max.poll.interval.ms
// Should still not throw exception
thread.runOnceWithoutProcessingThreads();
// Clear the missing source topics (simulate recovery)
streamsRebalanceData.setStatuses(List.of());
// Should complete without exception (recovery successful)
assertDoesNotThrow(() -> thread.runOnceWithoutProcessingThreads());
// Set missing topics again - should reset the timeout
streamsRebalanceData.setStatuses(List.of(
new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code())
.setStatusDetail("Different missing topics")
));
// Advance time by 250 seconds to test if timer was reset
// Total time from beginning: 150000 + 250000 = 400000ms (400s)
// If timer was NOT reset: elapsed time = 400s > 300s should throw
// If timer WAS reset: elapsed time = 250s < 300s should NOT throw
mockTime.sleep(250000); // Advance by 250 seconds
// Should not throw because timer was reset - only 250s elapsed from reset point
assertDoesNotThrow(() -> thread.runOnceWithoutProcessingThreads());
}
@ -4427,7 +4428,7 @@ public class StreamThreadTest {
null
);
}
private void runOnce(final boolean processingThreadsEnabled) {
if (processingThreadsEnabled) {
thread.runOnceWithProcessingThreads();

View File

@ -55,6 +55,9 @@ default_image_name="ducker-ak"
# The default kafka server mode.
default_kafka_mode="jvm"
# Container runtime command (docker or podman)
container_runtime=""
# Port to listen on when debugging
debugpy_port=5678
@ -66,6 +69,9 @@ usage() {
cat <<EOF
ducker-ak: a tool for running Apache Kafka tests inside Docker images.
Supports Docker and Podman container runtimes. Auto-detects available runtime
or can be overridden with CONTAINER_RUNTIME environment variable.
Usage: ${script_path} [command] [options]
help|-h|--help
@ -142,6 +148,30 @@ require_commands() {
done
}
# Detect and set the container runtime (docker or podman).
# Sets the global variable container_runtime.
# Can be overridden by setting CONTAINER_RUNTIME environment variable.
detect_container_runtime() {
if [[ -n "${container_runtime}" ]]; then
return # Already set
fi
# Allow override via environment variable
if [[ -n "${CONTAINER_RUNTIME}" ]]; then
container_runtime="${CONTAINER_RUNTIME}"
which "${container_runtime}" &> /dev/null || die "Specified container runtime '${container_runtime}' not found."
return
fi
if command -v docker &> /dev/null; then
container_runtime="docker"
elif command -v podman &> /dev/null; then
container_runtime="podman"
else
die "No supported container runtime found. Please install docker or podman, or set CONTAINER_RUNTIME environment variable."
fi
}
# Set a global variable to a value.
#
# $1: The variable name to set. This function will die if the variable already has a value. The
@ -256,7 +286,7 @@ ducker_build() {
must_pushd "${ducker_dir}"
# Tip: if you are scratching your head for some dependency problems that are referring to an old code version
# (for example java.lang.NoClassDefFoundError), add --no-cache flag to the build shall give you a clean start.
echo_and_do docker build --memory="${docker_build_memory_limit}" \
echo_and_do ${container_runtime} build --memory="${docker_build_memory_limit}" \
--build-arg "ducker_creator=${user_name}" \
--build-arg "jdk_version=${jdk_version}" \
--build-arg "UID=${UID}" \
@ -298,9 +328,14 @@ docker_run() {
# Invoke docker-run. We need privileged mode to be able to run iptables
# and mount FUSE filesystems inside the container. We also need it to
# run iptables inside the container.
must_do -v docker run --privileged \
local memory_swappiness_option=""
if [[ "${container_runtime}" == "docker" ]]; then
# Only add memory-swappiness for Docker (not supported in podman/cgroups v2)
memory_swappiness_option="--memory-swappiness=1"
fi
must_do -v ${container_runtime} run --privileged \
-d -t -h "${node}" --network ducknet "${expose_ports}" \
--memory=${docker_run_memory_limit} --memory-swappiness=1 \
--memory=${docker_run_memory_limit} ${memory_swappiness_option} \
-v "${kafka_dir}:/opt/kafka-dev" --name "${node}" -- "${image_name}"
}
@ -311,13 +346,13 @@ setup_custom_ducktape() {
[[ -f "${custom_ducktape}/ducktape/__init__.py" ]] || \
die "You must supply a valid ducktape directory to --custom-ducktape"
docker_run ducker01 "${image_name}"
local running_container="$(docker ps -f=network=ducknet -q)"
must_do -v -o docker cp "${custom_ducktape}" "${running_container}:/opt/ducktape"
docker exec --user=root ducker01 bash -c 'set -x && cd /opt/kafka-dev/tests && sudo python3 ./setup.py develop install && cd /opt/ducktape && sudo python3 ./setup.py develop install'
local running_container="$(${container_runtime} ps -f=network=ducknet -q)"
must_do -v -o ${container_runtime} cp "${custom_ducktape}" "${running_container}:/opt/ducktape"
${container_runtime} exec --user=root ducker01 bash -c 'set -x && cd /opt/kafka-dev/tests && sudo python3 ./setup.py develop install && cd /opt/ducktape && sudo python3 ./setup.py develop install'
[[ $? -ne 0 ]] && die "failed to install the new ducktape."
must_do -v -o docker commit ducker01 "${image_name}"
must_do -v docker kill "${running_container}"
must_do -v docker rm ducker01
must_do -v -o ${container_runtime} commit ducker01 "${image_name}"
must_do -v ${container_runtime} kill "${running_container}"
must_do -v ${container_runtime} rm ducker01
}
cleanup_native_dir() {
@ -347,7 +382,8 @@ prepare_native_dir() {
}
ducker_up() {
require_commands docker
detect_container_runtime
require_commands ${container_runtime}
while [[ $# -ge 1 ]]; do
case "${1}" in
-C|--custom-ducktape) set_once custom_ducktape "${2}" "the custom ducktape directory"; shift 2;;
@ -357,7 +393,7 @@ ducker_up() {
-e|--expose-ports) set_once expose_ports "${2}" "the ports to expose"; shift 2;;
-m|--kafka_mode) set_once kafka_mode "${2}" "the mode in which kafka will run"; shift 2;;
--ipv6) set_once ipv6 "true" "enable IPv6"; shift;;
*) set_once image_name "${1}" "docker image name"; shift;;
*) set_once image_name "${1}" "container image name"; shift;;
esac
done
[[ -n "${num_nodes}" ]] || num_nodes="${default_num_nodes}"
@ -377,13 +413,13 @@ use only ${num_nodes}."
fi
fi
docker ps >/dev/null || die "ducker_up: failed to run docker. Please check that the daemon is started."
${container_runtime} ps >/dev/null || die "ducker_up: failed to run ${container_runtime}. Please check that the daemon is started."
prepare_native_dir
ducker_build "${image_name}"
cleanup_native_dir
docker inspect --format='{{.Config.Labels}}' --type=image "${image_name}" | grep -q 'ducker.type'
${container_runtime} inspect --format='{{.Config.Labels}}' --type=image "${image_name}" | grep -q 'ducker.type'
local docker_status=${PIPESTATUS[0]}
local grep_status=${PIPESTATUS[1]}
[[ "${docker_status}" -eq 0 ]] || die "ducker_up: failed to inspect image ${image_name}. \
@ -396,7 +432,7 @@ it up anyway."
exit 1
fi
fi
local running_containers="$(docker ps -f=network=ducknet -q)"
local running_containers="$(${container_runtime} ps -f=network=ducknet -q)"
local num_running_containers=$(count ${running_containers})
if [[ ${num_running_containers} -gt 0 ]]; then
die "ducker_up: there are ${num_running_containers} ducker containers \
@ -405,15 +441,15 @@ attempting to start new ones."
fi
echo "ducker_up: Bringing up ${image_name} with ${num_nodes} nodes..."
if docker network inspect ducknet &>/dev/null; then
must_do -v docker network rm ducknet
if ${container_runtime} network inspect ducknet &>/dev/null; then
must_do -v ${container_runtime} network rm ducknet
fi
network_create_args=""
if [[ "${ipv6}" == "true" ]]; then
subnet_cidr_prefix="${DUCKER_SUBNET_CIDR:-"fc00:cf17"}"
network_create_args="--ipv6 --subnet ${subnet_cidr_prefix}::/64"
fi
must_do -v docker network create ${network_create_args} ducknet
must_do -v ${container_runtime} network create ${network_create_args} ducknet
if [[ -n "${custom_ducktape}" ]]; then
setup_custom_ducktape "${custom_ducktape}" "${image_name}"
fi
@ -427,28 +463,28 @@ attempting to start new ones."
for n in $(seq -f %02g 1 ${num_nodes}); do
local node="ducker${n}"
if [[ "${ipv6}" == "true" ]]; then
docker exec --user=root "${node}" grep "${node}" /etc/hosts | grep "${subnet_cidr_prefix}" >&3
${container_runtime} exec --user=root "${node}" grep "${node}" /etc/hosts | grep "${subnet_cidr_prefix}" >&3
else
docker exec --user=root "${node}" grep "${node}" /etc/hosts >&3
${container_runtime} exec --user=root "${node}" grep "${node}" /etc/hosts >&3
fi
[[ $? -ne 0 ]] && die "failed to find the /etc/hosts entry for ${node}"
done
exec 3>&-
for n in $(seq -f %02g 1 ${num_nodes}); do
local node="ducker${n}"
docker exec --user=root "${node}" \
${container_runtime} exec --user=root "${node}" \
bash -c "grep -v ${node} /opt/kafka-dev/tests/docker/build/node_hosts >> /etc/hosts"
[[ $? -ne 0 ]] && die "failed to append to the /etc/hosts file on ${node}"
# Filter out ipv4 addresses if ipv6
if [[ "${ipv6}" == "true" ]]; then
docker exec --user=root "${node}" \
${container_runtime} exec --user=root "${node}" \
bash -c "grep -v -E '([0-9]{1,3}\.){3}[0-9]{1,3}' /opt/kafka-dev/tests/docker/build/node_hosts >> /etc/hosts"
[[ $? -ne 0 ]] && die "failed to append to the /etc/hosts file on ${node}"
fi
done
if [ "$kafka_mode" == "native" ]; then
docker exec --user=root ducker01 bash -c 'cp /opt/kafka-binary/kafka.Kafka /opt/kafka-dev/kafka.Kafka'
${container_runtime} exec --user=root ducker01 bash -c 'cp /opt/kafka-binary/kafka.Kafka /opt/kafka-dev/kafka.Kafka'
fi
echo "ducker_up: added the latest entries to /etc/hosts on each node."
@ -526,8 +562,9 @@ correct_latest_link() {
}
ducker_test() {
require_commands docker
docker inspect ducker01 &>/dev/null || \
detect_container_runtime
require_commands ${container_runtime}
${container_runtime} inspect ducker01 &>/dev/null || \
die "ducker_test: the ducker01 instance appears to be down. Did you run 'ducker up'?"
declare -a test_name_args=()
local debug=0
@ -564,15 +601,16 @@ ducker_test() {
fi
cmd="cd /opt/kafka-dev && ${ducktape_cmd} --cluster-file /opt/kafka-dev/tests/docker/build/cluster.json $test_names $ducktape_args"
echo "docker exec ducker01 bash -c \"${cmd}\""
docker exec --user=ducker ducker01 bash -c "${cmd}"
echo "${container_runtime} exec ducker01 bash -c \"${cmd}\""
${container_runtime} exec --user=ducker ducker01 bash -c "${cmd}"
docker_status=$?
correct_latest_link
exit "${docker_status}"
}
ducker_ssh() {
require_commands docker
detect_container_runtime
require_commands ${container_runtime}
[[ $# -eq 0 ]] && die "ducker_ssh: Please specify a container name to log into. \
Currently active containers: $(echo_running_container_names)"
local node_info="${1}"
@ -597,21 +635,21 @@ Currently active containers: $(echo_running_container_names)"
local nodes=$(echo_running_container_names)
[[ "${nodes}" == "(none)" ]] && die "ducker_ssh: can't locate any running ducker nodes."
for node in ${nodes}; do
docker exec --user=${user_name} -i ${docker_flags} "${node}" \
${guest_command_prefix} "${guest_command}" || die "docker exec ${node} failed"
${container_runtime} exec --user=${user_name} -i ${docker_flags} "${node}" \
${guest_command_prefix} "${guest_command}" || die "${container_runtime} exec ${node} failed"
done
else
docker inspect --type=container -- "${node_name}" &>/dev/null || \
${container_runtime} inspect --type=container -- "${node_name}" &>/dev/null || \
die "ducker_ssh: can't locate node ${node_name}. Currently running nodes: \
$(echo_running_container_names)"
exec docker exec --user=${user_name} -i ${docker_flags} "${node_name}" \
exec ${container_runtime} exec --user=${user_name} -i ${docker_flags} "${node_name}" \
${guest_command_prefix} "${guest_command}"
fi
}
# Echo all the running Ducker container names, or (none) if there are no running Ducker containers.
echo_running_container_names() {
node_names="$(docker ps -f=network=ducknet -q --format '{{.Names}}' | sort)"
node_names="$(${container_runtime} ps -f=network=ducknet -q --format '{{.Names}}' | sort)"
if [[ -z "${node_names}" ]]; then
echo "(none)"
else
@ -620,7 +658,8 @@ echo_running_container_names() {
}
ducker_down() {
require_commands docker
detect_container_runtime
require_commands ${container_runtime}
local verbose=1
local force_str=""
while [[ $# -ge 1 ]]; do
@ -631,10 +670,10 @@ ducker_down() {
esac
done
local running_containers
running_containers="$(docker ps -f=network=ducknet -q)"
[[ $? -eq 0 ]] || die "ducker_down: docker command failed. Is the docker daemon running?"
running_containers="$(${container_runtime} ps -f=network=ducknet -q)"
[[ $? -eq 0 ]] || die "ducker_down: ${container_runtime} command failed. Is the ${container_runtime} daemon running?"
running_containers=${running_containers//$'\n'/ }
local all_containers="$(docker ps -a -f=network=ducknet -q)"
local all_containers="$(${container_runtime} ps -a -f=network=ducknet -q)"
all_containers=${all_containers//$'\n'/ }
if [[ -z "${all_containers}" ]]; then
maybe_echo "${verbose}" "No ducker containers found."
@ -645,18 +684,19 @@ ducker_down() {
verbose_flag="-v"
fi
if [[ -n "${running_containers}" ]]; then
must_do ${verbose_flag} docker kill "${running_containers}"
must_do ${verbose_flag} ${container_runtime} kill "${running_containers}"
fi
must_do ${verbose_flag} docker rm ${force_str} "${all_containers}"
must_do ${verbose_flag} ${container_runtime} rm ${force_str} "${all_containers}"
must_do ${verbose_flag} -o rm -f -- "${ducker_dir}/build/node_hosts" "${ducker_dir}/build/cluster.json"
if docker network inspect ducknet &>/dev/null; then
must_do -v docker network rm ducknet
if ${container_runtime} network inspect ducknet &>/dev/null; then
must_do -v ${container_runtime} network rm ducknet
fi
maybe_echo "${verbose}" "ducker_down: removed $(count ${all_containers}) containers."
}
ducker_purge() {
require_commands docker
detect_container_runtime
require_commands ${container_runtime}
local force_str=""
while [[ $# -ge 1 ]]; do
case "${1}" in
@ -666,8 +706,8 @@ ducker_purge() {
done
echo "** ducker_purge: attempting to locate ducker images to purge"
local images
images=$(docker images -q -a -f label=ducker.creator)
[[ $? -ne 0 ]] && die "docker images command failed"
images=$(${container_runtime} images -q -a -f label=ducker.creator)
[[ $? -ne 0 ]] && die "${container_runtime} images command failed"
images=${images//$'\n'/ }
declare -a purge_images=()
if [[ -z "${images}" ]]; then
@ -677,12 +717,12 @@ ducker_purge() {
echo "** ducker_purge: images to delete:"
for image in ${images}; do
echo -n "${image} "
docker inspect --format='{{.Config.Labels}} {{.Created}}' --type=image "${image}"
[[ $? -ne 0 ]] && die "docker inspect ${image} failed"
${container_runtime} inspect --format='{{.Config.Labels}} {{.Created}}' --type=image "${image}"
[[ $? -ne 0 ]] && die "${container_runtime} inspect ${image} failed"
done
ask_yes_no "Delete these docker images? [y/n]"
ask_yes_no "Delete these container images? [y/n]"
[[ "${_return}" -eq 0 ]] && exit 0
must_do -v -o docker rmi ${force_str} ${images}
must_do -v -o ${container_runtime} rmi ${force_str} ${images}
}
# Parse command-line arguments

View File

@ -20,6 +20,14 @@ KAFKA_NUM_CONTAINERS=${KAFKA_NUM_CONTAINERS:-14}
TC_PATHS=${TC_PATHS:-./kafkatest/}
REBUILD=${REBUILD:f}
# Auto-detect container runtime if not set
if [[ -z "${CONTAINER_RUNTIME}" ]]; then
export CONTAINER_RUNTIME="docker"
if command -v podman &> /dev/null; then
export CONTAINER_RUNTIME="podman"
fi
fi
die() {
echo $@
exit 1

View File

@ -27,6 +27,7 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.streams.CloseOptions;
import org.apache.kafka.streams.GroupProtocol;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValueTimestamp;
@ -341,9 +342,8 @@ public class DeleteStreamsGroupOffsetTest {
private void stopKSApp(String appId, String topic, KafkaStreams streams, StreamsGroupCommand.StreamsGroupService service) throws InterruptedException {
if (streams != null) {
KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ofSeconds(30));
closeOptions.leaveGroup(true);
CloseOptions closeOptions = CloseOptions.timeout(Duration.ofSeconds(30))
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
streams.close(closeOptions);
streams.cleanUp();

View File

@ -31,6 +31,7 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.CloseOptions;
import org.apache.kafka.streams.GroupProtocol;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValueTimestamp;
@ -512,9 +513,8 @@ public class DeleteStreamsGroupTest {
private void stopKSApp(String appId, KafkaStreams streams, StreamsGroupCommand.StreamsGroupService service) throws InterruptedException {
if (streams != null) {
KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ofSeconds(30));
closeOptions.leaveGroup(true);
CloseOptions closeOptions = CloseOptions.timeout(Duration.ofSeconds(30))
.withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
streams.close(closeOptions);
streams.cleanUp();