This PR includes the changes to feature flags that were outlined in KIP-778. Specifically, it
changes UpdateFeatures and FeatureLevelRecord to remove the maximum version level. It also adds
dry-run to the RPC so the controller can actually attempt the upgrade (rather than the client). It
introduces an upgrade type enum, which supersedes the allowDowngrade boolean. Because
FeatureLevelRecord was unused previously, we do not need to introduce a new version.
The kafka-features.sh tool was overhauled in KIP-778 and now includes the describe, upgrade,
downgrade, and disable sub-commands. Refer to
[KIP-778](https://cwiki.apache.org/confluence/display/KAFKA/KIP-778%3A+KRaft+Upgrades) for more
details on the new command structure.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, dengziming <dengziming1993@gmail.com>
In ZK mode, the topic "foo_bar" will conflict with "foo.bar" because of limitations in metric
names. We should implement this in KRaft mode. This PR also changes TopicCommandIntegrationTest to
support KRaft mode.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
This patch adds audit support through the kafka.authorizer.logger logger to StandardAuthorizer. It
follows the same conventions as AclAuthorizer with a similarly formatted log message. When
logIfAllowed is set in the Action, then the log message is at DEBUG level; otherwise, we log at
trace. When logIfDenied is set, then the log message is at INFO level; otherwise, we again log at
TRACE.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
Verify that ReplicaManager.stopReplica is called if the stop replica
request doesn't result in a stale broker epoch error.
Reviewers: Mickael Maison <mimaison@users.noreply.github.com>
The `DESTROYED` state is represented internally as a tombstone record when running in distributed mode and by the removal of the connector/task from the in-memory status map when running in standalone mode. As a result, it will never appear to users of the REST API, and we should remove mention of it from our docs so that developers creating tooling against the REST API don't write unnecessary logic to account for that state.
Reviewers: Mickael Maison <mickael.maison@gmail.com>
Because a validator is added to ProducerConfig.COMPRESSION_TYPE_CONFIG and KafkaConfig.CompressionTypeProp, the corresponding testCase is improved to verify whether the wrong value of compression.type will throw a ConfigException.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
Fixes a small copy/paste error from #10390 that changed the parameter names
for fetchSession from the singular session form (eg `startTime`) to the range
form (eg `earliestSessionStartTime`).
Reviewers: John Roesler <vvcephei@apache.org>
When migrating from Easymock to Mockito, the mockito implemetnation
didn't have the same semantic as the Easymock implementation.
Without this fix the mocking of LogSegment.size() always returns 0 because
a new AtomicInteger was getting created for each invocation of
LogSegment.size()
Reviewers: Mickael Maison <mimaison@users.noreply.github.com>
In comparator, objects that are not equal need to have a stable order otherwise, binary search may not find the objects. Improve the producer batch comparator
With KAFKA-13527 / KIP-784 we introduced a new top-level error code for
the DescribeLogDirs API for versions 3 and above. However, the change
regressed the error handling for versions less than 3 since the response
converter fails to write the non-zero error code out (rightly) for
versions lower than 3 and drops the response to the client which
eventually times out instead of receiving an empty log dirs response and
processing that as a Cluster Auth failure.
With this change, the API conditionally propagates the error code out to
the client if the request API version is 3 and above. This keeps the
semantics of the error handling the same for all versions and restores
the behavior for older versions.
See current behavior in the broker log:
```bash
ERROR] 2022-04-08 01:22:56,406 [data-plane-kafka-request-handler-10] kafka.server.KafkaApis - [KafkaApi-0] Unexpected error handling request RequestHeader(apiKey=DESCRIBE_LOG_DIRS, apiVersion=0, clientId=sarama, correlationId=1) -- DescribeLogDirsRequestData(topics=null)
org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default errorCode at version 0
[ERROR] 2022-04-08 01:22:56,407 [data-plane-kafka-request-handler-10] kafka.server.KafkaRequestHandler - [Kafka Request Handler 10 on Broker 0], Exception when handling request
org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default errorCode at version 0
```
Reviewers: Ismael Juma <ismael@juma.me.uk>
This regressed in ca375d8004 due to a typo. We need tests
for our builds. :)
I verified that passing the commitId via `-PcommitId=123`
works correctly.
Reviewers: Ismael Juma <ismael@juma.me.uk>
MetricsReporter.contextChange contract states the method should always be called first before MetricsReporter.init is called. This is done correctly for reporters enabled by default (e.g. JmxReporter) but not for metrics reporters configured dynamically.
This fixes the call ordering for dynamically configured metrics reporter and updates tests to enforce ordering.
Reviewers: David Jacot <djacot@confluent.io>
This PR allows to limit the output batches while they are inspected via the kafka-dump-log.sh script.
The idea is to take samples from the logsegments without affecting a production cluster as the current script will read the whole files, this could create issues related to performance.
Please see the KIP-824
Reviewers: Jun Rao <junrao@gmail.com>
The test expects that the connection becomes idle before the mock time is moved forward, but the processor thread runs concurrently and may run some activity on the connection after the mock time is moved forward, thus the connection never expires.
The solution is to wait until the message is received on the socket, and only then wait until the connection is unmuted (it's not enough to wait for unmuted without waiting for message being received on the socket, because the channel might have not been muted yet).
Reviewers: David Jacot <djacot@confluent.io>
As part of KIP-376 we had ConsumerInterceptor implement AutoCloseable
but forgot to do the same for ProducerInterceptor. This fixes the
inconsistency and also addresses KAFKA-6204 at the same time.
Reviewers: John Roesler <vvcephei@apache.org>
Fixes a bug in the comparator used to sort producer inflight batches for a topic partition. This can cause batches in the map `inflightBatchesBySequence` to be removed incorrectly: i.e. one batch may be removed by another batch with the same sequence number. This leads to an `IllegalStateException` when the inflight request finally returns. This patch fixes the comparator to check equality of the `ProducerBatch` instances if the base sequences match.
Reviewers: Jason Gustafson <jason@confluent.io>
The current preferred read replica selection logic relies on `partition.leaderReplicaIdOpt` to determine if the selection must be run. The issue is that `partition.leaderReplicaIdOpt` is defined for both the leader and the followers thus the logic is ran all the time. The impact is not too bad as the leader is selected most of the time when the logic is ran by the follower and the leader is filtered out. However there are cases where the selection on a follower could redirect the consumer to another follower under certain rare conditions. For instance with the `RackAwareReplicaSelector `, the follower must have stale replica states from a previous leadership and must have other followers in the same rack for instance. Other implementation of the selection logic could be more impacted.
This patch ensures that the preferred read replica selection is only ran by the leader.
Reviewers: David Jacot <djacot@confluent.io>
Fixes a regression introduced in https://github.com/apache/kafka/pull/11452. Following [KIP-480](https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner), the `Partitioner` will receive a callback when a batch has been completed so that it can choose another partition. Because of this, we have to wait until the batch has been successfully appended to the accumulator before adding the partition in `TransactionManager.maybeAddPartition`. This is still safe because the `Sender` cannot dequeue a batch from the accumulator until it has been added to the transaction successfully.
Reviewers: Artem Livshits <84364232+artemlivshits@users.noreply.github.com>, David Jacot <djacot@confluent.io>, Tom Bentley <tbentley@redhat.com>
Fix FetchResponse#`fetchData` and `forgottenTopics`: Assignment of lazy-initialized members should be the last step with double-checked locking
Reviewers: Luke Chen <showuon@gmail.com>
Previously, when in KRaft mode, CreateTopics did not return the active configurations for the
topic(s) it had just created. This PR addresses that gap. We will now return these topic
configuration(s) when the user has DESCRIBE_CONFIGS permission. (In the case where the user does
not have this permission, we will omit the configurations and set TopicErrorCode. We will also omit
the number of partitions and replication factor data as well.)
For historical reasons, we use different names to refer to each topic configuration when it is set
in the broker context, as opposed to the topic context. For example, the topic configuration
"segment.ms" corresponds to the broker configuration "log.roll.ms". Additionally, some broker
configurations have synonyms. For example, the broker configuration "log.roll.hours" can be used to
set the log roll time instead of "log.roll.ms". In order to track all of this, this PR adds a
table in LogConfig.scala which maps each topic configuration to an ordered list of ConfigSynonym
classes. (This table is then passed to KafkaConfigSchema as a constructor argument.)
Some synonyms require transformations. For example, in order to convert from "log.roll.hours" to
"segment.ms", we must convert hours to milliseconds. (Note that our assumption right now is that
topic configurations do not have synonyms, only broker configurations. If this changes, we will
need to add some logic to handle it.)
This PR makes the 8-argument constructor for ConfigEntry public. We need this in order to make full
use of ConfigEntry outside of the admin namespace. This change is probably inevitable in general
since otherwise we cannot easily test the output from various admin APIs in junit tests outside the
admin package.
Testing:
This PR adds PlaintextAdminIntegrationTest#testCreateTopicsReturnsConfigs. This test validates
some of the configurations that it gets back from the call to CreateTopics, rather than just checking
if it got back a non-empty map like some of the existing tests. In order to test the
configuration override logic, testCreateDeleteTopics now sets up some custom static and dynamic
configurations.
In QuorumTestHarness, we now allow tests to configure what the ID of the controller should be. This
allows us to set dynamic configurations for the controller in testCreateDeleteTopics. We will have
a more complete fix for setting dynamic configuations on the controller later.
This PR changes ConfigurationControlManager so that it is created via a Builder. This will make it
easier to add more parameters to its constructor without having to update every piece of test code
that uses it. It will also make the test code easier to read.
Reviewers: David Arthur <mumrah@gmail.com>
Add a note in `control.plane.listener.name` doc to mention the value can't be identical with `inter.broker.listener.name`.
Reviewers: Luke Chen <showuon@gmail.com>
Fix upperbound for sliding window, making it compatible with no grace period (kafka-13739)
Added unit test for early sliding window and "normal" sliding window for both events within one time difference (small input) and above window time difference (large input).
Fixing this window interval may slightly change stream behavior but probability to happen is extremely slow and may not have a huge impact on the result given.
Reviewers Leah Thomas <lthomas@confluent.io>, Bill Bejeck <bbejeck@apache.org>
Partitions are assigned to fetcher threads based on their hash modulo the number of fetcher threads. When we resize the fetcher thread pool, we basically re-distribute all the partitions based on the new fetcher thread pool size. The issue is that the logic that resizes the fetcher thread pool updates the `fetcherThreadMap` while iterating over it. The `Map` does not give any guarantee in this case - especially when the underlying map is re-hashed - and that led to not iterating over all the fetcher threads during the process and thus in leaving some partitions in the wrong fetcher threads.
Reviewers: Luke Chen <showuon@gmail.com>, David Jacot <djacot@confluent.io>
KIP-800 introduced a mechanism to pass a reason in the join group request and in the leave group request. A default reason is used unless one is provided by the user. In this case, the custom reason is prefixed by the default one.
When we tried to used this in Kafka Streams, we noted a significant degradation of the performances, see https://github.com/apache/kafka/pull/11873. It is not clear wether the prefixing is the root cause of the issue or not. To be on the safe side, I think that we should remove the prefixing. It does not bring much anyway as we are still able to distinguish a custom reason from the default one on the broker side.
This patch removes prefixing the user provided reasons. So if a the user provides a reason, the reason is used directly. If the reason is empty or null, the default reason is used.
Reviewers: Luke Chen <showuon@gmail.com>, <jeff.kim@confluent.io>, Hao Li <hli@confluent.io>
In KIP-815 we replaced KafkaConsumer with AdminClient in GetOffsetShell. In the previous implementation, partitions were just ignored if there is no offset for them, however, we will print -1 instead now, This PR fix this inconsistency.
Reviewers: David Jacot <djacot@confluent.io>, Luke Chen <showuon@gmail.com>