TimestampExtractor allows to drop records by returning a timestamp of -1. For this case, we still need to update consumed offsets to allows us to commit progress.
Reviewers: Bill Bejeck <bill@confluent.io>
As part of PR: https://github.com/apache/kafka/pull/17636 where purgatory has been moved from core to server-common hence move some existing classes used in Share Fetch to Share module.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Fixes an issue with the TTD in the specific case where users don't specify an initial time for the driver and also don't specify a start timestamp for the TestInputTopic, then pipe input records without timestamps. This combination results in a slight mismatch in the expected timestamps for the piped records, which can be noticeable when writing tests with very small time deltas.
The problem is that, while both the TTD and the TestInputTopic will be initialized to the "current time" when not otherwise specified, it's possible for some milliseconds to have passed between the creation of the TTD and the creation of the TestInputTopic. This can result in a TestInputTopic getting a start timestamp that's several ms larger than the driver's time, and ultimately causing the piped input records to have timestamps slightly in the future relative to the driver.
In practice even those who hit this issue might not notice it if they aren't manipulating time in their tests, or are advancing time by enough to negate the several-milliseconds of difference. However we noticed a test fail due to this because we were testing a ttl-based processor and had advanced the driver time by only 1 millisecond past the ttl. The piped record should have been expired, but because it's timestamp was a few milliseconds longer than the driver's start time, this test ended up failing.
Reviewers: Matthias Sax <mjsax@apache.org>, Bruno Cadonna <cadonna@apache.org>, Lucas Brutschy < lbrutschy@confluent.io>
Currently, we are using the String repr of the shareCoordinator/sharePartition key (groupId:topicId:parition) as defined in kip-932 in a few methods like ShareCoordinator.partitionFor and ShareCoordinatorMetadataCacheHelper.getShareCoordinator.
This has the potential to introduce subtle bugs when incorrect strings are used to invoke these methods. What is perturbing is that the failures might be intermittent.
This PR aims to remedy the situation by changing the type to the concrete SharePartitionKey. This way callers need not be worried about a specific encoding or format of the coordinator key as long as the SharePartitionKey has the correct fields set.
There is one issue - the FIND_COORDINATOR RPC does require the coordinator key to be set as a String in the request body. We can't get around this and have to set the value as String. However, on the KafkaApis handler side we parse this key into a SharePartitionKey again and gain compile time safety. One downside is that we need to split and format the incoming coordinator key in the request but that can be encapsulated at a single location in SharePartitionKey.
Added tests for partitionFor.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
This patch introduces the ConsumerGroupRegularExpression record (key + value) and updates the `GroupMatadataManager` and the `ConsumerGroup` to bookkeep it appropriately. Note that with this change, regular expressions are counted as subscribers in the `subscribedTopicNames` data structure. This is important because the topic metadata of the group is computed based on it.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
Added ShareRoundTripWorker.java similar to RoundTripWorker.java. This will start a producer and a share consumer on a single node. The share consumer reads back the messages produced by the producer.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
Kafka Streams actively purges records from repartition topics. Prior to this PR, Kafka Streams would retrieve the offset from the consumedOffsets map, but here are a couple of edge cases where the consumedOffsets can get ahead of the commitedOffsets map. In these cases, this means Kafka Streams will potentially purge a repartition record before it's committed.
Updated the current StreamTask test to cover this case
Reviewers: Matthias Sax <mjsax@apache.org>
These metrics were deprecated in KIP-773 and are being removed in Kafka 4.0.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Divij Vaidya <diviv@amazon.com>
Migrates existing connect tests that were using Zookeeper to use KRaft
instead, and cleans up some dead ZK code. For broker compatibility tests,
tests for versions 2.1-2.3 still need to use ZK.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
When loading transaction metadata from a transaction log partition, if the partition contains a segment ending with an empty batch, "currOffset" update logic at will be skipped for the last batch. Since "currOffset" is not advanced to next offset of last batch properly, TransactionStateManager.loadTransactionMetadata method will be stuck in the "while" loop.
This change fixes the issue by updating "currOffset" after processing each batch, whether the batch is empty or not.
Reviewers: Justine Olshan <jolshan@confluent.io>, Jun Rao <junrao@gmail.com>
Implementation of KIP-1076 to allow for adding client application metrics to the KIP-714 framework
Reviewers: Apoorv Mittal <amittal@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Matthias Sax <mjsax@apache.org>
This is part one of a multi-pr effort to convert Kafka Streams system tests to KRaft. I decided to break down the changes into multiple PRs to reduce the review load
Reviewers: Matthias Sax <mjsax@apache.org>
We should only call once `maybeSendResponseCallback` for each marker during the WriteTxnMarkersRequest handling.
Consider the following 2 cases:
First
We have 2 markers to append, one for producer-0, one for producer-1
When we first process producer-0, it appends a marker to the __consumer_offset.
The __consumer_offset append finishes very fast because the group coordinator is no longer the leader. So the coordinator directly returns NOT_LEADER_OR_FOLLOWER. In its callback, it calls the maybeComplete() for the first time, and because there is only one partition to append, it is able to go further to call maybeSendResponseCallback() and decrement numAppends.
Then it calls the replica manager append for nothing, in the callback, it calls the maybeComplete() for the second time. This time, it also decrements numAppends.
Second
We have 2 markers to append, one for producer-0, one for producer-1
When we first process producer-0, it appends a marker to the __consumer_offset and a data topic foo.
The 2 appends will be handled by group coordinator and replica manager asynchronously.
It can be a race that, both appends finishes together, then they can fill the `markerResults` at the same time, then call the `maybeComplete`. Because the `partitionsWithCompatibleMessageFormat.size == markerResults.size` condition is satisfied, both `maybeComplete` calls can go through to decrement the `numAppends` and cause a premature response.
Note: the problem only happens with KIP-848 coordinator enabled.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>, David Jacot <djacot@confluent.io>
Currently in ShareConsumeRequestManager, after we receive a ShareFetchResponse, if the subscription changes before we acknowledge(via ShareFetch), then we do not acknowledge the records which are not part of the updated subscription. Instead we must acknowledge all the records that we had received irrespective of the current subscription.
This bug is only when we are acknowledging via ShareFetch where we use SubscriptionState::fetchablePartitions to obtain the partitions to fetch. In ShareAcknowledge, as we are getting the partitions from the active share sessions, even if the subscription changed, the session would remain active.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>