* Merging the Refactory to add standby tasks heartbeat response
added test
Add stability check and refactor partition assignments
Introduces a `membersStable` method in `StreamsGroup` to ensure group members are in a stable state before partition assignments. Refactors `EndpointToPartitionsManager` to streamline input topic handling and updates related tests for clarity and correctness. Adds integration test for validating partition assignment logic with multiple configurations.
* Refactor IQv2 metadata handling and endpoint partition mapping.
Streamlined metadata validation logic in IQv2 integration tests and removed redundant code. Simplified `maybeBuildEndpointToPartitions` by eliminating unnecessary stability checks, improving clarity and maintainability of endpoint-to-partition mapping.
* Remove unused `membersStable` method from StreamsGroup
* Refactor and enhance IQv2 test metadata validation.
Replaced redundant variable names and improved metadata handling for active and standby tasks in the IQv2 test. Added validations for state store names using `EXPECTED_STORE_NAME` and organized metadata assertions to enhance clarity and testing coverage. These changes improve maintainability and ensure better validation of stream metadata.
* Add inspection of standby topics
* Adding standby tasks heartbeat response
* Refactory to add standby tasks heartbeat response
added test
* Addressing comments plus some minor checkstyle side cleanup
Implement --describe and its options: (--state, --offset, --members and the combination of them with --verbose)
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
Replaying records from the offset topic has several corner
cases, that we need to take care of. In particular, due to
compaction it can happen that we only see the tombstone
record of a member or the group, but not the original record
of the group. Similarly, if the group metadata record is
updated after the last update to the current assignment
record of a member, the records will be replayed out of order,
that is, we first replay the current assignment of the member,
and then create the group.
Therefore, we need to change the code to always create groups
when replaying any kind of records (instead of failing, as before).
Also, tombstones of non-existing groups are ignored.
This change also adds unit tests for all replay methods for streams
records.
We also piggyback some changes to streamsGroupHeartbeat:
* the structure of the method is cleaned up a bit
* request validation is extended
* unit test for request validation is extendend
Adding a couple of tests and some side cleanup.
Co-authored-by: Bruno Cadonna <cadonna@apache.org>
Reviewers: Bruno Cadonna <cadonna@apache.org>, Lucas Brutschy <lucasbru@apache.org>
We accidentally broke the epoch validation, throwing
on all epochs > 0. This is just the fix. We should improve
unit test coverage though (there is a ticket for it).
This PR integrates KIP-1082 changes into the KIP-1071. The change centers around clients creating the member id on startup, vs. waiting for the GroupCoordinator to create the member id and return it in the response to initial heartbeat.
The changes were small and included some changes to the GroupMetadataManagerTest streams group related tests.
I'll add comments in-line regarding the updates. Overall, the impact can be best summarized by stating that now certain actions will now happen one epoch earlier. For example, task assignment would typically happen on epoch 2 (bumping up from 1) since clients waited for the membership id to be returned and the assignment occurred in the subsequent heartbeat request.
For testing, I updated the GroupMetadataManagerTest to get all tests passing.
The Streams membership manager did not use the LeaveOnClose event
to faster leaving the group on closing. With the event the Streams
membership manager leaves the group without invoking any callback
for releasing the active tasks.
With this commit the Streams membership member uses the LeaveOnClose
event.
This PR implements two changes in the KIP, and related logic on client & server side.
We replace topology ID with topology epoch. We only include the basic changes to the schema and basic validation for now (fail if topology is changed without a topology epoch bump) - since topology updating will follow later on.
We add status codes for topic validation, and implement the required logic to validate the internal topics.
We merge the initialization of the Streams group into the heartbeat of the Streams group.
As a result the first heartbeat of a member (member epoch is zero) contains the topology
metadata required for initialization.
The streams membership manager manages the state of the member
and is responsible for reconciling the received task assignment
with the current task assignment of the member. Additionally,
it requests the call of the callback that installs the
reconciled task assignment in the Streams client.
* KSTREAMS-6456: Port tools for topic configuration
Ports several tools for topic configuration from the client side, to the
broker-side. Several things are refactored:
- Decoupling. On the client side, for example, RepartitionTopics was
using the CopartitionedTopicEnforcer, the InternalTopicCreator, the
TopologyMetadata and the Cluster objects. All of those are mocked
with Mockito during testing. This points to bad coupling. We
refactored all classes to be mostly self-sufficient, only relying on
themselves and simple interfaces.
- Tests only use JUnit5, not hamcrast matchers and no other streams
utilities, to not pollute the group coordinator module.
- All classes only modify the configurations -- the code does not
actually call into the AdminClient anymore.
- We map all errors to new errors in the broker, in particular,
the error for missing topics, inconsistent internal topics, and
invalid topologies.
We include the internal, mutable datastructures, that are set- and
map-based for effiecient algorithms. They are distinctly different
from the data represented in `StreamsGroupTopologyValue` and
`StreamsGroupInitializeRequest`, since regular expressions must
be resolved at this point.
Both the topic creation and internal topic validation will be
based on this code, the basics of this are implemented in the
`InternalTopicManager`.
Every time, either the broker-side topology or the topic metadata
on the broker changes, we reconfigure the internal topics, check
consistency with the current topics on the broker, and possibly
trigger creation of the missing internal topics. These changes
will be built on top of this change.
* formatting fixes
* KSTREAMS-6456: Complete topic metadata for automatic topic creation
This change updates the current RPCs and schemas to add the following
metadata:
- copartition groups are added to topology record and initialize RPC
- multiple regexs are added to topology record and initialize RPC
- replication factors are added for reach internal topic
We also add code to fill this information correctly from the
`InternalTopologyBuilder` object.
The fields `assignmentConfiguration` and `assignor` in the
`StreamsAssignmentInterface` are removed, because they are not
needed anymore.
* fixes
Throughout RPCs, we were using subtopology instead of subtopologyId for
the string ID of a subtopology. We were also using sub-topology
spelling. Cleaning this up before merging the auto-topic creating code.
This PR implements the new reconciliation logic for Kafka Streams. In
the POC, we used the reconciliation logic for topic partitions on the
broker side, developed as part of KIP-848. This worked for active tasks,
but does not take standby tasks or warm-up tasks into account.
The new reconciliation logic also takes standby and warm up tasks into
account. Inside the streams group object, we keep track of the process
IDs that own a task, and in which role. This replaces task epochs from
KIP-848. Using the process IDs, we can check the assignment invariants:
* A. A process can own a task only once in any role. For example, if
processX already owns a task as a standby, it cannot own it as an active
as well.
* B. For active tasks, since there can never two instances owning the
same task in an active role, the invariant is different: An active task
can be assigned only if no other member owns it (as an active task).
The reconciliation logic will make sure that these invariants are
followed. For this, it uses the same states for the member as KIP-848,
just with different state transitions:
A member is in state UNREVOKED whenever we are waiting for the member to
confirm revocation of at least one task. We expect the member to revoke
all tasks that are not in its target assignment before transitioning to
the next epoch. Once the member has revoked all tasks that are not in
its target assignment, it transitions to the new epoch and gets new
tasks assigned. Once all tasks are assigned, we transition to STABLE,
but it may be that some tasks cannot be assigned yet, because of the
above invariants:
* A. For any task, another instance of the same process may own the task
in any role (active/standby/warmup), and we are still awaiting
revocation.
* B. For an active task, an instance of any process may own the same
active task, and we are still awaiting revocation.
As long as tasks cannot be assigned, we remain in state UNRELEASED.
The testing required some changes, so that I could parameterize the test
on the task role which is causing state transitions. There are some
utitlity changes / clean-ups to construct assignments.
This commit:
- schedules a timeout for the initialization call
- requests the initialization only from the one member
- intializations of unknown topologies or already initialized topologies
are silently dropped
Translate from org.apache.kafka.coordinator.group.consumer package
CurrentAssignmentBuilder
TargetAssignmentBuilder
See https://github.com/lucasbru/kafka/pull/23
copy unit tests that can be preserved easily