This patch adds `syncGroup` to the new `GroupCoordinator` interface and updates `KafkaApis` to use it.
Reviewers: Justine Olshan <jolshan@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Jason Gustafson <jason@confluent.io>
* Update EndToEndAuthorizationTest to test both ZK and KRAFT quorum servers
* SCRAM and Delegation are not implemented for KRAFT yet so they emit
a message to stderr and pass the test.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
Clear the Yammer metrics after each test. This will allow the RaftManager and other objects to get properly GC'd
Reviewers: Jason Gustafson <jason@confluent.io>
Extract jointly owned parts of BrokerServer and ControllerServer into SharedServer. Shut down
SharedServer when the last component using it shuts down. But make sure to stop the raft manager
before closing the ControllerServer's sockets.
This PR also fixes a memory leak where ReplicaManager was not removing some topic metric callbacks
during shutdown. Finally, we now release memory from the BatchMemoryPool in KafkaRaftClient#close.
These changes should reduce memory consumption while running junit tests.
Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>
With KRaft the cluster metadata topic (__cluster_metadata) has a different implementation compared to regular topic. The user should not be allowed to create this topic. This can cause issues if the metadata log dir is the same as one of the log dirs.
This change returns an authorization error if the user tries to create the cluster metadata topic.
Reviewers: David Arthur <mumrah@gmail.com>
This patch adds `heartbeat` to the new `GroupCoordinator` interface and updates `KafkaApis` to use it.
Reviewers: Justine Olshan <jolshan@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Jason Gustafson <jason@confluent.io>
The IBM Semeru JDK use the OpenJDK security providers instead of the IBM security providers so test for the OpenJDK classes first where possible and test for Semeru in the java.runtime.name system property otherwise.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Bruno Cadonna <cadonna@apache.org>
Implementation for KIP-792, to add generationId field in ConsumerProtocolSubscription message. So when doing assignment, we'll take from subscription generationId fields it is provided in cooperative rebalance protocol. Otherwise, we'll fall back to original solution to use userData.
Reviewers: David Jacot <djacot@confluent.io>
The highlights are:
* Support for Java 19
* Support for incremental compilation following a compilation failure
* Flag for individual task rerun (eg "gradle test --rerun")
* Re-use Scala compiler between runs (will be enabled via #12280)
Release notes: https://docs.gradle.org/7.6/release-notes.html
Also adjusted the directory used by `retry_zinc` for the build output from
`build` to `logs` as `gradlew clean` was causing unintended deletion of
the file used by that tool to decide if a retry is required.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
This patch adds `joinGroup` to the new `GroupCoordinator` interface and updates `KafkaApis` to use it.
For the context, I will do the same for all the other interactions with the current group coordinator. In order to limit the changes, I have chosen to introduce the `GroupCoordinatorAdapter` that translates the new interface to the old one. It is basically a wrapper. This allows keeping the current group coordinator untouched for now and focus on the `KafkaApis` changes. Eventually, we can remove `GroupCoordinatorAdapter`.
Reviewers: Justine Olshan <jolshan@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Luke Chen <showuon@gmail.com>, Jason Gustafson <jason@confluent.io>
We recently had a bug causing the JoinGroup callback to thrown an exception (https://github.com/apache/kafka/pull/12909). When it happens, the exception is propagated to the caller and the JoinGroup callback is never completed. To make it worst, the member whose callback failed become a zombie because the group coordinator does not expire member with a pending callback.
This patch catch exceptions for both invocation of JoinGroup and SyncGroup callbacks and retry to complete them with a `UNKNOWN_SERVER_ERROR` error if they failed.
Reviewers: Jason Gustafson <jason@confluent.io>
When a consumer group on a version prior to 2.3 is upgraded to a newer version and static membership is enabled in the meantime, the consumer group remains stuck, iff the leader is still on the old version.
The issue is that setting `GroupInstanceId` in the response to the leader is only supported from JoinGroup version >= 5 and that `GroupInstanceId` is not ignorable nor handled anywhere else. Hence is there is at least one static member in the group, sending the JoinGroup response to the leader fails with a serialization error.
```
org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default groupInstanceId at version 2
```
When this happens, the member stays around until the group coordinator is bounced because a member with a non-null `awaitingJoinCallback` is never expired.
This patch fixes the issue by making `GroupInstanceId` ignorable. A unit test has been modified to cover this.
Reviewers: Jason Gustafson <jason@confluent.io>
Now that Kafka is generating a metadata snapshot every hour and
the default metadata retention is to delete snapshots after 7 days,
every cluster metadata partition will have 168 (1 snapshot per hour
* 24 hours per day * 7 days) snapshots. If we assume that in most
cases the size of the snapshot is determined by the number of
partitions in a cluster, a cluster with 100K partitions will have a
snapshot size of roughly 10MB (100 bytes per partition * 100k
partitions). For this kind of clusters the cluster metadata partition
will always consume around 1.7GB.
KIP-876 changed the default value for metadata.max.retention.bytes
to 100MB. This should limit the size of the cluster metadata
partition for large clusters but keep 7 days worth of snapshots for
small clusters.
Reviewers: Jason Gustafson <jason@confluent.io>
In this change, we enable backing off when the state directory
is still locked during initialization of a task. When the state
directory is locked, the task is reinserted into the
initialization queue. We will reattempt to acquire the lock
after the next round of polling.
Tested through a new unit test.
Reviewer: Bruno Cadonna <cadonna@apache.org>
The original implementation of the state updater could not
handle double rebalances within one poll phase correctly,
because it could create tasks more than once if they hadn't
finished initialization yet.
In a55071a, we
moved initialization to the state updater to fix this. However,
with more testing, I found out that this implementation has
it's problems as well: There are problems with locking the
state directory (state updater acquired the lock to the state
directory, so the main thread wouldn't be able to clear the
state directory when closing the task), and benchmarks also
show that this can lead to useless work (tasks are being
initialized, although they will be taken from the thread soon
after in a follow-up rebalance).
In this PR, I propose to revert the original change, and fix
the original problem in a much simpler way: When we
receive an assignment, we simply clear out the
list of tasks pending initialization. This way, no double
tasks instantiations can happen.
The change was tested in benchmarks, system tests,
and the existing unit & integration tests. We also add
the state updater to the smoke integration test, which
triggered the double task instantiations before.
Reviewer: Bruno Cadonna <cadonna@apache.org>
Recently, we got a lot of build failed (and terminated) with core:unitTest failure. The failed messages look like this:
FAILURE: Build failed with an exception.
[2022-09-14T09:51:52.190Z]
[2022-09-14T09:51:52.190Z] * What went wrong:
[2022-09-14T09:51:52.190Z] Execution failed for task ':core:unitTest'.
[2022-09-14T09:51:52.190Z] > Process 'Gradle Test Executor 128' finished with non-zero exit value 1
After investigation, I found one reason of it (maybe there are other reasons). In BrokerMetadataPublisherTest#testReloadUpdatedFilesWithoutConfigChange test, we created logManager twice, but when cleanup, we only close one of them. So, there will be a log cleaner keeping running. But during this time, the temp log dirs are deleted, so it will Exit.halt(1), and got the error we saw in gradle, like this code did when we encounter IOException in all our log dirs:
fatal(s"Shutdown broker because all log dirs in ${logDirs.mkString(", ")} have failed")
Exit.halt(1)
And, why does it sometimes pass, sometimes failed? Because during test cluster close, we shutdown broker first, and then other components. And the log cleaner is triggered in an interval. So, if the cluster can close fast enough, and finish this test, it'll be passed. Otherwise, it'll exit with 1.
Fixed it by mock log manager and other managers in mock publisher to avoid duplicate resource allocation. This change won't change the original test goal since we only want to make sure publisher will invoke reloadUpdatedFilesWithoutConfigChange when necessary.
Reviewers: dengziming <dengziming1993@gmail.com>
When consumers use static membership protocol, they can not update the rebalance timeout because the group coordinator simply ignore any new values. This patch fixes this.
Reviewers: David Jacot <djacot@confluent.io>
The default replica selector chooses a replica on whether the broker.rack matches the client.rack in the fetch request and whether the offset exists in the follower. If the follower is not in the ISR, we know it's lagging behind which will also lag the consumer behind. there are two cases:
1. the follower recovers and joins the isr. the consumer will no longer fall behind.
2. the follower continues to lag behind. after 5 minutes, the consumer will refresh its preferred read replica and the leader will return the same lagging follower since the offset the consumer fetched up to is capped by the follower's HWM. this can go on indefinitely.
If the replica selector chooses a broker in the ISR then we can ensure that at least every 5 minutes the consumer will consume from an up-to-date replica.
Reviewers: David Jacot <djacot@confluent.io>
Implement time based snapshot for the controller. The general strategy for this feature is that the controller will use the record-batch's append time to determine if a snapshot should be generated. If the oldest record that has been committed but is not included in the latest snapshot is older than `metadata.log.max.snapshot.interval.ms`, the controller will trigger a snapshot immediately. This is useful in case the controller was offline for more that `metadata.log.max.snapshot.interval.ms` milliseconds.
If the oldest record that has been committed but is not included in the latest snapshot is NOT older than `metadata.log.max.snapshot.interval.ms`, the controller will schedule a `maybeGenerateSnapshot` deferred task.
It is possible that when the controller wants to generate a new snapshot, either because of time or number of bytes, the controller is currently generating a snapshot. In this case the `SnapshotGeneratorManager` was changed so that it checks and potentially triggers another snapshot when the currently in-progress snapshot finishes.
To better support this feature the following additional changes were made:
1. The configuration `metadata.log.max.snapshot.interval.ms` was added to `KafkaConfig` with a default value of one hour.
2. `RaftClient` was extended to return the latest snapshot id. This snapshot id is used to determine if a given record is included in a snapshot.
3. Improve the `SnapshotReason` type to support the inclusion of values in the message.
Reviewers: Jason Gustafson <jason@confluent.io>, Niket Goel <niket-goel@users.noreply.github.com>
Batch 2 of the tests detailed in https://issues.apache.org/jira/browse/KAFKA-14133 which use EasyMock and need to be moved to Mockito.
Reviewers: Matthew de Detrich <matthew.dedetrich@aiven.io>, Dalibor Plavcic <dalibor.os@proton.me>, Bruno Cadonna <cadonna@apache.org
Fix for the subtle bug described in KAFKA-14382 that was causing rebalancing loops. If we trigger a new rebalance while the current one is still ongoing, it may cause some members to fail the first rebalance if they weren't able to send the SyncGroup request in time (for example due to processing records during the rebalance). This means those consumers never receive their assignment from the original rebalance, and won't revoke any partitions they might have needed to. This can send the group into a loop as each rebalance schedules a new followup cooperative rebalance due to partitions that need to be revoked, and each followup rebalance causes some consumer(s) to miss the SyncGroup and never revoke those partitions.
Reviewers: John Roesler <vvcephei@apache.org>
While restoring a batch of records, RocksDBStore was iterating the ConsumerRecords, building a list of KeyValues, and then iterating that list of KeyValues to add them to the RocksDB batch.
Simply adding the key and value directly to the RocksDB batch prevents this unnecessary second iteration, and the creation of itermediate KeyValue objects, improving the performance of state restoration, and reducing unnecessary object allocation.
This also simplifies the API of RocksDBAccessor, as prepareBatchForRestore is no longer needed.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Walker Carlson <wcarlson@confluent.io>
* Updated Jackson to version 2.13.4 for fixing CVE-2022-42004, CVE-2020-36518
* Updated Jackson data bind to version 2.13.4.2 for fixing CVE-2022-42004
Co-authored-by: Pratim SC <pratim.sunilkumar.chaudhuri@mercer.com>
Reviewers: Luke Chen <showuon@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>