Previously, when in KRaft mode, CreateTopics did not return the active configurations for the
topic(s) it had just created. This PR addresses that gap. We will now return these topic
configuration(s) when the user has DESCRIBE_CONFIGS permission. (In the case where the user does
not have this permission, we will omit the configurations and set TopicErrorCode. We will also omit
the number of partitions and replication factor data as well.)
For historical reasons, we use different names to refer to each topic configuration when it is set
in the broker context, as opposed to the topic context. For example, the topic configuration
"segment.ms" corresponds to the broker configuration "log.roll.ms". Additionally, some broker
configurations have synonyms. For example, the broker configuration "log.roll.hours" can be used to
set the log roll time instead of "log.roll.ms". In order to track all of this, this PR adds a
table in LogConfig.scala which maps each topic configuration to an ordered list of ConfigSynonym
classes. (This table is then passed to KafkaConfigSchema as a constructor argument.)
Some synonyms require transformations. For example, in order to convert from "log.roll.hours" to
"segment.ms", we must convert hours to milliseconds. (Note that our assumption right now is that
topic configurations do not have synonyms, only broker configurations. If this changes, we will
need to add some logic to handle it.)
This PR makes the 8-argument constructor for ConfigEntry public. We need this in order to make full
use of ConfigEntry outside of the admin namespace. This change is probably inevitable in general
since otherwise we cannot easily test the output from various admin APIs in junit tests outside the
admin package.
Testing:
This PR adds PlaintextAdminIntegrationTest#testCreateTopicsReturnsConfigs. This test validates
some of the configurations that it gets back from the call to CreateTopics, rather than just checking
if it got back a non-empty map like some of the existing tests. In order to test the
configuration override logic, testCreateDeleteTopics now sets up some custom static and dynamic
configurations.
In QuorumTestHarness, we now allow tests to configure what the ID of the controller should be. This
allows us to set dynamic configurations for the controller in testCreateDeleteTopics. We will have
a more complete fix for setting dynamic configuations on the controller later.
This PR changes ConfigurationControlManager so that it is created via a Builder. This will make it
easier to add more parameters to its constructor without having to update every piece of test code
that uses it. It will also make the test code easier to read.
Reviewers: David Arthur <mumrah@gmail.com>
Add a note in `control.plane.listener.name` doc to mention the value can't be identical with `inter.broker.listener.name`.
Reviewers: Luke Chen <showuon@gmail.com>
Fix upperbound for sliding window, making it compatible with no grace period (kafka-13739)
Added unit test for early sliding window and "normal" sliding window for both events within one time difference (small input) and above window time difference (large input).
Fixing this window interval may slightly change stream behavior but probability to happen is extremely slow and may not have a huge impact on the result given.
Reviewers Leah Thomas <lthomas@confluent.io>, Bill Bejeck <bbejeck@apache.org>
Partitions are assigned to fetcher threads based on their hash modulo the number of fetcher threads. When we resize the fetcher thread pool, we basically re-distribute all the partitions based on the new fetcher thread pool size. The issue is that the logic that resizes the fetcher thread pool updates the `fetcherThreadMap` while iterating over it. The `Map` does not give any guarantee in this case - especially when the underlying map is re-hashed - and that led to not iterating over all the fetcher threads during the process and thus in leaving some partitions in the wrong fetcher threads.
Reviewers: Luke Chen <showuon@gmail.com>, David Jacot <djacot@confluent.io>
KIP-800 introduced a mechanism to pass a reason in the join group request and in the leave group request. A default reason is used unless one is provided by the user. In this case, the custom reason is prefixed by the default one.
When we tried to used this in Kafka Streams, we noted a significant degradation of the performances, see https://github.com/apache/kafka/pull/11873. It is not clear wether the prefixing is the root cause of the issue or not. To be on the safe side, I think that we should remove the prefixing. It does not bring much anyway as we are still able to distinguish a custom reason from the default one on the broker side.
This patch removes prefixing the user provided reasons. So if a the user provides a reason, the reason is used directly. If the reason is empty or null, the default reason is used.
Reviewers: Luke Chen <showuon@gmail.com>, <jeff.kim@confluent.io>, Hao Li <hli@confluent.io>
In KIP-815 we replaced KafkaConsumer with AdminClient in GetOffsetShell. In the previous implementation, partitions were just ignored if there is no offset for them, however, we will print -1 instead now, This PR fix this inconsistency.
Reviewers: David Jacot <djacot@confluent.io>, Luke Chen <showuon@gmail.com>
In Fix FetchResponse#responseData, we did a double-checked lock for the responseData, but the assignment of lazy-initialized object(responseData) didn't assign in the last step, which would let other threads get the partial object.
Reviewers: David Jacot <djacot@confluent.io>, Luke Chen <showuon@gmail.com>
Since the topology-level cache size config only controls whether we disable the caching layer entirely for that topology, setting it to anything other than 0 has no effect. The actual cache memory is still just split evenly between the threads, and shared by all topologies.
It's possible we'll want to change this in the future, but for now we should make sure to log a warning so that users who do try to set this override to some nonzero value are made aware that it doesn't work like this.
Also includes some minor refactoring plus a fix for an off-by-one error in #11796
Reviewers: Luke Chen <showuon@gmail.com>, Walker Carlson <wcarlson@confluent.io>, Sagar Rao <sagarmeansocean@gmail.com>
With major server components like the new quorum controller being moved outside of the `core` module, it is useful to have shared dependencies moved into `server-common`. An example of this is Yammer metrics which server components still rely heavily upon. All server components should have access to the default registry used by the broker so that new metrics can be registered and metric naming conventions should be standardized. This is particularly important in KRaft where we are attempting to recreate identically named metrics in the controller context. This patch takes a step in this direction. It moves `KafkaYammerMetrics` into `server-common` and it implements
standard metric naming utilities there.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
With this change we stop including the non-production grade connectors that are meant to be used for demos and quick starts by default in the CLASSPATH and plugin.path of Connect deployments. The package of these connector will still be shipped with the Apache Kafka distribution and will be available for explicit inclusion.
The changes have been tested through the system tests and the existing unit and integration tests.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Randall Hauch <rhauch@gmail.com>
This bumps the slf4j version to 1.7.36 and swaps out log4j 1.2.17 with
reload4j 1.2.19
Signed-off-by: Mike Lothian <mike@fireburn.co.uk>
Reviewers: Luke Chen <showuon@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Bruno Cadonna <cadonna@apache.org>
CVE-2020-36518 vulnerability affects jackson-databind (see GHSA-57j2-w4cx-62h2).
Upgrading to jackson-databind version 2.12.6.1 addresses this CVE.
Reviewers: Luke Chen <showuon@gmail.com>, Bruno Cadonna <cadonna@apache.org>
Increase wait in ZooKeeperClientTest.testReinitializeAfterAuthFailure
so that the testcase of https://github.com/apache/kafka/pull/11563
actually fails without the corresponding source code fix.
Followup of https://issues.apache.org/jira/browse/KAFKA-13461.
Co-Authored-By: Gantigmaa Selenge <gantigmaa.selenge1@uk.ibm.com>
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Luke Chen <showuon@gmail.com>, Chris Egerton <fearthecellos@gmail.com>
Co-authored-by: Chris Egerton <fearthecellos@gmail.com>
RocksDB 6.27.3 does not run on arm64 M1 Macs which would prevent people on this platform to run Kafka Streams. Thus, this PR upgrades RocksDB to 6.29.4.1 which contains the following fix to allow to run RocksDB on arm64 M1 Macs:
facebook/rocksdb#7720
The source compatibility report between 6.27.3 and 6.29.4.1 (attached to the ticket) reports a couple of incompatibilities. However, the incompatibilities do not seem to affect Kafka Streams' backwards compatibility.
The changes to class RocksDB only apply when inheriting from RocksDB. RocksDB is not exposed to users in Streams.
The changes to class WriteBatch and class WriteBatchInterface also only apply with inheritance. Both classes are not exposed to users in Streams.
-The change to enum SanityLevel seem also not to apply to Streams since SanityLevel is only used in ConfigOptions which is only used to load options from files and properties objects. Loading options from files or properties is not exposed to users in Streams.
Reviewers: Bill Bejeck <bbejeck@apache.org>, Matthias J. Sax <mjsax@apache.org>, A. Sophie Blee-Goldman <ableegoldman@apache.org>
Key updates with TLS 1.3 trigger code paths similar to renegotiation with TLS 1.2.
Update the read/write paths not to throw an exception in this case (kept the exception
in the `handshake` method).
With the default configuration, key updates happen after 2^37 bytes are encrypted.
There is a security property to adjust this configuration, but the change has to be
done before it is used for the first time and it cannot be changed after that. As such,
it is best done via a system test (filed KAFKA-13779).
To validate the change, I wrote a unit test that forces key updates and manually ran
a producer workload that produced more than 2^37 bytes. Both cases failed without
these changes and pass with them.
Note that Shylaja Kokoori attached a patch with the SslTransportLayer fix and hence
included them as a co-author of this change.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
Co-authored-by: Shylaja Kokoori
Adds documentation for KIP-708: Rack awareness for Kafka Streams
Co-authored-by: Bruno Cadonna <cadonna@apache.org>
Reviewers: Luke Chen <showuon@gmail.com>, Bruno Cadonna <cadonna@apache.org>
When a replica selector is configured, the partition leader computes a preferred read replica for any fetch from the consumers. When the preferred read replica is not the leader, the leader returns the preferred read replica with `FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY)` to the `ReplicaManager`. This causes the fetch to go into in the fetch purgatory because the exit conditions are not met. In turns out that the delayed fetch is not completed until the timeout is reached because the delayed fetch ignores partition with an unknown offset (-1). If the fetch contains only one partition, the fetch is unnecessarily delayed by the timeout time (500ms by default) to only inform the consumer that it has to read from a follower.
This patch fixes the issue by completing the fetch request immediately when a preferred read replica is defined.
Reviewers: David Jacot <djacot@confluent.io>
I noticed two issues in the log4j entry:
1. It's formatted as "{}...{}" + param1, param2; effectively it is one param only, and the printed line is effectively mis-aligned: we always print Subtopology [sourceTopics set] was missing source topics {}
2. Even fix 1) is not enough, since topologyName may be null. On the other hand I think the original goal is not to print the topology name but the sub-topology id since it's within the per-sub-topology loop.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
This patch adds display names for KRaft and ZK tests. Without this, it becomes hard to understand in Jenkins test reports which test failed. With this addition, it becomes more clear which method in the test suite fails.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
#11356 inadvertently changed
the (undefined) header forwarding behavior of stream-stream joins.
This change does not define the behavior, but just restores the prior
undefined behavior for continuity's sake. Defining the header-forwarding
behavior is future work.
Reviewers: Matthias J. Sax <mjsax@apache.org>, Jorge Esteban Quilcate Otoya <quilcate.jorge@gmail.com>
The task assignor is modified to consider the Streams client with the most caught up states if no Streams client exists that is caught up, i.e., the lag of the states on that client is less than the acceptable recovery lag.
Unit test for case task assignment where no caught up nodes exist.
Existing unit and integration tests to verify no other behaviour has been changed
Co-authored-by: Bruno Cadonna <cadonna@apache.org>
Reviewer: Bruno Cadonna <cadonna@apache.org>
In this test, we have another thread to let broker down and up, to test if consumer can still work as expected. During the broker down and up, we tried to verify the assignment is as what we expected. But the rebalance will keep triggering while broker down and up. It doesn't make sense to verify the assignment here. Remove it to make the test reliable.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
In KIP-811, we added a new config repartition.purge.interval.ms to set repartition purge interval. In this flaky test, we expected the purge interval is the same as commit interval, which is not correct anymore (default is 30 sec). Set the purge interval explicitly to fix this issue.
Reviewers: Bruno Cadonna <cadonna@apache.org>, Guozhang Wang <wangguoz@gmail.com>
The `retryEndOffsets(…)` method in `TopicAdmin` recently added (KAFKA-12879, #11797) to allow the `KafkaBasedLog.start()` method to retry any failures reading the last offsets for a topic. However, this introduce a regression when talking to older brokers (0.10.x or earlier).
The `KafkaBasedLog` already had logic that expected an `UnsupportedVersionException` thrown by the admin client when a Kafka API is not available on an older broker, but the new retry logic in `TopicAdmin` did not account for this and wrapped the exception, thereby breaking the `KafkaBasedLog` logic and preventing startup.
The fix is to propagate this `UnsupportedVersionException` from the `TopicAdmin.retryEndOffsets(…)` method. Added a new unit test that first replicated the problem before the fix, and verified the fix corrects the problem.
This patch includes metadata wait time in total blocked time. First, this patch adds a new metric for total producer time spent waiting on metadata, called metadata-wait-time-ms-total. Then, this time is included in the total blocked time computed from StreamsProducer.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
When a log entry is appended to a Kafka topic using KafkaLog4jAppender, the producer.send operation
may hit a deadlock if the producer network thread also tries to append a log at the same log level.
This issue is triggered when idempotence is enabled for the KafkaLog4jAppender and the producer
tries to acquire the TransactionManager lock.
This is a temporary workaround to avoid deadlocks by disabling idempotence explicitly in
KafkaLog4jAppender.
Reviewers: Luke Chen <showuon@gmail.com>, Ismael Juma <ismael@juma.me.uk>
The caching store layers were passing down writes into lower store layers upon eviction, but not setting the context to the evicted records' context. Instead, the context was from whatever unrelated record was being processed at the time.
Reviewers: Matthias J. Sax <mjsax@apache.org>
With AK 3.0, idempotence was enabled by default in Kafka producers. However, if idempotence is enabled, Connect won't be able to communicate via its producers with Kafka brokers older than version 0.11. Perhaps more importantly, for brokers older than version 2.8 the IDEMPOTENT_WRITE ACL is required to be granted to the principal of the Connect worker.
Therefore this commit disables producer idempotence by default to all the producers instantiated by Connect. Users can still choose to enable producer idempotence by explicitly setting the right worker and/or connector properties.
The changes were tested via existing unit, integration and system tests.
Reviewers: Randall Hauch <rhauch@gmail.com>
The current README instruction for local publishing boils the ocean by building and installing every jar in the project with both 2.12 and 2.13. While that is some times what people want to do, they are also often trying to just build a specific jar.
Reviewers: Bill Bejeck <bbejeck@apache.org>
This reverts commit 76cf7a5793.
Connect already allows users to enable idempotent producers for connectors and the Connect workers. Although Kafka producers enabled idempotency by default in 3.0, due to compatibility requirements and the fact that [KIP-318](https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent) hasn't been explicitly approved, the changes here are reverted. A separate commit will explicitly disable idempotency in producers instantiated by Connect by default until KIP-318 is approved and scheduled for release.
This patch fixes a bug in the `AlterConfigPolicy.RequestMetadata.equals` method where we were not comparing the class correctly.
Co-authored-by: David Jacot <djacot@confluent.io>
Reviewers: David Jacot <djacot@confluent.io>
This patch fixes a few cases where we use `==` instead of `equals` to compare UUID. The impact of this bug is low because `Uuid.ZERO_UUID` is used by default everywhere.
Reviewers: Justine Olshan <jolshan@confluent.io>, dengziming <dengziming1993@gmail.com>, David Jacot <djacot@confluent.io>