This patch is the first of a series of patches to introduce support for server side regular expression. It introduces the re2j dependency.
Co-authored-by: Lianet Magrans <lmagrans@confluent.io>
Reviewers: Lianet Magrans <lmagrans@confluent.io>
Instead of waiting until Tasks are assigned to us, we pre-emptively
create a StandbyTask for each non-empty Task directory found on-disk.
We do this before starting any StreamThreads, and on our first
assignment (after joining the consumer group), we recycle any of these
StandbyTasks that were assigned to us, either as an Active or a
Standby.
We can't just use these "initial Standbys" as-is, because they were
constructed outside the context of a StreamThread, so we first have to
update them with the context (log context, ChangelogReader, and source
topics) of the thread that it has been assigned to.
The motivation for this is to (in a later commit) read StateStore
offsets for unowned Tasks from the StateStore itself, rather than the
.checkpoint file, which we plan to deprecate and remove.
There are a few additional benefits:
Initializing these Tasks on start-up, instead of on-assignment, will
reduce the time between a member joining the consumer group and beginning
processing. This is especially important when active tasks are being moved
over, for example, as part of a rolling restart.
If a Task has corrupt data on-disk, it will be discovered on startup and
wiped under EOS. This is preferable to wiping the state after being
assigned the Task, because another instance may have non-corrupt data and
would not need to restore (as much).
There is a potential performance impact: we open all on-disk Task
StateStores, and keep them all open until we have our first assignment.
This could require large amounts of memory, in particular when there are
a large number of local state stores on-disk.
However, since old local state for Tasks we don't own is automatically
cleaned up after a period of time, in practice, we will almost always
only be dealing with the state that was last assigned to the local
instance.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Bruno Cadonna <cadonna@apache.org>, Matthias Sax <mjsax@apache.org>
This PR augments Streams messages with leader epoch. In case of empty buffer queues, the last offset and leader epoch are retrieved from the streams task 's cache of nextOffsets.
Co-authored-by: Lucas Brutschy <lbrutschy@confluent.io>
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax <matthias@confluent.io>
Our "validate" job was running on JDK 21 while the "test" job was running 11 and 23. This patch updates the validate job to 23 and fixes the test catalog step to only run on JDK 23 (instead of 21)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Adds the DefaultStatePersister and other supporting classes for managing share state.
* Added DefaultStatePersister implementation. This is the entry point for callers who wish to invoke the share state RPC API.
* Added PersisterStateManager which is used by DefaultStatePersister to manage and send the RPCs over the network.
* Added code to BrokerServer and BrokerMetadataPublisher to instantiate the appropriate persister based on the config value for group.share.persister.class.name. If this is not specified, the DefaultStatePersister will be used. To force use of NoOpStatePersister, set the config to empty. This is an internal config, not to be exposed to the end user. This will be used to factory plug the appropriate persister.
* Using this persister, the internal __share_group_state topic will come to life and will be used for persistence of share group info.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Jun Rao <junrao@gmail.com>, David Arthur <mumrah@gmail.com>
As described in KIP-500, the Kafka controller monitors the liveness of each broker in the cluster. It gathers this information from heartbeats sent from the brokers themselves.
In some rare cases, the main controller thread may get blocked for several seconds at a time. In the current code, this will result in the controller being unable to update the last contact times for the brokers during this time.
This PR changes the controller heartbeat handling to be partially lockless. Specifically, the last contact time for each broker will be updated locklessly prior to the rest of the heartbeat handling. This will ensure that heartbeats always get through.
Additionally, this PR adds a PeriodicTaskControlManager to better manage periodic tasks. This should help handle the very common pattern where we want to schedule a background task at some frequency. We also want the background task to be immediately rescheduled if there is too much work to be done in one event.
Reviewers: Liu Zeyu <zeyu.luke@gmail.com>, David Arthur <mumrah@gmail.com>
For delayed fetch, tryComplete can be called again after onComplete. As the requests are processed with parallel threads hence this scenario can occur. We attain locks in tryComplete which keeps pending as onComplete is never called when request is already completed.
Reviewers: Abhinav Dixit <adixit@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Jun Rao <junrao@gmail.com>
Make max.poll.records a soft limit so that record batch boundaries can be respected in records returned by ShareConsumer.poll. This gives a significant performance gain because the broker is much more efficient at handling batches which have not been split.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
This allows shutting down a KafkaClusterTestKit from a JVM shutdown hook without risking error logs because the base directory has already been deleted by the shutdown hook TestUtils.tempDirectory sets up.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
A tiny fix to a single protocol JSON which was recently updated under KAFKA-14562. It seems to erroneously use strings where bools appear to be the right type.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Justine Olshan <jolshan@confluent.io>
This patch fixes the valid values generated doc of remote.log.manager.copier.thread.pool.size and remote.log.manager.expiration.thread.pool.size.
Signed-off-by: Federico Valeri <fedevaleri@gmail.com>
Reviewers: Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org>
The PR adds capability to restrict the messages in Share Fetch. The max fetch records will be an additional way to limit the number of records sent from broker to client.
In Share Fetch, with min and mx bytes, there exists 3 problems:
1. The max.poll.records client config sends the max number of records defined to application but might have fetched extra becuase of higher max bytes. But the timeout for the sent records has started on the broker.
2. As the application processes records as per max.poll.records, hence those number of records are sent in every acknowledgement. This causes the cache data to be tracked per offset as the batch is broken.
3. The client has to sent the partial acknoledgment batch and cannot piggyback on fetch requests.
To handle the above scenario max fetch records has been added. Once this PR is merged and we define the right methodolgy then KIP will be updated to have max fetch records in share fetch RPC rather as broker config.
Reviewers: Abhinav Dixit <adixit@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com>
This PR implements KIP-1094.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Kirk True <ktrue@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>