Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds and
KRaftMetadataCache#topicIdsToNames by returning a map subclass that
exposes the TopicsImage data structures without copying them.
Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>
In 11231 we fixed a bug in which the consumer would reset its state unnecessarily, and fixed up the tests accordingly. Unfortunately this also wiped out the test coverage for https://issues.apache.org/jira/browse/KAFKA-12983 that was added in 10986. This test coverage was re-added during a cherrypick to the 2.7 branch; this PR ports that up to trunk. This test has been verified to fail without the corresponding fix, ie resetting the `needsJoinPrepare` flag
Reviewers: Ismael Juma <ismael@juma.me.uk>
1. It should not require a TopicPartition during construction and normal
usage.
2. Simplify `equals` since `topicId` and `topicPartition` are never
null.
3. Inline `Objects.hash` to avoid array allocation.
4. Make `toString` more concise using a similar approach as
`TopicPartition` since this `TopicIdPartition` will replace
`TopicPartition` in many places in the future.
5. Add unit tests for `TopicIdPartition`, it seems like we had none.
6. Minor clean-up in calling/called classes.
Reviewers: David Jacot <djacot@confluent.io>, Satish Duggana <satishd@apache.org>
As part of the migration of KStream/KTable operations to the new
Processor API https://issues.apache.org/jira/browse/KAFKA-8410,
this PR includes the migration of KStream aggregate/reduce operations.
Reviewers: John Roesler <vvcephei@apache.org>
Recently a user hit this TaskAssignmentException due to a bug in their regex that meant no topics matched the pattern subscription, which in turn meant that it was impossible to resolve the number of partitions of the downstream repartition since there was no upstream topic to get the partition count for. Debugging this was pretty difficult and ultimately came down to stepping through the code line by line, since even with TRACE logging we only got a partial picture.
We should expand the logging to make sure the TRACE logging hits both conditional branches, and improve the error message with a suggestion for what to look for should someone hit this in the future
Reviewers: Walker Carlson <wcarlson@confluent.io>, Guozhang Wang <guozhang@confluent.io>, Matthias J. Sax <matthias@confluent.io>
In #9139, we added backward iterator on SessionStore. But there is a bug that when fetch/backwardFetch the key range, if there are multiple records in the same session window, we can't return the data in the correct order.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman<apache.org>
Found this while reading the code. We did a "a little heavy" check each time after performing assignment, which is to compare the "assigned topics" set and the "subscribed topics" set, to see if there's any topics not existed in another set. Also, the "assigned topics" set is created by traversing all the assigned partitions, which will be a little heavy if partition numbers are large.
However, as the comments described, it's a safe-guard for user-customized assignor, which might do assignment that we don't expected. In most cases, user will just use the in-product assignor, which guarantee that we only assign the topics from subscribed topics. Therefore, no need this check for in-product assignors.
In this PR, I added an "in-product assignor names" list, and we'll in consumerCoordinator check if the assignor is one of in-product assignors, to decide if we need to do the additional check. Also add test for it.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Guozhang Wang <guozhang@confluent.io>
This PR aims to utilize HighAvailabilityTaskAssignor to avoid downtime on corrupted tasks. The idea is that, when we hit TaskCorruptedException on an active task, a rebalance is triggered after we've wiped out the corrupted state stores. This will allow the assignor to temporarily redirect this task to another client who can resume work on the task while the original owner works on restoring the state from scratch.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
As raised in KAFKA-12994, All tests that use the old API should be either eliminated or migrated to the new API in order to remove the @SuppressWarnings("deprecation") annotations. This PR will migrate over all the relevant tests in TimeWindowsTests.java
Reviewers: Anna Sophie Blee-Goldman
Fix a bug where the validateOnly flag for createTopics was being ignored.
Reviewers: David Arthur <mumrah@gmail.com>, José Armando García Sancio <jsancio@gmail.com>, singingMan <3schwartz@users.noreply.github.com>
Replace deprecated exactly_once_beta with exactly_once_v2 in system tests.
Follow up for #10870, found out there are still some system tests using the deprecated exactly_once_beta. This PR updates them.
Reviewers: Bruno Cadonna <cadonna@apache.org>
Fix a ClusterControlManager log message that should have distinguished between
newly registered and re-registered brokers, but did not due to a bug.
Reviewers: Ismael Juma <ismael@juma.me.uk>, José Armando García Sancio <jsancio@gmail.com>
Before we used the metadata cache to determine whether or not to use topic IDs. Unfortunately, metadata cache updates with ZK controllers are in a separate request and may be too slow for the fetcher thread. This results in switching between topic names and topic IDs for topics that could just use IDs.
This patch adds topic IDs to FetcherState created in LeaderAndIsr requests. It also supports updating this state for follower threads as soon as a LeaderAndIsr request provides a topic ID.
We've opted to only update replica fetcher threads. AlterLogDir threads will use either topic name or topic ID depending on what was present when they were created.
Reviewers: David Jacot <djacot@confluent.io>
This also fixes KAFKA-13070.
We have seen a problem caused by shutting down the scheduler before shutting down LogManager.
When LogManager was closing partitions one by one, the scheduler called to delete old segments due to retention. However, the old segments could have been closed by the LogManager, which caused an exception and subsequently marked logdir as offline. As a result, the broker didn't flush the remaining partitions and didn't write the clean shutdown marker. Ultimately the broker took hours to recover the log during restart.
This PR essentially reverts #10538
Reviewers: Ismael Juma <ismael@juma.me.uk>, Kowshik Prakasam <kprakasam@confluent.io>, Jun Rao <junrao@gmail.com>
The `LastTimestamp` field is useful because its value is present even when there are no data batches written by a given producerId.
Reviewers: David Jacot <djacot@confluent.io>
Add support for CreateTopicsPolicy and AlterConfigsPolicy when running in KRaft mode.
Reviewers: David Arthur <mumrah@gmail.com>, Niket Goel <ngoel@confluent.io>
As part of the migration of KStream/KTable operations to the new Processor API https://issues.apache.org/jira/browse/KAFKA-8410, this PR includes the migration of KTable aggregate/reduce operations.
Reviewers: John Roesler <vvcephei@apache.org>
Add support for infinite range query for WindowStore. Story JIRA: https://issues.apache.org/jira/browse/KAFKA-13210
Reviewers: Patrick Stuedi <pstuedi@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
Internal topic configs with default value are not included in the response of CreateTopic/DescribeTopic. However, if they are explicitly set, they will be included in the response.
Reviewers: Jun Rao <junrao@gmail.com>
KAFKA-13246: StoreQueryIntegrationTest#shouldQueryStoresAfterAddingAndRemovingStreamThread does not gate on stream state well
The test now waits for the client to transition to REBALANCING/RUNNING after adding/removing a thread as well as to transition to RUNNING before querying the state store.
Reviewers: singingMan <@3schwartz>, Walker Carlson <wcarlson@confluent.io>, John Roesler <vvcephei@apache.org>
This patch fixes a deadlock when incrementing the high watermark after the synchronous zk ISR modification happens. The main difference is that we prevent the callback from executing while under the leader and ISR lock. The deadlock bug was introduced in https://github.com/apache/kafka/pull/11245.
Reviewers: David Jacot <djacot@confluent.io>
Our soak test application ran into an error constructing the ListDeserializer and crashed, but unfortunately the actual causing exception was not logged anywhere. This PR adds logging for this case, as well as a few others around this class and the corresponding serializer where additional logging would be helpful
Reviewers: Guozhang Wang
The ReplicaManager, LogManager, and KafkaApis class all have many
constructor parameters. It is often difficult to add or remove a
parameter, since there are so many locations that need to be updated. In
order to address this problem, we should use named parameters when
constructing these objects from Scala code. This will make it easy to
add new optional parameters without modifying many test cases. It will
also make it easier to read git diffs and PRs, since the parameters will
have names next to them. Since Java does not support named paramters,
this PR adds several Builder classes which can be used to achieve the
same effect.
ReplicaManager also had a secondary constructor, which this PR removes.
The function of the secondary constructor was just to provide some
default parameters for the main constructor. However, it is simpler just
to actually use default parameters.
Reviewers: David Arthur <mumrah@gmail.com>
This is to make sure that even if logging is disabled, we would still return null in order to workaround the deserialization issue for stream-stream left/outer joins.
Reviewers: Matthias J. Sax <mjsax@apache.org>
This is an alternative approach in parallel to #11235. After several unsuccessful trials to improve its efficiency i've come up with a larger approach, which is to use a kv-store instead as the shared store, which would store the value as list. The benefits of this approach are:
Only serde once that compose <timestamp, byte, key>, at the outer metered stores, with less byte array copies.
Deletes are straight-forward with no scan reads, just a single call to delete all duplicated <timestamp, byte, key> values.
Using a KV store has less space amplification than a segmented window store.
The cons though:
Each put call would be a get-then-write to append to the list; also we would spend a few more bytes to store the list (most likely a singleton list, and hence just 4 more bytes).
It's more complicated definitely.. :)
The main idea is that since the shared store is actively GC'ed by the expiration logic, not based on time retention, and since that the key format is in <timestamp, byte, key>, the range expiration query is quite efficient as well.
Added testing covering for the list stores (since we are still use kv-store interface, we cannot leverage on the get() calls in the stream-stream join, instead we use putIfAbsent and range only). Another minor factoring piggy-backed is to let toList to always close iterator to avoid leaking.
Reviewers: Sergio Peña <sergio@confluent.io>, Matthias J. Sax <mjsax@apache.org>
This PR aims to remove tombstones that persist indefinitely due to low throughput. Previously, deleteHorizon was calculated from the segment's last modified time.
In this PR, the deleteHorizon will now be tracked in the baseTimestamp of RecordBatches. After the first cleaning pass that finds a record batch with tombstones, the record batch is recopied with deleteHorizon flag and a new baseTimestamp that is the deleteHorizonMs. The records in the batch are rebuilt with relative timestamps based on the deleteHorizonMs that is recorded. Later cleaning passes will be able to remove tombstones more accurately on their deleteHorizon due to the individual time tracking on record batches.
KIP 534: https://cwiki.apache.org/confluence/display/KAFKA/KIP-534%3A+Retain+tombstones+and+transaction+markers+for+approximately+delete.retention.ms+milliseconds
Co-authored-by: Ted Yu <yuzhihong@gmail.com>
Co-authored-by: Richard Yu <yohan.richard.yu@gmail.com>
This patch fixes several problems with the `ElectLeaders` API in KRaft:
- `KafkaApis` did not properly forward this request type to the controller.
- `ControllerApis` did not handle the request type.
- `ElectLeadersRequest.getErrorResponse` may raise NPE when `TopicPartitions` is null.
- Controller should not do preferred election if `ElectLeaders` specifies `UNCLEAN` election.
- Controller should not do unclean election if `ElectLeaders` specifies `PREFERRED` election.
- Controller should use proper error codes to handle cases when desired leader is unavailable or when no election is needed because a desired leader is already elected.
- When election for all partitions is requested (indicated with null `TopicPartitions` field), the response should not return partitions for which no election was necessary.
In addition to extending the unit test coverage in `ReplicationControlManagerTest`, I have also converted `LeaderElectionCommandTest` to use KRaft.
Reviewers: dengziming <swzmdeng@163.com>, José Armando García Sancio <jsancio@users.noreply.github.com>, David Arthur <mumrah@gmail.com>
Raise `InvalidRecordException` from `DefaultRecordBatch.readFrom` instead of returning null if there are not enough bytes remaining to read the record. This ensures that the broker can raise a useful exception for malformed record batches.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
When introducing backward iterator for WindowStroe in #9138, we forgot to make "each segment" in reverse order (i.e. in descendingMap) in InMemoryWindowStore. Fix it and add integration tests for it.
Currently, in Window store, we store records in [segments -> [records] ].
For example:
window size = 500,
input records:
key: "a", value: "aa", timestamp: 0 ==> will be in [0, 500] window
key: "b", value: "bb", timestamp: 10 ==> will be in [0, 500] window
key: "c", value: "cc", timestamp: 510 ==> will be in [500, 1000] window
So, internally, the "a" and "b" will be in the same segment, and "c" in another segments.
segments: [0 /* window start */, records], [500, records].
And the records for window start 0 will be "a" and "b".
the records for window start 500 will be "c".
Before this change, we did have a reverse iterator for segments, but not in "records". So, when doing backwardFetchAll, we'll have the records returned in order: "c", "a", "b", which should be "c", "b", "a" obviously.
Reviewers: Jorge Esteban Quilcate Otoya <quilcate.jorge@gmail.com>, Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Guozhang Wang <wangguoz@gmail.com>
When using EOS checkpointed offsets are not updated to the latest offsets from the changelog because the maybeWriteCheckpoint method is only ever called when commitNeeded=false. This change will force the update if enforceCheckpoint=true .
I have also added a test which verifies that both the state store and the checkpoint file are completely up to date with the changelog after the app has shutdown.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Guozhang Wang <wangguoz@gmail.com>