AdminClient will throw IllegalStateException instead of TimeoutException if it receives new calls while closing down. This is more consistent with how Consumer and Producer clients handle new calls after closed down.
Reviewers: Luke Chen <showuon@gmail.com>, Kirk True <kirk@kirktrue.pro>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, vamossagar12 <sagarmeansocean@gmail.com>
est Cases Covered
1. Index Files already exist on disk but not in Cache i.e. RemoteIndexCache should not call remoteStorageManager to fetch it instead cache it from the local index file present.
2. RSM returns CorruptedIndex File i.e. RemoteIndexCache should throw CorruptedIndexException instead of successfull execution.
3. Deleted Suffix Indexes file already present on disk i.e. If cleaner thread is slow , then there is a chance of deleted index files present on the disk while in parallel same index Entry is invalidated. To understand more refer https://issues.apache.org/jira/browse/KAFKA-15169
Reviewers: Divij Vaidya <diviv@amazon.com>, Luke Chen <showuon@gmail.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>
HeartbeatRequestManager is responsible for handling the ConsumerGroupHeartbeat request and response. The manager has the following responsibilities:
1. Sending the request to the GroupCoordinator when it is possible and necessary
2. Handling the response and update the `MembershipManagerImpl` based on the error/response it receives.
3. Handles request retries and fatal failures
For Successful heartbeat response:
- Updates the MembershipManager
For Failures handling:
- Retriables Errors: backoff and retries
- Fenced: Transition to a fenced state and reset the epoch, and retry in the next poll
- Fatal: Propagate the error to the user and fail the state machine
Reviewers: Kirk True <ktrue@confluent.io>, Lianet Magrans <lianetmr@gmail.com>, David Jacot <djacot@confluent.io>
This patch fixes a problem where we migrate the current producer ID batch to KRaft instead of the next producer ID batch. Since KRaft stores the next batch in the log, we end up serving up a duplicate batch to the first caller of AllocateProducerIds once the KRaft controller has taken over.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
When producer-batch is being retried, new-leader is known for the partition Vs the leader used in last attempt, then it is worthwhile to retry immediately to this new leader. A partition-leader is considered to be newer, if the epoch has advanced.
Reviewers: Walker Carlson <wcarlson@apache.org>, Kirk True <kirk@kirktrue.pro>, Andrew Schofield <andrew_schofield@uk.ibm.com
I've added details for VerificationFailureRate and VerificationTimeMs.
I considered adding the documentation for the AddPartitionsToTxnVerification metrics, but I noticed that all the request metrics simply listed Produce|FetchConsumer|FetchFollower. If we don't already report the AddPartitionsToTxn request metrics in this file, it doesn't make sense to add the verification variant. (As well as all the other APIs we report)
Filed a followup jira if we want to redo that whole section.
Reviewers: Reviewers: Divij Vaidya <diviv@amazon.com>
Now that the implementation for the state updater is done, we can enable it by default.
This PR enables the state updater by default and fixes code that made assumptions that are not true when the state updater is enabled (mainly tests).
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Walker Carlson <wcarlson@confluent.io>
Replacing the use a hardcoded -1 with a constant (`LEAVE_GROUP_EPOCH`) that provides more clarity. Since static members also have a magic number (-2), this will motivate future commits to use constants instead of hardcoded values.
Reviewers: Sagar Rao <sagarmeansocean@gmail.com>, David Jacot <djacot@confluent.io>
This patch implements DeleteGroups and OffsetDelete API in the new group coordinator.
Reviewers: yangy0000, Ritika Reddy <rreddy@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
The LogAppendInfo class is a bit bloated in terms of class fields. That's because it is used as an umbrella class for both leader log appends and follower log appends and needs to carry fields for both. This makes the constructor for the class a bit cludgy to use. It also ends up being a bit confusing when fields are important and when they aren't. I noticed there were a few fields that didn't seem necessary.
Below is a description of changes:
firstOffset is a LogOffsetMetadata but there are no readers of the field that use anything but the messageOffset field - simplified to a long.
LogAppendInfo.errorMessage is only set in one context - when calling LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo. When we use this constructor, we pass up the original exception in LogAppendResult anyway, so the field is redundant with the LogAppendResult.exception field. This allows us to simplify the handling in KAFKA-15459: Convert coordinator retriable errors to a known producer response error #14378 since there are no custom error messages we just return whatever is in the exception message.
We only use targetCompressionType when constructing the LogValidator - just inline the call instead of including it in the LogAppendInfo.
offsetsMonotonic is only used when not assigning offsets to throw an exception - just throw the exception instead of setting a field to throw later.
shallowCount is only there to determine whether there are any messages in the append. Instead, we can just check validBytes which is incremented with a non-zero value every time we increment shallowCount.
Reviewers: Justine Olshan <jolshan@confluent.io>
This PR is part of #13247
It contains ReassignPartitionsIntegrationTest rewritten in java.
Goal of PR is reduce changes size in main PR.
Reviewers: Taras Ledkov <tledkov@apache.org>, Justine Olshan <jolshan@confluent.io>
Previous commits left out TxnOffsetCommits which go through the group coordinator (not directly from the producer).
I've wired up the methods to include the transactional id and state partition to do the verification.
I've also updated UnifiedLog to verify on client and coordinator requests that are transactional.
I've not updated any sequence check logic since the sequence is always 0 on group coordinator initiated writes.
Added returned errors to Response files. Both InvalidPidMapping and InvalidTxnState will be returned and be fatal for the transactional OffsetCommit requests.
Reviewers: David Jacot <david.jacot@gmail.com>, Artem Livshits <alivshits@confluent.io>
* Implements start and stop of task executors
* Introduce flush operation to keep consumer operations out of the processing threads
* Fixes corner case: handle requested unassignment during shutdown
* Fixes corner case: handle race between voluntary unassignment and requested unassigment
* Fixes corner case: task locking future completes for the empty set
* Fixes corner case: we should not reassign a task with an uncaught exception to a task executor
* Improved logging
* Number of threads controlled from outside, of the TaskManager
Reviewers: Bruno Cadonna <bruno@confluent.io>
The Javadoc for `GroupMetadataManager#throwIfMemberEpochIsInvalid` suggests that it throws a `NotCoordinatorException` exception when the member epoch in the consumer heartbeat request is greater than the one known by this coordinator for the given member. This could happen if the heartbeat-ing member got a higher epoch from another coordinator. However the method throws `FencedMemberEpochException` even in this case. This PR corrects the Javadocs to reflect the same.
Reviewers: David Jacot <djacot@confluent.io>
As a first step, we plan to release a preview of the new consumer group rebalance protocol without the client side assignor. This patch removes all the related fields from the ConsumerGroupHeartbeat API for now. Removing fields is fine here because this API is not released yet and not exposed by default. We will add them back while bumping the version of the request when we release this part in the future.
Reviewers: Justine Olshan <jolshan@confluent.io>
A bug in the RemoteIndexCache leads to a situation where the cache does not replace the corrupted index with a new index instance fetched from remote storage. This commit fixes the bug by adding correct handling for `CorruptIndexException`.
Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Alexandre Dupriez <duprie@amazon.com>
Currently, when consumer startup, there will be a log message said:
[2023-08-11 15:47:17,713] INFO [Consumer clientId=console-consumer, groupId=console-consumer-28605] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
It confused users and make them think there's something wrong in the consumer application. Since we already log need to re-join with the given member-id logs in the joinGroupResponseHandler and already requestRejoined. So, we can skip this confusion logs.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Paolo Patierno <ppatiern@redhat.com>, vamossagar12 <sagarmeansocean@gmail.com>, David Jacot <djacot@confluent.io>
This patch moves the TopicIdPartition from the metadata module to the server-common module so it can be used by the group-coordinator module as well.
Reviewers: Sagar Rao <sagarmeansocean@gmail.com>, David Jacot <djacot@confluent.io>
This patch fixes the version of the `AuthorizedOperations` field. The schema is not used yet so the bug had no impact.
Reviewers: Kirk True <ktrue@confluent.io>, David Jacot <djacot@confluent.io>
When a fetch response has no record for a partition, validBytes is 0. We shouldn't set the last fetched epoch to logAppendInfo.lastLeaderEpoch.asScala since there is no record and it is Optional.empty. We should use currentFetchState.lastFetchedEpoch instead.
Reviewers: Divij Vaidya <diviv@amazon.com>, Viktor Somogyi-Vass <viktorsomogyi@gmail.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
DeleteSegmentsDueToLogStartOffsetBreach configures the segment such that it can hold at-most 2 record-batches. And, it asserts that the local-log-start-offset based on the assumption that each segment will contain exactly two messages.
During leader switch, the segment can get rotated and may not always contain two records. Previously, we were checking whether the expected local-log-start-offset is equal to the base-offset-of-the-first-local-log-segment. With this patch, we will scan the first local-log-segment for the expected offset.
Reviewers: Divij Vaidya <diviv@amazon.com>
When Streams completes a rebalance, it unlocks state directories
all unassigned tasks. Unfortunately, when the state updater is enabled,
Streams does not look into the state updater to determine the
unassigned tasks.
This commit corrects this by adding the check.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
bump snappy-java version to 1.1.10.4, and add more tests to verify the compressed data can be correctly decompressed and read.
For LogCleanerParameterizedIntegrationTest, we increased the message size for snappy decompression since in the new version of snappy, the decompressed size is increasing compared with the previous version. But since the compression algorithm is not kafka's scope, all we need to do is to make sure the compressed data can be successfully decompressed and parsed/read.
Reviewers: Divij Vaidya <diviv@amazon.com>, Ismael Juma <ismael@juma.me.uk>, Josep Prat <josep.prat@aiven.io>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
Add support for --bootstrap-controller in the following command-line tools:
- kafka-cluster.sh
- kafka-configs.sh
- kafka-features.sh
- kafka-metadata-quorum.sh
To implement this, the following AdminClient APIs now support the new bootstrap.controllers
configuration:
- Admin.alterConfigs
- Admin.describeCluster
- Admin.describeConfigs
- Admin.describeFeatures
- Admin.describeMetadataQuorum
- Admin.incrementalAlterConfigs
- Admin.updateFeatures
Command-line tool changes:
- Add CommandLineUtils.initializeBootstrapProperties to handle parsing --bootstrap-controller
in addition to --bootstrap-server.
- Add --bootstrap-controller to ConfigCommand.scala, ClusterTool.java, FeatureCommand.java, and
MetadataQuorumCommand.java.
KafkaAdminClient changes:
- Add the AdminBootstrapAddresses class to handle extracting bootstrap.servers or
bootstrap.controllers from the config map for KafkaAdminClient.
- In AdminMetadataManager, store the new usingBootstrapControllers boolean. Generalize
authException to encompass the concept of fatal exceptions in general. (For example, the
fatal exception where we talked to the wrong node type.) Treat
MismatchedEndpointTypeException and UnsupportedEndpointTypeException as fatal exceptions.
- Extend NodeProvider to include information about whether bootstrap.controllers is supported.
- Modify the APIs described above to support bootstrap.controllers.
Server-side changes:
- Support DescribeConfigsRequest on kcontrollers.
- Add KRaftMetadataCache to the kcontroller to simplify implemeting describeConfigs (and
probably more APIs in the future). It's mainly a wrapper around MetadataImage, so there is
essentially no extra resource consumption.
- Split RuntimeLoggerManager out of ConfigAdminManager to handle the incrementalAlterConfigs
support for BROKER_LOGGER. This is now supported on kcontrollers as well as brokers.
- Fix bug in AuthHelper.computeDescribeClusterResponse that resulted in us always sending back
BROKER as the endpoint type, even on the kcontroller.
Miscellaneous:
- Fix a few places in exceptions and log messages where we wrote "broker" instead of "node".
For example, an exception in NodeApiVersions.java, and a log message in NetworkClient.java.
- Fix the slf4j log prefix used by KafkaRequestHandler logging so that request handlers on a
controller don't look like they're on a broker.
- Make the FinalizedVersionRange constructor public for the sake of a junit test.
- Add unit and integration tests for the above.
Reviewers: David Arthur <mumrah@gmail.com>, Doguscan Namal <namal.doguscan@gmail.com>
The process method inside the tasks needs to be called from within
the processing threads. However, it currently interacts with the
consumer in two ways:
* It resumes processing when the PartitionGroup buffers are empty
* It fetches the lag from the consumer
We introduce updateLags() and
resumePollingForPartitionsWithAvailableSpace() methods that call into
the task from the polling thread, in order to set up the consumer
correctly for the next poll, and extract metadata from the consumer
after the poll.
Reviewer: Bruno Cadonna <bruno@confluent.io>
When a Streams application is subscribed with a pattern to
input topics and an input topic is deleted, the stream thread
transists to PARTITIONS_REVOKED and a rebalance is triggered.
This happens inside the poll call. Sometimes, the poll call
returns before a new assignment is received. That means, Streams
executes the poll loop in state PARTITIONS_REVOKED.
With the state updater enabled processing is also executed in states
other than RUNNING and so processing is also executed when the
stream thread is in state PARTITION_REVOKED. However, that triggers
an IllegalStateException with error message:
No current assignment for partition TEST-TOPIC-A-0
which is a fatal error.
This commit prevents processing when the stream thread is in state
PARTITIONS_REVOKED.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
State updater can get into a busy loop when all tasks are paused, because changelogReader will never return that all changelogs have been read completely. Fix this, by awaiting if updatingTasks is empty.
Related and included: if we are restoring and all tasks are paused, we should return immediately from StoreChangelogReader.
Reviewer: Bruno Cadonna <cadonna@apache.org>
The state directory throws a lock exception during initialization if a task state directory is still locked by the stream thread that previously owned the task. When this happens, Streams catches the lock exception, ignores the exception, and tries to initialize the task in the next exception.
In the state updater code path, we missed catching the lock exception when Streams recycles a task. That leads to the lock exception thrown to the exception handler, which is unexpected and leads to test failures.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>