The original behavior was implemented to maintain the behavior of the Classic consumer, where the ConsumerCoordinator would do the same when handling the OffsetFetchResponse. This behavior is being updated for the legacy coordinator as part of KAFKA-17279, to retry on all retriable errors.
We should review and update the CommitRequestManager to align with this, and retry on all retriable errors, which seems sensible when fetching offsets.
The corresponding PR for classic consumer is #16826
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Currently there are 4 handler functions present for handling ShareAcknowledge responses. ShareConsumeRequestManager had an interface and the respective handlers would implement it. Instead of having 4 different handlers for this, now using AcknowledgeRequestType, we could merge the code and have only 2 handler functions, one for ShareAcknowledge success and one for ShareAcknowledge failure, eliminating the need for the interface.
This PR also fixes a bug - We were not using the time at which the response was received while handling the ShareAcknowledge response, we were using an outdated time. Now the bug is fixed.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
In AsyncKafkaConsumer, FindCoordinatorRequest is sent by background thread. In MockClient#prepareResponseFrom, it only matches the response to a future request. If there is some race condition, FindCoordinatorResponse may not match to a FindCoordinatorRequest. It's better to put MockClient#prepareResponseFrom before the request to avoid flaky test.
Reviewers: TaiJuWu <tjwu1217@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Per KIP-289, the use of an empty value for group.id configuration was deprecated back in 2.2.0.
In 3.7, the AsyncKafkaConsumer implementation will throw an error (see KAFKA-14438).
This task is to update the LegacyKafkaConsumer implementation to throw an error in 4.0.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
AccessTokenRetrieverFactory uses the value of sasl.oauthbearer.header.urlencode provided by the user, or null if no value was provided for that configuration. When the HttpAccessTokenRetriever is created the JVM attempts to unbox the value into a boolean, a NullPointerException is thrown.
The fix is to explicitly check the Boolean, and if it's null, use Boolean.FALSE.
Reviewers: bachmanity1 <81428651+bachmanity1@users.noreply.github.com>, Chia-Ping Tsai <chia7712@gmail.com>
Fix to ensure that the HB request to leave the group is generated when closing the HBRequestManager if the state is LEAVING. This is needed because we could end up closing the network thread without giving a chance to the HBManager to generate the request. This flow on consumer.close with short timeout:
1. app thread triggers releaseAssignmentAndLeaveGroup
2. background thread transitions to LEAVING
2.1 the next run of the background thread should poll the HB manager and generate a request
3. app thread releaseAssignmentAndLeaveGroup times out and moves on to close the network thread (stops polling managers. Calls pollOnClose to gather the final requests and send them along with the unsent)
If 3 happens in the app thread before 2.1 happens in the background, the HB manager won't have a chance to generate the request to leave. This PR implements the pollOnClose to generate the final request if needed.
Reviewers: Kirk True <kirk@kirktrue.pro>, TaiJuWu <tjwu1217@gmail.com>, TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
The patch adds support of alter/describe configs for group in kafka-configs.sh.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io>
This is a follow-up after KIP-919, extending support for BOOTSTRAP_CONTROLLERS_CONFIG to both Admin#alterPartitionReassignments and Admin#listPartitionReassignments.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Refactor the heartbeat request managers for consumer groups and share groups. Almost all of the code can be shared which is definitely good.
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Add the version check to client side when building ListOffsetRequest for the specific timestamp:
1) the version must be >=8 if timestamp=-4L (EARLIEST_LOCAL_TIMESTAMP)
2) the version must be >=9 if timestamp=-5L (LATEST_TIERED_TIMESTAMP)
Reviewers: PoAn Yang <payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
Add an integration test for share group list and describe admin operations.
Reviewers: Omnia Ibrahim <o.g.h.ibrahim@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
- The different behavior of nonexistent resource. For example: nonexistent broker will cause timeout; nonexistent topic will produce UnknownTopicOrPartitionException; nonexistent group will return static/default configs; client_metrics will return empty configs
- The resources (topic and broker resource types are currently supported) this description is out-of-date
- Add some junit test
Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
Currently we were not updating the result count when we merged commitAsync() requests into one batch in ShareConsumeRequestManager, so this led to lesser acknowledgements sent to the application thread (ShareConsumerImpl) than expected.
Fix : Now if the acknowledge response came from a commitAsync, then we do not wait for other requests to complete, we always prepare a background event to be sent.
This PR also fixes a bug in ShareConsumeRequestManager, where during the final ShareAcknowledge sent during close(), we also pick up any piggybacked acknowledgements which were waiting to be sent along with ShareFetch.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
Avoids stream allocation on hot code path in Admin#listOffsets
This patch avoids allocating the stream reference pipeline & spliterator for this case by explicitly allocating the pre-sized Node[] and using a for loop with int induction over the specified IDs List argument.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Kirk True <kirk@kirktrue.pro>, David Arthur <mumrah@gmail.com>
This patch extends the DescribeConfigs API to support group configs.
Reviewers: Andrew Schofield <aschofield@confluent.io>, David Jacot <djacot@confluent.io>
Implemented handleShareAcknowledge request RPC in KafkaApis.scala. This method is called whenever the client sends a Share Acknowledge request to the broker. The acknowledge logic is handles asynchronously and the results are handled appropriately.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Jun Rao <junrao@gmail.com>
This patch add resources to store and handle consumer group's config.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>, David Jacot <djacot@confluent.io>
In MetadataVersion 3.7-IV2 and above, the broker's AssignmentsManager sends an RPC to the
controller informing it about which directory we have chosen to place each new replica on.
Unfortunately, the code does not check to see if the topic still exists in the MetadataImage before
sending the RPC. It will also retry infinitely. Therefore, after a topic is created and deleted in
rapid succession, we can get stuck including the now-defunct replica in our subsequent
AssignReplicasToDirsRequests forever.
In order to prevent this problem, the AssignmentsManager should check if a topic still exists (and
is still present on the broker in question) before sending the RPC. In order to prevent log spam,
we should not log any error messages until several minutes have gone past without success.
Finally, rather than creating a new EventQueue event for each assignment request, we should simply
modify a shared data structure and schedule a deferred event to send the accumulated RPCs. This
will improve efficiency.
Reviewers: Igor Soarez <i@soarez.me>, Ron Dagostino <rndgstn@gmail.com>
The initial drop of ShareMembershipManager contained a lot of code duplicated from MembershipManagerImpl. The plan was always to share as much code as possible between the membership managers for consumer groups and share groups. This issue refactors the membership managers so that almost all of the code is in common.
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Chia Chuan Yu <yujuan476@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
This change implements the KRaft voter sending UpdateVoter request. The
UpdateVoter RPC is used to update a voter's listeners and supported
kraft versions. The UpdateVoter RPC is sent if the replicated voter set
(VotersRecord in the log) doesn't match the local voter's supported
kraft versions and controller listeners.
To not starve the Fetch request, the UpdateVoter request is sent at most
every 3 fetch timeouts. This is required to make sure that replication
is making progress and eventually the voter set in the replicated log
matches the local voter configuration.
This change also modifies the semantic for UpdateVoter. Now the
UpdateVoter response is sent right after the leader has created the new
voter set. This is required so that updating voter can transition from
sending UpdateVoter request to sending Fetch request. If the leader
waits for the VotersRecord control record to commit before sending the
UpdateVoter response, it may never send the UpdateVoter response. This
can happen if the leader needs that voter's Fetch request to commit the
control record.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
This PR adds support for add-controller and remove-controller in the kafka-metadata-quorum.sh
command-line tool. It also fixes some minor server-side bugs that blocked the tool from working.
In kafka-metadata-quorum.sh, the implementation of remove-controller is fairly straightforward. It
just takes some command-line flags and uses them to invoke AdminClient. The add-controller
implementation is a bit more complex because we have to look at the new controller's configuration
file. The parsing logic for the advertised.listeners and listeners server configurations that we
need was previously implemented in the :core module. However, the gradle module where
kafka-metadata-quorum.sh lives, :tools, cannot depend on :core. Therefore, I moved listener parsing
into SocketServerConfigs.listenerListToEndPoints. This will be a small step forward in our efforts
to move Kafka configuration out of :core.
I also made some minor changes in kafka-metadata-quorum.sh and Kafka-storage-tool.sh to handle
--help without displaying a backtrace on the screen, and give slightly better error messages on
stderr. Also, in DynamicVoter.toString, we now enclose the host in brackets if it contains a colon
(as IPV6 addresses can).
This PR fixes our handling of clusterId in addRaftVoter and removeRaftVoter, in two ways. Firstly,
it marks clusterId as nullable in the AddRaftVoterRequest.json and RemoveRaftVoterRequest.json
schemas, as it was always intended to be. Secondly, it allows AdminClient to optionally send
clusterId, by using AddRaftVoterOptions and RemoveRaftVoterOptions. We now also remember to
properly set timeoutMs in AddRaftVoterRequest. This PR adds unit tests for
KafkaAdminClient#addRaftVoter and KafkaAdminClient#removeRaftVoter, to make sure they are sending
the right things.
Finally, I fixed some minor server-side bugs that were blocking the handling of these RPCs.
Firstly, ApiKeys.ADD_RAFT_VOTER and ApiKeys.REMOVE_RAFT_VOTER are now marked as forwardable so that
forwarding from the broker to the active controller works correctly. Secondly,
org.apache.kafka.raft.KafkaNetworkChannel has now been updated to enable API_VERSIONS_REQUEST and
API_VERSIONS_RESPONSE.
Co-authored-by: Murali Basani muralidhar.basani@aiven.io
Reviewers: José Armando García Sancio <jsancio@apache.org>, Alyssa Huang <ahuang@confluent.io>
In ShareConsumeRequestManager, currently every time we perform a commitSync/commitAsync/acknowledgeOnClose we create one AcknowledgeRequestState for each call. But this can be optimised further as we can batch up the acknowledgements to be sent to the same node before the next poll() is invoked.
This will ensure that between 2 polls, the acknowledgements are accumulated in one request per node and then sent during poll, resulting in lesser RPC calls.
To achieve this, we are storing a pair of acknowledge request states for every node, the first value denotes the requestState for commitAsync() and the second value denotes the requestState for commitSync() and acknowledgeOnClose(). All the acknowledgements to be sent to a particular node are stored in the corresponding acknowledgeRequestState based on whether it was synchronous or asynchronous.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
Handle local log deletion when remote.log.copy.disabled=true based on the KIP-950.
When tiered storage is disabled or becomes read-only on a topic, the local retention configuration becomes irrelevant, and all data expiration follows the topic-wide retention configuration exclusively.
- added remoteLogEnabledAndRemoteCopyEnabled method to check if this topic enables tiered storage and remote log copy is enabled. We should adopt local.retention.ms/bytes when remote.storage.enable=true,remote.log.copy.disable=false.
- Changed to use retention.bytes/retention.ms when remote copy disabled.
- Added validation to ask users to set local.retention.ms == retention.ms and local.retention.bytes == retention.bytes
- Added tests
Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Satish Duggana <satishd@apache.org>, Christo Lolov <lolovc@amazon.com>
The replicaDirectoryId field for FetchRequest and FetchSnapshotRequest should be ignorable. This allows data objects with the directory id to be serialized to any version of the requests.
Reviewers: José Armando García Sancio <jsancio@apache.org>, Chia-Ping Tsai <chia7712@apache.org>
Adds the methods to the admin client for listing and describing share groups.
There are some unit tests, but integration tests will be in a follow-on PR.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
The actual type name in the JSON descriptor files is "bool" not "boolean"
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Andrew Schofield <andrew_schofield@live.com>
Add support for handling the update voter RPC. The update voter RPC is used to automatically update
the voters supported kraft versions and available endpoints as the operator upgrades and
reconfigures the KRaft controllers.
The add voter RPC is handled as follow:
1. Check that the leader has fenced the previous leader(s) by checking that the HWM is known;
otherwise, return the REQUEST_TIMED_OUT error.
2. Check that the cluster supports kraft.version 1; otherwise, return the UNSUPPORTED_VERSION error.
3. Check that there are no uncommitted voter changes, otherwise return the REQUEST_TIMED_OUT error.
4. Check that the updated voter still supports the currently finalized kraft.version; otherwise
return the INVALID_REQUEST error.
5. Check that the updated voter is still listening on the default listener.
6. Append the updated VotersRecord to the log. The KRaft internal listener will read this
uncommitted record from the log and update the voter in the set of voters.
7. Wait for the VotersRecord to commit using the majority of the voters. Return a REQUEST_TIMED_OUT
error if it doesn't commit in time.
8. Send the UpdateVoter successful response to the voter.
This change also implements the ability for the leader to update its own entry in the voter
set when it becomes leader for an epoch. This is done by updating the voter set and writing a
control batch as the first batch in a new leader epoch.
Finally, fix a bug in KafkaAdminClient's handling of removeRaftVoterResponse where we tried to cast
the response to the wrong type.
Reviewers: Alyssa Huang <ahuang@confluent.io>, Colin P. McCabe <cmccabe@apache.org>
The purpose of this PR is to remove ConsumerTestBuilder.java since it is no longer needed. The following PRs have eliminated the use of ConsumerTestBuilder:
#14930#16140#16200#16312
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This pr support EarliestLocalSpec LatestTierSpec in GetOffsetShell, and add integration tests.
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, PoAn Yang <payang@apache.org>
Fix log on consumer fatal error, to show member ID only if present. If no member ID the log will clearly indicate that the member has no member ID (instead of showing empty as it used to)
Apply same fix consistently to all other log lines that include member ID.
Reviewers: Kirk True <kirk@kirktrue.pro>, PoAn Yang <payang@apache.org>, TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
1. Use oldestAllowedVersion as 9 if using ListOffsetsRequest#EARLIEST_LOCAL_TIMESTAMP or ListOffsetsRequest#LATEST_TIERED_TIMESTAMP.
2. Add test cases to ListOffsetsRequestTest#testListOffsetsRequestOldestVersion to make sure requireTieredStorageTimestamp return 9 as minVersion.
3. Add EarliestLocalSpec and LatestTierSpec to OffsetSpec.
4. Add more cases to KafkaAdminClient#getOffsetFromSpec.
5. Add testListOffsetsEarliestLocalSpecMinVersion and testListOffsetsLatestTierSpecSpecMinVersion to KafkaAdminClientTest to make sure request builder has oldestAllowedVersion as 9.
Signed-off-by: PoAn Yang <payang@apache.org>
Reviewers: Luke Chen <showuon@gmail.com>
describe --status now includes directory id and endpoint information for voter and observers.
describe --replication now includes directory id.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, José Armando García Sancio <jsancio@apache.org>
Kafka Consumer client registers node/connection latency metrics in Selector.java but the values against the metric is never recorded. This seems to be an issue since inception.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Andrew Schofield <aschofield@confluent.io>, Jun Rao <junrao@gmail.com>
As mentioned in the ticket, the config property name "rebalanceTimeoutMs" is misleading since the property only handles the client's commit portion of the process.It is used in MembershipManagerImpl as a means to limit the client's efforts in the case where it is repeatedly trying to commit but failing. Considering the same, the property name has been updated to "commitTimeoutDuringReconciliation" (suggested in ticket) in GroupRebalanceConfig and in all other classes where the property/variable is used
Reviewers: Kirk True <kirk@kirktrue.pro>, Chia-Ping Tsai <chia7712@gmail.com>
As discussed in #16657 (comment) , we should make logger as static to avoid creating multiple logger instances.
I use the regex private.*Logger.*LoggerFactory to search and check all the results if certain logs need to be static.
There are some exceptions that loggers don't need to be static:
1) The logger in the inner class. Since java8 doesn't support static field in the inner class.
https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java#L3676
2) Custom loggers for each instance (non-static + non-final). In this case, multiple logger instances is actually really needed.
https://github.com/apache/kafka/blob/trunk/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java#L166
3) The logger is initialized in constructor by LogContext. Many non-static but with final modifier loggers are in this category, that's why I use .*LoggerFactory to only check the loggers that are assigned initial value when declaration.
4) protected final Logger log = Logger.getLogger(getClass())
This is for subclass can do logging with subclass name instead of superclass name.
But in this case, if the log access modifier is private, the purpose cannot be achieved since subclass cannot access the log defined in superclass. So if access modifier is private, we can replace getClass() with <className>.class
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Extend LeaderChangeMessage schema to support version 1 of the message. The leader will continue to write version 0 of the schema. This is needed so that in the future the leader can write version 1 of the message and be guaranteed that all of the replicas in the cluster support version 1 of the schema.
Reviewers: José Armando García Sancio <jsancio@apache.org>
Implement the RemoveVoter RPC. The general algorithm is as follow:
1. Check that the leader has fenced the previous leader(s) by checking that the HWM is known;
otherwise return the REQUEST_TIMED_OUT error.
2. Check that the cluster supports kraft.version 1; otherwise return the UNSUPPORTED_VERSION error.
3. Check that there are no uncommitted voter changes; otherwise return the REQUEST_TIMED_OUT error.
4. Append the updated VotersRecord to the log. The KRaft internal listener will read this uncommitted
record from the log and add the new voter to the set of voters.
5. Wait for the VotersRecord to commit using the majority of the new set of voters. Return a
REQUEST_TIMED_OUT error if it doesn't commit in time.
6. Send the RemoveVoter successful response to the client.
7. Resign the leadership if the leader is not in the new voter set
One thing to note is that now that KRaft supports both the remove voter and add voter RPC. Only one
change can be pending at once. This is achieved in the following ways. The AddVoter RPC checks if
there are pending AddVoter or RemoveVoter RPC. The RemoveVoter RPC checks if there are any
pending AddVoter or RemoveVoter RPC. Both RPCs check that there is no uncommitted VotersRecord.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
In this PR I have completely migrated HeartbeatRequestManagerTest away from ConsumerTestBuilder. I have removed any instances of spy objects and used mocks wherever possible. 31/31 tests are passing. I also removed one test that existed in another test file.
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This change implements the AddVoter RPC. The high-level algorithm is as follow:
1. Check that the leader has fenced the previous leader(s) by checking that the HWM is known;
otherwise, return the REQUEST_TIMED_OUT error.
2. Check that the cluster supports kraft.version 1; otherwise return the UNSUPPORTED_VERSION error.
3. Check that there are no uncommitted voter changes; otherwise return the REQUEST_TIMED_OUT error.
4. Check that the new voter's id is not part of the existing voter set, otherwise return the
DUPLICATE_VOTER error.
5. Send an API_VERSIONS RPC to the first (default) listener to discover the supported kraft.version
of the new voter.
6. Check that the new voter supports the current kraft.version, otherwise return the
INVALID_REQUEST error.
7. Check that the new voter is caught up to the log end offset of the leader, otherwise return a
REQUEST_TIMED_OUT error.
8. Append the updated VotersRecord to the log. The KRaft internal listener will read this
uncommitted record from the log and add the new voter to the set of voters.
9. Wait for the VotersRecord to commit using the majority of the new set of voters. Return a
REQUEST_TIMED_OUT error if it doesn't commit in time.
10. Send the AddVoter successful response to the client.
The leader implements the above algorithm by tracking 3 events: the ADD_VOTER request is received,
the API_VERSIONS response is received and finally the HWM is updated.
The state of the ADD_VOTER operation is tracked by LeaderState using the AddVoterHandlerState. The
algorithm is implemented by the AddVoterHandler type.
This change also fixes a small issue introduced by the bootstrap checkpoint (0-0.checkpoint). The
internal partition listener (KRaftControlRecordStateMachine) and the external partition listener
(KafkaRaftClient.ListenerContext) were using "nextOffset = 0" as the initial state of the reading
cursor. This was causing the bootstrap checkpoint to keep getting reloaded until the leader wrote a
record to the log. Changing the initial offset (nextOffset) to -1 allows the listeners to
distinguish between the initial state (nextOffset == -1) and the bootstrap checkpoint was loaded
but the log segment is empty (nextOffset == 0).
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Improvement to ensure that the consumer unsubscribe operation waits for a response to the leave group request before moving on to close the consumer. This makes it consistent with the behaviour of the legacy consumer.
This will avoid undesired interactions on close, that triggers a leave group, and shuts down the network thread when it completes (which before this PR could led to responses to disconnected clients).
Note that this PR does not change the transitions of the state machine on leave group, only the completion of the leave group future.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Kirk True <ktrue@confluent.io>
Introduce the KRaftVersion enum to describe the current value of kraft.version. Change a bunch of places in the code that were using raw shorts over to using this new enum.
In BrokerServer.scala, fix a bug that could cause null pointer exceptions during shutdown if we tried to shut down before fully coming up.
Do not send finalized features that are finalized as level 0, since it is a no-op.
Reviewers: dengziming <dengziming1993@gmail.com>, José Armando García Sancio <jsancio@apache.org>
This is the initial version of the share group consumer client code. It implements the complete ShareConsumer interface.
There are unit tests, but not integration tests yet since those would depend upon complete broker code, which is not available at this point.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>, Lianet Magrans <lianetmr@gmail.com>
As part of KIP-584, brokers expose a range of supported versions for each feature. For example, metadata.version might be supported from 1 to 21. (Note that feature level ranges are always inclusive, so this would include both level 1 and 21.)
These supported ranges are supposed to be able to include 0. For example, it should be possible for a broker to support a kraft.version between 0 and 1. However, in older software versions, there is an assertion in org.apache.kafka.common.feature.SupportedVersionRange that prevents this. This causes problems when the older software attempts to deserialize an ApiVersionsResponse containing such a range.
In order to resolve this dilemma, this PR bumps the version of ApiVersionsRequest from 3 to 4. Clients which send v4 promise to be able to handle ranges including 0. Clients which send v3 will not be exposed to these ranges. The feature will show up as having a minimum version of 1 instead. This work is part
of KIP-1022.
Similarly, this PR also introduces a new version of BrokerRegistrationRequest, and specifies that the
older versions of that RPC cannot handle supported version ranges including 0. Therefore, 0 is translated to 1 in the older requests.
Reviewers: Jun Rao <junrao@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>, Justine Olshan <jolshan@confluent.io>
We recently increased that log verbosity from INFO to DEBUG, but this is causing some confusion among users when an idle connection is disconnected, which is a normal activity. This makes it clear that the connection being disconnect has expired
Reviewers: Dimitar Dimitrov <ddimitrov@confluent.io>, Justine Olshan <jolshan@confluent.io>
Add a remote.log.disable.policy on a topic-level only as part of KIP-950
Reviewers: Kamal Chandraprakash <kchandraprakash@uber.com>, Luke Chen <showuon@gmail.com>, Murali Basani <muralidhar.basani@aiven.io>
The response data should change accordingly to the input, however
with the current design, it will not change even if the input changes.
We remove this cache logic to avoid returning wrong data.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Apoorv Mittal <apoorvmittal10@gmail.com>, Igor Soarez <soarez@apple.com>
Update the walkFileTree override implementation to handle parallel file deletion.
So as to prevent crashing of Kafka broker process when logs are deleted by
other threads due to retention expiry.
Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Igor Soarez <soarez@apple.com>
This change implements response handling for the new version of Vote, Fetch, FetchSnapshot, BeginQuorumEpoch and EndQuorumEpoch. All of these responses were extended to include the leader's endpoint when the leader is known.
This change also includes sending the new version of the requests for Vote, Fetch, FetchSnapshot, BeginQuorumEpoch and EndQuorumEpoch. The two most notable changes are that:
1. The leader is going to include all of its endpoints in the BeginQuorumEpoch and EndQuorumEpoch requests.
2. The replica is going to include the destination replica key for the Vote and BeginQuorumEpoch request.
QuorumState was extended so that the replica transitions to UnattachedState instead of FollowerState during startup, if the leader is known but the leader's endpoint is not known. This can happen if the known leader is not part of the voter set replicated by the replica. The expectation is that the replica will rediscover the leader from Fetch responses from the bootstrap servers or from the BeginQuorumEpoch request from the leader.
To make sure that replicas never forget the leader of a given epoch the unattached state was extended to allow an optional leader id for when the leader is known but the leader's endpoint is not known.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
This patch is an attempt to simplifying GroupMetadataManager#consumerGroupHeartbeat and GroupMetadataManager#classicGroupJoinToConsumerGroup by sharing more of the common logic. It slightly change how static members are replaced too. Now, we generate the records to replace the member and then we update the member if needed.
Reviewers: Dongnuo Lyu <dlyu@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, Justine Olshan <jolshan@confluent.io>
Finishing migration of MembershipManagerImplTest away from ConsumerTestBuilder and removed all spy objects.
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Philip Nee <pnee@confluent.io>, Matthias J. Sax <matthias@confluent.io>
Prior to KIP-853, users were not allow to enumerate listeners specified in `controller.listener.names` in the `advertised.listeners`. This decision was made in 3.3 because the `controller.quorum.voters` property is in effect the list of advertised listeners for all of the controllers.
KIP-853 is moving away from `controller.quorum.voters` in favor of a dynamic set of voters. This means that the user needs to have a way of specifying the advertised listeners for controller.
This change allows the users to specify listener names in `controller.listener.names` in `advertised.listeners`. To make this change forwards compatible (use a valid configuration from 3.8 in 3.9), the controller's advertised listeners are going to get computed by looking up the endpoint in `advertised.listeners`. If it doesn't exist, the controller will look up the endpoint in the `listeners` configuration.
This change also includes a fix the to the BeginQuorumEpoch request where the default value for VoterId was 0 instead of -1.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Create 3 new metadata versions:
- 3.8-IV0, for the upcoming 3.8 release.
- 3.9-IV0, to add support for KIP-1005.
- 3.9-IV1, as the new release vehicle for KIP-966.
Create ListOffsetRequest v9, which will be used in 3.9-IV0 to support KIP-1005. v9 is currently an unstable API version.
Reviewers: Jun Rao <junrao@gmail.com>, Justine Olshan <jolshan@confluent.io>
Implement request handling for the updated versions of the KRaft RPCs (Fetch, FetchSnapshot, Vote,
BeginQuorumEpoch and EndQuorumEpoch). This doesn't add support for KRaft replicas to send the new
version of the KRaft RPCs. That will be implemented in KAFKA-16529.
All of the RPCs responses were extended to include the leader's endpoint for the listener of the
channel used in the request. EpochState was extended to include the leader's endpoint information
but only the FollowerState and LeaderState know the leader id and its endpoint(s).
For the Fetch request, the replica directory id was added. The leader now tracks the follower's log
end offset using both the replica id and replica directory id.
For the FetchSnapshot request, the replica directory id was added. This is not used by the KRaft
leader and it is there for consistency with Fetch and for help debugging.
For the Vote request, the replica key for both the voter (destination) and the candidate (source)
were added. The voter key is checked for consistency. The candidate key is persisted when the vote
is granted.
For the BeginQuorumEpoch request, all of the leader's endpoints are included. This is needed so
that the voters can return the leader's endpoint for all of the supported listeners.
For the EndQuorumEpoch request, all of the leader's endpoints are included. This is needed so that
the voters can return the leader's endpoint for all of the supported listeners. The successor list
has been extended to include the directory id. Receiving voters can use the entire replica key when
searching their position in the successor list.
Updated the existing test in KafkaRaftClientTest and KafkaRaftClientSnapshotTest to execute using
both the old version and new version of the RPCs.
Reviewers: Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
KIP-932 introduces FindCoordinator v6 for finding share coordinators. The initial implementation:
Checks that share coordinators are only requested with v6 or above.
Share coordinator requests are authorized as cluster actions (this is for inter-broker use only)
Responds with COORDINATOR_NOT_AVAILABLE because share coordinators are not yet available.
When the share coordinator code is delivered, the request handling will be gated by configurations which enable share groups and the share coordinator specifically. If these are not enabled, COORDINATOR_NOT_AVAILABLE is the response.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
Add the cause of TimeoutException for Producer send() errors.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Lianet Magrans <lianetmr@gmail.com>, Artem Livshits <alivshits@confluent.io>, Justine Olshan <jolshan@confluent.io>
This PR fixes consumer close to avoid updating the subscription state object in the app thread. Now the close simply triggers an UnsubscribeEvent that is handled in the background to trigger callbacks, clear assignment, and send leave heartbeat. Note that after triggering the event, the unsubscribe will continuously process background events until the event completes, to ensure that it allows for callbacks to run in the app thread.
The logic around what happens if the unsubscribe fails remain unchanged: close will log, keep the first exception and carry on.
It also removes the redundant LeaveOnClose event (it used to do the the exact same thing as the UnsubscribeEvent, both calling membershipMgr.leaveGroup).
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
In `GroupMetadataManager#toTopicPartitions`, we generate a list of `ConsumerGroupHeartbeatRequestData.TopicPartitions` from the input deserialized subscription. Currently the input subscription is `ConsumerPartitionAssignor.Subscription`, where the topic partitions are stored as (topic-partition) pairs, whereas in `ConsumerGroupHeartbeatRequestData.TopicPartitions`, we need the topic partitions to be stored as (topic-partition list) pairs.
`ConsumerProtocolSubscription` is an intermediate data structure in the deserialization where the topic partitions are stored as (topic-partition list) pairs. This pr uses `ConsumerProtocolSubscription` instead as the input subscription to make `toTopicPartitions` more efficient.
Reviewers: David Jacot <djacot@confluent.io>
We observed some runs of the test suite caused CI pipelines to stall.
A thread dump revealed that the test runner was blocked trying to read from a
socket, while attempting to close the socket [[0]]. It turns out this is
due to a bug in JDK which is very similar to JDK-8274524, but it affects
the else branch of `SSLSocketImpl::bruteForceCloseInput` [[1]] which wasn't
fixed in JDK-8274524.
Since the blocking happens in a native call, the test runner's timeouts have
no effect as the blocked test runner thread doesn't seem to respond to
interrupts.
As a mitigation in Kafka's test suite, this change adds `SO_TIMEOUT` of
30 seconds to all the TLS sockets handled by `EchoServer`. The timeout is
reasonably high for tests and a finite upper bound avoids infinite
blocking of the test suite.
[0]: https://issues.apache.org/jira/secure/attachment/13066427/timeout.log
[1]: 890adb6410/src/java.base/share/classes/sun/security/ssl/SSLSocketImpl.java (L808)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Define the interfaces and RPCs for share-group persistence. (KIP-932). This PR is just RPCs and interfaces to allow building of the broker components which depend upon them. The implementation will follow in subsequent PRs.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Apoorv Mittal <apoorvmittal10@gmail.com>
Allow the committed offsets fetch to run for as long as needed. This handles the case where a user invokes Consumer.poll() with a very small timeout (including zero).
Reviewers: Andrew Schofield <aschofield@confluent.io>, Lianet Magrans <lianetmr@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
In previous PR(#16048), I mistakenly excluded the underscore (_) from the set of valid characters for the protocol,
resulting in the inability to correctly parse the connection string for SASL_PLAINTEXT. This bug fix addresses the
issue and includes corresponding tests.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Luke Chen <showuon@gmail.com>
Fixes for the leave group flow (unsubscribe/close):
Fix to send Heartbeat to leave group on close even if the callbacks fail
fix to ensure that if a member gets fenced while blocked on callbacks (ex. on unsubscribe), it will clear its epoch to not include it in commit requests
fix to avoid race on the subscription state object on unsubscribe, updating it only on the background thread when the callbacks to leave complete (success or failure).
Also improving logging in this area.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Philip Nee <pnee@confluent.io>
Completely migrates ConsumerNetworkThreadTest away from ConsumerTestBuilder and removes all usages of spy objects and replaced with mocks. Removes testEnsureMetadataUpdateOnPoll() since it was doing integration testing. Also I adds new tests to get more complete test coverage of ConsumerNetworkThread.
Reviewers: Kirk True <kirk@kirktrue.pro>, Lianet Magrans <lianetmr@gmail.com>, Philip Nee <pnee@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This commit implements KIP-899: Allow producer and consumer clients to rebootstrap. It introduces the new setting `metadata.recovery.strategy`, applicable to all the types of clients.
Reviewers: Greg Harris <gharris1727@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
KAFKA-16570 FenceProducers API returns "unexpected error" when successful
* Client handling of ConcurrentTransactionsException as retriable
* Unit test
* Integration test
Reviewers: Chris Egerton <chrise@aiven.io>, Justine Olshan <jolshan@confluent.io>
This patch is the continuation of https://github.com/apache/kafka/pull/15964. It introduces the records coalescing to the CoordinatorRuntime. It also introduces a new configuration `group.coordinator.append.linger.ms` which allows administrators to chose the linger time or disable it with zero. The new configuration defaults to 10ms.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
Add support for KIP-953 KRaft Quorum reconfiguration in the DescribeQuorum request and response.
Also add support to AdminClient.describeQuorum, so that users will be able to find the current set of
quorum nodes, as well as their directories, via these RPCs.
Reviewers: Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>, Andrew Schofield <aschofield@confluent.io>
Propagate metadata error from background thread to application thread.
So, this fix ensures that metadata errors are thrown to the user on consumer.poll()
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Philip Nee <pnee@confluent.io>, Lianet Magrans <lianetmr@gmail.com>
Fix to complete Future which was stuck due the exception.getCause() throws an error.
The fix in the #16217 unblocked blocking thread but exception in catch block from blocking get call was wrapped in ExecutionException which is not the case when moved to async workflow hence getCause is not needed.
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This PR includes changes for AsyncKafkaConsumer to avoid evaluating the subscription regex on every poll if metadata hasn't changed. The metadataVersionSnapshot was introduced to identify whether metadata has changed or not, if yes then the current subscription regex will be evaluated.
This is the same mechanism used by the LegacyConsumer.
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Matthias J. Sax <matthias@confluent.io>
Improve consistency and correctness for user-provided timeouts at the Consumer network request layer, per the Java client Consumer timeouts design (https://cwiki.apache.org/confluence/display/KAFKA/Java+client+Consumer+timeouts). While the changes introduced in KAFKA-15974 enforce timeouts at the Consumer's event layer, this change enforces timeouts at the network request layer.
The changes mostly fit into the following areas:
1. Create shared code and idioms so timeout handling logic is consistent across current and future RequestManager implementations
2. Use deadlineMs instead of expirationMs, expirationTimeoutMs, retryExpirationTimeMs, timeoutMs, etc.
3. Update "preemptive pruning" to remove expired requests that have had at least one attempt
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Bruno Cadonna <cadonna@apache.org>
Remove usage of the partition.assignment.strategy config in the new consumer. This config is deprecated with the new consumer protocol, so the AsyncKafkaConsumer should not use or validate the property.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
Implement the add voter, remove voter, and update voter RPCs for
KIP-853. This is just adding the RPC handling; the current
implementation in RaftManager just throws UnsupportedVersionException.
Reviewers: Andrew Schofield <aschofield@confluent.io>, José Armando García Sancio <jsancio@apache.org>
Use REQUEST_TIMEOUT_MS_CONFIG in AdminClient.fenceProducers,
or options.timeoutMs if specified, as transaction timeout.
No transaction will be started with this timeout, but
ReplicaManager.appendRecords uses this value as its timeout.
Use REQUEST_TIMEOUT_MS_CONFIG like a regular producer append
to allow for replication to take place.
Co-Authored-By: Adrian Preston <prestona@uk.ibm.com>
This PR introduces ShareConsumer and KafkaShareConsumer. It is focused entirely on the minimal additions required to introduce the external programming interfaces.
Reviewers: Apoorv Mittal <amittal@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
This PR defines the initial set of RPCs for KIP-932. The RPCs for the admin client and state management are not in this PR.
Reviewers: Apoorv Mittal <amittal@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
Fix the bug where the heartbeat is not sent when a newly created consumer is immediately closed.
When there is a heartbeat request in flight and the consumer is then closed. In the current code, the HeartbeatRequestManager does not correctly send the closing heartbeat because a previous heartbeat request is still in flight. However, the closing heartbeat is only sent once, so in this situation, the broker will not know that the consumer has left the consumer group until the consumer's heartbeat times out.
This situation causes the broker to wait until the consumer's heartbeat times out before triggering a consumer group rebalance, which in turn affects message consumption.
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
As part of KIP-1022, I have created an interface for all the new features to be used when parsing the command line arguments, doing validations, getting default versions, etc.
I've also added the --feature flag to the storage tool to show how it will be used.
Created a TestFeatureVersion to show an implementation of the interface (besides MetadataVersion which is unique) and added tests using this new test feature.
I will add the unstable config and tests in a followup.
Reviewers: David Mao <dmao@confluent.io>, David Jacot <djacot@confluent.io>, Artem Livshits <alivshits@confluent.io>, Jun Rao <junrao@apache.org>
Rate reports value in the form of sumOrCount/monitoredWindowSize. It has a bug in monitoredWindowSize calculation, which leads to spikes in result values.
Reviewers: Jun Rao <junrao@gmail.com>
Implements KIP-1036.
Add raw ConsumerRecord data to RecordDeserialisationException to make DLQ implementation easier.
Reviewers: Kirk True <ktrue@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Matthias J. Sax <matthias@confluent.io>
Handle FencedInstanceIdException that a consumer may receive in the heartbeat response. This will be the case when a static consumer is removed from the group by and admin client, and another member joins with the same group.instance.id (allowed in). The first member will receive a FencedInstanceId on its next heartbeat. The expectation is that this should be handled as a fatal error.
There are no actual changes in logic with this PR, given that without being handled, the FencedInstanceId was being treated as an "unexpected error", which are all treated as fatal errors, so the outcome remains the same. But we're introducing this small change just for accuracy in the logic and the logs: FencedInstanceId is expected during heartbeat, a log line is shown describing the situation and why it happened (and it's treated as a fatal error, just like it was before this PR).
This PR also improves the test to ensure that the error propagated to the app thread matches the one received in the HB.
Reviewers: Andrew Schofield <aschofield@confluent.io>, David Jacot <djacot@confluent.io>
The intention of the CompletableApplicationEvent is for a Consumer to enqueue the event and then block, waiting for it to complete. The application thread will block up to the amount of the timeout. This change introduces a consistent manner in which events are expired out by checking their timeout values.
The CompletableEventReaper is a new class that tracks CompletableEvents that are enqueued. Both the application thread and the network I/O thread maintain their own reaper instances. The application thread will track any CompletableBackgroundEvents that it receives and the network I/O thread will do the same with any CompletableApplicationEvents it receives. The application and network I/O threads will check their tracked events, and if any are expired, the reaper will invoke each event's CompletableFuture.completeExceptionally() method with a TimeoutException.
On closing the AsyncKafkaConsumer, both threads will invoke their respective reapers to cancel any unprocessed events in their queues. In this case, the reaper will invoke each event's CompletableFuture.completeExceptionally() method with a CancellationException instead of a TimeoutException to differentiate the two cases.
The overall design for the expiration mechanism is captured on the Apache wiki and the original issue (KAFKA-15848) has more background on the cause.
Note: this change only handles the event expiration and does not cover the network request expiration. That is handled in a follow-up Jira (KAFKA-16200) that builds atop this change.
This change also includes some minor refactoring of the EventProcessor and its implementations. This allows the event processor logic to focus on processing individual events rather than also the handling of batches of events.
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Philip Nee <pnee@confluent.io>, Bruno Cadonna <cadonna@apache.org>
When client telemetry is configured in a cluster, Kafka producers and consumers push metrics to the brokers periodically. There is a special push of metrics that occurs when the client is terminating. A state machine in the client telemetry reporter controls its behaviour in different states.
Sometimes, when a client was terminating, it was attempting an invalid state transition from TERMINATING_PUSH_IN_PROGRESS to PUSH_NEEDED when it receives a response to a PushTelemetry RPC. This was essentially harmless because the state transition did not occur but it did cause unsightly log lines to be generated. This PR performs a check for the terminating states when receiving the response and simply remains in the current state.
I added a test to validate the state management in this case. Actually, the test passes before the code change in the PR, but with unsightly log lines.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Apoorv Mittal <amittal@confluent.io>
The KafkaAsyncConsumer would occasionally fail to stop when wakeup() was invoked. It turns out that there's a race condition between the thread that invokes wakeup() and the thread that is performing an action on the consumer. If the operation's Future is already completed by thread A when thread B invoke's completeExceptionally() inside wakeup(), the WakeupException will be ignored. We should use the return value from completeExceptionally() to determine if that call actually triggered completion of the Future. If that method returns false, that signals that the Future was already completed, and the exception we passed to completeExceptionally() was ignored. Therefore, we then need to return a new WakeupFuture instead of null so that the next call to setActiveTask() will throw the WakeupException.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This PR fixes two kinds of bugs in the new(ish) rack-aware part of the sticky assignment algorithm:
First, when reassigning "owned partitions" to their previous owners, we now have to take the rack placement into account and might not immediately assign a previously-owned partition to its old consumer during this phase. There is a small chance this partition will be assigned to its previous owner during a later stage of the assignment, but if it's not then by definition it has been "revoked" and must be removed from the assignment during the adjustment phase of the CooperativeStickyAssignor according to the cooperative protocol. We need to make sure any partitions removed in this way end up in the "partitionsTransferringOwnership".
Second, the sticky algorithm works in part by keeping track of how many consumers are still "unfilled" when they are at the "minQuota", meaning we may need to assign one more partition to get to the expected number of consumers at the "maxQuota". During the rack-aware round-robin assignment phase, we were not properly clearing the set of unfilled & minQuota consumers once we reached the expected number of "maxQuota" consumers (since by definition that means no more minQuota consumers need to or can be given any more partitions since that would bump them up to maxQuota and exceed the expected count). This bug would result in the entire assignment being failed due to a correctness check at the end which verifies that the "unfilled members" set is empty before returning the assignment. An IllegalStateException would be thrown, failing the rebalancing and sending the group into an endless rebalancing loop until/unless it was lucky enough to produce a new assignment that didn't hit this bug
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Validate that a control batch in the batch accumulator has at least one control record.
Reviewers: Jun Rao <junrao@apache.org>, Chia-Ping Tsai <chia7712@apache.org>
Improve consumer log for expired poll timer, by showing how much time was the max.poll.interval.ms exceeded. This should be helpful in guiding the user to tune that config on the common case of long-running processing causing the consumer to leave the group. Inspired by other clients that log such information on the same situation.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Matthias Sax <mjsax@apache.org>,
Andrew Schofield <andrew_schofield@live.com>, Kirk True <kirk@kirktrue.pro>
Fix to allow to initialize positions for newly assigned partitions, while the onPartitionsAssigned callback is running, even though the partitions remain non-fetchable until the callback completes.
Before this PR, we were not allowing initialization or fetching while the callback was running. The fix here only allows to initialize the newly assigned partition position, and keeps the existing logic for making sure that the partition remains non-fetchable until the callback completes.
The need for this fix came out in one of the connect system tests, that attempts to retrieve a newly assigned partition position with a call to consumer.position from within the onPartitionsAssigned callback (WorkerSinkTask). With this PR, we allow to make such calls (test added), which is the behaviour of the legacy consumer.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
The contract of KafkaConsumer.poll(Duration) says that it throws InterruptException "if the calling thread is interrupted before or while this function is called". The new KafkaConsumer implementation was not doing this if the thread was interrupted before the poll was called, specifically with a very short timeout. If it ever waited for records, it did check the thread state. If it did not wait for records because of a short timeout, it did not.
Some of the log messages in the code erroneously mentioned timeouts, when they really meant interruption.
Also adds a test for this specific scenario.
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>