Configs default.windowed.value.serde.inner and default.windowed.key.serde.inner
were replace with windowed.inner.class.serde. This PR updates the docs accordingly,
plus a few more side cleanups.
Reviewers: Matthias J. Sax <matthias@confluent.io>
MissingSourceTopicException should contain the name of the missing topic.
There is one corner case for which we don't have the topic name at hand, but we can log the topic
name somewhere else.
Reviewers: Bruno Cadonna <bruno@confluent.io>, Matthias J. Sax <matthias@confluent.io>
Introduce a new field id in annotation ClusterConfigProperty. The main purpose of new field is to define specific broker/controller(kraft) property. And the default value is -1 which means the ClusterConfigProperty will apply to all broker/controller.
Note that under Type.KRAFT mode, the controller id starts from 3000, and then increments by one each time. Other modes the broker/controller id starts from 0 and then increments by one.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This is the first PR in a sequence to support custom task assignors in Kafka Streams, which was described in KIP 924. It creates and exposes all of the interfaces that will need to be implemented during the refactor of the current task assignment logic.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
As a part of 2e8d69b78c, we had introduced the TransactionAbortableException in AK. On more detailed analysis we figured out that the enum SupportedOperation was a bit misleading. Hence updated the same to TransactionSupportedOperation to allow a better and more defined function signature
Reviewers: Justine Olshan <jolshan@confluent.io>
Uses the new remove operation of the state updater that returns
a future to remove revoked tasks from the state updater.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
In two tests, we are using the current snapshot version as a test parameter
`to_version`, but as the only option. We can hardcode it. This
simplifies testing downstream, since the test parameters do not change
with every version. In particular, some tests downstream are blacklisted
because they do not work with ARM. These lists need to be updated every
time `DEV_VERSION` is bumped.
Reviewers: Matthias J. Sax <matthias@confluent.io>
When there are overlapping segments in the remote storage, then the deletion may fail to remove the segments due to isRemoteSegmentWithinLeaderEpochs check. Once the deletion starts to fail for a partition, then segments won't be eligible for cleanup. The one workaround that we have is to move the log-start-offset using the kafka-delete-records script.
Reviewers: Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org>
Uses the new remove operation of the state updater that returns a future to remove lost tasks from the state udpater.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
Fix to allow to initialize positions for newly assigned partitions, while the onPartitionsAssigned callback is running, even though the partitions remain non-fetchable until the callback completes.
Before this PR, we were not allowing initialization or fetching while the callback was running. The fix here only allows to initialize the newly assigned partition position, and keeps the existing logic for making sure that the partition remains non-fetchable until the callback completes.
The need for this fix came out in one of the connect system tests, that attempts to retrieve a newly assigned partition position with a call to consumer.position from within the onPartitionsAssigned callback (WorkerSinkTask). With this PR, we allow to make such calls (test added), which is the behaviour of the legacy consumer.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
The patch implements JoinGroup API for the new consumer groups. It allow members using the classic rebalance protocol with the consumer embedded protocol to join a new consumer group.
Reviewers: David Jacot <djacot@confluent.io>
When the consumer group protocol is used in a cluster, it is, at the moment, impossible to see all records stored in the __consumer_offsets topic with kafka-dump-log --offsets-decoder. It does not know how to handle all the new records.
This patch refactors the OffsetsMessageParser used internally by kafka-dump-log to use the RecordSerde used by the new group coordinator. It ensures that the tool is always in sync with the coordinator implementation. The patch also changes the format to using the toString'ed representations of the records instead of having custom logic to dump them. It ensures that all the information is always dumped. The downside of the latest is that inner byte arrays (e.g. assignment in the classic protocol) are no longer deserialized. Personally, I feel like that it is acceptable and it is actually better to stay as close as possible to the actual records in this tool. It also avoids issues like https://issues.apache.org/jira/browse/KAFKA-15603.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This patch fixes two issues with IncrementalAlterConfigs and the ZK migration. First, it changes the handling of IncrementalAlterConfigs to check if the controller is ZK vs KRaft and only forward for KRaft. Second, it adds a check in KafkaZkClient#setOrCreateEntityConfigs to ensure a ZK broker is not directly modifying configs in ZK if there is a KRaft controller. This closes the race condition between KRaft taking over as the active controller and the ZK brokers learning about this.
*Forwarding*
During the ZK migration, there is a time when the ZK brokers are running with migrations enabled, but KRaft has yet to take over as the controller. Prior to KRaft taking over as the controller, the ZK brokers in migration mode were unconditionally forwarding IncrementalAlterConfigs (IAC) to the ZK controller. This works for some config types, but breaks when setting BROKER and BROKER_LOGGER configs for a specific broker. The behavior in KafkaApis for IAC was to always forward if the forwarding manager was defined. Since ZK brokers in migration mode have forwarding enabled, the forwarding would happen, and the special logic for BROKER and BROKER_LOGGER would be missed, causing the request to fail.
With this fix, the IAC handler will check if the controller is KRaft or ZK and only forward for KRaft.
*Protected ZK Writes*
As part of KIP-500, we moved most (but not all) ZK mutations to the ZK controller. One of the things we did not move fully to the controller was entity configs. This is because there was some special logic that needed to run on the broker for certain config updates. If a broker-specific config was set, AdminClient would route the request to the proper broker. In KRaft, we have a different mechanism for handling broker-specific config updates.
Leaving this ZK update on the broker side would be okay if we were guarding writes on the controller epoch, but it turns out KafkaZkClient#setOrCreateEntityConfigs does unprotected "last writer wins" updates to ZK. This means a ZK broker could update the contents of ZK after the metadata had been migrated to KRaft. No good! To fix this, this patch adds a check on the controller epoch to KafkaZkClient#setOrCreateEntityConfigs but also adds logic to fail the update if the controller is a KRaft controller.
The new logic in setOrCreateEntityConfigs adds STALE_CONTROLLER_EPOCH as a new exception that can be thrown while updating configs.
Reviewers: Luke Chen <showuon@gmail.com>, Akhilesh Chaganti <akhileshchg@users.noreply.github.com>, Chia-Ping Tsai <chia7712@gmail.com>
The contract of KafkaConsumer.poll(Duration) says that it throws InterruptException "if the calling thread is interrupted before or while this function is called". The new KafkaConsumer implementation was not doing this if the thread was interrupted before the poll was called, specifically with a very short timeout. If it ever waited for records, it did check the thread state. If it did not wait for records because of a short timeout, it did not.
Some of the log messages in the code erroneously mentioned timeouts, when they really meant interruption.
Also adds a test for this specific scenario.
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
ConsumerGroupPartitionMetadataValue.Epoch is not used anywhere so we can remove it. Note that we already have non-backward compatible changes lined up for 3.8 so it is fine to do it.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>