Adds an integration test for the manual upgrade scenario to upgrade a non-versioned store to a versioned store. The procedure is outlined in KIP-889 and also in the docs.
Reviewers: Matthias J. Sax <matthias@confluent.io>
The SnapshotReader exposes the "last contained log time". This is mainly used during snapshot cleanup. The previous implementation used the append time of the snapshot record. This is not accurate as this is the time when the snapshot was created and not the log append time of the last record included in the snapshot.
The log append time of the last record included in the snapshot is store in the header control record of the snapshot. The header control record is the first record of the snapshot.
To be able to read this record, this change extends the RecordsIterator to decode and expose the control records in the Records type.
Reviewers: Colin Patrick McCabe <cmccabe@apache.org>
As the third part of the KIP-903, it fills the broker epochs from the Fetch request into the AlterPartitionRequest. Also, before generating the alterPartitionRequest, the partition will check whether the broker epoch from the FetchRequest matches with the broker epoch recorded in the metadata cache. If not, the ISR change will be delayed.
Reviewers: Jun Rao <junrao@gmail.com>
Under at-least-once, we want to ensure checkpointing the progress after completing the restoration to prevent losing the progress and needing to restore from scratch.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Bruno Cadonna <cadonna@apache.org>
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Viktor Somogyi-Vass <viktorsomogyi@gmail.com>, Jordan Moore <crikket.007@gmail.com>, Chris Egerton <fearthecellos@gmail.com>
Whenever there are changes to the ISR, add an extra string to the existing log message in case the partition is under min ISR. This makes it easier to search the log when partitions go under min-ISR.
Reviewers: Luke Chen <showuon@gmail.com>, Colin Patrick McCabe <colin@cmccabe.xyz>
We have seen the following error in logs:
```
"Mar 22, 2019 @ 21:57:56.655",Error,"kafka-0-0","transaction-log-manager-0","Uncaught exception in scheduled task 'transactionalId-expiration'","java.lang.IllegalArgumentException: Illegal new producer epoch -1
```
Investigations showed that it is actually possible for a transaction metadata object to still have -1 as producer epoch when it transitions to Dead.
When a transaction metadata is created for the first time (in handleInitProducerId), it has -1 as its producer epoch. Then a producer epoch is attributed and the transaction coordinator tries to persist the change. If the write fail for instance because there is an under min isr, the transaction metadata remains with its epoch as -1 forever or until the init producer id is retried.
This means that it is possible for transaction metadata to remain with -1 as producer epoch until it gets expired. At the moment, this is not allowed because we enforce a producer epoch greater or equals to 0 in prepareTransitionTo.
Reviewers: Luke Chen <showuon@gmail.com>, Justine Olshan <jolshan@confluent.io>
This fix is inspired by #12540.
1. Added a clearCache function for CachedStateStore, which would be triggered upon recycling a state manager.
2. Added the integration test inherited from #12540 .
3. Improved some log4j entries.
4. Found and fixed a minor issue with log4j prefix.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax <matthias@confluent.io>
Until this PR, all the code added for KIP-889 for introducing versioned stores to Kafka Streams has been accessible from internal packages only. This PR exposes the stores via public Stores.java methods, and also updates the TopologyTestDriver.
Reviewers: Matthias J. Sax <matthias@confluent.io>
The motivation for introducing InMemoryLeaderEpochCheckpoint is to allow remote log manager to create the RemoteLogSegmentMetadata(RLSM) with the correct leader epoch info for a specific segment. To do that, we need to rely on the LeaderEpochCheckpointCache to truncate from start and end, to get the epoch info. However, we don't really want to truncate the epochs in cache (and write to checkpoint file in the end). So, we introduce this InMemoryLeaderEpochCheckpoint to feed into LeaderEpochCheckpointCache, and when we truncate the epoch for RLSM, we can do them in memory without affecting the checkpoint file, and without interacting with file system.
Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>
1. Fix the StateUpdater shutdown procedure: a) in shutdown, we first set the running flag, then notify the condition; b) in the thread's waitIfAllChangelogsCompletelyRead block, we collapse the if condition together with the while condition so that we always check all four conditions once the thread is notified inside the while loop. As a result, shutdown procedure would not involve any thread interruptions anymore.
2. Print fine-grained streams exception when list-offset fails, this is a byproduct of the debugging procedure but I think it's worth keeping since it has better operational visibilities.
3. Some nit logging improvements (including moving logger from the inner thread into the outer class to also add some more logging).
4. Re-enable state-updater in SmokeTestDriverIntegrationTest.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Bruno Cadonna <cadonna@apache.org>
Implement KIP-900
Update kafka-storage to be able to add SCRAM records to the bootstrap metadata file at format time so that SCRAM is enabled at initial start (bootstrap) of KRaft cluster. Includes unit tests.
Update ./core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala to use bootstrap and
enable the test to run with both ZK and KRaft quorum.
Moved the one test from ScramServerStartupTest.scala into SaslScramSslEndToEndAuthorizationTest.scala. This test is really small, so there was no point in recreating all the bootstrap startup just for a 5 line test when it could easily be run elsewhere.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Manikumar Reddy <manikumar.reddy@gmail.com>
The RocksDB-based versioned store implementation introduced in KIP-889 currently uses two physical RocksDB instances per store instance: one for the "latest value store" and another for the "segments store." This PR combines those two RocksDB instances into one by representing the latest value store as a special "reserved" segment within the segments store. This reserved segment has segment ID -1, is never expired, and is not included in the regular Segments methods for getting or creating segments, but is represented in the physical RocksDB instance the same way as any other segment.
Reviewers: Matthias J. Sax <matthias@confluent.io>
The RocksDB-based implementation of versioned stores introduced via KIP-889 consists of a "latest value store" and separate (logical) "segments stores." A single put operation may need to modify multiple (two) segments, or both a segment and the latest value store, which opens the possibility to store inconsistencies if the first write succeeds while the later one fails. When this happens, Streams will error out, but the store still needs to be able to recover upon restart. This PR adds the necessary repair logic into RocksDBVersionedStore to effectively undo the earlier failed write when a store inconsistency is encountered.
Reviewers: Matthias J. Sax <matthias@confluent.io>
Best-effort rack alignment for sticky assignors when both consumer racks and partition racks are available with the protocol changes introduced in KIP-881. Rack-aware assignment is enabled by configuring client.rack for consumers. The assignment builders attempt to align on racks on a best-effort basis, but prioritize balanced assignment over rack alignment.
Reviewers: David Jacot <djacot@confluent.io>
Fix a case where we were getting an exception because we removed a publisher, but left it in
BrokerServer.metadataPublishers (resulting in us trying to remove it during broker shutdown.)
When `client.rack` is configured for consumers, we perform rack-aware consumer partition assignment to improve locality. After/during reassignments, replica racks may change, so to ensure optimal consumer assignment, trigger rebalance from the leader when set of racks of any partition changes.
Reviewers: David Jacot <djacot@confluent.io>
Why:
Using java.util.Random to generate every byte sent from the ProducerPerformance
appears to be a limiting factor. Throughput of the ProducerPerformance script is
higher with a file of records as compared to randomly generated records.
On my machine a single thread can generate ~100MB/second of uppercase letters using
java.util.Random and ~300MB/sec using java.util.SplittableRandom. This is a limit on
throughput.
Note: you can optimise further by expanding it from 26 letters to 32 letter generated
as it is more efficient to generate a nicely distributed int when the bound is a
power of two.
Reviewers: Luke Chen <showuon@gmail.com>
A couple tests in TopicCommandIntegrationTest look flaky, such as testTopicDeletion and testTopicWithCollidingCharDeletionAndCreateAgain.
I also updated part of a comment that implies the code only runs in ZK mode but thats not the case so I removed it.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Justine Olshan <jolshan@confluent.io>
On startup, we always update the metadata. The topic ID also goes from null to defined. Move the epoch is null check to before the topic ID check to prevent log spam.
Reviewers: David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>
This is a small patch to make it so we only create one FeatureControlManager instance in ReplicationControlManagerTest. Currently we create two, which isn't needed. Its also a bit confusing because the ReplicationControlTestContext objects ends up having a different FeatureControlManager reference that the one its own ReplicationControlManager instance has a reference to.
Reviewers: José Armando García Sancio <jsancio@apache.org>, dengziming <dengziming1993@gmail.com>
In this PR, I implemented the committed API. Here are the specifics:
* the CommitRequestManager handles committed() request.
* I implemented a UnsentOffsetFetchRequestState to handle deduping the request: because we don't want to send the exact requests repeatedly.
* I implemented the retry mechanism: Some retriable errors will be retried automatically
* ClientResponse errors are handled in the handlers.
* Some of the top-level APIs were refactored lightly.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Removed addOne method that broke scala 2.12 build
---------
Co-authored-by: David Arthur <mumrah@gmail.com>
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Jason Gustafson <jason@confluent.io>
The MetadataLoader is not supposed to publish metadata updates until we have loaded up to the high
water mark. Previously, this logic was broken, and we published updates immediately. This PR fixes
that and adds a junit test.
Another issue is that the MetadataLoader previously assumed that we would periodically get
callbacks from the Raft layer even if nothing had happened. We relied on this to install new
publishers in a timely fashion, for example. However, in older MetadataVersions that don't include
NoOpRecord, this is not a safe assumption.
Aside from the above changes, also fix a deadlock in SnapshotGeneratorTest, fix the log prefix for
BrokerLifecycleManager, and remove metadata publishers on brokerserver shutdown (like we do for
controllers).
Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>
Currently, we have various bits of reassignment logic spread across different classes. For example, ReplicationControlManager contains logic for when a reassignment is in progress, which is duplication in PartitionChangeBuilder. Another example is PartitionReassignmentRevert which contains logic for how to undo/revert a reassignment. The idea here is to move the logic to PartitionReassignmentReplicas so it's more testable and easier to reason about.
Reviewers: José Armando García Sancio <jsancio@apache.org>
This patch refactors the loadCache method in AclAuthorizer to make it reusable by ZkMigrationClient.
The loaded ACLs are converted to AccessControlEntryRecord. I noticed we still have the defunct
AccessControlRecord, so I've deleted it.
Also included here are the methods to write ACL changes back to ZK while in dual-write mode.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Colin P. McCabe <cmccabe@apache.org>
Fix for a NPE bug that was caused by referring to a local variable and not the instance variable of the deserializers.
Co-authored-by: Robert Yokota <1761488+rayokota@users.noreply.github.com>
Reviewers: Robert Yokota <1761488+rayokota@users.noreply.github.com>, Guozhang Wang <wangguoz@gmail.com>