Consider the following scenario:
1. A Fetch request contains partitions P1 and P2. The data of P1 is in LogCache, while the data of P2 is not.
2. First, a fast read will be attempted. At this time, P1 will return data and consume Network Out, and P2 will return a FastReadException.
3. Due to the FastReadException, the entire Fetch attempts a slow read. At this time, both P1 and P2 return data and consume Network Out.
4. At this point, the Network Out in step 2 is consumed repeatedly.
Solution: Move the S3Stream network out consumption to ElasticReplicaManager. Avoid the network out traffic over-consumed, when there are mixin(tail read & catch-up read) partitions reading.
fix: Added support for AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY (#2747)
Added support for AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY to the
list of supported environment variables. Issue #2746
Co-authored-by: Saumya Pandey <saumyapandeycse98@gmail.com>
- Add Time dependency to StreamReader and StreamReaders for time-related operations
- Update constructors to accept Time, defaulting to Time.SYSTEM
- Replace System.currentTimeMillis() with time.milliseconds() throughout
- Refactor StreamReadersTest to use MockTime for simulating time passage
- Remove reflection-based time manipulation in tests for cleaner and safer testing
---------
Signed-off-by: Gezi-lzq <lzqtxwd@gmail.com>
* fix(log): Prevent potential offset overflow in ElasticLogSegment
This commit addresses an issue where a log segment could accommodate more than Integer.MAX_VALUE records, leading to a potential integer overflow when calculating relative offsets.
The root cause was that the check `offset - baseOffset <= Integer.MAX_VALUE` allowed a relative offset to be exactly `Integer.MAX_VALUE`. Since offsets are 0-based, this allows for `Integer.MAX_VALUE + 1` records, which cannot be represented by a standard Integer.
This fix implements the following changes:
1. In `ElasticLogSegment`, the offset validation is changed from `<=` to `< Integer.MAX_VALUE` to ensure the relative offset strictly fits within an Integer's bounds.
2. In `LogCleaner`, a new segment grouping method `groupSegmentsBySizeV2` is introduced for `ElasticUnifiedLog`. This method uses the same stricter offset check to prevent incorrectly grouping segments that would exceed the offset limit.
3. The corresponding unit tests in `LogCleanerTest` have been updated to reflect these new boundaries and validate the fix.
Fixes: #2718
* fix(logCleaner): unify segment grouping logic
* fix(logCleaner): extract offset range check for segment grouping to prevent overflow in ElasticLogSegment
* style(logCleaner): fix indentation in segment grouping condition for readability
* style(logCleaner): fix line break in offset range check for readability
* chore: add AutoMQ inject
* style(logCleaner): remove unnecessary blank line after segment grouping
* fix(stream): validate record batch count to prevent negative values in append
fix: resolve Base64 decoding error in certificate parsing (#2615) (#2693)
- Fix IllegalArgumentException: Illegal base64 character 20 in S3StreamKafkaMetricsManager
- Replace single newline removal with comprehensive whitespace cleanup using replaceAll("\s", "")
- Add graceful error handling for both Base64 and certificate parsing failures
- Add comprehensive unit tests covering various whitespace scenarios and edge cases
- Improve logging with specific error messages for failed certificate parsing
Fixes#2615
(cherry picked from commit 75bdea05e5)
Co-authored-by: Vivek Chavan <111511821+vivekchavan14@users.noreply.github.com>
* feat(catalog): Avoid static global credentials provider
Refactors `CredentialProviderHolder` to prevent "Connection Pool Shutdown"
errors by creating a new provider instance for each catalog.
Previously, a single static `AwsCredentialsProvider` was shared globally.
If this provider was closed, it would affect all subsequent operations.
By creating a new provider on each `create()` call from Iceberg, this
change removes the global singleton and isolates provider instances.
Fixes#2680
* Update core/src/main/java/kafka/automq/table/CredentialProviderHolder.java
* fix(credentials): update DefaultCredentialsProvider instantiation to use builder pattern
---------
Signed-off-by: Gezi-lzq <lzqtxwd@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* feat(s3stream/block-wal): complete appends sequentially (#2665)
* feat(s3stream/block-wal): complete appends sequentially
* fix: use a lock to ensure there is at most one callback thread
---------
* feat(s3stream/wal): write a padding record when no space at the end of device (#2673)
* refactor(RecordHeader): remove useless methods
* feat(SlidingWindowService): write a padding block when not enough space
* feat(recovery): handle padding records
* fix: fix incorrect assertion
---------
* feat(s3stream/wal): defaults to using version V1 and forward compatible (#2676)
* feat: introduce the Block WAL V1
* feat: impl `RecoverIteratorV1` which only recovers continuous records
* feat: wal forward compatibility
* test: fix tests
* test: test recover from WAL V1
* test: test upgrade
---------
---------
Signed-off-by: Ning Yu <ningyu@automq.com>
fix(worker): update CommitResponse to use partition type from writerFactory (#2677)
* fix(worker): update CommitResponse to use partition type from writerFactory
* fix(worker): mock partitionSpec in TopicPartitionsWorkerTest for unpartitioned partitions
* fix(worker): reorganize imports in TopicPartitionsWorkerTest for clarity
This commit fixes an issue where the doList method in AwsObjectStorage.java
did not handle paginated results from the S3 listObjectsV2 API. The
method now recursively fetches all pages of objects, ensuring that it can
retrieve more than the default 1000 objects.
* fix(e2e): fix e2e test performance and log compaction
* fix(e2e): fix e2e test reassign and round_trip
* fix(e2e): fix e2e test GroupModeTransactionsTest
* fix(e2e): fix e2e test reassignTest
* fix(e2e): fix e2e test kafka start failed because not support file wal
* fix(e2e): fix e2e test kafka start failed because not support file wal
* fix(e2e): fix e2e test kafka start failed because not support file wal
* fix(e2e): fix e2e test kafka start failed because wait logic
* fix(e2e): fix e2e test kafka start failed because wait too short
* fix(e2e): format code
* fix(e2e): fix e2e test kafka start failed because not support file wal
* fix(e2e): format code
* fix(e2e): format code
* fix(e2e): format code
* chore(tools/perf): add an option "--send-throughput"
Signed-off-by: Ning Yu <ningyu@automq.com>
* feat(tools/perf): log cpu and memory usages (#2607)
* feat: introduce `CpuMonitor`
Signed-off-by: Ning Yu <ningyu@automq.com>
* feat: introduce `MemoryMonitor`
Signed-off-by: Ning Yu <ningyu@automq.com>
* feat(tools/perf): log cpu and memory usages
Signed-off-by: Ning Yu <ningyu@automq.com>
---------
Signed-off-by: Ning Yu <ningyu@automq.com>
* feat(tools/perf): log the min latency (#2608)
Signed-off-by: Ning Yu <ningyu@automq.com>
---------
Signed-off-by: Ning Yu <ningyu@automq.com>
perf(s3stream): recover and upload data in segments (#2593)
* feat: pause recovery once the cache is full
* feat(s3stream): recover and upload data in segments
* test: test segmented recovery
---------
Signed-off-by: Ning Yu <ningyu@automq.com>
* perf(tool/perf): reduce the record header size
Signed-off-by: Ning Yu <ningyu@automq.com>
* style: fix lint
Signed-off-by: Ning Yu <ningyu@automq.com>
---------
Signed-off-by: Ning Yu <ningyu@automq.com>
* feat(container): automq kafka container features and patch file for u… (#2561)
feat(container): automq kafka container features and patch file for upgrade
* fix(dockerfile): update Dockerfile and scripts for AutoMQ packaging im… (#2570)
fix(container): update Dockerfile and scripts for AutoMQ packaging improvements
* feat(docker): Docker release for bitnami chart (#2563)
* feat(docker): add GitHub Actions workflow for bitnami chart Docker image release
* feat(docker): add GitHub Actions workflow for bitnami chart Docker image release
* fix(docker): update image tag format for automq-bitnami in release workflow and temporarily remove the latest tag until the AutoMQ Docker Compose is refactored into Bitnami Docker Compose.
* feat(helm): add demo-values.yaml and update README for AutoMQ deployment (#2562)
* feat(helm): add demo-values.yaml and update README for AutoMQ deployment
* fix(demo/readme): fix demo-values.yaml and change readme description
* fix(README): update Helm chart references to use 'kafka'
* feat(values): update demo-values.yaml and README for AutoMQ deployment
* fix(demo): image tag
* fix(readme): bitnami helm chart version
* fix(readme): bitnami helm chart version
* fix(docker): update Dockerfile for AutoMQ while github action packagi… (#2574)
fix(docker): update Dockerfile for AutoMQ while github action packaging installations and permissions
* refactor(s3stream/object-wal): complete appends sequentially (#2426)
* chore: add todos
Signed-off-by: Ning Yu <ningyu@automq.com>
* refactor(s3stream/object-wal): sequentially succeed
Signed-off-by: Ning Yu <ningyu@automq.com>
* refactor(s3stream/object-wal): drop discontinuous objects during recovery
Signed-off-by: Ning Yu <ningyu@automq.com>
* test: introduce `MockObjectStorage`
Signed-off-by: Ning Yu <ningyu@automq.com>
* test: test sequentially succeed
Signed-off-by: Ning Yu <ningyu@automq.com>
* refactor: record endOffset in the object path
Signed-off-by: Ning Yu <ningyu@automq.com>
* feat: different version of wal object header
Signed-off-by: Ning Yu <ningyu@automq.com>
* refactor: adapt to the new object header format
Signed-off-by: Ning Yu <ningyu@automq.com>
* feat: recover from the trim offset
Signed-off-by: Ning Yu <ningyu@automq.com>
* test: test recover continuous objects from trim offset
Signed-off-by: Ning Yu <ningyu@automq.com>
* test: test marshal and unmarshal wal object header
Signed-off-by: Ning Yu <ningyu@automq.com>
* test: fix tests
Signed-off-by: Ning Yu <ningyu@automq.com>
* test: test recover from discontinuous objects
Signed-off-by: Ning Yu <ningyu@automq.com>
* test: test recover from v0 and v1 objects
Signed-off-by: Ning Yu <ningyu@automq.com>
* style: fix lint
Signed-off-by: Ning Yu <ningyu@automq.com>
---------
Signed-off-by: Ning Yu <ningyu@automq.com>
* test: fix tests
Signed-off-by: Ning Yu <ningyu@automq.com>
---------
Signed-off-by: Ning Yu <ningyu@automq.com>
fix(storage): ensure proper reference counting for ByteBuf in write o… (#2452)
* fix(storage): ensure proper reference counting for ByteBuf in write operations
* feat(storage): implement fast retry mechanism and improve resource management in write operations
* test(storage): add concurrency test for write operations and ensure buffer release
* test(storage): add test for write permit acquisition and blocking behavior
* style(test): format code for consistency in AbstractObjectStorageTest
* feat(storage): add constructor for MemoryObjectStorage with concurrency support
* fix(storage): ensure proper release of ByteBuf resources in write operations
* chore: polish code
* fix(storage): improve error handling and resource management in write operations
* fix(storage): ensure proper release of resources on timeout in AbstractObjectStorage
* test(storage): increase timeout duration for resource cleanup assertions
perf(s3stream/objectstorage): unify the throttle criteria (#2386)
* perf(s3stream/objectstorage): unify the throttle criteria
* refactor(s3stream/objectstorage): retry on 403 responses
* refactor(objectstorage): use `TimeoutException` instead of `ApiCallAttemptTimeoutException`
* refactor: supress the cause of `ObjectNotExistException`
* style: fix lint
---------
Signed-off-by: Ning Yu <ningyu@automq.com>
* chore(objectstorage): log next retry delay
Signed-off-by: Ning Yu <ningyu@automq.com>
* feat: a `TrafficLimiter` to limit the network traffic
Signed-off-by: Ning Yu <ningyu@automq.com>
* feat: a `TrafficMonitor` to monitor the network traffic
Signed-off-by: Ning Yu <ningyu@automq.com>
* feat: record success and failed write requests
Signed-off-by: Ning Yu <ningyu@automq.com>
* feat: queued pending write tasks
Signed-off-by: Ning Yu <ningyu@automq.com>
* feat: run write tasks one by one
Signed-off-by: Ning Yu <ningyu@automq.com>
* feat: use a `TrafficRegulator` to control the rate of write requests
Signed-off-by: Ning Yu <ningyu@automq.com>
* feat: limit the inflight force upload tasks
Signed-off-by: Ning Yu <ningyu@automq.com>
* fix: correct retry count
Signed-off-by: Ning Yu <ningyu@automq.com>
* chore: fix commit object logs
Signed-off-by: Ning Yu <ningyu@automq.com>
* chore: log force uploads
Signed-off-by: Ning Yu <ningyu@automq.com>
* fix: catch exceptions
Signed-off-by: Ning Yu <ningyu@automq.com>
* style: fix lint
Signed-off-by: Ning Yu <ningyu@automq.com>
* fix: fix re-trigger run write task
Signed-off-by: Ning Yu <ningyu@automq.com>
* perf: increate if no traffic
Signed-off-by: Ning Yu <ningyu@automq.com>
* refactor: remove useless try-catch
Signed-off-by: Ning Yu <ningyu@automq.com>
* perf: ensure only one inflight force upload tasks
Signed-off-by: Ning Yu <ningyu@automq.com>
* refactor: move inner classes outside
Signed-off-by: Ning Yu <ningyu@automq.com>
* perf: increase rate limit slower
Signed-off-by: Ning Yu <ningyu@automq.com>
* chore: add a prefix in `AbstractObjectStorage#logger`
Signed-off-by: Ning Yu <ningyu@automq.com>
* chore: reduce useless logs
Signed-off-by: Ning Yu <ningyu@automq.com>
* perf: reduce the sample count on warmup
Signed-off-by: Ning Yu <ningyu@automq.com>
* feat: introduce `TrafficVolumeLimiter` base on IBM `AsyncSemaphore`
Signed-off-by: Ning Yu <ningyu@automq.com>
* feat: limit the inflight write requests
Signed-off-by: Ning Yu <ningyu@automq.com>
* chore: reduce useless logs
Signed-off-by: Ning Yu <ningyu@automq.com>
* fix: fix release size
Signed-off-by: Ning Yu <ningyu@automq.com>
* fix: release permits once the request failed
Signed-off-by: Ning Yu <ningyu@automq.com>
* perf: increase to max after 2 hours
Signed-off-by: Ning Yu <ningyu@automq.com>
* fix: limit the request size
Signed-off-by: Ning Yu <ningyu@automq.com>
* perf: adjust constants
Signed-off-by: Ning Yu <ningyu@automq.com>
---------
Signed-off-by: Ning Yu <ningyu@automq.com>
feat(s3stream/wal): add constraints in recovery mode (#2301)
* feat(s3stream/wal): add constraints in recovery mode
* refactor: log it rather than throw an exception
---------
Signed-off-by: Ning Yu <ningyu@automq.com>
fix(network): adjust number of permits if the request is huge (#2294)
* refactor: use only one semaphore to limit the queues requests size
* fix(network): adjust number of permits if the request is huge
---------
Signed-off-by: Ning Yu <ningyu@automq.com>
* refactor(controller): consider brokers that has recently `CONTROLLED_SHUTDOWN` as `SHUTTING_DOWN`
Signed-off-by: Ning Yu <ningyu@automq.com>
* test: test `BrokerHeartbeatManager#brokerState`
Signed-off-by: Ning Yu <ningyu@automq.com>
* revert(NodeState): revert `SHUTDOWN` and `SHUTTING_DOWN` to `FENCED` and `CONTROLLED_SHUTDOWN`
Signed-off-by: Ning Yu <ningyu@automq.com>
---------
Signed-off-by: Ning Yu <ningyu@automq.com>
* feat(config): add table topic conversion type configurations
* feat(config): rename table topic type to schema type and update related configurations
* feat(config): add table topic schema registry URL configuration and validation
* test(config): add unit tests for ControllerConfigurationValidator table topic schema configuration
* fix(tests): update exception type in ControllerConfigurationValidatorTableTest for schema validation
* feat(config): polish code
* feat(backpressure): make back pressure manager configurable
Signed-off-by: Ning Yu <ningyu@automq.com>
* test: test diabled
Signed-off-by: Ning Yu <ningyu@automq.com>
* refactor: move backpressure from s3stream to kafka.core
Signed-off-by: Ning Yu <ningyu@automq.com>
* refactor: init `BackPressureManager` in `BrokerServer`
Signed-off-by: Ning Yu <ningyu@automq.com>
* refactor: introduce `BackPressureConfig`
Signed-off-by: Ning Yu <ningyu@automq.com>
* feat: make `BackPressureManager` reconfigurable
Signed-off-by: Ning Yu <ningyu@automq.com>
* test: test reconfigurable
Signed-off-by: Ning Yu <ningyu@automq.com>
* refactor: rename config key
Signed-off-by: Ning Yu <ningyu@automq.com>
* refactor: move metric "back_pressure_state" from s3stream to core
Signed-off-by: Ning Yu <ningyu@automq.com>
---------
Signed-off-by: Ning Yu <ningyu@automq.com>
* feat(backpressure): log it on recovery from backpressure
Signed-off-by: Ning Yu <ningyu@automq.com>
* feat: add metric fetch_limiter_waiting_task_num
Signed-off-by: Ning Yu <ningyu@automq.com>
* feat: add metric fetch_limiter_timeout_count
Signed-off-by: Ning Yu <ningyu@automq.com>
* feat: add metric fetch_limiter_time
Signed-off-by: Ning Yu <ningyu@automq.com>
* feat: add metric back_pressure_state
Signed-off-by: Ning Yu <ningyu@automq.com>
* feat: add metric broker_quota_limit
Signed-off-by: Ning Yu <ningyu@automq.com>
* fix(backpressure): run checkers with fixed delay
Signed-off-by: Ning Yu <ningyu@automq.com>
* style: fix lint
Signed-off-by: Ning Yu <ningyu@automq.com>
* perf: drop too large values
Signed-off-by: Ning Yu <ningyu@automq.com>
* refactor: record -1 for other states
Signed-off-by: Ning Yu <ningyu@automq.com>
* test: fix tests
Signed-off-by: Ning Yu <ningyu@automq.com>
---------
Signed-off-by: Ning Yu <ningyu@automq.com>
* feat(quota): exclude internal client IDs from broker quota
Signed-off-by: Ning Yu <ningyu@automq.com>
* feat(autobalancer): mark producers and consumers internal clients
Signed-off-by: Ning Yu <ningyu@automq.com>
---------
Signed-off-by: Ning Yu <ningyu@automq.com>
* fix: fix logs
Signed-off-by: Ning Yu <ningyu@automq.com>
* feat(quota): support to get current quota metric value
Signed-off-by: Ning Yu <ningyu@automq.com>
* refactor(backpressure): remove `Regulator#minimize`
Signed-off-by: Ning Yu <ningyu@automq.com>
* perf(quota): increase the max of broker quota throttle time
Signed-off-by: Ning Yu <ningyu@automq.com>
* perf(backpressure): decrease cooldown time
Signed-off-by: Ning Yu <ningyu@automq.com>
* perf(quota): increase the max of broker quota throttle time
Signed-off-by: Ning Yu <ningyu@automq.com>
* docs: update comments
Signed-off-by: Ning Yu <ningyu@automq.com>
---------
Signed-off-by: Ning Yu <ningyu@automq.com>
* refactor(quota): refactor `maybeRecordAndGetThrottleTimeMs`
Signed-off-by: Ning Yu <ningyu@automq.com>
* fix(quota): throttle the produce request whatever the acks is
Signed-off-by: Ning Yu <ningyu@automq.com>
* refactor(quota): separate `Request` in `ClientQuotaManager` and `RequestRate` in `BrokerQuotaManager`
Signed-off-by: Ning Yu <ningyu@automq.com>
* sytle: fix lint
Signed-off-by: Ning Yu <ningyu@automq.com>
* feat(quota): support to update broker request rate quota
Signed-off-by: Ning Yu <ningyu@automq.com>
* test(quota): test update quota
Signed-off-by: Ning Yu <ningyu@automq.com>
---------
Signed-off-by: Ning Yu <ningyu@automq.com>
fix(stream): release `FetchResult`s if the subsequent fetch fails (#2172)
* fix(stream): release `FetchResult`s if the subsequent fetch fails
* revert: "fix(stream): release `FetchResult`s if the subsequent fetch fails"
This reverts commit 5836a6afa0.
* refactor: add the `FetchResult` into the list in order rather than in reverse order
* fix: release `FetchResult`s if failed to fetch
---------
Signed-off-by: Ning Yu <ningyu@automq.com>
* feat(tools/perf): log progress on resetting offsets
Signed-off-by: Ning Yu <ningyu@automq.com>
* fix: reset timeouts
Signed-off-by: Ning Yu <ningyu@automq.com>
* feat: increase the log interval
Signed-off-by: Ning Yu <ningyu@automq.com>
* perf(tools/perf): assuming all partitions have the same offset at the same time
Signed-off-by: Ning Yu <ningyu@automq.com>
* feat: limit the min of --backlog-duration
Signed-off-by: Ning Yu <ningyu@automq.com>
---------
Signed-off-by: Ning Yu <ningyu@automq.com>
* docs: add todos
* perf(network): limit the inflight requests by size
* perf(ReplicaManager): limit the queue size of the `fetchExecutor`s
* perf(KafkaApis): limit the queue size of async request handlers
* refactor(network): make "queued.max.requests.size.bytes" configurable
* style: fix lint
* fix(network): limit the min queued request size per queue
---------
Signed-off-by: Ning Yu <ningyu@automq.com>
We fail the entire CreateTopicsRequest action if there are more than 10k total
partitions being created in this topic for this specific request. The usual pattern for
this API to try and succeed with some topics. Since the 10k limit applies to all topics
then no topic should be created if they all exceede it.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
KIP-853 adds support for dynamic KRaft quorums. This means that the quorum topology is
no longer statically determined by the controller.quorum.voters configuration. Instead, it
is contained in the storage directories of each controller and broker.
Users of dynamic quorums must format at least one controller storage directory with either
the --initial-controllers or --standalone flags. If they fail to do this, no quorum can be
established. This PR changes the storage tool to warn about the case where a KIP-853 flag has
not been supplied to format a KIP-853 controller. (Note that broker storage directories
can continue to be formatted without a KIP-853 flag.)
There are cases where we don't want to specify initial voters when formatting a controller. One
example is where we format a single controller with --standalone, and then dynamically add 4
more controllers with no initial topology. In this case, we want the 4 later controllers to grab
the quorum topology from the initial one. To support this case, this PR adds the
--no-initial-controllers flag.
Reviewers: José Armando García Sancio <jsancio@apache.org>, Federico Valeri <fvaleri@redhat.com>
According to KIP-950, remote.log.manager.thread.pool.size should be marked as deprecated and replaced by two new configurations: remote.log.manager.copier.thread.pool.size and remote.log.manager.expiration.thread.pool.size. Fix default handling so that -1 works as expected.
Reviewers: Luke Chen <showuon@gmail.com>, Gaurav Narula <gaurav_narula2@apple.com>, Satish Duggana <satishd@apache.org>, Colin P. McCabe <cmccabe@apache.org>
During ZK migration, always include control.plane.listener.name in advertisedBrokerListeners, to be
bug-compatible with earlier Apache Kafka versions that ignored this misconfiguration. (Just as
before, control.plane.listener.name is not supported in KRaft mode itself.)
Reviewers: Luke Chen <showuon@gmail.com>
Co-authored-by: Mickael Maison <mimaison@users.noreply.github.com>
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
KAFKA-16534 introduced a change to send UpdateVoterRequest every "3 * fetchTimeoutMs" if the voter's configure endpoints are different from the endpoints persisted in the KRaft log. It also introduced a regression where if the voter nodes do not need an update then updateVoterTimer wasn't reset. This resulted in a busy-loop in KafkaRaftClient#poll method resulting in high CPU usage.
This PR modifies the conditions in pollFollowerAsVoter to reset updateVoterTimer appropriately.
Reviewers: José Armando García Sancio <jsancio@apache.org>
Previously, Apache Kafka was uploading release candidate (RC) artifacts
to users' home directories on home.apache.org. However, since this
resource has been decommissioned, we need to follow the standard
approach of putting release candidate artifacts into the appropriate
subversion directory, at https://dist.apache.org/repos/dist/dev/kafka/.
Reviewers: Justine Olshan <jolshan@confluent.io>
When reverting the ZK migration, we must also remove the /migration ZNode in order to allow the migration to be re-attempted in the future.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
When a replica restarts in the follower state it is possible for the set of leader endpoints to not match the latest set of leader endpoints. Voters will discover the latest set of leader endpoints through the BEGIN_QUORUM_EPOCH request. This means that KRaft needs to allow for the replica to transition from Follower to Follower when only the set of leader endpoints has changed.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Alyssa Huang <ahuang@confluent.io>
Kafka Streams system tests were failing with this error:
Failed to parse host name from entry 3001@d for the configuration controller.quorum.voters. Each entry should be in the form `{id}@{host}:{port}`.
The cause is that in kafka.py line 876, we create a delimited string from a list comprehension, but the input is a string itself, so each character gets appended vs. the bootstrap server string of host:port. To fix this, this PR adds split(',') to controller_quorum_bootstrap_servers. Note that this only applies when dynamicRaftQuorum=False
Reviewers: Alyssa Huang <ahuang@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
Fixes a regression introduced by #16669 which inadvertently stopped processing SCRAM arguments from kafka-storage.sh
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Federico Valeri <fedevaleri@gmail.com>
This change fixes a few issues.
KAFKA-17608; KRaft controller crashes when active controller is removed
When a control batch is committed, the quorum controller currently increases the last stable offset but fails to create a snapshot for that offset. This causes an issue if the quorum controller renounces and needs to revert to that offset (which has no snapshot present). Since the control batches are no-ops for the quorum controller, it does not need to update its offsets for control records. We skip handle commit logic for control batches.
KAFKA-17604; Describe quorum output missing added voters endpoints
Describe quorum output will miss endpoints of voters which were added via AddRaftVoter. This is due to a bug in LeaderState's updateVoterAndObserverStates which will pull replica state from observer states map (which does not include endpoints). The fix is to populate endpoints from the lastVoterSet passed into the method.
Reviewers: José Armando García Sancio <jsancio@apache.org>, Colin P. McCabe <cmccabe@apache.org>, Chia-Ping Tsai <chia7712@apache.org>
Several Kafka log configurations in have synonyms. For example, log retention can be configured
either by log.retention.ms, or by log.retention.minutes, or by log.retention.hours. There is also
a faculty in Kafka to dynamically change broker configurations without restarting the broker. These
dynamically set configurations are stored in the metadata log and override what is in the broker
properties file.
Unfortunately, these two features interacted poorly; there was a bug where the dynamic log
configuration update code ignored synonyms. For example, if you set log.retention.minutes and then
reconfigured something unrelated that triggered the LogConfig update path, the retention value that
you had configured was overwritten.
The reason for this was incorrect handling of synonyms. The code tried to treat the Kafka broker
configuration as a bag of key/value entities rather than extracting the correct retention time (or
other setting with overrides) from the KafkaConfig object.
Reviewers: Luke Chen <showuon@gmail.com>, Jun Rao <junrao@gmail.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Christo Lolov <lolovc@amazon.com>, Federico Valeri <fedevaleri@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>, amangandhi94 <>
This test expects that each partition can receive the record, so using a non-null key helps distribute the records more randomly.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This PR tries to improve the error message when broker.id is set to -1 and ZK migration is enabled. It is not
needed to disable the broker.id.generation.enable option. It is sufficient to just not use it (by not setting
the broker.id to -1).
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Luke Chen <showuon@gmail.com>
Change the configurations under config/kraft to use controller.quorum.bootstrap.servers instead of controller.quorum.voters. Add comments explaining how to use the older static quorum configuration where appropriate.
In docs/ops.html, remove the reference to "tentative timelines for ZooKeeper removal" and "Tiered storage is considered as an early access feature" since they are no longer up-to-date. Add KIP-853 information.
In docs/quickstart.html, move the ZK instructions to be after the KRaft instructions. Update the KRaft instructions to use KIP-853.
In docs/security.html, add an explanation of --bootstrap-controller and document controller.quorum.bootstrap.servers instead of controller.quorum.voters.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Alyssa Huang <ahuang@confluent.io>, Colin P. McCabe <cmccabe@apache.org>
This reverts commit 391778b8d7.
Unfortunately that commit re-introduced bug #15127 which prevented the publishing of kafka-clients
artifacts to remote maven. As that bug says:
The issue triggers only with publishMavenJavaPublicationToMavenRepository due to signing.
Generating signed asc files error out for shadowed release artifacts as the module name
(clients) differs from the artifact name (kafka-clients).
The fix is basically to explicitly define artifact of shadowJar to signing and publish plugin.
project.shadow.component(mavenJava) previously outputs the name as client-<version>-all.jar
though the classifier and archivesBaseName are already defined correctly in :clients and
shadowJar construction.
Reviewers: David Arthur <mumrah@gmail.com>
When brokers undergoing ZK migration register with the controller, it should verify that they have
provided a way to contact them via their inter.broker.listener. Otherwise the migration will fail
later on with a more confusing error message.
Reviewers: David Arthur <mumrah@gmail.com>
Failed tasks discovered when removed from the state updater during assignment or revocation are added to the task registry. From there they are retrieved and handled as normal tasks. This leads to a couple of IllegalStateExceptions because it breaks some invariants that ensure that only good tasks are assigned and processed.
This commit solves this bug by distinguish failed from non-failed tasks in the task registry.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
There is a race condition between KRaftMigrationDriver running its first poll() and being notified by Raft about a leader change. If onControllerChange is called before RecoverMigrationStateFromZKEvent is run, we will end up getting stuck in the INACTIVE state.
This patch fixes the race by enqueuing a RecoverMigrationStateFromZKEvent from onControllerChange if the driver has not yet initialized. If another RecoverMigrationStateFromZKEvent was already enqueued, the second one to run will just be ignored.
Reviewers: Luke Chen <showuon@gmail.com>
The 3.8 controller assumes the unknown features have min version = 0, but KAFKA-17011 replace the min=0 by min=1 when BrokerRegistrationRequest < 4. Hence, to support upgrading from 3.8.0 to 3.9, this PR changes the implementation of ApiVersionsResponse (<4) and BrokerRegistrationRequest (<4) to skip features with supported minVersion of 0 instead of replacing 0 with 1
Reviewers: Jun Rao <junrao@gmail.com>, Colin P. McCabe <cmccabe@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
Update the leader before calling handleLeaderChange and use the given epoch in LocalLogManager#prepareAppend. This should hopefully fix several flaky QuorumControllerTest tests.
Reviewers: José Armando García Sancio <jsancio@apache.org>
This patch raises the minimum MetadataVersion for migrations to 3.6-IV1 (metadata transactions). This is only enforced on the controller during bootstrap (when the log is empty). If the log is not empty on controller startup, as in the case of a software upgrade, we allow the migration to continue where it left off.
The broker will log an ERROR message if migrations are enabled and the IBP is not at least 3.6-IV1.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Part of KIP-1033.
Co-authored-by: Dabz <d.gasparina@gmail.com>
Co-authored-by: loicgreffier <loic.greffier@michelin.com>
Reviewers: Matthias J. Sax <matthias@confluent.io>
why df04887ba5 does not fix it?
The fix of df04887ba5 is to NOT collect the log from path `/mnt/kafka/kafka-operational-logs/debug/xxxx.log`if the task is successful. It does not change the log level. see ducktape b2ad7693f2/ducktape/tests/test.py (L181)
why df04887ba5 does not see the error of "sort"
df04887ba5 does NOT show the error since the number of features is only "one" (only metadata.version). Hence, the bug is not triggered as it does not need to "sort". Now, we have two features - metadata.version and krafe.version - so the sort is executed and then we see the "hello bug"
why we should change the kafka.log_level to INFO?
the template of log4j.properties is controlled by `log_level` (https://github.com/apache/kafka/blob/trunk/tests/kafkatest/services/kafka/templates/log4j.properties#L16), and the bug happens in writing debug message (e4ca066680/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala (L274)). Hence, changing the log level to DEBUG can avoid triggering the bug.
Reviewers: Justine Olshan <jolshan@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
7496e62434 fixed an error that caused an exception to be thrown on broker startup when debug logs were on. This made it to every version except 3.2.
The Kraft upgrade tests totally turn off debug logs, but I think we only need to remove them for the broken version.
Note: this bug is also present in 3.1, but there is no logging on startup like in subsequent versions.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <david.jacot@gmail.com>
Add the version check to server side for the specific timestamp:
- the version must be >=8 if timestamp=-4L
- the version must be >=9 if timestamp=-5L
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Because of KIP-902 (Upgrade Zookeeper version to 3.8.2), it is not possible to upgrade from a Kafka version
earlier than 2.4 to a version later than 2.4. Therefore, we should not test these upgrade scenarios
in upgrade_test.py. They do happen to work sometimes, but only in the trivial case where we don't
create topics or make changes during the upgrade (which would reveal the ZK incompatibility).
Instead, we should test only supported scenarios.
Reviewers: Reviewers: José Armando García Sancio <jsancio@gmail.com>
The docker image for Native Apache Kafka was introduced with KIP-974 and was first release with 3.8 AK release.
The docker image for Native Apache Kafka is currently intended for local development and testing purposes.
This PR intends to add a logline indicating the same during docker image startup.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
The COPY_SEGMENT_STARTED state segments are counted when calculating remote retention size. This causes unexpected retention size breached segment deletion. This PR fixes it by
1. only counting COPY_SEGMENT_FINISHED and DELETE_SEGMENT_STARTED state segments when calculating remote log size.
2. During copy Segment, if we encounter errors, we will delete the segment immediately.
3. Tests added.
Co-authored-by: Guillaume Mallet <>
Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Satish Duggana <satishd@apache.org>, Guillaume Mallet <>
Previously in KRaft mode, we could request an unclean leader election for a specific topic using
the electLeaders API. This PR adds an additional way to trigger unclean leader election when in
KRaft mode via the static controller configuration and various dynamic configurations.
In order to support all possible configuration methods, we have to do a multi-step configuration
lookup process:
1. check the dynamic topic configuration for the topic.
2. check the dynamic node configuration.
3. check the dynamic cluster configuration.
4. check the controller's static configuration.
Fortunately, we already have the logic to do this multi-step lookup in KafkaConfigSchema.java.
This PR reuses that logic. It also makes setting a configuration schema in
ConfigurationControlManager mandatory. Previously, it was optional for unit tests.
Of course, the dynamic configuration can change over time, or the active controller can change
to a different one with a different configuration. These changes can make unclean leader
elections possible for partitions that they were not previously possible for. In order to address
this, I added a periodic background task which scans leaderless partitions to check if they are
eligible for an unclean leader election.
Finally, this PR adds the UncleanLeaderElectionsPerSec metric.
Co-authored-by: Luke Chen showuon@gmail.com
Reviewers: Igor Soarez <soarez@apple.com>, Luke Chen <showuon@gmail.com>
Implement the remaining ForwardingManager metrics from KIP-938: Add more metrics for measuring KRaft performance:
kafka.server:type=ForwardingManager,name=QueueTimeMs.p99
kafka.server:type=ForwardingManager,name=QueueTimeMs.p999
kafka.server:type=ForwardingManager,name=QueueLength
kafka.server:type=ForwardingManager,name=RemoteTimeMs.p99
kafka.server:type=ForwardingManager,name=RemoteTimeMs.p999
Reviewers: Colin P. McCabe <cmccabe@apache.org>
AccessTokenRetrieverFactory uses the value of sasl.oauthbearer.header.urlencode provided by the user, or null if no value was provided for that configuration. When the HttpAccessTokenRetriever is created the JVM attempts to unbox the value into a boolean, a NullPointerException is thrown.
The fix is to explicitly check the Boolean, and if it's null, use Boolean.FALSE.
Reviewers: bachmanity1 <81428651+bachmanity1@users.noreply.github.com>, Chia-Ping Tsai <chia7712@gmail.com>
The given test took 5 seconds as the logic was waiting completely for 5 seconds for the expiration task to be completed. Adding copySegmentLatch countdown after expiration task is over
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
1) When the local.retention.ms/bytes is set to -2, we didn't replace it with the server-side retention.ms/bytes config, so the -2 local retention won't take effect.
2) When setting retention.ms/bytes to -2, we can notice this log message:
```
Deleting segment LogSegment(baseOffset=10045, size=1037087, lastModifiedTime=1724040653922, largestRecordTimestamp=1724040653835) due to local log retention size -2 breach. Local log size after deletion will be 13435280. (kafka.log.UnifiedLog) [kafka-scheduler-6]
```
This is not helpful for users. We should replace -2 with real retention value when logging.
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Add the version check to client side when building ListOffsetRequest for the specific timestamp:
1) the version must be >=8 if timestamp=-4L (EARLIEST_LOCAL_TIMESTAMP)
2) the version must be >=9 if timestamp=-5L (LATEST_TIERED_TIMESTAMP)
Reviewers: PoAn Yang <payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
- due to the server config UNSTABLE_API_VERSIONS_ENABLE_CONFIG is true, so we can't test the scenario of ListOffsetsRequest is unstable version. We want to test this case in this PR
- get the MV from metadataCache.metadataVersion() instead of config.interBrokerProtocolVersion since MV can be set dynamically.
Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
When a broker tries to register with the controller quorum, its registration should be rejected if it doesn't support a feature that is currently enabled. (A feature is enabled if it is set to a non-zero feature level.) This is important for the newly added kraft.version feature flag.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, José Armando García Sancio <jsancio@apache.org>
When a voter fails as leader (LeaderState) the quorum-state still states that it is the leader of
the epoch. When the voter starts it never starts as leader and instead starts as resigned
(ResignedState) if it was previously a leader. This causes the KRaft client to immediately notify
the state machine (e.g QuorumController) that it is leader or active. This is incorrect for two
reasons.
One, the controller cannot be notified of leadership until it has reached the LEO. If the
controller is notified before that it will generate and append records that are not based on the
latest state.
Two, it is not practical to notify of local leadership when it is resigned since any write
operation (prepareAppend and schedulePreparedAppend) will fail with NotLeaderException while KRaft
is in the resigned state.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, David Arthur <mumrah@gmail.com>
This change includes two improvements.
When the leader removes itself from the voters set clients of RaftClient may call resign. In those cases the leader is not in the voter set and should not throw an exception.
Controllers that are observers must flush the log on every append because leader may be trying to add them to the voter set. Leader always assume that voters flush their disk before sending a Fetch request.
Reviewers: David Arthur <mumrah@gmail.com>, Alyssa Huang <ahuang@confluent.io>
This change fixes the Kafka configuration validation to take into account the reconfiguration changes to configuration and allows KRaft observers to start with an unknown set of voters.
For the Kafka configuration validation the high-level change is that now the user only needs to specify either the controller.quorum.bootstrap.servers property or the controller.quorum.voters property. The other notable change in the configuration is that controller listeners can now be (and should be) specified in advertise.listeners property.
Because Kafka can now be configured without any voters and just the bootstrap servers. The KRaft client needs to allow for an unknown set of voters during the initial startup. This is done by adding the VoterSet#empty set of voters to the KRaftControlRecordStateMachine.
Lastly the RaftClientTestContext type is updated to support this new configuration for KRaft and a test is added to verify that observers can start and send Fetch requests when the voters are unknown.
Reviewers: David Arthur <mumrah@gmail.com>
These 2 classes are package protected but they are part of the public
API of public methods. To have clean APIs we should make this
consistent.
Static class ReplicaState is exposed in RaftUtil#singletonDescribeQuorumResponse method which is public.
RequestSender is implemented by a public class and it's exposed in the public constructor of AddVoterHandler.
Reviewers: José Armando García Sancio <jsancio@apache.org>
RaftClient API is changed to separate the batch accumulation (RaftClient#prepareAppend) from scheduling the append of accumulated batches (RaftClient#schedulePrepatedAppend) to the KRaft log. This change is needed to better match the controller's flow of replaying the generated records before replicating them. When the controller replay records it needs to know the offset associated with the record. To compute a table offset the KafkaClient needs to be aware of the records and their log position.
The controller uses this new API by generated the cluster metadata records, compute their offset using RaftClient#prepareAppend, replay the records in the state machine, and finally allowing KRaft to append the records with RaftClient#schedulePreparedAppend.
To implement this API the BatchAccumulator is changed to also support this access pattern. This is done by adding a drainOffset to the implementation. The batch accumulator is allowed to return any record and batch that is less than the drain offset.
Lastly, this change also removes some functionality that is no longer needed like non-atomic appends and validation of the base offset.
Reviewers: Colin Patrick McCabe <cmccabe@apache.org>, David Arthur <mumrah@gmail.com>
- Mark 3.9-IV0 as stable. Metadata version 3.9-IV0 should return Fetch version 17.
- Move ELR to 4.0-IV0. Remove 3.9-IV1 since it's no longer needed.
- Create a new 4.0-IV1 MV for KIP-848.
Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Justine Olshan <jolshan@confluent.io>
In MetadataVersion 3.7-IV2 and above, the broker's AssignmentsManager sends an RPC to the
controller informing it about which directory we have chosen to place each new replica on.
Unfortunately, the code does not check to see if the topic still exists in the MetadataImage before
sending the RPC. It will also retry infinitely. Therefore, after a topic is created and deleted in
rapid succession, we can get stuck including the now-defunct replica in our subsequent
AssignReplicasToDirsRequests forever.
In order to prevent this problem, the AssignmentsManager should check if a topic still exists (and
is still present on the broker in question) before sending the RPC. In order to prevent log spam,
we should not log any error messages until several minutes have gone past without success.
Finally, rather than creating a new EventQueue event for each assignment request, we should simply
modify a shared data structure and schedule a deferred event to send the accumulated RPCs. This
will improve efficiency.
Reviewers: Igor Soarez <i@soarez.me>, Ron Dagostino <rndgstn@gmail.com>
Only voters should be able to transition to Candidate state. This removes VotedState as one of the EpochStates and moves voted information into UnattachedState.
Reviewers: José Armando García Sancio <jsancio@apache.org>
This change implements the KRaft voter sending UpdateVoter request. The
UpdateVoter RPC is used to update a voter's listeners and supported
kraft versions. The UpdateVoter RPC is sent if the replicated voter set
(VotersRecord in the log) doesn't match the local voter's supported
kraft versions and controller listeners.
To not starve the Fetch request, the UpdateVoter request is sent at most
every 3 fetch timeouts. This is required to make sure that replication
is making progress and eventually the voter set in the replicated log
matches the local voter configuration.
This change also modifies the semantic for UpdateVoter. Now the
UpdateVoter response is sent right after the leader has created the new
voter set. This is required so that updating voter can transition from
sending UpdateVoter request to sending Fetch request. If the leader
waits for the VotersRecord control record to commit before sending the
UpdateVoter response, it may never send the UpdateVoter response. This
can happen if the leader needs that voter's Fetch request to commit the
control record.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
This PR adds support for add-controller and remove-controller in the kafka-metadata-quorum.sh
command-line tool. It also fixes some minor server-side bugs that blocked the tool from working.
In kafka-metadata-quorum.sh, the implementation of remove-controller is fairly straightforward. It
just takes some command-line flags and uses them to invoke AdminClient. The add-controller
implementation is a bit more complex because we have to look at the new controller's configuration
file. The parsing logic for the advertised.listeners and listeners server configurations that we
need was previously implemented in the :core module. However, the gradle module where
kafka-metadata-quorum.sh lives, :tools, cannot depend on :core. Therefore, I moved listener parsing
into SocketServerConfigs.listenerListToEndPoints. This will be a small step forward in our efforts
to move Kafka configuration out of :core.
I also made some minor changes in kafka-metadata-quorum.sh and Kafka-storage-tool.sh to handle
--help without displaying a backtrace on the screen, and give slightly better error messages on
stderr. Also, in DynamicVoter.toString, we now enclose the host in brackets if it contains a colon
(as IPV6 addresses can).
This PR fixes our handling of clusterId in addRaftVoter and removeRaftVoter, in two ways. Firstly,
it marks clusterId as nullable in the AddRaftVoterRequest.json and RemoveRaftVoterRequest.json
schemas, as it was always intended to be. Secondly, it allows AdminClient to optionally send
clusterId, by using AddRaftVoterOptions and RemoveRaftVoterOptions. We now also remember to
properly set timeoutMs in AddRaftVoterRequest. This PR adds unit tests for
KafkaAdminClient#addRaftVoter and KafkaAdminClient#removeRaftVoter, to make sure they are sending
the right things.
Finally, I fixed some minor server-side bugs that were blocking the handling of these RPCs.
Firstly, ApiKeys.ADD_RAFT_VOTER and ApiKeys.REMOVE_RAFT_VOTER are now marked as forwardable so that
forwarding from the broker to the active controller works correctly. Secondly,
org.apache.kafka.raft.KafkaNetworkChannel has now been updated to enable API_VERSIONS_REQUEST and
API_VERSIONS_RESPONSE.
Co-authored-by: Murali Basani muralidhar.basani@aiven.io
Reviewers: José Armando García Sancio <jsancio@apache.org>, Alyssa Huang <ahuang@confluent.io>
Handle local log deletion when remote.log.copy.disabled=true based on the KIP-950.
When tiered storage is disabled or becomes read-only on a topic, the local retention configuration becomes irrelevant, and all data expiration follows the topic-wide retention configuration exclusively.
- added remoteLogEnabledAndRemoteCopyEnabled method to check if this topic enables tiered storage and remote log copy is enabled. We should adopt local.retention.ms/bytes when remote.storage.enable=true,remote.log.copy.disable=false.
- Changed to use retention.bytes/retention.ms when remote copy disabled.
- Added validation to ask users to set local.retention.ms == retention.ms and local.retention.bytes == retention.bytes
- Added tests
Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Satish Duggana <satishd@apache.org>, Christo Lolov <lolovc@amazon.com>
The replicaDirectoryId field for FetchRequest and FetchSnapshotRequest should be ignorable. This allows data objects with the directory id to be serialized to any version of the requests.
Reviewers: José Armando García Sancio <jsancio@apache.org>, Chia-Ping Tsai <chia7712@apache.org>
related to https://issues.apache.org/jira/browse/KAFKA-17235
The root cause of this issue is a change we introduced in KAFKA-16879, where we modified the PushHttpMetricsReporter constructor to use Time.System [1]. However, Time.System doesn't exist in Kafka versions 0.8.2 and 0.9.
In test_performance_services.py, we have system tests for Kafka versions 0.8.2 and 0.9 [2]. These tests always use the tools JAR from the trunk branch, regardless of the Kafka version being tested [3], while the client JAR aligns with the Kafka version specified in the test suite [4]. This discrepancy is what causes the issue to arise.
To resolve this issue, we have a few options:
1) Add Time.System to Kafka 0.8.2 and 0.9: This isn't practical, as we no longer maintain these versions.
2) Modify the PushHttpMetricsReporter constructor to use new SystemTime() instead of Time.System: This would contradict the intent of KAFKA-16879, which aims to make SystemTime a singleton.
3) Implement Time in PushHttpMetricsReporter use the time to get current time
4) Remove system tests for Kafka 0.8.2 and 0.9 from test_performance_services.py
Given that we no longer maintain Kafka 0.8.2 and 0.9, and altering the constructor goes against the design goals of KAFKA-16879, option 4 appears to be the most feasible solution. However, I'm not sure whether it's acceptable to remove these old version tests. Maybe someone else has a better solution
"We'll proceed with option 3 since support for versions 0.8 and 0.9 is still required, meaning we can't remove those Kafka versions from the system tests."
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Add support for handling the update voter RPC. The update voter RPC is used to automatically update
the voters supported kraft versions and available endpoints as the operator upgrades and
reconfigures the KRaft controllers.
The add voter RPC is handled as follow:
1. Check that the leader has fenced the previous leader(s) by checking that the HWM is known;
otherwise, return the REQUEST_TIMED_OUT error.
2. Check that the cluster supports kraft.version 1; otherwise, return the UNSUPPORTED_VERSION error.
3. Check that there are no uncommitted voter changes, otherwise return the REQUEST_TIMED_OUT error.
4. Check that the updated voter still supports the currently finalized kraft.version; otherwise
return the INVALID_REQUEST error.
5. Check that the updated voter is still listening on the default listener.
6. Append the updated VotersRecord to the log. The KRaft internal listener will read this
uncommitted record from the log and update the voter in the set of voters.
7. Wait for the VotersRecord to commit using the majority of the voters. Return a REQUEST_TIMED_OUT
error if it doesn't commit in time.
8. Send the UpdateVoter successful response to the voter.
This change also implements the ability for the leader to update its own entry in the voter
set when it becomes leader for an epoch. This is done by updating the voter set and writing a
control batch as the first batch in a new leader epoch.
Finally, fix a bug in KafkaAdminClient's handling of removeRaftVoterResponse where we tried to cast
the response to the wrong type.
Reviewers: Alyssa Huang <ahuang@confluent.io>, Colin P. McCabe <cmccabe@apache.org>
As part of KIP-853, storage-tool.sh now has two new flags: --standalone, and --initial-voters. This PR implements these two flags in storage-tool.sh.
There are currently two valid ways to format a cluster:
The pre-KIP-853 way, where you use a statically configured controller quorum. In this case, neither --standalone nor --initial-voters may be specified, and kraft.version must be set to 0.
The KIP-853 way, where one of --standalone and --initial-voters must be specified with the initial value of the dynamic controller quorum. In this case, kraft.version must be set to 1.
This PR moves the formatting logic out of StorageTool.scala and into Formatter.java. The tool file was never intended to get so huge, or to implement complex logic like generating metadata records. Those things should be done by code in the metadata or raft gradle modules. This is also useful for junit tests, which often need to do formatting. (The 'info' and 'random-uuid' commands remain in StorageTool.scala, for now.)
Reviewers: José Armando García Sancio <jsancio@apache.org>
* KAFKA-17227: Update zstd-jni lib
* Add note in upgrade docs
* Change zstd-jni version in docker native file and add warning in dependencies.gradle file
* Add reference to snappy in upgrade
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Mickael Maison <mickael.maison@gmail.com>
This pr support EarliestLocalSpec LatestTierSpec in GetOffsetShell, and add integration tests.
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, PoAn Yang <payang@apache.org>
Follow up code cleanup for KIP-1033.
This PR unifies the handling of both error cases for exception handlers:
- handler throws an exception
- handler returns null
The unification happens for all 5 handler cases:
- deserialzation
- production / serialization
- production / send
- processing
- punctuation
Reviewers: Sebastien Viale <sebastien.viale@michelin.com>, Loic Greffier <loic.greffier@michelin.com>, Bill Bejeck <bill@confluent.io>
1. Use oldestAllowedVersion as 9 if using ListOffsetsRequest#EARLIEST_LOCAL_TIMESTAMP or ListOffsetsRequest#LATEST_TIERED_TIMESTAMP.
2. Add test cases to ListOffsetsRequestTest#testListOffsetsRequestOldestVersion to make sure requireTieredStorageTimestamp return 9 as minVersion.
3. Add EarliestLocalSpec and LatestTierSpec to OffsetSpec.
4. Add more cases to KafkaAdminClient#getOffsetFromSpec.
5. Add testListOffsetsEarliestLocalSpecMinVersion and testListOffsetsLatestTierSpecSpecMinVersion to KafkaAdminClientTest to make sure request builder has oldestAllowedVersion as 9.
Signed-off-by: PoAn Yang <payang@apache.org>
Reviewers: Luke Chen <showuon@gmail.com>
Fix an issue that cause system test failing when using AsyncKafkaConsumer.
A configuration option, group.coordinator.rebalance.protocols, was introduced to specify the rebalance protocols used by the group coordinator. By default, the rebalance protocol is set to classic. When the new group coordinator is enabled, the rebalance protocols are set to classic,consumer.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io>, Lianet Magrans <lianetmr@gmail.com>, Kirk True <kirk@kirktrue.pro>, Justine Olshan <jolshan@confluent.io>
we need to migate GroupMetadataMessageFormatter from scala code to java code,and make the message format is json pattern
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
describe --status now includes directory id and endpoint information for voter and observers.
describe --replication now includes directory id.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, José Armando García Sancio <jsancio@apache.org>
This PR is part of KIP-1033 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing.
This PR actually catches processing exceptions from punctuate.
Co-authored-by: Dabz <d.gasparina@gmail.com>
Co-authored-by: loicgreffier <loic.greffier@michelin.com>
Reviewers: Bruno Cadonna <bruno@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This reverts commit 15a4501bde.
We consider this change backward incompatible and will fix forward for 4.0
release via KIP-1065, but need to revert for 3.9 release.
Reviewers: Josep Prat <josep.prat@aiven.io>, Bill Bejeck <bill@confluent.io>
This PR is part of KIP-1033 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing.
This PR catch the exceptions thrown while handling a processing exception
Co-authored-by: Dabz <d.gasparina@gmail.com>
Co-authored-by: loicgreffier <loic.greffier@michelin.com>
Reviewers: Bruno Cadonna <bruno@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This PR is part of KIP1033 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing.
This PR expose the new ErrorHandlerContext as a parameter to the Deserialization exception handlers and deprecate the previous handle signature.
Co-authored-by: Dabz <d.gasparina@gmail.com>
Co-authored-by: loicgreffier <loic.greffier@michelin.com>
Reviewers: Bruno Cadonna <bruno@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This PR is part of KIP-1033 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing.
This PR expose the new ErrorHandlerContext as a parameter to the Production exception handler and deprecate the previous handle signature.
Co-authored-by: Dabz <d.gasparina@gmail.com>
Co-authored-by: loicgreffier <loic.greffier@michelin.com>
Reviewers: Bruno Cadonna <bruno@confluent.io>, Matthias J. Sax <matthias@confluent.io>
<optionname="notice"value="Copyright 2024, AutoMQ HK Limited. The use of this file is governed by the Business Source License, as detailed in the file "/LICENSE.S3Stream" included in this repository. As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, Version 2.0"/>
[AutoMQ](https://www.automq.com/) is a cloud-native alternative to Kafka by decoupling durability to cloud storage services like S3. 10x Cost-Effective. No Cross-AZ Traffic Cost. Autoscale in seconds. Single-digit ms latency.
This Helm chart simplifies the deployment of AutoMQ into your Kubernetes cluster using the Software model.
AutoMQ is fully compatible with Bitnami's Helm Charts, so you can customize your AutoMQ Kubernetes cluster based on the relevant values.yaml of Bitnami.
The quickest way to set up a Kubernetes cluster to install Bitnami Charts is by following the "Bitnami Get Started" guides for the different services:
[Get Started with Bitnami Charts using the Amazon Elastic Container Service for Kubernetes (EKS)](https://docs.bitnami.com/kubernetes/get-started-eks/)
### Installing the AutoMQ with Bitnami Chart
As an alternative to supplying the configuration parameters as arguments, you can create a supplemental YAML file containing your specific config parameters. Any parameters not specified in this file will default to those set in [values.yaml](values.yaml).
1. Create an empty `automq-values.yaml` file
2. Edit the file with your specific parameters:
You can refer to the [demo-values.yaml](/chart/bitnami/demo-values.yaml) based on the bitnami [values.yaml](https://github.com/bitnami/charts/blob/main/bitnami/kafka/values.yaml)
we provided for deploying AutoMQ on AWS across 3 Availability Zones using m7g.xlarge instances (4 vCPUs, 16GB Mem, 156MiB/s network bandwidth).
You need to replace the bucket configurations in the placeholders ${}, such as ops-bucket, data-bucket, region, endpoint, access-key/secret-key.
3. Install or upgrade the AutoMQ Helm chart using your custom yaml file:
we recommend using the `--version` [31.x.x (31.1.0 ~ 31.5.0)](https://artifacthub.io/packages/helm/bitnami/kafka) bitnami helm chart while installing AutoMQ.