1. In state updater, when handling task corrupted exception due to invalid restoring offset, first delete the affected partitions from the checkpoint before reporting it back to the stream thread. This is to mimic the same behavior in stream threads's StateManager#handleCorruption#closeDirtyAndRevive. It's cleaner to do so inside the restore thread, plus it enables us to optimize by only deleting those corrupted partitions, and not all.
2. In the state manager, handle the drained exceptions as follows (this is the same as handling all exceptions from handleAssignment): 1) Task-migrated, throw all the way to stream-thread as handleTaskMigrated, 2) any fatal Streams exception, throw all the way to stream-thread to trigger exception handler, 3) Task-corrupted, throw to the stream-thread as handleCorruption. Note that for 3), we would specially distinguish if the corrupted-tasks are already closed (when they are thrown from handleAssignment or not (when they are thrown from the state updater).
Reviewers: Bruno Cadonna <cadonna@apache.org>
Based on a patch submitted to the confluentinc fork & then abandoned. Needed some updates and minor expansion but more or less just re-applied the changes proposed in confluentinc#697.
Original PR has a very detailed justification for these changes but the tl;dr of it is that apparently the PriorityQueue's iterator does not actually guarantee to return elements in priority order.
Reviewer: Luke Chen <showuon@gmail.com>
#12374 adjusted the invocation of the alter configs policy check in KRaft to match the behavior in ZooKeeper, which is to only provide the configs that were explicitly sent in the request. While the code was correct for the incremental alter configs case, the code actually included the implicit deletions for the legacy/non-incremental alter configs case, and those implicit deletions are not included in the ZooKeeper-based invocation. This patch adds a test to check for this and adjusts ConfigurationControlManager code so that the test passes -- the adjusted test is confirmed to fail locally otherwise. We also add a log statement to emit any unexpected stack traces in the alter config code path.
Reviewers: José Armando García Sancio <jsancio@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
Changelogs are already unregistered when tasks are closed.
There is no need to also unregister them in the state
updater.
In future, when we will only have the state updater without
the old code path, we should consider registering and
unregistering the changelogs within the state updater.
Reviewer: Guozhang Wang <wangguoz@gmail.com>
When the task manager is shutdown, the state updater should also
shutdown. After the shutdown of the state updater, the tasks
in its output queues should be closed.
Reviewer: Guozhang Wang <wangguoz@gmail.com>
Previously, the KRaft controller was incorrectly reporting an empty feature set in
ApiVersionResponse. This was preventing any multi-node clusters from being upgraded via
kafka-features.sh, since they would incorrectly believe that metadata.version was not a supported
feature. This PR adds a regression test for this bug, KRaftClusterTest.testUpdateMetadataVersion.
Reviewers: José Armando García Sancio <jsancio@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
This test was removed in #11667 since UpdateFeatures is not properly handled in KRaft mode, now we can bring it back since UpdateFeatures is properly handled after #12036.
Reviewers: Luke Chen <showuon@gmail.com>
This PR adds support to kafka-features.sh for the --metadata flag, as specified in KIP-778. This
flag makes it possible to upgrade to a new metadata version without consulting a table mapping
version names to short integers. Change --feature to use a key=value format.
FeatureCommandTest.scala: make most tests here true unit tests (that don't start brokers) in order
to improve test run time, and allow us to test more cases. For the integration test part, test both
KRaft and ZK-based clusters. Add support for mocking feature operations in MockAdminClient.java.
upgrade.html: add a section describing how the metadata.version should be upgraded in KRaft
clusters.
Add kraft_upgrade_test.py to test upgrades between KRaft versions.
Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>, José Armando García Sancio <jsancio@gmail.com>
Changes:
- Migrate to Mockito
- Add more assertive checks using verify
- Minor indentation fixes
Reviewers: Dalibor Plavcic <dalibor.os@proton.me>, Bruno Cadonna <cadonna@apache.org>
When debugging some bugs related to configs, I find we are unable to show default broker/topic entity name since the resourceName="". Changed it to similar to how we trait default client quotas.
Reviewers: Luke Chen <showuon@gmail.com>
The state updater removes its updating and paused task on shutdown.
The removed tasks are added to the output queue for removed tasks.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Walker Carlson <wcarlson@confluent.io>
Once the state updater restored an active task it puts it
into an output queue. The stream thread reads the restored
active task from the output queue and after it verified
that the task is still owned by the stream thread it transits
it to RUNNING.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Walker Carlson <wcarlson@confluent.io>
The boostrap.checkpoint files should include a control record batch for
the SnapshotHeaderRecord at the start of the file. It should also
include a control record batch for the SnapshotFooterRecord at the end
of the file.
The snapshot header record is important because it versions the rest of
the bootstrap file.
Reviewers: David Arthur <mumrah@gmail.com>
Migrates Streams sustem tests to either use kraft brokers or to use both kraft and zk in a testing matrix.
This skips tests which use various forms of Kafka versioning since those seem to have issues with KRaft at the moment. Running these tests with KRaft will require a followup PR.
Reviewers: Guozhang Wang <guozhang@apache.org>, John Roesler <vvcephei@apache.org>
The main changes here are ensuring that we always have a metadata.version record in the log, making
˘sure that the bootstrap file can be used for records other than the metadata.version record (for
example, we will want to put SCRAM initialization records there), and fixing some bugs.
If no feature level record is in the log and the IBP is less than 3.3IV0, then we assume the minimum KRaft
version for all records in the log.
Fix some issues related to initializing new clusters. If there are no records in the log at all,
then insert the bootstrap records in a single batch. If there are records, but no metadata version,
process the existing records as though they were metadata.version 3.3IV0 and then append a metadata
version record setting version 3.3IV0. Previously, we were not clearly distinguishing between the
case where the metadata log was empty, and the case where we just needed to add a metadata.version
record.
Refactor BootstrapMetadata into an immutable class which contains a 3-tuple of metadata version,
record list, and source. The source field is used to log where the bootstrap metadata was obtained
from. This could be a bootstrap file, the static configuration, or just the software defaults.
Move the logic for reading and writing bootstrap files into BootstrapDirectory.java.
Add LogReplayTracker, which tracks whether the log is empty.
Fix a bug in FeatureControlManager where it was possible to use a "downgrade" operation to
transition to a newer version. Do not store whether we have seen a metadata version or not in
FeatureControlManager, since that is now handled by LogReplayTracker.
Introduce BatchFileReader, which is a simple way of reading a file containing batches of snapshots
that does not require spawning a thread. Rename SnapshotFileWriter to BatchFileWriter to be
consistent, and to reflect the fact that bootstrap files aren't snapshots.
QuorumController#processBrokerHeartbeat: add an explanatory comment.
Reviewers: David Arthur <mumrah@gmail.com>, Jason Gustafson <jason@confluent.io>
Our docs for offset retention has been outdated and hence needs to be updated. Also I think it's better to indicate how we handle offsets when delete-topics and delete-groups.
Reviewers: Victoria Xia <victoria.f.xia281@gmail.com>, Luke Chen <showuon@gmail.com>
AccessControlEntryRecord and RemoveAccessControlEntryRecord are added in KIP-801, FeatureLevelRecord was added in KIP-778, and BrokerRegistrationChangeRecord was added in KIP-841, and NoOpRecord was added in KIP-835, I added these 5 record types in MetadataShell.
Reviewers: Luke Chen <showuon@gmail.com>
A few small cleanups in the `DescribeQuorum` API and handling logic:
- Change field types in `QuorumInfo`:
- `leaderId`: `Integer` -> `int`
- `leaderEpoch`: `Integer` -> `long` (to allow for type expansion in the future)
- `highWatermark`: `Long` -> `long`
- Use field names `lastFetchTimestamp` and `lastCaughtUpTimestamp` consistently
- Move construction of `DescribeQuorumResponseData.PartitionData` into `LeaderState`
- Consolidate fetch time/offset update logic into `LeaderState.ReplicaState.updateFollowerState`
Reviewers: Luke Chen <showuon@gmail.com>, José Armando García Sancio <jsancio@users.noreply.github.com>
This implements KIP-830: https://cwiki.apache.org/confluence/display/KAFKA/KIP-830%3A+Allow+disabling+JMX+Reporter
It adds a new configuration `auto.include.jmx.reporter` that can be set to false to disable the JMX Reporter. This configuration is deprecated and will be removed in the next major version.
Reviewers: Tom Bentley <tbentley@redhat.com>, Christo Lolov <christo_lolov@yahoo.com>
This PR is a mirror of apache/kafka-site#433 which used placeholder images for the Kafka Streams that users need to click in order to load the iframe with the corresponding video.
Reviewers: Mickael Maison <mimaison@apache.org>
In Scala it's standard practice to use `_` whenever you are initializing variables. In regard to implementation, for object references, `_` initialization maps to `null` so there is no change in behavior.
Reviewers: Mickael Maison <mickael.maison@gmail.com>
Currently the task manager stores the tasks it manages in an
internally. We recently extracted the code to store and retrieve
tasks into its own class Tasks. However, the task manager creates
the Tasks object internally and during testing of the task
manager we do not have access to it which makes testing the task
manager quite complex.
This commit externalizes the data structure that the task manager
uses to store and rerieve tasks. It introduces the TasksRegistry
interface and lets the Tasks object implementing TasksRegistry.
The Tasks object is passed into the task manager via its
constructor. Passing the TasksRegistry dependency to the task
manager from outside faciliates simpler testing of the task
manager.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Walker Carlson <wcarlson@confluent.io>
Cache the Kafka cluster Id once it has been retrieved to avoid creating many Admin clients at startup.
Reviewers: Chris Egerton <fearthecellos@gmail.com>
Catch InvocationTargetException explicitly and propagate underlying cause
Reviewers: Ismael Juma <mlists@juma.me.uk>, Matthew de Detrich <mdedetrich@gmail.com>, Kvicii, Luke Chen <showuon@gmail.com>
Removes tasks from the state updater when the input partitions of the tasks are revoked or partitions are lost during a rebalance.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Add `MetadataQuorumCommand` to describe quorum status, I'm trying to use arg4j style command format, currently, we only support one sub-command which is "describe" and we can specify 2 arguments which are --status and --replication.
```
# describe quorum status
kafka-metadata-quorum.sh --bootstrap-server localhost:9092 describe --replication
ReplicaId LogEndOffset Lag LastFetchTimeMs LastCaughtUpTimeMs Status
0 10 0 -1 -1 Leader
1 10 0 -1 -1 Follower
2 10 0 -1 -1 Follower
kafka-metadata-quorum.sh --bootstrap-server localhost:9092 describe --status
ClusterId: fMCL8kv1SWm87L_Md-I2hg
LeaderId: 3002
LeaderEpoch: 2
HighWatermark: 10
MaxFollowerLag: 0
MaxFollowerLagTimeMs: -1
CurrentVoters: [3000,3001,3002]
CurrentObservers: [0,1,2]
# specify AdminClient properties
kafka-metadata-quorum.sh --bootstrap-server localhost:9092 --command-config config.properties describe --status
```
Reviewers: Jason Gustafson <jason@confluent.io>
We have been seeing a few exceptions like the following when running integration tests:
```
[2022-08-18 13:02:59,470] ERROR [ControllerApis nodeId=3000] Unexpected error handling request RequestHeader(apiKey=FETCH, apiVersion=13, clientId=raft-client-0, correlationId=7) -- FetchRequestData(clusterId='txpo87ZUSbGSeV2v7H0n_w', replicaId=0, maxWaitMs=500, minBytes=0, maxBytes=8388608, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='__cluster_metadata', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=1, fetchOffset=6, lastFetchedEpoch=1, logStartOffset=-1, partitionMaxBytes=0)])], forgottenTopicsData=[], rackId='') with context RequestContext(header=RequestHeader(apiKey=FETCH, apiVersion=13, clientId=raft-client-0, correlationId=7), connectionId='127.0.0.1:63113-127.0.0.1:63114-0', clientAddress=/127.0.0.1, principal=User:ANONYMOUS, listenerName=ListenerName(CONTROLLER), securityProtocol=PLAINTEXT, clientInformation=ClientInformation(softwareName=apache-kafka-java, softwareVersion=unknown), fromPrivilegedListener=false, principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@201038c3]) (kafka.server.ControllerApis:76)
java.util.concurrent.CompletionException: java.util.NoSuchElementException: key not found: BROKER_NOT_AVAILABLE
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:936)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
at org.apache.kafka.raft.KafkaRaftClient.lambda$handleRequest$19(KafkaRaftClient.java:1666)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
at kafka.raft.TimingWheelExpirationService$TimerTaskCompletableFuture.run(TimingWheelExpirationService.scala:32)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.util.NoSuchElementException: key not found: BROKER_NOT_AVAILABLE
```
There are two causes for this error that I found. First, we were not shutting down the timer services in `RaftManager` which are used in the purgatory implementation. This meant that operations remaining in purgatory could be completed even after `RaftManager` was shutdown. Second, the shutdown order in `KafkaClusterTestKit` was problematic. The `RaftManager` instance depends on the `SocketServer` in `ControllerServer`, but it was the latter that was shutdown first. Instead, we should shutdown `RaftManager` first as we do in `KafkaRaftServer`.
Reviewers: Ismael Juma <ismael@juma.me.uk>
While reviewing KIP-588 and KIP-691 I went through the exception throwing behavior and wanted to improve the related javadocs a little bit.
Reviewers: John Roesler <vvcephei@apache.org>
Fix a bug in ReplicationControlManager where we got a NullPointerException when removing a topic
with no offline replicas, and there were other topics that did have offline replicas.
Fix an issue in MetadataDelta#replay where we were replaying RemoveTopicRecord twice.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, dengziming <dengziming1993@gmail.com>