https://issues.apache.org/jira/browse/KAFKA-18575 solved a critical race condition by returning with CONCURRENT_TRANSACTIONS early when the transaction was still completing.
In testing, it was discovered that this early return could cause performance regressions.
Prior to KIP-890 the addpartitions call was a separate call from the producer. There was a previous change https://issues.apache.org/jira/browse/KAFKA-5477 that decreased the retry backoff to 20ms. With KIP-890 and making the call through the produce path, we go back to the default retry backoff which takes longer. Prior to 18575 we introduce a slight delay when sending to the coordinator, so prior to 18575, we are less likely to return quickly and get stuck in this backoff. However, based on results from produce benchmarks, we can still run into the default backoff in some scenarios.
This PR reverts KAFKA-18575, and doesn't return early and wait until the coordinator for checking if a transaction is ongoing. Instead, it will fix the handling with the verification guard so we don't hit the edge condition.
Also cleans up some of the verification text that was unclear.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Artem Livshits <alivshits@confluent.io>
CoordinatorRecordSerde does not validate the version of the value to check whether the version is supported by the current version of the software. This is problematic if a future and unsupported version of the record is read by an older version of the software because it would misinterpret the bytes. Hence CoordinatorRecordSerde must throw an error if the version is unknown. This is also consistent with the handling in the old coordinator.
Reviewers: Jeff Kim <jeff.kim@confluent.io>
During testing we discovered that the empty group count is not updated in group conversion, but when the new group is transition to other state, the empty group count is decremented. This could result in negative empty group count.
We can have a new consumer group count implementation that follows the pattern we did for the classic group count. The timeout task periodically refreshes the metrics based on the current groups soft state.
Reviewers: Jeff Kim <jeff.kim@confluent.io>
Return produce v0-v2 as supported versions in `ApiVersionsResponse`, but disable support
for it everywhere else.
Since clients pick the highest supported version by both client and broker during version
negotiation, this solves the problem with minimal tech debt (even though it's not ideal that
`ApiVersionsResponse` becomes inconsistent with the actual protocol support).
Add one test for the socket server handling (in `ProcessorTest`) and one test for the
client behavior (in `ProduceRequestTest`). Adjust a couple of api versions tests to verify
the new behavior.
Finally, include a few clean-ups in `ApiKeys`, `Protocol`, `ProduceRequest`,
`ProduceRequestTest` and `BrokerApiVersionsCommandTest`.
Reference to related librdkafka issue:
https://github.com/confluentinc/librdkafka/issues/4956
Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Stanislav Kozlovski <stanislav_kozlovski@outlook.com>
Fixed the typo that used the wrong producer ID and epoch when returning so that we handle epoch overflow correctly.
We also had to rearrange the concurrent transaction handling so that we don't self-fence when we start the new transaction with the new producer ID.
I also tested this with a modified version of the code where epoch overflow happens on the first epoch bump (every request has a new producer id)
Reviewers: Artem Livshits <alivshits@confluent.io>, Jeff Kim <jeff.kim@confluent.io>
The full class name of the assignors if part of our public api. Hence, we should ensure that they are not changed by mistake. This patch adds a unit test verifying them.
Reviewers: Sean Quah <squah@confluent.io>, Jeff Kim <jeff.kim@confluent.io>
This change reduces fetch session cache evictions on the broker for AsyncKafkaConsumer by altering its logic to determine which partitions it includes in fetch requests.
Background
Consumer implementations fetch data from the cluster and temporarily buffer it in memory until the user next calls Consumer.poll(). When a fetch request is being generated, partitions that already have buffered data are not included in the fetch request.
The ClassicKafkaConsumer performs much of its fetch logic and network I/O in the application thread. On poll(), if there is any locally-buffered data, the ClassicKafkaConsumer does not fetch any new data and simply returns the buffered data to the user from poll().
On the other hand, the AsyncKafkaConsumer consumer splits its logic and network I/O between two threads, which results in a potential race condition during fetch. The AsyncKafkaConsumer also checks for buffered data on its application thread. If it finds there is none, it signals the background thread to create a fetch request. However, it's possible for the background thread to receive data from a previous fetch and buffer it before the fetch request logic starts. When that occurs, as the background thread creates a new fetch request, it skips any buffered data, which has the unintended result that those partitions get added to the fetch request's "to remove" set. This signals to the broker to remove those partitions from its internal cache.
This issue is technically possible in the ClassicKafkaConsumer too, since the heartbeat thread performs network I/O in addition to the application thread. However, because of the frequency at which the AsyncKafkaConsumer's background thread runs, it is ~100x more likely to happen.
Options
The core decision is: what should the background thread do if it is asked to create a fetch request and it discovers there's buffered data. There were multiple proposals to address this issue in the AsyncKafkaConsumer. Among them are:
The background thread should omit buffered partitions from the fetch request as before (this is the existing behavior)
The background thread should skip the fetch request generation entirely if there are any buffered partitions
The background thread should include buffered partitions in the fetch request, but use a small “max bytes” value
The background thread should skip fetching from the nodes that have buffered partitions
Option 4 won out. The change is localized to AbstractFetch where the basic idea is to skip fetch requests to a given node if that node is the leader for buffered data. By preventing a fetch request from being sent to that node, it won't have any "holes" where the buffered partitions should be.
Reviewers: Lianet Magrans <lmagrans@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Jun Rao <junrao@gmail.com>
This patch renames kraft_upgrade_test.py to upgrade_test.py. This is enough to cover the old upgrade/downgrade tests.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
HdrHistogram can throw an exception if the recorded value is greater than a configured limit. Expand the ceiling from per-metric to all invocations.
Reviewers: David Jacot <djacot@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
While testing, it was found that the not_enough_replicas error was super common and could be easily confused. Since we are already bumping the request, we can signify that the produce request may return this error and new clients can handle it
(Note, the java client should be able to handle this already as a retriable error, but other client libraries may need to implement this change)
Reviewers: Justine Olshan <jolshan@confluent.io>
Ensure we always return empty records (including cases where an error is returned).
We also remove `nullable` from `records` since it is effectively expected to be
non-null by a large percentage of clients in the wild.
This behavior regressed in fe56fc9 (KAFKA-18269). Empty records were
previously set via `FetchResponse.recordsOrFail(partitionData)` in the
now-removed `maybeConvertFetchedData` method.
Added an integration test that fails without this fix and also update many
tests to set `records` to `empty` instead of leaving them as `null`.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Arthur <mumrah@gmail.com>
During testing, we identified that kafka-python (and aiokafka) relies on metadata request v0 and
hence we need to add these back to comply with the premise of KIP-896 - i.e. it should not
break the clients listed within it.
I reverted the changes from #18218 related to the removal of metadata versions 0-3.
I will submit a separate PR to undeprecate these API versions on the relevant 3.x branches.
kafka-python (and aiokafka) work correctly (produce & consume) with this change on
top of the 4.0 branch.
Reviewers: David Arthur <mumrah@gmail.com>
This patch reorganizes our test infrastructure into three Gradle modules:
":test-common:test-common-internal-api" is now a minimal dependency which exposes interfaces and annotations only. It has one project dependency on server-common to expose commonly used data classes (MetadataVersion, Feature, etc). Since this pulls in server-common, this module is Java 17+. It cannot be used by ":clients" or other Java 11 modules.
":test-common:test-common-util" includes the auto-quarantined JUnit extension. The @Flaky annotation has been moved here. Since this module has no project dependencies, we can add it to the Java 11 list so that ":clients" and others can utilize the @Flaky annotation
":test-common:test-common-runtime" now includes all of the test infrastructure code (TestKitNodes, etc). This module carries heavy dependencies (core, etc) and so it should not normally be included as a compile-time dependency.
In addition to this reorganization, this patch leverages JUnit SPI service discovery so that modules can utilize the integration test framework without depending on ":core". This will allow us to start moving integration tests out of core and into the appropriate sub-module. This is done by adding ":test-common:test-common-runtime" as a testRuntimeOnly dependency rather than as a testImplementation dependency. A trivial example was added to QuorumControllerTest to illustrate this.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>
This patch includes some maintenance updates for Develocity.
* Publish build scans to develocity.apache.org
* Update Develocity Gradle plugin to to 3.19
* Use `DEVELOCITY_ACCESS_KEY` to authenticate to `develocity.apache.org`
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Arthur <mumrah@gmail.com>
Ensure that unloading a coordinator always succeeds. Previously, we have
guarded against exceptions from DeferredEvent completions. All that
remains is handling exceptions from the onUnloaded() method of the
coordinator state machine.
Reviewers: David Jacot <djacot@confluent.io>
This patch is a first step towards removing `ReplicaManager#becomeLeaderOrFollower`. It updates the `LocalLeaderEndPointTest` tests.
Reviewers: Christo Lolov <lolovc@amazon.com>, Ismael Juma <ismael@juma.me.uk>