Various tests in the streams park were leaking native memory.
Most tests were fixed by closing the corresponding rocksdb resource.
I tested that the corresponding leak is gone by using a previous rocksdb
release with finalizers and checking if the finalizers would be called at some
point.
Reviewer: Bruno Cadonna <cadonna@apache.org>
StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores and StoreQueryIntegrationTest#shouldQueryOnlyActivePartitionStoresByDefault has a logic to find active partitions by doing a modulo with 2 and comparing the remainder. This can break when a new test is added and since Junit chooses an arbitrary order to run the tests, modulo checks can fail. This PR tries to make it deterministic.
Also, this PR uses Junit5 annotations so that the cluster and input topic can be setup/destroyed once.
Reviewer: Bruno Cadonna <cadonna@apache.org>
The failure handling code for fetches could run into an IllegalStateException if a fetch response came back with a failure after the corresponding topic partition has already been removed from the assignment.
Reviewers: David Jacot <djacot@confluent.io>
The call to `interrupt` on the state updater thread during shutdown
could interrupt the thread while writing the checkpoint file. This
can cause a failure to write the checkpoint file and a misleading
stack trace in the logs.
Reviewer: Bruno Cadonna <cadonna@apache.org>
In MetadataQuorumCommandTest, we sometimes got the error:
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Received a fatal error while waiting for the broker to catch up with the current cluster metadata.
Since we tried to bring up 3 broker + 3 controllers at the same time, and the config initial.broker.registration.timeout.ms (default 1 min) is sometimes not enough for them to start up. Checking the tests, it doesn't require so many nodes. Reducing the nodes number to make these tests reliable.
Reviewers: dengziming <dengziming1993@gmail.com>, Ismael Juma <ismael@juma.me.uk>
For broader context on this change, please check:
* KAFKA-14470: Move log layer to storage module
Reviewers: Jun Rao <junrao@gmail.com>, Satish Duggana <satishd@apache.org>
This is a requirement for:
* KAFKA-14477: Move LogValidator to storage module.
For broader context on this change, please check:
* KAFKA-14470: Move log layer to storage module
Reviewers: dengziming <dengziming1993@gmail.com>
For broader context on this change, please check:
* KAFKA-14470: Move log layer to storage module
Reviewers: Jun Rao <junrao@gmail.com>, Satish Duggana <satishd@apache.org>
The controller metrics in the controllers has three problems. 1) the active controller exposes uncommitted data in the metrics. 2) the active controller doesn't update the metrics when the uncommitted data gets aborted. 3) the controller doesn't update the metrics when the entire state gets reset.
We fix these issues by only updating the metrics when processing committed metadata records and reset the metrics when the metadata state is reset.
This change adds a new type `ControllerMetricsManager` which processes committed metadata records and updates the metrics accordingly. This change also removes metrics updating responsibilities from the rest of the controller managers.
Reviewers: Ron Dagostino <rdagostino@confluent.io>
This patch applies the necessary conversion of milliseconds to nanoseconds for SnapshotGenerator.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, José Armando García Sancio <jsancio@gmail.com>
A test that verifies the upgrade from a version of Streams with
state updater disabled to a version with state updater enabled
and vice versa, so that we can offer a save upgrade path.
- upgrade test from a version of Streams with state updater
disabled to a version with state updater enabled
- downgrade test from a version of Streams with state updater
enabled to a version with state updater disabled
Reviewer: Bruno Cadonna <cadonna@apache.org>
For broader context on this change, please check:
* KAFKA-14470: Move log layer to storage module
Reviewers: Jun Rao <junrao@gmail.com>, Satish Duggana <satishd@apache.org>
For broader context on this change, please check:
* KAFKA-14470: Move log layer to storage module
Reviewers: Jun Rao <junrao@gmail.com>, Satish Duggana <satishd@apache.org>
This PR updates StateQueryResult.getOnlyPartitionResult() to not throw an IllegaArgumentException when there are 0 query results.
Added a test that will fail without this patch
Reviewers: John Roesler<vvcephei@apache.org>
[KAFKA-14264](https://issues.apache.org/jira/browse/KAFKA-14264)
In this patch, we refactored the existing FindCoordinator mechanism. In particular, we first centralize all of the network operation (send, poll) in `NetworkClientDelegate`, then we introduced a RequestManager interface that is responsible to handle the timing of different kind of requests, based on the implementation. In this path, we implemented a `CoordinatorRequestManager` which determines when to create an `UnsentRequest` upon polling the request manager.
Reviewers: Jason Gustafson <jason@confluent.io>
Older clients can not handle the `REQUEST_TIMED_OUT` error that is returned from `InitProducerId` when the next producerId block cannot be fetched from the controller. In this patch, we return `COORDINATOR_LOAD_IN_PROGRESS` instead which is retriable.
Reviewers: Jason Gustafson <jason@confluent.io>
Every request used in KafkaApiTests is serialized before hitting KafkaApis. This ensures that backward compatibility is handled correctly, especially ignorable fields. We don't do the same for the responses though. This patch fixes this and refactor how we get back the response in tests.
Reviewers: Mickael Maison <mickael.maison@gmail.com>
#11390 and #12993 were merged in relatively quick succession, which resulted in compiler errors that weren't present when each change was on top of trunk.
Reviewers: Mickael Maison <mickael.maison@gmail.com>
This PR implements the follower fetch protocol as mentioned in KIP-405.
Added a new version for ListOffsets protocol to receive local log start offset on the leader replica. This is used by follower replicas to find the local log star offset on the leader.
Added a new version for FetchRequest protocol to receive OffsetMovedToTieredStorageException error. This is part of the enhanced fetch protocol as described in KIP-405.
We introduced a new field locaLogStartOffset to maintain the log start offset in the local logs. Existing logStartOffset will continue to be the log start offset of the effective log that includes the segments in remote storage.
When a follower receives OffsetMovedToTieredStorage, then it tries to build the required state from the leader and remote storage so that it can be ready to move to fetch state.
Introduced RemoteLogManager which is responsible for
initializing RemoteStorageManager and RemoteLogMetadataManager instances.
receives any leader and follower replica events and partition stop events and act on them
also provides APIs to fetch indexes, metadata about remote log segments.
Followup PRs will add more functionality like copying segments to tiered storage, retention checks to clean local and remote log segments. This will change the local log start offset and make sure the follower fetch protocol works fine for several cases.
You can look at the detailed protocol changes in KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-FollowerReplication
Co-authors: satishd@apache.org, kamal.chandraprakash@gmail.com, yingz@uber.com
Reviewers: Kowshik Prakasam <kprakasam@confluent.io>, Cong Ding <cong@ccding.com>, Tirtha Chatterjee <tirtha.p.chatterjee@gmail.com>, Yaodong Yang <yangyaodong88@gmail.com>, Divij Vaidya <diviv@amazon.com>, Luke Chen <showuon@gmail.com>, Jun Rao <junrao@gmail.com>
The OAuth code to generate the Authentication header was incorrectly
using the URL-safe base64 encoder. For client IDs and/or secrets with
dashes and/or plus signs would not be encoded correctly, leading to the
OAuth server to reject the credentials.
This change uses the correct base64 encoder, per RFC-7617.
Co-authored-by: Endre Vig <vendre@gmail.com>
Add comments to clarify that both offsets and partitions are 0-indexed, and fix a minor typo. Clarify which offset will be retrieved by poll() after seek() is used in various circumstances. Also added integration tests.
Reviewers: Luke Chen <showuon@gmail.com>
This PR introduces the new metadata loader and snapshot generator. For the time being, they are
only used by the controller, but a PR for the broker will come soon.
The new metadata loader supports adding and removing publishers dynamically. (In contrast, the old
loader only supported adding a single publisher.) It also passes along more information about each
new image that is published. This information can be found in the LogDeltaManifest and
SnapshotManifest classes.
The new snapshot generator replaces the previous logic for generating snapshots in
QuorumController.java and associated classes. The new generator is intended to be shared between
the broker and the controller, so it is decoupled from both.
There are a few small changes to the old snapshot generator in this PR. Specifically, we move the
batch processing time and batch size metrics out of BrokerMetadataListener.scala and into
BrokerServerMetrics.scala.
Finally, fix a case where we are using 'is' rather than '==' for a numeric comparison in
snapshot_test.py.
Reviewers: David Arthur <mumrah@gmail.com>
Follow up PR for KIP-837. We don't want to allow multicasting for IQ. This PR imposes that restriction.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This PR enables brokers which are upgrading from ZK mode to KRaft mode to forward certain metadata
change requests to the controller instead of applying them directly through ZK. To faciliate this,
we now support EnvelopeRequest on zkBrokers (instead of only on KRaft nodes.)
In BrokerToControllerChannelManager, we can now reinitialize our NetworkClient. This is needed to
handle the case when we transition from forwarding requests to a ZK-based broker over the
inter-broker listener, to forwarding requests to a quorum node over the controller listener.
In MetadataCache.scala, distinguish between KRaft and ZK controller nodes with a new type,
CachedControllerId.
In LeaderAndIsrRequest, StopReplicaRequest, and UpdateMetadataRequest, switch from sending both a
zk and a KRaft controller ID to sending a single controller ID plus a boolean to express whether it
is KRaft. The previous scheme was ambiguous as to whether the system was in KRaft or ZK mode when
both IDs were -1 (although this case is unlikely to come up in practice). The new scheme avoids
this ambiguity and is simpler to understand.
Reviewers: dengziming <dengziming1993@gmail.com>, David Arthur <mumrah@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
This patch adds `deleteGroups` to the new `GroupCoordinator` interface and updates `KafkaApis` to use it.
Reviewers: Omnia G H Ibrahim <o.g.h.ibrahim@gmail.com>, Justine Olshan <jolshan@confluent.io>, Jason Gustafson <jason@confluent.io>
Increment the value in "/controller_epoch" when registering a KRaft controller as the active controller. Use the "kraftControllerEpoch" stored under "/controller" to ensure we are registering a newer KRaft controller.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
The test was added with a fix to KAFKA-14379, the problem was that the replication factor for the offset topic was 1 and consumer group coordinator got unavailable when one of the brokers got shut down.
Reviewers: David Jacot <djacot@confluent.io>
Prior to starting a KIP-866 migration, the ZK brokers must register themselves with the active
KRaft controller. The controller waits for all brokers to register in order to verify that all the
brokers can
A) Communicate with the quorum
B) Have the migration config enabled
C) Have the proper IBP set
This patch uses the new isMigratingZkBroker field in BrokerRegistrationRequest and
RegisterBrokerRecord. The type was changed from int8 to bool for BrokerRegistrationRequest (a
mistake from #12860). The ZK brokers use the existing BrokerLifecycleManager class to register and
heartbeat with the controllers.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
Let `RaftClient.createSnapshot` take the snapshotId directly instead of the committed offset/epoch (which may not exist).
Reviewers: José Armando García Sancio <jsancio@apache.org>
There is unwanted logging introduced by #12803 as pointed out in this comment: #12803 (comment). This PR removes it.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Bruno Cadonna <cadonna@apache.org>
The PR resolve issue KAFKA-14285. After doing changeConfigs, check and clean quota nodes if configs are empty, to avoid infinite increasement of quota nodes in zookeeper
Reviewers: Luke Chen <showuon@gmail.com>, Igor Soarez <soarez@apple.com>, dengziming <dengziming1993@gmail.com>
This patch adds `describeGroups` to the new `GroupCoordinator` interface and updates `KafkaApis` to use it.
Reviewers: Justine Olshan <jolshan@confluent.io>, Jason Gustafson <jason@confluent.io>
While debugging KRaft and the metadata state machines it is helpful to always log the first time the replica discovers the high watermark. All other updates to the high watermark are logged at trace because they are more frequent and less useful.
Reviewers: Luke Chen <showuon@gmail.com>
Introduce MetadataProvenance to encapsulate the three-tuple of (offset, epoch, timestamp) that is
associated with each MetadataImage, as well as each on-disk snapshot. Also introduce a builder
for MetadataDelta.
Remove offset and epoch tracking from MetadataDelta. We do not really need to know this information
until we are creating the final MetadataImage object. Therefore, this bookkeeping should be done by
the metadata loading code, not inside the delta code, like the other bookkeeping. This simplifies a
lot of tests, as well as simplifying RecordTestUtils. It also makes more sense for snapshots, where
the offset and epoch are the same for every record.
Add ImageReWriter, an ImageWriter that applies records to a MetadataDelta. This is useful when you
need to create a MetadataDelta object that holds the contents of a MetadataImage. This will be
used in the new image loader code (coming soon).
Add ImageWriterOptionsTest to test ImageWriterOptions.
Reviewers: David Arthur <mumrah@gmail.com>
The consumer (fetcher) used to refresh the preferred read replica on
three conditions:
1. the consumer receives an OFFSET_OUT_OF_RANGE error
2. the follower does not exist in the client's metadata (i.e., offline)
3. after metadata.max.age.ms (5 min default)
For other errors, it will continue to reach to the possibly unavailable
follower and only after 5 minutes will it refresh the preferred read
replica and go back to the leader.
Another problem is that the client might have stale metadata and not
send fetches to preferred replica, even after the leader redirects to
the preferred replica.
A specific example is when a partition is reassigned. the consumer will
get NOT_LEADER_OR_FOLLOWER which triggers a metadata update but the
preferred read replica will not be refreshed as the follower is still
online. it will continue to reach out to the old follower until the
preferred read replica expires.
The consumer can instead refresh its preferred read replica whenever it
makes a metadata update request, so when the consumer receives i.e.
NOT_LEADER_OR_FOLLOWER it can find the new preferred read replica without
waiting for the expiration.
Generally, we will rely on the leader to choose the correct preferred
read replica and have the consumer fail fast (clear preferred read replica
cache) on errors and reach out to the leader.
Co-authored-by: Jeff Kim <jeff.kim@confluent.io>
Reviewers: David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>
Newly added test KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions as part of KIP-837 passes when run individually but fails when is part of IT class and hence is marked as Ignored.
That seemed to have been because of the way StreamsConfig was being initialised so any new test would have used the same names. Because of which the second test never got to the desired state. With this PR, every test gets a unique app name which seems to have fixed the issue. Also, a couple of cosmetic changes
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>