Migrates Streams sustem tests to either use kraft brokers or to use both kraft and zk in a testing matrix.
This skips tests which use various forms of Kafka versioning since those seem to have issues with KRaft at the moment. Running these tests with KRaft will require a followup PR.
Reviewers: Guozhang Wang <guozhang@apache.org>, John Roesler <vvcephei@apache.org>
The main changes here are ensuring that we always have a metadata.version record in the log, making
˘sure that the bootstrap file can be used for records other than the metadata.version record (for
example, we will want to put SCRAM initialization records there), and fixing some bugs.
If no feature level record is in the log and the IBP is less than 3.3IV0, then we assume the minimum KRaft
version for all records in the log.
Fix some issues related to initializing new clusters. If there are no records in the log at all,
then insert the bootstrap records in a single batch. If there are records, but no metadata version,
process the existing records as though they were metadata.version 3.3IV0 and then append a metadata
version record setting version 3.3IV0. Previously, we were not clearly distinguishing between the
case where the metadata log was empty, and the case where we just needed to add a metadata.version
record.
Refactor BootstrapMetadata into an immutable class which contains a 3-tuple of metadata version,
record list, and source. The source field is used to log where the bootstrap metadata was obtained
from. This could be a bootstrap file, the static configuration, or just the software defaults.
Move the logic for reading and writing bootstrap files into BootstrapDirectory.java.
Add LogReplayTracker, which tracks whether the log is empty.
Fix a bug in FeatureControlManager where it was possible to use a "downgrade" operation to
transition to a newer version. Do not store whether we have seen a metadata version or not in
FeatureControlManager, since that is now handled by LogReplayTracker.
Introduce BatchFileReader, which is a simple way of reading a file containing batches of snapshots
that does not require spawning a thread. Rename SnapshotFileWriter to BatchFileWriter to be
consistent, and to reflect the fact that bootstrap files aren't snapshots.
QuorumController#processBrokerHeartbeat: add an explanatory comment.
Reviewers: David Arthur <mumrah@gmail.com>, Jason Gustafson <jason@confluent.io>
Our docs for offset retention has been outdated and hence needs to be updated. Also I think it's better to indicate how we handle offsets when delete-topics and delete-groups.
Reviewers: Victoria Xia <victoria.f.xia281@gmail.com>, Luke Chen <showuon@gmail.com>
AccessControlEntryRecord and RemoveAccessControlEntryRecord are added in KIP-801, FeatureLevelRecord was added in KIP-778, and BrokerRegistrationChangeRecord was added in KIP-841, and NoOpRecord was added in KIP-835, I added these 5 record types in MetadataShell.
Reviewers: Luke Chen <showuon@gmail.com>
A few small cleanups in the `DescribeQuorum` API and handling logic:
- Change field types in `QuorumInfo`:
- `leaderId`: `Integer` -> `int`
- `leaderEpoch`: `Integer` -> `long` (to allow for type expansion in the future)
- `highWatermark`: `Long` -> `long`
- Use field names `lastFetchTimestamp` and `lastCaughtUpTimestamp` consistently
- Move construction of `DescribeQuorumResponseData.PartitionData` into `LeaderState`
- Consolidate fetch time/offset update logic into `LeaderState.ReplicaState.updateFollowerState`
Reviewers: Luke Chen <showuon@gmail.com>, José Armando García Sancio <jsancio@users.noreply.github.com>
This implements KIP-830: https://cwiki.apache.org/confluence/display/KAFKA/KIP-830%3A+Allow+disabling+JMX+Reporter
It adds a new configuration `auto.include.jmx.reporter` that can be set to false to disable the JMX Reporter. This configuration is deprecated and will be removed in the next major version.
Reviewers: Tom Bentley <tbentley@redhat.com>, Christo Lolov <christo_lolov@yahoo.com>
This PR is a mirror of apache/kafka-site#433 which used placeholder images for the Kafka Streams that users need to click in order to load the iframe with the corresponding video.
Reviewers: Mickael Maison <mimaison@apache.org>
In Scala it's standard practice to use `_` whenever you are initializing variables. In regard to implementation, for object references, `_` initialization maps to `null` so there is no change in behavior.
Reviewers: Mickael Maison <mickael.maison@gmail.com>
Currently the task manager stores the tasks it manages in an
internally. We recently extracted the code to store and retrieve
tasks into its own class Tasks. However, the task manager creates
the Tasks object internally and during testing of the task
manager we do not have access to it which makes testing the task
manager quite complex.
This commit externalizes the data structure that the task manager
uses to store and rerieve tasks. It introduces the TasksRegistry
interface and lets the Tasks object implementing TasksRegistry.
The Tasks object is passed into the task manager via its
constructor. Passing the TasksRegistry dependency to the task
manager from outside faciliates simpler testing of the task
manager.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Walker Carlson <wcarlson@confluent.io>
Cache the Kafka cluster Id once it has been retrieved to avoid creating many Admin clients at startup.
Reviewers: Chris Egerton <fearthecellos@gmail.com>
Catch InvocationTargetException explicitly and propagate underlying cause
Reviewers: Ismael Juma <mlists@juma.me.uk>, Matthew de Detrich <mdedetrich@gmail.com>, Kvicii, Luke Chen <showuon@gmail.com>
Removes tasks from the state updater when the input partitions of the tasks are revoked or partitions are lost during a rebalance.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Add `MetadataQuorumCommand` to describe quorum status, I'm trying to use arg4j style command format, currently, we only support one sub-command which is "describe" and we can specify 2 arguments which are --status and --replication.
```
# describe quorum status
kafka-metadata-quorum.sh --bootstrap-server localhost:9092 describe --replication
ReplicaId LogEndOffset Lag LastFetchTimeMs LastCaughtUpTimeMs Status
0 10 0 -1 -1 Leader
1 10 0 -1 -1 Follower
2 10 0 -1 -1 Follower
kafka-metadata-quorum.sh --bootstrap-server localhost:9092 describe --status
ClusterId: fMCL8kv1SWm87L_Md-I2hg
LeaderId: 3002
LeaderEpoch: 2
HighWatermark: 10
MaxFollowerLag: 0
MaxFollowerLagTimeMs: -1
CurrentVoters: [3000,3001,3002]
CurrentObservers: [0,1,2]
# specify AdminClient properties
kafka-metadata-quorum.sh --bootstrap-server localhost:9092 --command-config config.properties describe --status
```
Reviewers: Jason Gustafson <jason@confluent.io>
We have been seeing a few exceptions like the following when running integration tests:
```
[2022-08-18 13:02:59,470] ERROR [ControllerApis nodeId=3000] Unexpected error handling request RequestHeader(apiKey=FETCH, apiVersion=13, clientId=raft-client-0, correlationId=7) -- FetchRequestData(clusterId='txpo87ZUSbGSeV2v7H0n_w', replicaId=0, maxWaitMs=500, minBytes=0, maxBytes=8388608, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='__cluster_metadata', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=1, fetchOffset=6, lastFetchedEpoch=1, logStartOffset=-1, partitionMaxBytes=0)])], forgottenTopicsData=[], rackId='') with context RequestContext(header=RequestHeader(apiKey=FETCH, apiVersion=13, clientId=raft-client-0, correlationId=7), connectionId='127.0.0.1:63113-127.0.0.1:63114-0', clientAddress=/127.0.0.1, principal=User:ANONYMOUS, listenerName=ListenerName(CONTROLLER), securityProtocol=PLAINTEXT, clientInformation=ClientInformation(softwareName=apache-kafka-java, softwareVersion=unknown), fromPrivilegedListener=false, principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@201038c3]) (kafka.server.ControllerApis:76)
java.util.concurrent.CompletionException: java.util.NoSuchElementException: key not found: BROKER_NOT_AVAILABLE
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:936)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
at org.apache.kafka.raft.KafkaRaftClient.lambda$handleRequest$19(KafkaRaftClient.java:1666)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
at kafka.raft.TimingWheelExpirationService$TimerTaskCompletableFuture.run(TimingWheelExpirationService.scala:32)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.util.NoSuchElementException: key not found: BROKER_NOT_AVAILABLE
```
There are two causes for this error that I found. First, we were not shutting down the timer services in `RaftManager` which are used in the purgatory implementation. This meant that operations remaining in purgatory could be completed even after `RaftManager` was shutdown. Second, the shutdown order in `KafkaClusterTestKit` was problematic. The `RaftManager` instance depends on the `SocketServer` in `ControllerServer`, but it was the latter that was shutdown first. Instead, we should shutdown `RaftManager` first as we do in `KafkaRaftServer`.
Reviewers: Ismael Juma <ismael@juma.me.uk>
While reviewing KIP-588 and KIP-691 I went through the exception throwing behavior and wanted to improve the related javadocs a little bit.
Reviewers: John Roesler <vvcephei@apache.org>
Fix a bug in ReplicationControlManager where we got a NullPointerException when removing a topic
with no offline replicas, and there were other topics that did have offline replicas.
Fix an issue in MetadataDelta#replay where we were replaying RemoveTopicRecord twice.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, dengziming <dengziming1993@gmail.com>
This patch fixes another incorrect version check in the FK code and adds unit tests that would have caught this bug.
Reviewers: John Roesler <vvcephei@apache.org>
In Scala its standard practice to use _ whenever you are initializing variables. In regards to implementation, for object references _ initialization maps to null so there is no change in behaviour.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Divij Vaidya <diviv@amazon.com>
There are a few cases in `ControllerApis` where we may see an `ApiException` wrapped as a `CompletionException`. This can happen in `QuorumController.allocateProducerIds` where the returned future is the result of calling `thenApply` on the future passed to the controller. The danger when this happens is that the `CompletionException` gets passed to `Errors.forException`, which translates it to an `UNKNOWN_SERVER_ERROR`. At a minimum, I found that the `AllocateProducerIds` and `UpdateFeatures` APIs were affected by this bug, but it is difficult to root out all cases.
Interestingly, `DeleteTopics` is not affected by this bug as I originally suspected. This is because we have logic in `ApiError.fromThrowable` to check for both `CompletionException` and `ExecutionException` and to pull out the underlying cause. This patch duplicates this logic from `ApiError.fromThrowable` into `Errors.forException` to be sure that we handle all cases where exceptions are converted to error codes.
Reviewers: David Arthur <mumrah@gmail.com>
Currently the server will return `INVALID_REQUEST` if a `DescribeQuorum` request is sent to a node that is not the current leader. In addition to being inconsistent with all of the other leader APIs in the raft layer, this error is treated as fatal by both the forwarding manager and the admin client. Instead, we should return `NOT_LEADER_OR_FOLLOWER` as we do with the other APIs. This error is retriable and we can rely on the admin client to retry it after seeing this error.
Reviewers: David Jacot <djacot@confluent.io>
Compilation is failing after these two commits:
```
> Task :streams:compileJava
/Users/jgustafson/Projects/kafka/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:852: error: cannot find symbol
tasks.addPendingTaskToClose(restoringTask.id());
^
symbol: method addPendingTaskToClose(org.apache.kafka.streams.processor.TaskId)
location: variable tasks of type org.apache.kafka.streams.processor.internals.Tasks
1 error
```
Also here:
```
[2022-08-17T20:58:20.912Z] > Task :streams:compileTestJava
[2022-08-17T20:58:20.912Z] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-12530/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:822: error: method setupForRevocation(Set<Task>,Set<Task>) is already defined in class TaskManagerTest
[2022-08-17T20:58:20.912Z] private TaskManager setupForRevocation(final Set<Task> tasksInStateUpdater,
```
This patch reverts them.
Reviewers: Ismael Juma <ismael@juma.me.uk>
Removes tasks from the state updater when the input partitions of the tasks are revoked during a rebalance.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
1. Within the tryCompleteRestore function of the thread, try to drain the removed tasks from state updater and handle accordingly: 1) for recycle, 2) for closure, 3) for update input partitions.
2. Catch up on some unit test coverage from previous PRs.
3. Some minor cleanups around exception handling.
Reviewers: Bruno Cadonna <cadonna@apache.org>
After 520f72995d, the subsequent checks are ensuring that
the leader and partition epochs are not less than. So,
make that explicit.
Reviewers: Jason Gustafson <jason@confluent.io>
Similar to https://github.com/apache/kafka/pull/12506. For the Kraft controller, we should return NOT_CONTROLLER if the leader/partition epoch in the request is ahead of the controller.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>
We recently added a builder to create task mocks for unit
tests. This PR adds the functionality to add input partitions
to task mocks when the builder is used.
Reviewers: Walker Carlson <wcarlson@confluent.io>, A. Sophie Blee-Goldman <ableegoldman@apache.org>
Ensures that SSL buffered data is processed by server immediately on the next poll when channel is unmuted after processing previous request. Poll timeout is reset to zero for this case to avoid 300ms delay in poll() if no new data arrives on the sockets.
Reviewers: David Mao <dmao@confluent.io>, Ismael Juma <ismael@juma.me.uk>, Rajini Sivaram <rajinisivaram@googlemail.com>