## Summary
jira: https://issues.apache.org/jira/browse/KAFKA-19517
Ensure `LoadSummary#numRecords` counts all records, including control
batches, to maintain consistency with numBytes.
## Test
`testLoading` now verifies `numRecords`.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, TengYao Chi
<frankvicky@apache.org>
This PR removes the dependencies on `core` and `scala-library` from the
`coordinator-common` module, as a follow-up to
https://github.com/apache/kafka/pull/20089.
These dependencies have been removed from tests, and the previously
added import-control relaxations have been reverted accordingly.
Reviewers: TengYao Chi <frankvicky@apache.org>, Ken Huang
<s7133700@gmail.com>
The MetadataImage has a lot of stuff in it and it gets passed around in
many places in the new GroupCoordinator. This makes it difficult to
understand what metadata the group coordinator actually relies on and
makes it too easy to use metadata in ways it wasn't meant to be used.
This change encapsulate the MetadataImage in an interface
(`CoordinatorMetadataImage`) that indicates and controls what metadata
the group coordinator actually uses. Now it is much easier at a glance
to see what dependencies the GroupCoordinator has on the metadata. Also,
now we have a level of indirection that allows more flexibility in how
the GroupCoordinator is provided the metadata it needs.
### Summary of Changes
- Rewrote both `CoordinatorLoaderImpl` and `CoordinatorLoaderImplTest`
in Java, replacing their original Scala implementations.
- Removed the direct dependency on `ReplicaManager` and replaced it with
functional interfaces for `partitionLogSupplier` and
`partitionLogEndOffsetSupplier`
- Preserved original logic and test coverage during migration.
Reviewers: TaiJuWu <tjwu1217@gmail.com>, Ken Huang <s7133700@gmail.com>,
TengYao Chi <frankvicky@apache.org>, Chia-Ping Tsai
<chia7712@gmail.com>
* Coordinator starts with a smaller buffer, which can grow as needed.
* In freeCurrentBatch, release the appropriate buffer:
* The Coordinator recycles the expanded buffer
(`currentBatch.builder.buffer()`), not `currentBatch.buffer`, because
`MemoryBuilder` may allocate a new `ByteBuffer` if the existing one
isn't large enough.
* There are two cases that buffer may exceeds `maxMessageSize` 1.
If there's a single record whose size exceeds `maxMessageSize` (which,
so far, is derived from `max.message.bytes`) and the write is in
`non-atomic` mode, it's still possible for the buffer to grow beyond
`maxMessageSize`. In this case, the Coordinator should revert to using a
smaller buffer afterward. 2. Coordinator do not recycles the buffer
that larger than `maxMessageSize`. If the user dynamically reduces
`maxMessageSize` to a value even smaller than `INITIAL_BUFFER_SIZE`, the
Coordinator should avoid recycling any buffer larger than
`maxMessageSize` so that Coordinator can allocate the smaller buffer in
the next round.
* Add tests to verify the above scenarios.
Reviewers: David Jacot <djacot@confluent.io>, Sean Quah
<squah@confluent.io>, Ken Huang <s7133700@gmail.com>, PoAn Yang
<payang@apache.org>, TaiJuWu <tjwu1217@gmail.com>, Jhen-Yung Hsu
<jhenyunghsu@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
When sensors are shared between different metric groups, data from all
groups is combined and added to all metrics under each sensor. This
means that different metric groups will report the same values for their
metrics.
Prefix sensor names with metric group names to isolate metric groups.
Reviewers: Yung <yungyung7654321@gmail.com>, Sushant Mahajan
<smahajan@confluent.io>, Dongnuo Lyu <dlyu@confluent.io>, TengYao Chi
<frankvicky@apache.org>
Simplify Set initialization and reduce the overhead of creating extra
collections.
The changes mostly include:
- new HashSet<>(List.of(...))
- new HashSet<>(Arrays.asList(...)) / new HashSet<>(asList(...))
- new HashSet<>(Collections.singletonList()) / new
HashSet<>(singletonList())
- new HashSet<>(Collections.emptyList())
- new HashSet<>(Set.of())
This change takes the following into account, and we will not change to
Set.of in these scenarios:
- Require `mutability` (UnsupportedOperationException).
- Allow `duplicate` elements (IllegalArgumentException).
- Allow `null` elements (NullPointerException).
- Depend on `Ordering`. `Set.of` does not guarantee order, so it could
make tests flaky or break public interfaces.
Reviewers: Ken Huang <s7133700@gmail.com>, PoAn Yang
<payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
### About
11 of the test cases in `SharePartitionTest` have failed at least once
in the past 28 days.
https://develocity.apache.org/scans/tests?search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=Europe%2FLondon&tests.container=kafka.server.share.SharePartitionTest
Observing the flakiness, they seem to be caused due to the usage of
`SystemTimer` for various acquisition lock timeout related tests. I have
replaced the usage of `SystemTimer` with `MockTimer` and also improved
the `MockTimer` API with regard to removing the timer task entries that
have already been cancelled.
Also, this has reduced the time taken to run `SharePartitionTest` from
~6 sec to ~1.5 sec
### Testing
The testing has been done with the help of already present unit tests in
Apache Kafka.
Reviewers: Andrew Schofield <aschofield@confluent.io>
This patch updates the `GroupCoordinator` interface to use
`AuthorizableRequestContext` instead of using `RequestContext`. It makes
the interface more generic. The only downside is that the request
version in `AuthorizableRequestContext` is an `int` instead of a `short`
so we had to adapt it in a few places. We opted for using `int` directly
wherever possible.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>
Previously in KAFKA-18484, we added exception handling for exceptions
coming from batch events. Also handle exceptions from 0-record write
events and transaction completion events.
Reviewers: David Jacot <djacot@confluent.io>
Make DeferredEventCollection and CoordinatorBatch static classes.
DeferredEventCollection only needs to access the logger and
CoordinatorBatch is only non-static because it holds
DeferredEventCollections.
Reviewers: David Jacot <djacot@confluent.io>
Some of these classes are generally useful for testing.
MockCoordinatorShard is already shared by SnapshottableCoordinatorTest.
Also do some minor refactors.
Reviewers: TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io>
Given that now we support Java 17 on our brokers, this PR replace the
use of :
Collections.singletonList() and Collections.emptyList() with List.of()
Collections.singletonMap() and Collections.emptyMap() with Map.of()
Collections.singleton() and Collections.emptySet() with Set.of()
Affected modules: server and coordinator-common
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
When group.coordinator.threads is greater than 1, we lose track of thread idle time because of integer arithmetic. Use doubles instead.
Reviewers: David Jacot <djacot@confluent.io>
CoordinatorRecordSerde does not validate the version of the value to check whether the version is supported by the current version of the software. This is problematic if a future and unsupported version of the record is read by an older version of the software because it would misinterpret the bytes. Hence CoordinatorRecordSerde must throw an error if the version is unknown. This is also consistent with the handling in the old coordinator.
Reviewers: Jeff Kim <jeff.kim@confluent.io>
HdrHistogram can throw an exception if the recorded value is greater than a configured limit. Expand the ceiling from per-metric to all invocations.
Reviewers: David Jacot <djacot@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
Ensure that unloading a coordinator always succeeds. Previously, we have
guarded against exceptions from DeferredEvent completions. All that
remains is handling exceptions from the onUnloaded() method of the
coordinator state machine.
Reviewers: David Jacot <djacot@confluent.io>
This patch does a few things:
1) Replace ApiMessageAndVersion by ApiMessage in CoordinatorRecord for the key
2) Leverage the fact that ApiMessage exposes the apiKey. Hence we don't need to specify the key anymore.
Reviewers: Andrew Schofield <aschofield@confluent.io>
This patch updates the transaction coordinator record to use the new coordinator record definition.
Reviewers: Andrew Schofield <aschofield@confluent.io>
This patch updates the GroupCoordinatorSerde and the ShareGroupCoordinatorSerde to leverage the CoordinatorRecordType to deserialize records. With this, newly added record are automatically picked up. In other words, the serdes work with all defined records without doing anything.
Reviewers: Andrew Schofield <aschofield@confluent.io>
* Avoid attaching empty writes to empty batches.
* Handle flushes of empty batches, which would return a 0 offset otherwise.
Reviewers: David Jacot <djacot@confluent.io>
In this PR, we've added a class ShareCoordinatorOffsetsManager, which tracks the last redundant offset for each share group state topic partition. We have also added a periodic timer job in ShareCoordinatorService which queries for the redundant offset at regular intervals and if a valid value is found, issues the deleteRecords call to the ReplicaManager via the PartitionWriter. In this way the size of the partitions is kept manageable.
Reviewers: Jun Rao <junrao@gmail.com>, David Jacot <djacot@confluent.io>, Andrew Schofield <aschofield@confluent.io>
Removes the client side AddPartitionsToTxn/AddOffsetsToTxn calls so that the partition is implicitly added as part of KIP-890 part 2.
This change also requires updating the valid state transitions. The client side can not know for certain if a partition has been added server side when the request times out (partial completion). Thus for TV2, the transition to PrepareAbort is now valid for Empty, CompleteCommit, and CompleteAbort.
For readability, the V1 and V2 endTransaction methods have been separated.
Reviewers: Artem Livshits <alivshits@confluent.io>, Justine Olshan <jolshan@confluent.io>, Ritika Reddy <rreddy@confluent.io>
This patch introduces the asynchronous resolution of regular expressions. Let me unpack a few details about the implementations:
1) I have decided to finally update all the regular expressions within a consumer group together. My assumption is that the number of regular expressions in a group will be generally small but the number of topics in a cluster is large. Hence grouping has two benefits. Firstly, it allows to go through the list of topics once for all the regular expressions. Secondly, it reduces the number of potential rebalances because all the regular expressions are updated at the same time.
2) An update is triggered when the group is subscribed to at least one regular expressions.
3) An update is triggered when there is no ongoing update.
4) An update is triggered only of the previous one is older than 10s.
5) An update is triggered when the group has unresolved regular expressions.
6) An update is triggered when the metadata image has new topics.
Reviewers: Jeff Kim <jeff.kim@confluent.io>
This patch introduces the `CoordinatorExecutor` construct into the `CoordinatorRuntime`. It allows scheduling asynchronous tasks from within a `CoordinatorShard` while respecting the runtime semantic. It will be used to asynchronously resolve regular expressions.
The `GroupCoordinatorService` uses a default `ExecutorService` with a single thread to back it at the moment. It seems that it should be sufficient. In the future, we could consider making the number of threads configurable.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
This patch fixes a few things:
* Typos.
* Merge the tests for fetchOffsets and fetchAllOffsets together into parameterized tests since they share the same structure.
* Use Topic.GROUP_METADATA_TOPIC_NAME instead of __consumer_offsets in new group coordinator tests.
Reviewers: Ken Huang <s7133700@gmail.com>, David Jacot <djacot@confluent.io>
Introduces the share coordinator. This coordinator is built on the new coordinator runtime framework. It
is responsible for persistence of share-group state in a new internal topic named "__share_group_state".
The responsibility for being a share coordinator is distributed across the brokers in a cluster.
Reviewers: David Arthur <mumrah@gmail.com>, Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>
Introduce ShareCoordinator interface and related classes.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>, David Arthur <mumrah@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This patch introduces a wrapper around [HdrHistogram](https://github.com/HdrHistogram/HdrHistogram) to use for group coordinator histograms, event queue time, event processing time, flush time, and purgatory time.
Reviewers: David Jacot <djacot@confluent.io>
There is a lot of code in group-coordinator which is not share/consumer/classic group specific.
Since we are introducing a share-coordinator as part of KIP-932 (in a new module), it would make sense to get the common coordinator functionality into a separate common coordinator module so that share-coordinator need not depend on group-coordinator.
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, David Jacot <djacot@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Jeff Kim <jeff.kim@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>