The previous URL http://lambda-architecture.net/ seems to now be controlled by spammers
Co-authored-by: Shashank <hsshashank.grad@gmail.com>
Reviewers: Mickael Maison <mickael.maison@gmail.com>
The `state-change.log` file is being incorrectly rotated to
`stage-change.log.[date]`. This change fixes the typo to have the log
file correctly rotated to `state-change.log.[date]`
_No functional changes._
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Christo Lolov
<lolovc@amazon.com>, Luke Chen <showuon@gmail.com>, Ken Huang
<s7133700@gmail.com>, TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
All state updater threads use the same metrics instance, but do not use
unique names for their sensors. This can have the following symptoms:
1) Data inserted into one sensor by one thread can affect the metrics of
all state updater threads.
2) If one state updater thread is shutdown, the metrics associated to
all state updater threads are removed.
3) If one state updater thread is started, while another one is removed,
it can happen that a metric is registered with the `Metrics` instance,
but not associated to any `Sensor` (because it is concurrently removed),
which means that the metric will not be removed upon shutdown. If a
thread with the same name later tries to register the same metric, we
may run into a `java.lang.IllegalArgumentException: A metric named ...
already exists`, as described in the ticket.
This change fixes the bug giving unique names to the sensors. A test is
added that there is no interference of the removal of sensors and
metrics during shutdown.
Reviewers: Matthias J. Sax <matthias@confluent.io>
The changes update the OpenJDK base image from 17-buster to 17-bullseye:
- Updates tests/docker/Dockerfile to use openjdk:17-bullseye instead of
openjdk:17-buster
- Updates tests/docker/ducker-ak script to use the new default image
- Updates documentation in tests/README.md with the new image name
examples
Reviewers: Federico Valeri <fedevaleri@gmail.com>, TengYao Chi
<kitingiao@gmail.com>, Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
Update catch to handle compression errors
Before :

After
```
Sent message: KR Message 376
[kafka-producer-network-thread | kr-kafka-producer] INFO
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter -
KR: Failed to compress telemetry payload for compression: zstd, sending
uncompressed data
Sent message: KR Message 377
```
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Bill Bejeck
<bbejeck@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
* Coordinator starts with a smaller buffer, which can grow as needed.
* In freeCurrentBatch, release the appropriate buffer:
* The Coordinator recycles the expanded buffer
(`currentBatch.builder.buffer()`), not `currentBatch.buffer`, because
`MemoryBuilder` may allocate a new `ByteBuffer` if the existing one
isn't large enough.
* There are two cases that buffer may exceeds `maxMessageSize` 1.
If there's a single record whose size exceeds `maxMessageSize` (which,
so far, is derived from `max.message.bytes`) and the write is in
`non-atomic` mode, it's still possible for the buffer to grow beyond
`maxMessageSize`. In this case, the Coordinator should revert to using a
smaller buffer afterward. 2. Coordinator do not recycles the buffer
that larger than `maxMessageSize`. If the user dynamically reduces
`maxMessageSize` to a value even smaller than `INITIAL_BUFFER_SIZE`, the
Coordinator should avoid recycling any buffer larger than
`maxMessageSize` so that Coordinator can allocate the smaller buffer in
the next round.
* Add tests to verify the above scenarios.
Reviewers: David Jacot <djacot@confluent.io>, Sean Quah
<squah@confluent.io>, Ken Huang <s7133700@gmail.com>, PoAn Yang
<payang@apache.org>, TaiJuWu <tjwu1217@gmail.com>, Jhen-Yung Hsu
<jhenyunghsu@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
The `AdminClient` adds a telemetry reporter to the metrics reporters
list in the constructor. The problem is that the reporter was already
added in the `createInternal` method. In the `createInternal` method
call, the `clientTelemetryReporter` is added to a
`List<MetricReporters>` which is passed to the `Metrics` object, will
get closed when `Metrics.close()` is called. But adding a reporter to
the reporters list in the constructor is not used by the `Metrics`
object and hence doesn't get closed, causing a memory leak.
All related tests pass after this change.
Reviewers: Apoorv Mittal <apoorvmittal10@apache.org>, Matthias J. Sax
<matthias@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>,
Jhen-Yung Hsu <jhenyunghsu@gmail.com>
This fixes librdkafka older than the recently released 2.11.0 with
Kerberos authentication and Apache Kafka 4.x.
Even though this is a bug in librdkafka, a key goal of KIP-896 is not to
break the popular client libraries listed in it. Adding back JoinGroup
v0 & v1 is a very small change and worth it from that perspective.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Log segment closure results in right sizing the segment on disk along
with the associated index files.
This is specially important for TimeIndexes where a failure to right
size may eventually cause log roll failures leading to under replication
and log cleaner failures.
This change uses `Utils.closeAll` which propagates exceptions, resulting
in an "unclean" shutdown. That would then cause the broker to attempt to
recover the log segment and the index on next startup, thereby avoiding
the failures described above.
Reviewers: Omnia Ibrahim <o.g.h.ibrahim@gmail.com>, Jun Rao
<junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Backports the flakyness fix in #18451 to 4.0 branch
> Sometimes we didn't get into abortable state before aborting, so the
epoch didn't get bumped. Now we force abortable state with an attempt to
send before aborting so the epoch bump occurs as expected.
>
> Reviewers: Jeff Kim <jeff.kim@confluent.io>
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Co-authored-by: Justine Olshan <jolshan@confluent.io>
ConsumerGroupDescribe with an empty group id returns a response containing `null` groupId in a non-nullable field. Since the response cannot be serialized, this results in UNKNOWN_SERVER_ERROR being returned to the client. This PR sets the group id in the response to an empty string instead and adds request tests for empty group id.
Reviewers: David Jacot <djacot@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
https://issues.apache.org/jira/browse/KAFKA-19383 When applying the
ClearElrRecord, it may pick up the topicId in the image without checking
if the topic has been deleted. This can cause the creation of a new
TopicRecord with an old topic ID.
Reviewers: Alyssa Huang <ahuang@confluent.io>, Artem Livshits <alivshits@confluent.io>, Colin P. McCabe <cmccabe@apache.org>
No conflicts.
Previously, we could wait for up to half of the broker session timeout
for an RPC to complete, and then delay by up to half of the broker
session timeout. When taken together, these two delays could lead to
brokers erroneously missing heartbeats.
This change removes exponential backoff for heartbeats sent from the
broker to the controller. The load caused by heartbeats is not heavy,
and controllers can easily time out heartbeats when the queue length is
too long. Additionally, we now set the maximum RPC time to the length of
the broker period. This minimizes the impact of heavy load.
Reviewers: José Armando García Sancio <jsancio@apache.org>, David Arthur <mumrah@gmail.com>
If there are more deletion filters after we initially hit the
`MAX_RECORDS_PER_USER_OP` bound, we will add an additional deletion
record ontop of that for each additional filter.
The current error message returned to the client is not useful either,
adding logic so client doesn't just get `UNKNOWN_SERVER_EXCEPTION` with
no details returned.
## Summary
- Fix potential race condition in
LogSegment#readMaxTimestampAndOffsetSoFar(), which may result in
non-monotonic offsets and causes replication to stop.
- See https://issues.apache.org/jira/browse/KAFKA-19407 for the details
how it happen.
Reviewers: Vincent PÉRICART <mauhiz@gmail.com>, Jun Rao
<junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This is a follow up to
[https://github.com/apache/kafka/pull/19910](https://github.com/apache/kafka/pull/url)
The coordinator failed to write an epoch fence transition for producer
jt142 to the transaction log with error COORDINATOR_NOT_AVAILABLE. The
epoch was increased to 2 but not returned to the client
(kafka.coordinator.transaction.TransactionCoordinator) -- as we don't
bump the epoch with this change, we should also update the message to
not say "increased" and remove the
epochAndMetadata.transactionMetadata.hasFailedEpochFence = true line
In the test, the expected behavior is:
First append transaction to the log fails with
COORDINATOR_NOT_AVAILABLE (epoch 1)
We try init_pid again, this time the SINGLE epoch bump succeeds, and
the following things happen simultaneously (epoch 2)
-> Transition to COMPLETE_ABORT
-> Return CONCURRENT_TRANSACTION error to the client
The client retries, and there is another epoch bump; state
transitions to EMPTY (epoch 3)
Reviewers: Justine Olshan <jolshan@confluent.io>
[0b2e410d61](url)
Bug fix in 4.0
**Conflicts:**
- The Transaction Coordinator had some conflicts, mainly with the
transaction states. Ex: ongoing in 4.0 is TransactionState.ONGOING in
4.1.
- The TransactionCoordinatorTest file had conflicts w.r.t the 2PC
changes from KIP-939 in 4.1 and the above mentioned state changes
Reviewers: Justine Olshan <jolshan@confluent.io>, Artem Livshits
<alivshits@confluent.io>
Minor fix to correct the validate condition for GetTelemetryRequests.
Added respective tests as well.
Reviewers: Andrew Schofield <aschofield@confluent.io>
## Summary
- MetadataShell may deletes lock file unintentionally when it exists or
fails to acquire lock. If there's running server, this causes unexpected
result as below:
* MetadataShell succeeds on 2nd run unexpectedly
* Even worse, LogManager/RaftManager's lock also no longer work from
concurrent Kafka process startup
Reviewers: TengYao Chi <frankvicky@apache.org>
# Conflicts:
# shell/src/test/java/org/apache/kafka/shell/MetadataShellIntegrationTest.java
At the retry limit binaryExponentialElectionBackoffMs it becomes
statistically likely that the exponential backoff returned
electionBackoffMaxMs. This is an issue as multiple replicas can get
stuck starting elections at the same cadence.
This change fixes that by added a random jitter to the max election
backoff.
Reviewers: José Armando García Sancio <jsancio@apache.org>, TaiJuWu
<tjwu1217@gmail.com>, Yung <yungyung7654321@gmail.com>
### Issue:
API Responses missing latest version in [Kafka protocol
guide](https://kafka.apache.org/protocol.html)
#### For example:
These are missing:
- ApiVersions Response (Version: 4) — Only versions 0–3 are documented,
though version 4 of the request is included.
- DescribeTopicPartitions Response — Not listed at all.
- Fetch Response (Version: 17) — Only versions 4–16 are documented,
though version 17 of the request is included.
#### After the fix:
docs/generated/protocol_messages.html
<img width="1045" alt="image"
src="https://github.com/user-attachments/assets/5ea79ced-aab5-4c47-8e09-9956047c9bf1"
/>
Reviewers: dengziming <dengziming1993@gmail.com>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This PR fixes a regression bug introduced with KAFKA-17203. We need to
pass in mutable collections into `closeTaskClean(...)`.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Bruno Cadonna <bruno@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
Group Coordinator Shards are not unloaded when `__consumer_offsets`
topic is deleted. The unloading is scheduled but it is ignored because
the epoch is equal to the current epoch:
```
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1]
Scheduling unloading of metadata for __consumer_offsets-0 with epoch
OptionalInt[0]
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Scheduling
unloading of metadata for __consumer_offsets-1 with epoch OptionalInt[0]
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading
metadata for __consumer_offsets-0 in epoch OptionalInt[0] since current
epoch is 0.
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading
metadata for __consumer_offsets-1 in epoch OptionalInt[0] since current
epoch is 0.
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
```
This patch fixes the issue by not setting the leader epoch in this case.
The coordinator expects the leader epoch to be incremented when the
resignation code is called. When the topic is deleted, the epoch is not
incremented. Therefore, we must not use it. Note that this is aligned
with deleted partitions are handled too.
Reviewers: Dongnuo Lyu <dlyu@confluent.io>, José Armando García Sancio <jsancio@apache.org>
Fix the issue where JMC is unable to correctly display client-state and
thread-state metrics. The root cause is that these two metrics directly
return the `State` class to JMX. If the user has not set up the RMI
server, JMC or other monitoring tools will be unable to interpret the
`State` class. To resolve this, we should return a string representation
of the state instead of the State class in these two metrics.
Reviewers: Luke Chen <showuon@gmail.com>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
When the number of partitions is not divisible by the number of members,
some members will end up with one more partition than others.
Previously, we required these to be the members at the start of the
iteration order, which meant that partitions could be reassigned even
when the previous assignment was already balanced.
Allow any member to have the extra partition, so that we do not move
partitions around when the previous assignment is already balanced.
Before the PR
```
Benchmark (assignmentType) (assignorType) (isRackAware) (memberCount) (partitionsToMemberRatio) (subscriptionType) (topicCount) Mode Cnt Score Error Units
ServerSideAssignorBenchmark.doAssignment FULL RANGE false 10000 50 HOMOGENEOUS 1000 avgt 2 26.175 ms/op
ServerSideAssignorBenchmark.doAssignment FULL RANGE false 10000 50 HETEROGENEOUS 1000 avgt 2 123.955 ms/op
ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 10000 50 HOMOGENEOUS 1000 avgt 2 24.408 ms/op
ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 10000 50 HETEROGENEOUS 1000 avgt 2 114.873 ms/op
```
After the PR
```
Benchmark (assignmentType) (assignorType) (isRackAware) (memberCount) (partitionsToMemberRatio) (subscriptionType) (topicCount) Mode Cnt Score Error Units
ServerSideAssignorBenchmark.doAssignment FULL RANGE false 10000 50 HOMOGENEOUS 1000 avgt 2 24.259 ms/op
ServerSideAssignorBenchmark.doAssignment FULL RANGE false 10000 50 HETEROGENEOUS 1000 avgt 2 118.513 ms/op
ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 10000 50 HOMOGENEOUS 1000 avgt 2 24.636 ms/op
ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 10000 50 HETEROGENEOUS 1000 avgt 2 115.503 ms/op
```
Reviewers: David Jacot <djacot@confluent.io>
When a group has pending transactional offsets but no committed offsets,
we can accidentally delete it while cleaning up expired offsets. Add a
check to avoid this case.
Reviewers: David Jacot <djacot@confluent.io>
### Motivation
While investigating “events skipped in group
rebalancing” ([spring‑projects/spring‑kafka#3703](https://github.com/spring-projects/spring-kafka/issues/3703))
I discovered a race
condition between
- the main poll/commit thread, and
- the consumer‑coordinator heartbeat thread.
If the main thread enters
`ConsumerCoordinator.sendOffsetCommitRequest()` while the heartbeat
thread is finishing a rebalance (`SyncGroupResponseHandler.handle()`),
the group state transitions in the following order:
```
COMPLETING_REBALANCE → (race window) → STABLE
```
Because we read the state twice without a lock:
1. `generationIfStable()` returns `null` (state still
`COMPLETING_REBALANCE`),
2. the heartbeat thread flips the state to `STABLE`,
3. the main thread re‑checks with `rebalanceInProgress()` and wrongly
decides that a rebalance is still active,
4. a spurious `CommitFailedException` is returned even though the commit
could succeed.
For more details, please refer to sequence diagram below. <img
width="1494" alt="image"
src="https://github.com/user-attachments/assets/90f19af5-5e2d-4566-aece-ef764df2d89c"
/>
### Impact
- The exception is semantically wrong: the consumer is in a stable
group, but reports failure.
- Frameworks and applications that rely on the semantics of
`CommitFailedException` and `RetryableCommitException` (for example
`Spring Kafka`) take the wrong code path, which can ultimately skip the
events and break “at‑most‑once” guarantees.
### Fix
We enlarge the synchronized block in
`ConsumerCoordinator.sendOffsetCommitRequest()` so that the consumer
group state is examined atomically with respect to the heartbeat thread:
### Jira
https://issues.apache.org/jira/browse/KAFKA-19242
https: //github.com/spring-projects/spring-kafka/issues/3703
Signed-off-by: chickenchickenlove <ojt90902@naver.com>
Reviewers: David Jacot <david.jacot@gmail.com>
When fetching stable offsets in the group coordinator, we iterate over
all requested partitions. For each partition, we iterate over the
group's ongoing transactions to check if there is a pending
transactional offset commit for that partition.
This can get slow when there are a large number of partitions and a
large number of pending transactions. Instead, maintain a list of
pending transactions per partition to speed up lookups.
Reviewers: Shaan, Dongnuo Lyu <dlyu@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, David Jaco <djacot@confluent.io>