The CurrentControllerId metric added by KIP-1001 is unreliable in ZK
mode. Sometimes when there is no active ZK-based controller, it still
shows the previous controller ID. Instead, it should show -1 in that
situation.
This PR fixes that by using the controller ID from the
KafkaController.scala, which is obtained directly from the controller
znode. It also adds a new test, ControllerIdMetricTest.scala.
Reviewers: David Arthur <mumrah@gmail.com>
This patch adds the `group.consumer.migration.policy` config which controls how consumer groups can be converted from classic group to consumer group and vice versa. The config is kept as an internal one while we develop the feature.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
The current AssignmentValidationTest only tests EAGER assignment protocol and does not support incremental assignment like CooperativeStickyAssignor and consumer protocol. Therefore in the ConsumerEventHandler, I subclassed the existing handler overridden the assigned and revoke event handling methods, to permit incremental changes to the current assignments.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Kirk True <ktrue@confluent.io>
If a future replica doesn't get promoted, any directory reassignment sent to the controller should be reversed.
The current logic is already addressing the case when a replica hasn't yet been promoted and the controller hasn't yet acknowledged the directory reassignment. However, it doesn't cover the case where the replica does not get promoted due to a directory failure after the controller has acknowledged the reassignment but before the future replica catches up again and is promoted to main replica.
Reviewers: Luke Chen <showuon@gmail.com>
beginningOrEndOffset and offsetforTimes should send a ListOffsetEvent and return empty results if the provided timeout is zero.
A minor change to the mock timer with zero timeout in AsyncConsumerTest.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
When implementing the group type conversion from a classic group to a consumer group, if the replay of conversion records fails, the group should be reverted back including its timeouts.
A possible solution is to keep all the classic group timeouts and add a type check to the timeout operations. If the group is successfully upgraded, it won't be able to pass the type check and its operations will be executed without actually doing anything; if the group upgrade fails, the group map will be reverted and the timeout operations will be executed as is.
We've already have group type check in consumer group timeout operations. This patch adds similar type check to those classic group timeout operations.
Reviewers: David Jacot <djacot@confluent.io>
Kafka Streams announced the removal of metric forward-rate in
KIP-444 and removed it completely in AK 3.0. However, we forgot
to remove some code for this metric.
This commit removes the code to create the metric forward-rate.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Matthias J. Sax <matthias@confluent.io>
Minor changes for improving the logging and docs related to the auto-commit inflight logic, also adding tests to ensure the expected behaviour:
- auto-commit on the interval does not send a request if another one inflight, and it sends the next as soon as a response is received (without waiting for the full interval again)
- auto-commit before revocation always send a request (even if another one from auto-commit on interval is in-flight), to ensure the latest is committed before revoking partitions.
No changes in logic, just adding tests, docs and minor refactoring.
Reviewer: Bruno Cadonna <cadonna@apache.org>
Fully implemented legacy subscription using Pattern for AsyncKafkaConsumer.
Enabled related tests for subscription using Pattern in PlaintextConsumerTest.
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Kirk True <ktrue@confluent.io>, David Jacot <djacot@confluent.io>, Bruno Cadonna <cadonna@apache.org>
We do iterate the records to find the offsetOfMaxTimestamp instead of returning the cached one when handling ListOffsetsRequest.MAX_TIMESTAMP, since it is hard to align all paths to get correct offsetOfMaxTimestamp. The known paths are shown below.
1. convertAndAssignOffsetsNonCompressed -> we CAN get correct offsetOfMaxTimestamp when validating all records
2. assignOffsetsNonCompressed -> ditto
3. validateMessagesAndAssignOffsetsCompressed -> ditto
4. validateMessagesAndAssignOffsetsCompressed#buildRecordsAndAssignOffsets -> ditto
5. appendAsFollow#append#analyzeAndValidateRecords -> we CAN'T get correct offsetOfMaxTimestamp as iterating all records is expensive when fetching records from leader
6. LogSegment#recover -> ditto
Reviewers: Jun Rao <junrao@gmail.com>
Sometimes we want to define server properties for all test cases, and using BeforeEach to modify the ClusterConfig is the only way. The side effect is that the IDE does not like the style since IDE can't recognize custom ParameterResolver of ClusterConfig.
The alternative is that we can take ClusterInstance from constructor first, and then we modify the inner ClusterConfig in BeforeEach phase. However, that may confuse users about the life cycle of "configs".
In short, I prefer to define the server property by ClusterTestDefaults. It already includes some server-side default property, and we can enhance that to deal with more existent test case.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Add new method shutdown(Duration) to accept timeout argument. We can leverage the new method to run non-graceful shutdown in testing.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Updates the broker metrics name to kebab/hyphen case. Also removed the redundant client-metrics- prefix in all metrics as the group name in client-metrics itself.
Reviewers: David Jacot <djacot@confluent.io>, Jun Rao <junrao@gmail.com>
Currently, when executing kafka-reassign-partitions.sh with the --execute option, if a partition number specified in the JSON file does not exist, this check occurs only when submitting the reassignments to alterPartitionReassignments on the server-side.
We can perform this check in advance before submitting the reassignments to the server side.
Reviewers: Luke Chen <showuon@gmail.com>
After profiling the kafka tests, tons of client-metrics-reaper thread not cleanup after BrokerServer shutdown.
The thread client-metrics-reaper comes from ClientMetricsManager#expirationTimer, and BrokerServer#shudown doesn't close ClientMetricsManager which let the thread still runs in background.
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
A subtle difference in the behavior of the two API causes the failures with Invalid negative timestamp.
In this PR, the list offsets response will be processed differently based on the API. For beginingOffsets/endOffsets - the offset response should be directly returned.
For offsetsForTimes - A OffsetAndTimestamp object is constructed for each requested TopicPartition before being returned.
The reason beginningOffsets and endOffsets - We are expecting a -1 timestamp from the response which subsequently causes the invalid timestamp exception because the original code tries to construct an OffsetAndTimestamp object upon returning.
In this PR, the following missing tasks are added:
short-circuit both beginningOrEndOffsets
Test both API (beginningOrEndOffsets, OffsetsForTime)
Seems like we don't have tests for this API: Note it is presented in other IntegrationTests but they are added to test Async consumer
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Lianet Magrans <lianetmr@gmail.com>
- Use `Empty` instead of 'none' when referring to `Optional` values.
- `Headers.lastHeader` returns `null` when no header is found.
- Fix minor spelling mistakes.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Enables log directory failure system test for all Kraft modes in addition to ZK mode.
Reviewers: Luke Chen <showuon@gmail.com>, Igor Soarez <soarez@apple.com>, Proven Provenzano <pprovenzano@confluent.io>
This pr fixes the bug created by #15263 which caused topic partition to be recreated whenever the original log dir is offline: Log directory failure re-creates partitions in another logdir automatically
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Igor Soarez <soarez@apple.com>, Gaurav Narula <gaurav_narula2@apple.com>, Proven Provenzano <pprovenzano@confluent.io>
Following test cases don't really run kraft case. The reason is that the test info doesn't contain parameter name, so it always returns false in TestInfoUtils#isKRaft.
1) TopicCommandIntegrationTest
2) DeleteConsumerGroupsTest
3) AuthorizerIntegrationTest
4) DeleteOffsetsConsumerGroupCommandIntegrationTest
We can fix it by adding options.compilerArgs << '-parameters' after
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
testUnrecoverableErrors was flaky as the wanted error either affected the next block request (prefecthing) or just missed that.
First I tried to wait for the background thread to be finished before setting the Errors.X. But then it consistently failed, because the generateProducerId call does prefetching too and after a successful producer id generation we set the error and expected that it will fail again with coordinator-load-in-progress exception but since the block was prefetched, it was able to serve us with a proper producer id.
calling generateProducerId --> no current block exists, so requesting block --> CoordinatorLoadInProgressException
asserting exception
calling generateProducerId again --> prefetching, requesting the next block --> giving back the producer id from the first block
asserting received producer id
setting error -- waiting for the background callback(s) to be finished first
calling generateProducerId, expecting CoordinatorLoadInProgressException, but --> works like 2), just the prefetching callback is failing due to the error we set before
Note: without the waiting for the background thread completions the error setting could happened before the 2) step's callback or after that, the test was written in a way that it expected to happen before the cb.
This was the point I realised that we need to have a queue to control the responses rather than trying to do it in the middle of the test method.
Errors can be passed in a queue at creation of the mock id manager instead modifying on-the-fly.
In the queue we're specifying Errors, how the background thread (which imitates the controllerChannel) should behave, return an error or a proper response and call the callback accordingly with that.
I was able to simplify the mock manager id class as well, no need for the maybeRequestNextBlock overriding if the errors are handled this way via a queue.
Reviewers: Igor Soarez <soarez@apple.com>, Daniel Urban <urb.daniel7@gmail.com>, Viktor Somogyi-Vass <viktorsomogyi@gmail.com>
Invokes `SSLEngine::closeInbound` after we flush close_notify alert tothe socket. This fixes memory leak in Netty/OpenSSL based SSLEngine which only free native resources once closeInbound has been invoked.
Reviewers: Omnia Ibrahim <o.g.h.ibrahim@gmail.com>, Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
If the broker registers with the same broker epoch as the previous session, it is recognized as a clean shutdown. Otherwise, it is an unclean shutdown. This replica will be removed from any ELR.
Reviewers: Artem Livshits <alivshits@confluent.io>, David Arthur <mumrah@gmail.com>
The issue KAFKA-16359 reported inclusion of kafka-clients runtime dependencies in MANIFEST.MF Class-Path.
The root cause is the issue defined here with the usage of shadow plugin.
Looking into the specifics of plugin and documentation, specifies that any dependency marked as shadow will be treated as following by the shadow plugin:
1. Adds the dependency as runtime dependency in resultant pom.xml - code here
2. Adds the dependency as Class-Path in MANIFEST.MF as well - code here
Resolution
We do need the runtime dependencies available in the pom (1 above) but not on manifest (2 above). Also there is no clear way to separate the behaviour as both above tasks relies on shadow configuration.
To fix, I have defined another custom configuration named shadowed which is later used to populate the correct pom (the code is similar to what shadow plugin does to populate pom for runtime dependencies).
Though this might seem like a workaround, but I think that's the only way to fix the issue. I have checked other SDKs which suffered with same issue and went with similar route to populate pom.
Reviewers: Luke Chen <showuon@gmail.com>, Reviewers: Mickael Maison <mickael.maison@gmail.com>, Gaurav Narula <gaurav_narula2@apple.com>
Added a new optional group_protocol parameter to the test methods, then passed that down to the setup_consumer method.
Unfortunately, because the new consumer can only be used with the new coordinator, this required a new @matrix block instead of adding the group_protocol=["classic", "consumer"] to the existing blocks 😢
Reviewers: Walker Carlson <wcarlson@apache.org>
Added a new optional group_protocol parameter to the test methods, then passed that down to the setup_consumer method.
Unfortunately, because the new consumer can only be used with the new coordinator, this required a new @matrix block instead of adding the group_protocol=["classic", "consumer"] to the existing blocks 😢
Reviewers: Walker Carlson <wcarlson@apache.org>
Migrated the following tests for the new consumer:
- test_fencing_static_consumer
- test_static_consumer_bounce
- test_static_consumer_persisted_after_rejoin
Reviewers: Walker Carlson <wcarlson@apache.org>
Added a new optional group_protocol parameter to the test methods, then passed that down to the methods involved.
Unfortunately, because the new consumer can only be used with the new coordinator, this required a new @matrix block instead of adding the group_protocol=["classic", "consumer"] to the existing blocks 😢
Reviewers: Walker Carlson <wcarlson@apache.org>
Currently, when we using kafka-reassign-partitions to move the log directory, the output only indicates which replica's movement has successfully started.
This PR propose to show more detailed information, helping end users understand that the operation is proceeding as expected.
Reviewers: Luke Chen <showuon@gmail.com>, Andrew Schofield <aschofield@confluent.io>
When moving replicas between directories in the same broker, future replica promotion hinges on acknowledgment from the controller of a change in the directory assignment.
ReplicaAlterLogDirsThread relies on AssignmentsManager for a completion notification of the directory assignment change.
In its current form, under certain assignment scheduling, AssignmentsManager both miss completion notifications, or prematurely trigger them.
Reviewers: Luke Chen <showuon@gmail.com>, Omnia Ibrahim <o.g.h.ibrahim@gmail.com>, Gaurav Narula <gaurav_narula2@apple.com>