When using kafka consumer in apache pinot , we did see couple of WARN as we are trying to create kafka consumer class with the same name . We currently have to use a added suffix to create a new mBean as each new kafka consumer in same process creates a mBean . Adding support here to skip creation of mBean if its already existing
Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>
Implementation of the required functionality for resetting and validating positions in the new async consumer.
This PR includes:
1. New async application events ResetPositionsApplicationEvent and ValidatePositionsApplicationEvent, both handled by the same OfffsetsRequestManager.
2. Integration of the reset/validate functionality in the new async consumer, to update fetch positions using the partitions offsets.
3. Minor refactoring to extract functionality that is reused from both consumer implementations (moving logic without changes from OffsetFetcher into OffsetFetchUtils, and from OffsetsForLeaderEpochClient into OffsetsForLeaderEpochUtils)
Reviewers: Philip Nee <pnee@confluent.io>, Kirk True <kirk@mustardgrain.com>, Jun Rao<junrao@gmail.com>
Initial definition of the core components for maintaining group membership on the client following the new consumer group protocol.
This PR includes:
- Membership management for keeping member state and assignment, based on the heartbeat responses received.
- Assignor selection logic to support server side assignors.
This only includes the basic initial states and transitions, to be extended as we implement the protocol.
This is intended to be used from the heartbeat and assignment requests manager that actually build and process the heartbeat and assignment related requests.
Reviewers: Philip Nee <pnee@confluent.io>, Kirk True <ktrue@confluent.io>, David Jacot <djacot@confluent.io>
This patch adds integration tests for the OffsetCommit API and the OffsetFetch API. The tests runs against the old and the new group coordinator and with the new and the old consumer rebalance protocol.
Reviewers: Ritika Reddy <rreddy@confluent.io>, Justine Olshan <jolshan@confluent.io>
This change introduces some basic clean up and refactoring for forthcoming commits related to the revised fetch code for the consumer threading refactor project.
Reviewers: Christo Lolov <lolovc@amazon.com>, Jun Rao <junrao@gmail.com>
Implement KIP-919: Allow AdminClient to Talk Directly with the KRaft Controller Quorum and add
Controller Registration. This KIP adds a new version of DescribeClusterRequest which is supported
by KRaft controllers. It also teaches AdminClient how to use this new DESCRIBE_CLUSTER request to
talk directly with the controller quorum. This is all gated behind a new MetadataVersion,
IBP_3_7_IV0.
In order to share the DESCRIBE_CLUSTER logic between broker and controller, this PR factors it out
into AuthHelper.computeDescribeClusterResponse.
The KIP adds three new errors codes: MISMATCHED_ENDPOINT_TYPE, UNSUPPORTED_ENDPOINT_TYPE, and
UNKNOWN_CONTROLLER_ID. The endpoint type errors can be returned from DescribeClusterRequest
On the controller side, the controllers now try to register themselves with the current active
controller, by sending a CONTROLLER_REGISTRATION request. This, in turn, is converted into a
RegisterControllerRecord by the active controller. ClusterImage, ClusterDelta, and all other
associated classes have been upgraded to propagate the new metadata. In the metadata shell, the
cluster directory now contains both broker and controller subdirectories.
QuorumFeatures previously had a reference to the ApiVersions structure used by the controller's
NetworkClient. Because this PR removes that reference, QuorumFeatures now contains only immutable
data. Specifically, it contains the current node ID, the locally supported features, and the list
of quorum node IDs in the cluster.
Reviewers: David Arthur <mumrah@gmail.com>, Ziming Deng <dengziming1993@gmail.com>, Luke Chen <showuon@gmail.com>
This restores previous behavior for Admin::listOffsets, which was to fail immediately if topic metadata could not be found, and only retry if metadata for one or more specific partitions could not be found.
There is a subtle difference here: prior to https://github.com/apache/kafka/pull/13432, the operation would be retried if any metadata error was reported for any individual topic partition, even if an error was also reported for the entire topic. With this change, the operation always fails if an error is reported for the entire topic, even if an error is also reported for one or more individual topic partitions.
I am not aware of any cases where brokers might return both topic- and topic partition-level errors for a metadata request, and if there are none, then this change should be safe. However, if there are such cases, we may need to refine this PR to remove the discrepancy in behavior.
Reviewers: Justine Olshan <jolshan@confluent.io>
This patch adds the schemas of the new ConsumerGroupDescribe API (KIP-848) and adds an handler to the KafkaApis.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
This patch adds the MemberId and the MemberEpoch fields to the OffsetFetchRequest. Those fields will be populated when the new consumer group protocol is used to ensure that the member fetching the offset has the correct member id and epoch. If it does not, UNKNOWN_MEMBER_ID or STALE_MEMBER_EPOCH are returned to the client.
Our initial idea was to implement the same for the old protocol. The field is called GenerationIdOrMemberEpoch in KIP-848 to materialize this. As a second though, I think that we should only do it for the new protocol. The effort to implement it in the old protocol is not worth it in my opinion.
Reviewers: Ritika Reddy <rreddy@confluent.io>, Calvin Liu <caliu@confluent.io>, Justine Olshan <jolshan@confluent.io>
Implementation of KIP-580 to add exponential back-off to situations in which retry.backoff.ms
is used to delay backoff attempts. This KIP adds exponential backoff behavior with a maximum
controlled by a new config retry.backoff.max.ms, together with a +/- 20% of jitter to spread the
retry attempts of the client fleet.
Reviewers: Mayank Shekhar Narula <mayanks.narula@gmail.com>, Milind Luthra <i.milind.luthra@gmail.com>, Kirk True <kirk@mustardgrain.com>, Jun Rao<junrao@gmail.com>
Implementation of the OffsetRequestsManager, responsible for building requests and processing responses for requests related to partition offsets.
In this PR, the manager includes support for ListOffset requests, generated when the user makes any of the following consumer API calls:
beginningOffsets
endOffsets
offsetsForTimes
All previous consumer API calls interact with the OffsetsRequestsManager by generating a ListOffsetsApplicationEvent.
Includes tests to cover the new functionality and to ensure no API level changes are introduced.
This covers KAFKA-14965 and KAFKA-15081.
Reviewers: Philip Nee <pnee@confluent.io>, Kirk True <kirk@mustardgrain.com>, Jun Rao<junrao@gmail.com>
Summary
Implemented wakeup() mechanism using a WakeupTrigger class to store the pending wakeup item, and when wakeup() is invoked, it checks whether there's an active task or a wakeup task.
If there's an active task: the task will be completed exceptionally and the atomic reference will be freed up.
If there an wakedup task, which means wakeup() was invoked before a blocking call was issued. Therefore, the current task will be completed exceptionally immediately.
This PR also addressed minor issues such as:
Throwing WakeupException at the right place: As wakeups are thrown by completing an active future exceptionally. The WakeupException is wrapped inside of the ExecutionException.
mockConstruction is a thread-lock mock; therefore, we need to free up the reference before completing the test. Otherwise, other tests will continue using the thread-lock mock.
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Jun Rao <junrao@gmail.com>
As described in https://issues.apache.org/jira/browse/KAFKA-15353
When the AlterPartitionRequest version is < 3 and its builder.build is called multiple times, both newIsrWithEpochs and newIsr will all be empty. This can happen if the sender retires on errors.
Reviewers: Luke Chen <showuon@gmail.com>
This implementation introduces two new configurations `log.message.timestamp.before.max.ms` and `log.message.timestamp.after.max.ms` and deprecates `log.message.timestamp.difference.max.ms`.
The default value for all these three configs is maintained to be Long.MAX_VALUE for backward compatibility but with the newly added configurations we can have a finer control when validating message timestamps that are in the past and the future compared to the broker's timestamp.
To maintain backward compatibility if the default value of `log.message.timestamp.before.max.ms` is not changed, we are assuming users are still using the deprecated config `log.message.timestamp.difference.max.ms` and validation is done using its value. This ensures that existing customers who have customized the value of `log.message.timestamp.difference.max.ms` will continue to see no change in behavior.
Reviewers: Divij Vaidya <diviv@amazon.com>, Christo Lolov <lolovc@amazon.com>
A race condition between async flush and segment rename (for deletion purpose) might cause the entire log directory to be marked offline when we delete a topic. This PR fixes the bug by ignoring NoSuchFileException when we flush a directory.
Reviewers: Divij Vaidya <diviv@amazon.com>
"The test RefreshingHttpsJwksTest#testSecondaryRefreshAfterElapsedDelay relies on the actual system clock, which makes it frequently fail. The fix adds a second constructor that allows for passing a ScheduledExecutorService to manually execute the scheduled tasks before refreshing. The fixed task is much more robust and stable.
Co-authored-by: Fei Xie <feixie@MacBook-Pro.attlocal.net>
Reviewers: Divij Vaidya <diviv@amazon.com>, Luke Chen <showuon@gmail.com>
This PR main refactoring relates to :
1. serializers/deserializers used in clients - unified in a Deserializers class
2. logic for configuring ClusterResourceListeners moved to ClientUtils
3. misc refactoring of the new async consumer in preparation for upcoming Request Managers
Reviewers: Jun Rao <junrao@gmail.com>
Reviewers: David Arthur <mumrah@gmail.com>, Ron Dagostino <rndgstn@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>, Viktor Somogyi <viktor.somogyi@cloudera.com>
In a non-empty log the KRaft leader only notifies the listener of leadership when it has read to the leader's epoch start offset. This guarantees that the leader epoch has been committed and that the listener has read all committed offsets/records.
Unfortunately, the KRaft leader doesn't do this when the log is empty. When the log is empty the listener is notified immediately when it has become leader. This makes the API inconsistent and harder to program against.
This change fixes that by having the KRaft leader wait for the listener's nextOffset to be greater than the leader's epochStartOffset before calling handleLeaderChange.
The RecordsBatchReader implementation is also changed to include control records. This makes it possible for the state machine learn about committed control records. This additional information can be used to compute the committed offset or for counting those bytes when determining when to snapshot the partition.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Jason Gustafson <jason@confluent.io>
As title, we discovered a flaky bug during testing that the commit request manager would seldomly throw a NOT_COORDINATOR exception, which means the request was routed to a non-coordinator node. We discovered that if we don't check the coordinator node in the commitRequestManager, the request manager will pass on an empty node to the NetworkClientDelegate, which implies the request can be sent to any node in the cluster. This behavior is incorrect as the commit requests need to be routed to a coordinator node.
Because the timing coordinator's discovery during integration testing isn't entirely deterministic; therefore, the test became extremely flaky. After fixing this: The coordinator node is mandatory before attempt to enqueue these commit request to the NetworkClient.
Reviewers: Jun Rao <junrao@gmail.com>
Move common code from the client implementations to the ClientUtils
class or (consumer) Utils class, where passible.
There are a number of places in the client code where the same basic
calls are made by more than one client implementation. Minor
refactoring will reduce the amount of boilerplate code necessary for
the client to construct its internal state.
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Jun Rao <junrao@gmail.com>
Discovered the committed() API timeout during the integration test. After investigation, this is because the future was not completed in the ApplicationEventProcessor. Also added toString methods to the event class for debug purposes.
Reviewers: Jun Rao <junrao@gmail.com>
* MINOR: Add test for describe topic with ID
Add a simple test to verify topic description with topic IDs.
Reviewers: Divij Vaidya <diviv@amazon.com>, dengziming <dengziming1993@gmail.com>
in 3.5.0 AbstractStickyAssignor may run useless loop in performReassignments because isBalanced have a trivial mistake, and result in rebalance timeout in some situation.
Co-authored-by: lixy <lixy@tuya.com>
Reviewers: Ritika Reddy <rreddy@confluent.io>, Philip Nee <pnee@confluent.io>, Kirk True <kirk@mustardgrain.com>, Guozhang Wang <wangguoz@gmail.com>
A missing piece from KAFKA-14950. This is to test assign() and assignment() in the integration test.
Also fixed an accidental mistake in the committed API.
Reviewers: Jun Rao <junrao@gmail.com>
This patch introduces the `OffsetMetadataManager` and implements the `OffsetCommit` API for both the old rebalance protocol and the new rebalance protocol. It introduces version 9 of the API but keeps it as unstable for now. The patch adds unit tests to test the API. Integration tests will be done separately.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
We will explicitly send an assignment change event to the background thread to invoke auto-commit if the group.id is configured. After updating the subscription state, a NewTopicsMetadataUpdateRequestEvent will also be sent to the background thread to update the metadata.
Co-authored-by: Kirk True <kirk@kirktrue.pro>
Reviewers: Jun Rao <junrao@gmail.com>
This patch does a few things:
1) It introduces version 9 of the OffsetCommit API. This new version has no schema changes but it can return a StaleMemberEpochException if the new consumer group protocol is used. Note the use of `"latestVersionUnstable": true` in the request schema. This means that this new version is not available yet unless activated.
2) It renames the `generationId` field in the request to `GenerationIdOrMemberEpoch`. This is backward compatible change.
3) It introduces the new StaleMemberEpochException error.
4) It does a minor refactoring in OffsetCommitRequest class.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Arthur <mumrah@gmail.com>, Justine Olshan <jolshan@confluent.io>
This patch implements the existing JoinGroup protocol within the new group coordinator.
Some notable differences:
* Methods return a CoordinatorResult to the runtime framework, which includes records to append to the log as well as a future to complete after the append succeeds/fails.
* The coordinator runtime ensures that only a single thread will be processing a group at any given time, therefore there is no more locking on groups.
* Instead of using on purgatories, we rely on the Timer interface to schedule/cancel delayed operations.
Reviewers: David Jacot <djacot@confluent.io>
Introduced extra mapping to track verification state.
When verifying, there is a race condition that the add partitions verification response returns that the partition is in the ongoing transaction, but an abort marker is written before we get to append. Therefore, we track any given transaction we are verifying with an object unique to that transaction.
We check this unique state upon the first append to the log. After that, we can rely on currentTransactionFirstOffset. We remove the verification state on appending to the log with a transactional data record or marker.
We will also clean up lingering verification state entries via the producer state entry expiration mechanism. We do not update the the timestamp on retrying a verification for a transaction, so each entry must be verified before producer.id.expiration.ms.
There were a few other fixes:
- Moved the transaction manager handling for failed batch into the future completed exceptionally block to avoid processing it twice (this caused issues in unit tests)
- handle interrupted exceptions encountered when callback thread encountered them
- change handling to throw error if we try to set verification state and leaderLogIfLocal is None.
Reviewers: David Jacot <djacot@confluent.io>, Artem Livshits <alivshits@confluent.io>, Jason Gustafson <jason@confluent.io>
Splitting partitions while setting auto.offset.reset to latest may cause message delivery loss, but users might not be aware about that since currently it isn't documented anywhere.
Reviewers: Luke Chen <showuon@gmail.com>
This is a follow up on the initial OffsetFetcher refactoring to extract reusable logic, needed for the new consumer implementation (initial refactoring merged with PR-13815).
Similar to the initial refactoring, this PR brings no changes to the existing logic, just extracting functions or pieces of logic.
There were no individual tests for the extracted functions, so no tests were migrated.
Reviewers: Jun Rao <junrao@gmail.com>
Use AdminApiDriver class to refresh the metadata and retry the request that failed with retriable errors.
Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Mickael Maison <mmaison@redhat.com>, Dimitar Dimitrov <30328539+dimitarndimitrov@users.noreply.github.com>
It's good for us to add support for Java 20 in preparation for Java 21 - the next LTS.
Given that Scala 2.12 support has been deprecated, a Scala 2.12 variant is not included.
Also remove some branch builds that add load to the CI, but have
low value: JDK 8 & Scala 2.13 (JDK 8 support has been deprecated),
JDK 11 & Scala 2.12 (Scala 2.12 support has been deprecated) and
JDK 17 & Scala 2.12 (Scala 2.12 support has been deprecated).
A newer version of Mockito (4.9.0 -> 4.11.0) is required for Java 20 support, but we
only use it with Scala 2.13+ since it causes compilation errors with Scala 2.12. Similarly,
we upgrade easymock when the Java version is 16 or newer as it's incompatible
with powermock (which doesn't support Java 16 or newer).
Filed KAFKA-15117 for a test that fails with Java 20 (SslTransportLayerTest.testValidEndpointIdentificationCN).
Finally, fixed some lossy conversions that were added after #13582 was submitted.
Reviewers: Ismael Juma <ismael@juma.me.uk>
Poison the transaction manager if we detect an illegal transition in the Sender thread. A ThreadLocal in is stored in TransactionManager so that the Sender can inform TransactionManager which thread it's using.
Reviewers: Daniel Urban <durban@cloudera.com>, Justine Olshan <jolshan@confluent.io>, Jason Gustafson <jason@confluent.io>
Fixed a regression described in KAFKA-15053 that security.protocol only allows uppercase values like PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. With this fix, both lower case and upper case values will be supported (e.g. PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL, plaintext, ssl, sasl_plaintext, sasl_ssl)
Reviewers: Chris Egerton <chrise@aiven.io>, Divij Vaidya <diviv@amazon.com>
When Kafka fails to perform an atomic file move the error is getting swallowed. Kafka should log these cases at least at WARN level.
Reviewers: Ron Dagostino <rndgstn@gmail.com>, Kirk True <kirk@kirktrue.pro>
Adding the following metrics as per kip-890:
VerificationTimeMs – number of milliseconds from adding partition info to the manager to the time the response is sent. This will include the round trip to the transaction coordinator if it is called. This will also account for verifications that fail before the coordinator is called.
VerificationFailureRate – rate of verifications that returned in failure either from the AddPartitionsToTxn response or through errors in the manager.
AddPartitionsToTxnVerification metrics – separating the verification request metrics from the typical add partitions ones similar to how fetch replication and fetch consumer metrics are separated.
Reviewers: Divij Vaidya <diviv@amazon.com>
RPCProducerIdManager initiates an async request to the controller to grab a block of producer IDs and then blocks waiting for a response from the controller.
This is done in the request handler threads while holding a global lock. This means that if many producers are requesting producer IDs and the controller is slow to respond, many threads can get stuck waiting for the lock.
This patch aims to:
* resolve the deadlock scenario mentioned above by not waiting for a new block and returning an error immediately
* remove synchronization usages in RpcProducerIdManager.generateProducerId()
* handle errors returned from generateProducerId() so that KafkaApis does not log unexpected errors
* confirm producers backoff before retrying
* introduce backoff if manager fails to process AllocateProducerIdsResponse
Reviewers: Artem Livshits <alivshits@confluent.io>, Jason Gustafson <jason@confluent.io>
An example of the warning:
> warning: [lossy-conversions] implicit cast from long to int in compound assignment is possibly lossy
There should be no change in behavior as part of these changes - runtime logic ensured
we didn't run into issues due to the lossy conversions.
Reviewers: Divij Vaidya <diviv@amazon.com>
add "remote.log.metadata.manager.listener.name" config to rlmm to allow producer/consumer to connect to the server. Also add tests.
Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>
The FileTokenRetriever class is used to read the access_token from a file on the clients system and then it is passed along with the jaas config to the OAuthBearerSaslServer. In case the token was sent using FileTokenRetriever on the client side, some EOL character is getting appended to the token, causing authentication to fail with the message:
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
The OffsetFetcher is internally used by the KafkaConsumer to fetch offsets, validate and reset positions. For the new KafkaConsumer with a refactored threading model, similar functionality will be needed.
This is an initial refactoring for extracting logic from the OffsetFetcher, that will be reused by the new consumer implementation. No changes to the existing logic, just extracting classes, functions or pieces of logic.
All the functionality moved out of the OffsetFetcher is already covered by tests in OffsetFetcherTest and FetcherTest. There were no individual tests for the extracted functions, so no tests were migrated.
Reviewers: Jun Rao <junrao@gmail.com>
This PR fixes three issues:
InvalidProducerEpochException was not handled consistently. InvalidProducerEpochException used to be able to be return via both transactional response and produce response, but as of KIP-588 (2.7+), transactional responses should not return InvalidProducerEpochException anymore, only produce responses can. It can happen that older brokers may still return InvalidProducerEpochException for transactional responses; these must be converted to the newer ProducerFencedException. This conversion wasn't done for TxnOffsetCommit (sent to the group coordinator).
InvalidTxnStateException was double-wrapped in KafkaException, whereas other exceptions are usually wrapped only once. Furthermore, InvalidTxnStateException was not handled at all for in AddOffsetsToTxn response, where it should be a possible error as well, according to API documentation.
According to API documentation, UNSUPPORTED_FOR_MESSAGE_FORMAT is not possible for TxnOffsetCommit, but it looks like it is, and it is being handled there, so I updated the API documentation.
Reviewers: Justine Olshan <jolshan@confluent.io>
The contract for Consumer#commitSync() guarantees that the callbacks for all prior async commits will be invoked before it returns. Prior to this patch the contract could be violated if an empty offsets map were passed in to Consumer#commitSync().
Reviewers: Philip Nee <philipnee@gmail.com>, David Jacot <djacot@confluent.io>
This patch introduces the `PartitionWriter` interface in the `group-coordinator` module. The `ReplicaManager` resides in the `core` module and it is thus not accessible from the `group-coordinator` one. The `CoordinatorPartitionWriter` is basically an implementation of the interface residing in `core` which interfaces with the `ReplicaManager`.
One notable difference from the usual produce path is that the `PartitionWriter` returns the offset following the written records. This is then used by the coordinator runtime to track when the request associated with the write can be completed.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
After this change,
For broker side decompression: JMH benchmark RecordBatchIterationBenchmark demonstrates 20-70% improvement in throughput (see results for RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize).
For consumer side decompression: JMH benchmark RecordBatchIterationBenchmark a mix bag of single digit regression for some compression type to 10-50% improvement for Zstd (see results for RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize).
Reviewers: Luke Chen <showuon@gmail.com>, Manyanda Chitimbo <manyanda.chitimbo@gmail.com>, Ismael Juma <mail@ismaeljuma.com>
1. Ensures that NPE are not thrown
2. Ensures that the background thread has been started to avoid flasky
assertions failures on isRunning
3. Add a check that the thread is not running when closed
Reviewers: Philip Nee <philipnee@gmail.com>, Kirk True <kirk@kirktrue.pro>
The KRaft controller return empty finalized features in `ApiVersionResponse`, the brokers are not infected by this, so this problem doesn't have any impact currently, but it's worth fixing it to avoid unexpected problems.
And there is a bunch of of confusing methods in `ApiVersionResponse` which are only used in test code, I moved them to TestUtils to make the code more clear, and force everyone to pass in the correct parameters instead of the default zero parameters, for example, empty supported features and empty finalized features.
Reviewers: Luke Chen <showuon@gmail.com>
#13557 introduced a utils method to close executors silently. This PR leverages that method to close executors in connect runtime. There was duplicate code while closing the executors which isn't the case with this PR.
Note that there are a few more executors used in Connect runtime but their close methods don't follow this pattern of shutdown, await and shutdown. Some of them have some logic like executor like Worker, so not changing at such places.
---------
Co-authored-by: Sagar Rao <sagarrao@Sagars-MacBook-Pro.local>
Reviewers: Daniel Urban <durban@cloudera.com>, Yash Mayya <yash.mayya@gmail.com>, Viktor Somogyi-Vass <viktorsomogyi@gmail.com>
Motivation
Reading/writing the protocol buffer varInt32 and varInt64 (also called varLong in our code base) is in the hot path of data plane code in Apache Kafka. We read multiple varInt in a record and in long. Hence, even a minor change in performance could extrapolate to larger performance benefit.
In this PR, we only update varInt32 encoding/decoding.
Changes
This change uses loop unrolling and reduces the amount of repetition of calculations. Based on the empirical results from the benchmark, the code has been modified to pick up the best implementation.
Results
Performance has been evaluated using JMH benchmarks on JDK 17.0.6. Various implementations have been added in the benchmark and benchmarking has been done for different sizes of varints and varlongs. The benchmark for various implementations have been added at ByteUtilsBenchmark.java
Reviewers: Ismael Juma <mlists@juma.me.uk>, Luke Chen <showuon@gmail.com>, Alexandre Dupriez <alexandre.dupriez@gmail.com>
Also modifies verification to only add a partition to verify if it is transactional.
When verifying we look at all the transactional producer IDs and throw INVALID_RECORD on the request if one is different.
Reviewers: Kirk True <ktrue@confluent.io>, Artem Livshits <alivshits@confluent.io>, Jason Gustafson <jason@confluent.io>
Producers used to throw a fatal error upon failing initProducerId, which can be caused by authorization errors. In this case, the user will need to instantiate a producer.
This PR makes authorization errors non-fatal so that the user can retry until the permission is fixed by an admin.
Here we first transition the producer to the ABORTABLE state, then to the UNINITIALIZED state (so that the producer is recoverable). Upon the subsequent send, the producer will transition to INITIALIZING and attempt to send another InitProducerIdRequest.
Reviewers: Kirk True <ktrue@confluent.io>, David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>, Justine Olshan <jolshan@confluent.io>
The integration tests seem to create an unnecessarily large number of threads. This reduces the number of threads created per integration test harness broker.
Reviewers: Luke Chen <showuon@gmail.com>. Justine Olshan <jolshan@confluent.io>
The generated message types are missing a range check for the case when the tagged version range is a subset of
the flexible version range. This causes the tagged field count, which is computed correctly, to conflict with the
number of tags serialized.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
This is a really long story, but the incident started in KAFKA-13419 when we observed a member sending out a topic partition owned from the previous generation when a member missed a rebalance cycle due to REBALANCE_IN_PROGRESS.
This patch changes the AbstractStickyAssignor.AllSubscriptionsEqual method. In short, it should no long check and validate only the highest generation. Instead, we consider 3 cases:
1. Member will continue to hold on to its partition if there are no other owners
2. If there are 1+ owners to the same partition. One with the highest generation will win.
3. If two members of the same generation hold on to the same partition. We will log an error but remove both from the assignment. (Same with the current logic)
Here are some important notes that lead to the patch:
- If a member is kicked out of the group, and `UNKNOWN_MEMBER_ID` will be thrown.
- It seems to be a common situation that members are late to joinGroup and therefore get `REBALANCE_IN_PROGRESS` error. This is why we don't want to reset generation because it might cause lots of revocations and can be disruptive
To summarize the current behavior of different errors:
`REBALANCE_IN_PROGRESS`
- heartbeat: requestRejoin if member state is stable
- joinGroup: rejoin immediately
- syncGroup: rejoin immediately
- commit: requestRejoin and fail the commit. Raise this exception if the generation is staled, i.e. another rebalance is already in progress.
`UNKNOWN_MEMBER_ID`
- heartbeat: resetStateAndRejoinif generation hasn't changed. otherwise, ignore
- joinGroup: resetStateAndRejoin if generation unchanged, otherwise rejoin immediately
- syncGroup: resetStateAndRejoin if generation unchanged, otherwise rejoin immediately
`ILLEGAL_GENERATION`
- heartbeat: resetStateAndRejoinif generation hasn't changed. otherwise, ignore
- syncGroup: raised the exception if generation has been resetted or the member hasn't completed rebalancing. then resetStateAndRejoin if generation unchanged, otherwise rejoin immediately
Reviewers: David Jacot <djacot@confluent.io>
1. add ZkMigrationReady in apiVersionsResponse
2. check all nodes if ZkMigrationReady are ready before moving to next migration state
Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>
Don't allow setting negative or zero values for quotas. Don't allow SCRAM mechanism names to be
used as client quota names. SCRAM mechanisms are not client quotas. (The confusion arose because of
internal ZK representation details that treated them both as "client configs.")
Add unit tests for ClientQuotaControlManager.isValidIpEntity and
ClientQuotaControlManager.configKeysForEntityType.
This change doesn't affect metadata record application, only input validation. If there are bad
client quotas that are set currently, this change will not alter the current behavior (of throwing
an exception and ignoring the bad quota).
The sslEngine.setUseClientMode(false) was duplicated when ssl mode is server during SSLEngine creation
in DefaultSslEngineFactory.java. The patch attemps to remove the duplicated call.
Reviewers: maulin-vasavada <maulin.vasavada@gmail.com>, Divij Vaidya <diviv@amazon.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
We are handling complex workflows ListOffsets by chaining together MetadataCall instances and ListOffsetsCall instances, there are many complex and error-prone logic. In this PR we rewrote it with the `AdminApiDriver` infra, notable changes better than old logic:
1. Retry lookup stage on receiving `NOT_LEADER_OR_FOLLOWER` and `LEADER_NOT_AVAILABLE`, whereas in the past we failed the partition directly without retry.
2. Removing class field `supportsMaxTimestamp` and calculating it on the fly to avoid the mutable state, this won't change any behavior of the client.
3. Retry fulfillment stage on `RetriableException`, whereas in the past we just retry fulfillment stage on `InvalidMetadataException`, this means we will retry on `TimeoutException` and other `RetriableException`.
We also `handleUnsupportedVersionException` to `AdminApiHandler` and `AdminApiLookupStrategy`, they are used to keep consistency with old logic, and we can continue improvise them.
Reviewers: Ziming Deng <dengziming1993@gmail.com>, David Jacot <djacot@confluent.io>
KafkaStatusBackingStore uses an infinite retry logic on producer send, which can lead to a stack overflow.
To avoid the problem, a background thread was added, and the callback submits the retry onto the background thread.
Added check for ongoing transaction
Thread to send and receive verify only add partition to txn requests
Code to send on request thread courtesy of @artemlivshits
Reviewers: Artem Livshits <alivshits@confluent.io>, Jun Rao <junrao@gmail.com>
The SnapshotReader exposes the "last contained log time". This is mainly used during snapshot cleanup. The previous implementation used the append time of the snapshot record. This is not accurate as this is the time when the snapshot was created and not the log append time of the last record included in the snapshot.
The log append time of the last record included in the snapshot is store in the header control record of the snapshot. The header control record is the first record of the snapshot.
To be able to read this record, this change extends the RecordsIterator to decode and expose the control records in the Records type.
Reviewers: Colin Patrick McCabe <cmccabe@apache.org>