This fix is trying to skip the assignment validation for built-in cooperative sticky assignor, since (a) we know the assignment is valid since we do essentially this same check already in the cooperative sticky assignor, and (b) the check is broken anyways due to potential for claimed `ownedPartitions` to be incorrect
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This is the fix 1 and fix 2 in #10985 for v2.6, including the tests. Uses the generation to invalidate previous assignments that claim partitions but no longer own them, and implements an additional safety net to handle any case in which doubly-claimed partitions slip in to the input anyway
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
`testResolveDnsLookupResolveCanonicalBootstrapServers` added in
#11091 treats the result from
`ClientUtils.resolve(...,
ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY. ...)` as if
it should differ from `ClientUtils.resolve(...,
ClientDnsLookup.DEFAULT, ...)`, but the branching in resolve is on
`ClientDnsLookup.USE_ALL_DNS_IPS == clientDnsLookup` -- everything else
gets a single result.
Reviewers: Ismael Juma <ismael@juma.me.uk>
When expiring transactionalIds, we group the tombstones together into batches. Currently there is no limit on the size of these batches, which can lead to `MESSAGE_TOO_LARGE` errors when a bunch of transactionalIds need to be expired at the same time. This patch fixes the problem by ensuring that the batch size respects the configured limit. Any transactionalIds which are eligible for expiration and cannot be fit into the batch are postponed until the next periodic check.
Reviewers: David Jacot <djacot@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This focuses on the currently failing test, #9315 is a more complete fix
that we should also review and merge.
Reviewers: David Jacot <djacot@confluent.io>
The latest JDKs no longer support TLS 1.0/1.1 causing the test to fail.
We have already fixed this in trunk and 3.0, so this is for 2.8 and older
branches.
The relevant trunk commit is 530224e4fe. We had another test with
the same issue and it was fixed for all branches via #10922.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
Support for TLS 1.0/1.1 was disabled in recent versions of Java 8/11
and all versions of 16 causing this test to fail.
It is possible to make it work by updating the relevant security property,
but it has to be done before the affected classes are loaded and it can
not be disabled after that. Given the low value of the test, we remove
it.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Luke Chen <showuon@gmail.com>
From Java 9 onwards, LoginContext#logout() throws an NPE if invoked multiple times due to https://bugs.openjdk.java.net/browse/JDK-8173069. KerberosLogin currently attempts logout followed by login in a background refresh thread. If login fails we retry the same sequence. As a result, a single login failure prevents subsequent re-login. And clients will never be able to authenticate successfully after the first failure, until the process is restarted. The commit checks if logout is necessary before invoking LoginContext#logout(). Also adds a test for this case.
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
We use a background thread for Kerberos to perform re-login before tickets expire. The thread performs logout() followed by login(), relying on the Java library to clear and then populate credentials in Subject. This leaves a timing window where clients fail to authenticate because credentials are not available. We cannot introduce any form of locking since authentication is performed on the network thread. So this commit treats NO_CRED as a transient failure rather than a fatal authentication exception in clients.
Reviewers: Ron Dagostino <rdagostino@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
If a distributed worker fails to write (or read back) a new session key to/from the config topic, it dies. This fix softens the blow a bit by instead restarting the herder tick loop anew and forcing a read to the end of the config topic until the worker is able to successfully read to the end.
At this point, if the worker was able to successfully write a new session key in its first attempt, it will have read that key back from the config topic and will not write a new key during the next tick iteration. If it was not able to write that key at all, it will try again to write a new key (if it is still the leader).
Verified with new unit tests for both cases (failure to write, failure to read back after write).
Author: Chris Egerton <chrise@confluent.io>
Reviewers: Greg Harris <gregh@confluent.io>, Randall Hauch <rhauch@gmail.com>
`KafkaAdmin.listOffsets` did not handle topic-level errors, hence the UnknownTopicOrPartitionException on topic-level can obstruct a Connect worker from running when the new internal topic is NOT synced to all brokers. The method did handle partition-level retriable errors by retrying, so this changes to handle topic-level retriable errors in the same way.
This allows a Connect worker to start up and have the admin client retry when the worker is trying to read to the end of the newly-created internal topics until the internal topic metadata is synced to all brokers.
Author: Chia-Ping Tsai <chia7712@gmail.com>
Reviewers: Randall Hauch <rhauch@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>
Refactored the KafkaBasedLog logic to read end offsets into a separate method to make it easier to test. Also changed the TopicAdmin.endOffsets method to throw the original UnsupportedVersionException, LeaderNotAvailableException, and TimeoutException rather than wrapping, to better conform with the consumer method and how the KafkaBasedLog retries those exceptions.
Added new tests to verify various scenarios and errors.
Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
These Kafka*BackingStore classes used in Connect have a recently-added deprecated constructor, which is not used within AK. However, this commit corrects a AdminClient resource leak if those deprecated constructors are used outside of AK. The fix simply ensures that the AdminClient created by the “default” supplier is always closed when the Kafka*BackingStore is stopped.
Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
Fixes the recent change to the `MirrorMaker` class (used only in the MirrorMaker 2 executable) that uses a `SharedTopicAdmin` client as part of Connect, so that the correct properties into the `SharedTopicAdmin`.
Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
This patch changes the NetworkClient behavior to resolve the target node's hostname after disconnecting from an established connection, rather than waiting until the previously-resolved addresses are exhausted. This is to handle the scenario when the node's IP addresses have changed during the lifetime of the connection, and means that the client does not have to try to connect to invalid IP addresses until it has tried each address.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Satish Duggana <satishd@apache.org>, David Jacot <djacot@confluent.io>
The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic.
However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end.
Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity.
Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance.
The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes.
These changes are implemented as follows:
1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets.
2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance.
3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed.
4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic.
5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.)
6. Change `MirrorMaker` similarly to `ConnectDistributed`.
7. Change existing unit tests to no longer use deprecated constructors.
8. Add unit tests for new functionality.
Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
A few important fixes:
* ZOOKEEPER-3829: Zookeeper refuses request after node expansion
* ZOOKEEPER-3842: Rolling scale up of zookeeper cluster does not work
with reconfigEnabled=false
* ZOOKEEPER-3830: After add a new node, zookeeper cluster won't commit
any proposal if this new node is leader
Full release notes:
https://zookeeper.apache.org/doc/r3.5.9/releasenotes.html
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Fixes a regression introduced by https://github.com/apache/kafka/pull/9806 in the original fix for KAFKA-10895
It was discovered that if an invalid JAAS config is present on the worker, invoking Configuration::getConfiguration throws an exception. The changes from #9806 cause that exception to be thrown during plugin scanning, which causes the worker to fail even if it is not configured to use the basic auth extension at all.
This follow-up handles invalid JAAS configurations more gracefully, and only throws them if the worker is actually configured to use the basic auth extension, at the time that the extension is instantiated and configured.
Two unit tests are added.
Reviewers: Greg Harris <gregh@confluent.io>, Konstantine Karantasis <k.karantasis@gmail.com>
First issue: When more than one workers join the Connect group the incremental cooperative assignor revokes and reassigns at most average number of tasks per worker.
Side-effect: This results in the additional workers joining the group stay idle and would require more future rebalances to happen to have even distribution of tasks.
Fix: As part of task assignment calculation following a deployment, the reassignment of tasks are calculated by revoking all the tasks above the rounded up (ceil) average number of tasks.
Second issue: When more than one worker is lost and rejoins the group at most one worker will be re assigned with the lost tasks from all the workers that left the group.
Side-effect: In scenarios where more than one worker is lost and rejoins the group only one among them gets assigned all the partitions that were lost in the past. The additional workers that have joined would not get any task assigned to them until a rebalance that happens in future.
Fix: As part fo lost task re assignment all the new workers that have joined the group would be considered for task assignment and would be assigned in a round robin fashion with the new tasks.
Testing strategy :
* System testing in a Kubernetes environment completed
* New integration tests to test for balanced tasks
* Updated unit tests.
Co-authored-by: Rameshkrishnan Muthusamy <rameshkrishnan_muthusamy@apple.com>
Co-authored-by: Randall Hauch <rhauch@gmail.com>
Co-authored-by: Konstantine Karantasis <konstantine@confluent.io>
Reviewers: Randall Hauch <rhauch@gmail.com>, Konstantine Karantasis <k.karantasis@gmail.com>
When two cooperative rebalances take place soon after one another, a prior rebalance may not complete before the next rebalance is started.
Under Eager rebalancing, no tasks would have been started, so the subsequent onRevoked call is intentionally skipped whenever rebalanceResolved was false.
Under Incremental Cooperative rebalancing, the same logic causes the DistributedHerder to skip stopping all of the connector/task revocations which occur in the second rebalance.
The DistributedHerder still removes the revoked connectors/tasks from its assignment, so that the DistributedHerder and Worker have different knowledge of running connectors/tasks.
This causes the connector/task instances that would have been stopped to disappear from the rebalance protocol, and left running until their workers are halted, or they fail.
Connectors/Tasks which were then reassigned to other workers by the rebalance protocol would be duplicated, and run concurrently with zombie connectors/tasks.
Connectors/Tasks which were reassigned back to the same worker would encounter exceptions in Worker, indicating that the connector/task existed and was already running.
* Added test for revoking and then reassigning a connector under normal circumstances
* Added test for revoking and then reassigning a connector following an incomplete cooperative rebalance
* Changed expectRebalance to make assignment fields mutable before passing them into the DistributedHerder
* Only skip revocation for the Eager protocol, and never skip revocation for incremental cooperative protocols
Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
ducktape 0.7.11 fixes a bug where a unicode exception message would cause test runner to hang up and never finish.
This change should be backported to all the branches using ducktape 0.7.10
Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
Changes the `WorkerSourceTask` class to only call `SourceTask::stop` from the task thread when the task is actually stopped (via `Source:task::close` just before `WorkerTask::run` completes), and only if an attempt has been made to start the task (which will not be the case if it was created in the paused state and then shut down before being started). This prevents `SourceTask::stop` from being indirectly invoked on the herder's thread, which can have adverse effects if the task is unable to shut down promptly.
Unit tests are tweaked where necessary to account for this new logic, which covers some edge cases mentioned in PR #5020 that were unaddressed up until now.
The existing integration tests for blocking connectors are expanded to also include cases for blocking source and sink tasks. Full coverage of every source/sink task method is intentionally omitted from these expanded tests in order to avoid inflating test runtime (each one adds an extra 5 seconds at minimum) and because the tests that are added here were sufficient to reproduce the bug with source task shutdown.
Author: Chris Egerton <chrise@confluent.io>
Reviewers: Randall Hauch <rhauch@gmail.com>
If a connector, converter, etc. invokes [Configuration::setConfiguration](https://docs.oracle.com/javase/8/docs/api/javax/security/auth/login/Configuration.html#setConfiguration-javax.security.auth.login.Configuration-), it will cause the Connect basic auth filter to use that JAAS config instead of the one configured at startup with the `-Djava.security.auth.login.config` JVM property. This can cause requests to the worker to succeed initially but start to be rejected after the JVM's global JAAS config is altered.
To alleviate this the current PR instructs the Connect Worker to cache the JVM's global JAAS configuration at the beginning (as soon as the `BasicAuthSecurityRestExtension` class is loaded), and use that for all future authentication.
Existing tests for the JAAS basic auth filter are modified to work with the new internal logic. The `testEmptyCredentialsFile` test is corrected to actually operate on an empty credentials file (instead of a non-existent credentials file, which it currently operates on). A new test is added to ensure that, even if the global JAAS config is overwritten, the basic auth filter will use the first one it could find.
Reviewers: Greg Harris <gregh@confluent.io>, Konstantine Karantasis <k.karantasis@gmail.com>
This is similar to config change logs for topic/broker. This will be useful for debugging any acl related issues.
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
Closes#9841 from omkreddy/acl-change-log
KAFKA-10235 fixed a consistency issue with the transaction timeout and the progress timeout. Since the test case relies on transaction timeouts, we need to wait at last as long as the timeout in order to ensure progress. However, having a low transaction timeout makes the test prone to the issue identified in KAFKA-9802, in which the coordinator timed out the transaction while the producer was awaiting a Produce response.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Boyang Chen <boyang@confluent.io>, Jun Rao <junrao@gmail.com>
Fixes:
* DOMDeserializer: setExpandEntityReferences(false) may not prevent
external entity
expansion in all cases (CVE-2020-25649)
Full details:
https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.10#micro-patches
The cherry-pick to 2.5 also updated jackson from 2.10.2 to 2.10.5.
Reviewers: Ismael Juma <ismael@juma.me.uk>
Normally the `EmbeddedConnectCluster` class masks the `Exit` procedures using within the Connect worker. This normally works great when a single instance of the embedded cluster is used. However, the `MirrorConnectorsIntegrationTest` uses two `EmbeddedConnectCluster` instances, and when the first one is stopped it would reset the (static) exit procedures, and any problems during shutdown of the second embedded Connect cluster would cause the worker to shut down the JVM running the tests.
Instead, the `MirrorConnectorsIntegrationTest` class should mask the `Exit` procedures and instruct the `EmbeddedConnectClusters` instances (via the existing builder method) to not mask the procedures.
Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This patch fixes a liveness bug which prevents follower truncation from completing after a leader election. If there are consecutive leader elections without writing any data entries, then the leader and follower may have conflicting epoch entries at the end of the log.
The problem is the shortcut return in `Log.truncateTo` when the truncation offset is larger than or equal to the end offset, which prevents the conflicting entries from being resolved. Here we change this case to ensure `LeaderEpochFileCache.truncateFromEnd` is still called.
Reviewers: Jun Rao <junrao@gmail.com>
Ducktape version 0.7.10 pinned paramiko to version 2.3.2 to deal with random SSHExceptions confluent had been seeing since ducktape was updated to a later version of paramiko.
The idea is that we can backport ducktape 0.7.10 change as far back as possible, while 2.7 and trunk can update to 0.8.0 and python3 separately.
Tested:
In progress, but unlikely to affect anything, since the only difference between ducktape 0.7.9 and 0.7.10 is paramiko version downgrade.
Author: Stanislav Vodetskyi <stan@confluent.io>
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
Closes#9490 from stan-confluent/ducktape-710-26
(cherry picked from commit 1cbc4da0c9)
Signed-off-by: Manikumar Reddy <manikumar.reddy@gmail.com>
DistributedHerder goes to updateConfigsWithIncrementalCooperative() synchronized method and called configBackingStore.snapshot() which take a lock on internal object in KafkaConfigBackingStore class.
Meanwhile KafkaConfigBackingStore in ConsumeCallback inside synchronized block on internal object gets SESSION_KEY record and calls updateListener.onSessionKeyUpdate() which take a lock on DistributedHerder.
This results to a deadlock.
To avoid this, updateListener with new session key should be called outside synchronized block as it's done, for example, for updateListener.onTaskConfigUpdate(updatedTasks).
Co-authored-by: Taisiia Goltseva <tado0618@netcracker.com>
Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
ClusterConnectStatesTest and ClientUtilsTest were failing because they expected
kafka.apache.org to resolve to 2 IP addresses. This updates the tests so they reflect
that DNS resolves to 3 addresses.
Reviewers: Ismael Juma <ismael@juma.me.uk>
Trigger task reconfiguration when:
- topic-partitions are created or deleted on source cluster
- topic-partitions are missing on target cluster
Authors: Mickael Maison <mickael.maison@gmail.com>, Edoardo Comar <ecomar@uk.ibm.com>
Reviewer: Randall Hauch <rhauch@gmail.com>
Connect should not always add an error to configuration values in validation results that don't have a `ConfigKey` defined in the connector's `ConfigDef`, and any errors on such configuration values included by the connector should be counted in the total number of errors. Added more unit tests for `AbstractHerder.generateResult(...)`.
Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
The `org.apache.kafka.connect.data.Values#parse` method parses integers, which are larger than `Long.MAX_VALUE` as `double` with `Schema.FLOAT64_SCHEMA`.
That means we are losing precision for these larger integers.
For example:
`SchemaAndValue schemaAndValue = Values.parseString("9223372036854775808");`
returns:
`SchemaAndValue{schema=Schema{FLOAT64}, value=9.223372036854776E18}`
Also, this method parses values that can be parsed as `FLOAT32` to `FLOAT64`.
This PR changes parsing logic, to use `FLOAT32`/`FLOAT64` for numbers that don't have fraction part(`decimal.scale()!=0`) only, and use an arbitrary-precision `org.apache.kafka.connect.data.Decimal` otherwise.
Also, it updates the method to parse numbers, that can be represented as `float` to `FLOAT64`.
Added unit tests, that cover parsing `BigInteger`, `Byte`, `Short`, `Integer`, `Long`, `Float`, `Double` types.
Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
System.currentTimeMillis() is not monotonic, so using that to calculate time to sleep can result in negative values. That will throw IllegalArgumentException.
This change checks for that and sleeps for a second (to avoid tight loop) if the value returned is negative.
Author: Shaik Zakir Hussain <zhussain@confluent.io>
Reviewer: Randall Hauch <rhauch@gmail.com>
Fixes a regression introduced in `JsonConverter` with previous upgrades from Jackson Databind 2.9.x to 2.10.x. Jackson Databind version 2.10.0 included a backward-incompatible behavioral change to use `JsonNodeType.MISSING` (and `MissingNode`, the subclass of `JsonNode` that has a type of `MISSING`) instead of `JsonNodeType.NULL` / `NullNode`. See https://github.com/FasterXML/jackson-databind/issues/2211 for details of this change.
This change makes recovers the older `JsonConverter` behavior of returning null on empty input.
Added two unit tests for this change. Both unit tests were independently tested with earlier released versions and passed on all versions that used Jackson 2.9.x and earlier, and failed on all versions that used 2.10.x and that did not have the fixed included in the PR. Both of the new unit tests pass with this fix to `JsonConverter`.
Author: Shaik Zakir Hussain <zhussain@confluent.io>
Reviewer: Randall Hauch <rhauch@gmail.com>
Add logic to reset the existing `canReadConfigs` in `DistributedHerder` once the herder is able to successfully read the configs again. Added unit test to verify the functionality.
Author: Chris Egerton <chrise@confluent.io>
Reviewer: Nigel Liang <nigel@nigelliang.com>, Randall Hauch <rhauch@gmail.com>