Commit Graph

517 Commits

Author SHA1 Message Date
Randall Hauch 221a6d3595 MINOR: Use MessageDigest equals when comparing signature (#10898) 2021-06-18 11:59:44 -05:00
Chris Egerton d5e85efb95 KAFKA-12474: Handle failure to write new session keys gracefully (#10396)
If a distributed worker fails to write (or read back) a new session key to/from the config topic, it dies. This fix softens the blow a bit by instead restarting the herder tick loop anew and forcing a read to the end of the config topic until the worker is able to successfully read to the end.

At this point, if the worker was able to successfully write a new session key in its first attempt, it will have read that key back from the config topic and will not write a new key during the next tick iteration. If it was not able to write that key at all, it will try again to write a new key (if it is still the leader).

Verified with new unit tests for both cases (failure to write, failure to read back after write).

Author: Chris Egerton <chrise@confluent.io>
Reviewers: Greg Harris <gregh@confluent.io>, Randall Hauch <rhauch@gmail.com>
2021-04-01 12:27:14 -05:00
Luke Chen 98596b3c49 KAFKA-10192: Increase max time to wait for worker to start in some integration tests (#10118)
Author: Luke Chen <showuon@gmail.com>
Reviewers: Chris Egerton <chrise@confluent.io>, Randall Hauch <rhauch@gmail.com>
2021-03-09 12:34:20 -06:00
Randall Hauch f4bef70dc6 KAFKA-12343: Handle exceptions better in TopicAdmin, including UnsupportedVersionException (#10158)
Refactored the KafkaBasedLog logic to read end offsets into a separate method to make it easier to test. Also changed the TopicAdmin.endOffsets method to throw the original UnsupportedVersionException, LeaderNotAvailableException, and TimeoutException rather than wrapping, to better conform with the consumer method and how the KafkaBasedLog retries those exceptions.

Added new tests to verify various scenarios and errors.

Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
2021-02-19 16:20:55 -06:00
Randall Hauch 4b02c40e8e KAFKA-12340: Fix potential resource leak in Kafka*BackingStore (#10153)
These Kafka*BackingStore classes used in Connect have a recently-added deprecated constructor, which is not used within AK. However, this commit corrects a AdminClient resource leak if those deprecated constructors are used outside of AK. The fix simply ensures that the AdminClient created by the “default” supplier is always closed when the Kafka*BackingStore is stopped.

Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
2021-02-19 11:54:14 -06:00
Randall Hauch db803ec1c6 KAFKA-12326: Corrected regresion in MirrorMaker 2 executable introduced with KAFKA-10021 (#10122)
Fixes the recent change to the `MirrorMaker` class (used only in the MirrorMaker 2 executable) that uses a `SharedTopicAdmin` client as part of Connect, so that the correct properties into the `SharedTopicAdmin`.

Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
2021-02-12 17:38:37 -06:00
Randall Hauch 0338995d73 KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics (#9780)
The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic.

However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end.

Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity.

Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance.

The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes.

These changes are implemented as follows:
1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets.
2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance.
3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed.
4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic.
5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.)
6. Change `MirrorMaker` similarly to `ConnectDistributed`.
7. Change existing unit tests to no longer use deprecated constructors.
8. Add unit tests for new functionality.

Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
2021-02-09 19:50:21 -06:00
Chris Egerton 8cef4cd5a5 KAFKA-10895: Gracefully handle invalid JAAS configs (follow up fix) (#9987)
Fixes a regression introduced by https://github.com/apache/kafka/pull/9806 in the original fix for KAFKA-10895

It was discovered that if an invalid JAAS config is present on the worker, invoking Configuration::getConfiguration throws an exception. The changes from #9806 cause that exception to be thrown during plugin scanning, which causes the worker to fail even if it is not configured to use the basic auth extension at all.

This follow-up handles invalid JAAS configurations more gracefully, and only throws them if the worker is actually configured to use the basic auth extension, at the time that the extension is instantiated and configured.

Two unit tests are added.

Reviewers: Greg Harris <gregh@confluent.io>, Konstantine Karantasis <k.karantasis@gmail.com>
2021-02-03 15:17:29 -08:00
Ramesh Krishnan M a645c25669 KAFKA-10413: Allow for even distribution of lost/new tasks when multiple Connect workers join at the same time (#9319)
First issue: When more than one workers join the Connect group the incremental cooperative assignor revokes and reassigns at most average number of tasks per worker.
Side-effect: This results in the additional workers joining the group stay idle and would require more future rebalances to happen to have even distribution of tasks.
Fix: As part of task assignment calculation following a deployment, the reassignment of tasks are calculated by revoking all the tasks above the rounded up (ceil) average number of tasks.

Second issue: When more than one worker is lost and rejoins the group at most one worker will be re assigned with the lost tasks from all the workers that left the group.
Side-effect: In scenarios where more than one worker is lost and rejoins the group only one among them gets assigned all the partitions that were lost in the past. The additional workers that have joined would not get any task assigned to them until a rebalance that happens in future.
Fix: As part fo lost task re assignment all the new workers that have joined the group would be considered for task assignment and would be assigned in a round robin fashion with the new tasks.

Testing strategy :
* System testing in a Kubernetes environment completed
* New integration tests to test for balanced tasks
* Updated unit tests.

Co-authored-by: Rameshkrishnan Muthusamy <rameshkrishnan_muthusamy@apple.com>
Co-authored-by: Randall Hauch <rhauch@gmail.com>
Co-authored-by: Konstantine Karantasis <konstantine@confluent.io>

Reviewers: Randall Hauch <rhauch@gmail.com>, Konstantine Karantasis <k.karantasis@gmail.com>
2021-02-02 12:25:39 -08:00
Greg Harris 7896915190 KAFKA-10763: Fix incomplete cooperative rebalances preventing connector/task revocations (#9765)
When two cooperative rebalances take place soon after one another, a prior rebalance may not complete before the next rebalance is started.
Under Eager rebalancing, no tasks would have been started, so the subsequent onRevoked call is intentionally skipped whenever rebalanceResolved was false.
Under Incremental Cooperative rebalancing, the same logic causes the DistributedHerder to skip stopping all of the connector/task revocations which occur in the second rebalance.
The DistributedHerder still removes the revoked connectors/tasks from its assignment, so that the DistributedHerder and Worker have different knowledge of running connectors/tasks.
This causes the connector/task instances that would have been stopped to disappear from the rebalance protocol, and left running until their workers are halted, or they fail.
Connectors/Tasks which were then reassigned to other workers by the rebalance protocol would be duplicated, and run concurrently with zombie connectors/tasks.
Connectors/Tasks which were reassigned back to the same worker would encounter exceptions in Worker, indicating that the connector/task existed and was already running.

* Added test for revoking and then reassigning a connector under normal circumstances
* Added test for revoking and then reassigning a connector following an incomplete cooperative rebalance
* Changed expectRebalance to make assignment fields mutable before passing them into the DistributedHerder
* Only skip revocation for the Eager protocol, and never skip revocation for incremental cooperative protocols

Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
2021-01-26 11:39:07 -08:00
Chris Egerton 36ae1f5e12
KAFKA-10792: Prevent source task shutdown from blocking herder thread (#9669) (#9880)
Changes the `WorkerSourceTask` class to only call `SourceTask::stop` from the task thread when the task is actually stopped (via `Source:task::close` just before `WorkerTask::run` completes), and only if an attempt has been made to start the task (which will not be the case if it was created in the paused state and then shut down before being started). This prevents `SourceTask::stop` from being indirectly invoked on the herder's thread, which can have adverse effects if the task is unable to shut down promptly.

Unit tests are tweaked where necessary to account for this new logic, which covers some edge cases mentioned in PR #5020 that were unaddressed up until now.

The existing integration tests for blocking connectors are expanded to also include cases for blocking source and sink tasks. Full coverage of every source/sink task method is intentionally omitted from these expanded tests in order to avoid inflating test runtime (each one adds an extra 5 seconds at minimum) and because the tests that are added here were sufficient to reproduce the bug with source task shutdown.

Author: Chris Egerton <chrise@confluent.io>
Reviewers: Randall Hauch <rhauch@gmail.com>
2021-01-14 12:32:08 -06:00
Chris Egerton 522ce9d0d3 KAFKA-10895: Attempt to prevent JAAS config from being overwritten for basic auth filter in Connect (#9806)
If a connector, converter, etc. invokes [Configuration::setConfiguration](https://docs.oracle.com/javase/8/docs/api/javax/security/auth/login/Configuration.html#setConfiguration-javax.security.auth.login.Configuration-), it will cause the Connect basic auth filter to use that JAAS config instead of the one configured at startup with the `-Djava.security.auth.login.config` JVM property. This can cause requests to the worker to succeed initially but start to be rejected after the JVM's global JAAS config is altered.

To alleviate this the current PR instructs the Connect Worker to cache the JVM's global JAAS configuration at the beginning (as soon as the `BasicAuthSecurityRestExtension` class is loaded), and use that for all future authentication.

Existing tests for the JAAS basic auth filter are modified to work with the new internal logic. The `testEmptyCredentialsFile` test is corrected to actually operate on an empty credentials file (instead of a non-existent credentials file, which it currently operates on). A new test is added to ensure that, even if the global JAAS config is overwritten, the basic auth filter will use the first one it could find.

Reviewers: Greg Harris <gregh@confluent.io>, Konstantine Karantasis <k.karantasis@gmail.com>
2021-01-11 21:47:46 -08:00
Chris Egerton b2484755b2 KAFKA-9767: Add logging to basic auth rest extension (#8357)
Add logging to basic auth rest extension.

Author: Chris Egerton <chrise@confluent.io>
Reviewers: Magesh Nandakumar <magesh.n.kumar@gmail.com>, Randall Hauch <rhauch@gmail.com>
2021-01-11 21:46:34 -08:00
Randall Hauch 874198b873 KAFKA-10811: Correct the MirrorConnectorsIntegrationTest to correctly mask the exit procedures (#9698)
Normally the `EmbeddedConnectCluster` class masks the `Exit` procedures using within the Connect worker. This normally works great when a single instance of the embedded cluster is used. However, the `MirrorConnectorsIntegrationTest` uses two `EmbeddedConnectCluster` instances, and when the first one is stopped it would reset the (static) exit procedures, and any problems during shutdown of the second embedded Connect cluster would cause the worker to shut down the JVM running the tests.

Instead, the `MirrorConnectorsIntegrationTest` class should mask the `Exit` procedures and instruct the `EmbeddedConnectClusters` instances (via the existing builder method) to not mask the procedures.

Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2020-12-07 13:29:08 -06:00
xakassi 962c624af9 KAFKA-10426: Deadlock in DistributedHerder during session key update. (#9431)
DistributedHerder goes to updateConfigsWithIncrementalCooperative() synchronized method and called configBackingStore.snapshot() which take a lock on internal object in KafkaConfigBackingStore class.

Meanwhile KafkaConfigBackingStore in ConsumeCallback inside synchronized block on internal object gets SESSION_KEY record and calls updateListener.onSessionKeyUpdate() which take a lock on DistributedHerder.

This results to a deadlock.

To avoid this, updateListener with new session key should be called outside synchronized block as it's done, for example, for updateListener.onTaskConfigUpdate(updatedTasks).

Co-authored-by: Taisiia Goltseva <tado0618@netcracker.com>

Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
2020-10-20 09:08:05 -07:00
Mickael Maison 71f7ed3923 KAFKA-10332: Update MM2 refreshTopicPartitions() logic (#9343)
Trigger task reconfiguration when:
- topic-partitions are created or deleted on source cluster
- topic-partitions are missing on target cluster

Authors: Mickael Maison <mickael.maison@gmail.com>, Edoardo Comar <ecomar@uk.ibm.com>
Reviewer: Randall Hauch <rhauch@gmail.com>
2020-10-19 12:12:54 -05:00
Randall Hauch 6688ea2235 KAFKA-10600: Connect should not add error to connector validation values for properties not in connector’s ConfigDef (#9425)
Connect should not always add an error to configuration values in validation results that don't have a `ConfigKey` defined in the connector's `ConfigDef`, and any errors on such configuration values included by the connector should be counted in the total number of errors. Added more unit tests for `AbstractHerder.generateResult(...)`.

Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
2020-10-16 09:17:34 -05:00
Chris Egerton 673eaea10e KAFKA-10574: Fix infinite loop in Values::parseString (#9375)
Fix infinite loop in Values::parseString

Author: Chris Egerton <chrise@confluent.io>
Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Randall Hauch <rhauch@gmail.com>
2020-10-12 11:49:21 -05:00
Chris Egerton 47c517931c KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop (#8910) 2020-10-06 13:59:49 -05:00
Alex Diachenko b926ccbbd1 KAFKA-10439: Connect's Values to parse BigInteger as Decimal with zero scale. (#9320)
The `org.apache.kafka.connect.data.Values#parse` method parses integers, which are larger than `Long.MAX_VALUE` as `double` with `Schema.FLOAT64_SCHEMA`.

That means we are losing precision for these larger integers.

For example:
`SchemaAndValue schemaAndValue = Values.parseString("9223372036854775808");`
returns:
`SchemaAndValue{schema=Schema{FLOAT64}, value=9.223372036854776E18}`

Also, this method parses values that can be parsed as `FLOAT32` to `FLOAT64`.

This PR changes parsing logic, to use `FLOAT32`/`FLOAT64` for numbers that don't have fraction part(`decimal.scale()!=0`) only, and use an arbitrary-precision `org.apache.kafka.connect.data.Decimal` otherwise.
Also, it updates the method to parse numbers, that can be represented as `float` to `FLOAT64`.

Added unit tests, that cover parsing `BigInteger`, `Byte`, `Short`, `Integer`, `Long`, `Float`, `Double` types.

Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
2020-10-05 17:27:33 -07:00
Vikas Singh d2c332a1be KAFKA-10531: Check for negative values to Thread.sleep call (#9347)
System.currentTimeMillis() is not monotonic, so using that to calculate time to sleep can result in negative values. That will throw IllegalArgumentException.

This change checks for that and sleeps for a second (to avoid tight loop) if the value returned is negative.

Author: Shaik Zakir Hussain <zhussain@confluent.io>
Reviewer: Randall Hauch <rhauch@gmail.com>
2020-10-05 14:10:27 -05:00
Shaik Zakir Hussain 94fd8ff345 KAFKA-10477: Fix JsonConverter regression to treat MISSING nodes as NULL nodes (#9306)
Fixes a regression introduced in `JsonConverter` with previous upgrades from Jackson Databind 2.9.x to 2.10.x. Jackson Databind version 2.10.0 included a backward-incompatible behavioral change to use `JsonNodeType.MISSING` (and `MissingNode`, the subclass of `JsonNode` that has a type of `MISSING`) instead of `JsonNodeType.NULL` / `NullNode`. See https://github.com/FasterXML/jackson-databind/issues/2211 for details of this change.

This change makes recovers the older `JsonConverter` behavior of returning null on empty input.

Added two unit tests for this change. Both unit tests were independently tested with earlier released versions and passed on all versions that used Jackson 2.9.x and earlier, and failed on all versions that used 2.10.x and that did not have the fixed included in the PR. Both of the new unit tests pass with this fix to `JsonConverter`.

Author: Shaik Zakir Hussain <zhussain@confluent.io>
Reviewer: Randall Hauch <rhauch@gmail.com>
2020-10-02 09:25:43 -05:00
Chris Egerton 8ac397bf4f KAFKA-10218: Stop reading config topic in every subsequent tick if catchup fails once (#8973)
Add logic to reset the existing `canReadConfigs` in `DistributedHerder` once the herder is able to successfully read the configs again. Added unit test to verify the functionality.

Author: Chris Egerton <chrise@confluent.io>
Reviewer: Nigel Liang <nigel@nigelliang.com>, Randall Hauch <rhauch@gmail.com>
2020-09-28 18:33:42 -05:00
Randall Hauch 89978e1dcf
KAFKA-9066: Retain metrics for failed tasks (#8502) (#8854)
Author: Chris Egerton <chrise@confluent.io>
Reviewers: Nigel Liang <nigel@nigelliang.com>, Konstantine Karantasis <konstantine@confluent.io>, Randall Hauch <rhauch@gmail.com>
2020-08-12 17:42:12 -05:00
showuon 06341cfbac KAFKA-9509: Increase timeout when consuming records to fix flaky test in MM2 (#8894)
A simple increase in the timeout of the consumer that verifies that records have been replicated seems to fix the integration tests in `MirrorConnectorsIntegrationTest` that have been failing more often recently. 

Reviewers: Ryanne Dolan <ryannedolan@gmail.com>, Sanjana Kaundinya <skaundinya@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>
2020-06-29 13:24:04 -07:00
Chris Egerton a23bd36d2a KAFKA-9845: Warn users about using config providers with plugin.path property (#8455)
* KAFKA-9845: Fix plugin.path when config provider is used

* Revert "KAFKA-9845: Fix plugin.path when config provider is used"

This reverts commit 96caaa9a49.

* KAFKA-9845: Emit ERROR-level log message when config provider is used for plugin.path property

* KAFKA-9845: Demote log message level from ERROR to WARN

Co-Authored-By: Nigel Liang <nigel@nigelliang.com>

* KAFKA-94845: Fix failing unit tests

* KAFKA-9845: Add warning message to docstring for plugin.path config

* KAFKA-9845: Apply suggestions from code review

Co-authored-by: Randall Hauch <rhauch@gmail.com>

Co-authored-by: Nigel Liang <nigel@nigelliang.com>
Co-authored-by: Randall Hauch <rhauch@gmail.com>
2020-06-10 23:38:11 -05:00
Lucent-Wong 3b6b9d1ae7 KAFKA-9841: Revoke duplicate connectors and tasks when zombie workers return with an outdated assignment (#8453)
With Incremental Cooperative Rebalancing, if a worker returns after it's been out of the group for sometime (essentially as a zombie worker) and hasn't voluntarily revoked its own connectors and tasks in the meantime, there's the possibility that these assignments have been distributed to other workers and redundant connectors and tasks might be running now in the Connect cluster. 

This PR complements previous fixes such as KAFKA-9184, KAFKA-9849 and KAFKA-9851 providing a last line of defense against zombie tasks: if at any rebalance round the leader worker detects that there are duplicate assignments in the group, it revokes them completely and resolves duplication with a correct assignment in the rebalancing round that will follow task revocation. 

Author: Wang <ywang50@ebay.com>

Reviewer: Konstantine Karantasis <konstantine@confluent.io>
2020-06-10 20:15:51 -07:00
Mario Molina 9f0ec4d537 KAFKA-9985: Sink connector may exhaust broker when writing in DLQ (#8663)
* Validate topic name against DQL topic in Sink connector config

* Adding parseTopicsList method

* Suppress warning

* KAFKA-9985: Minor changes to improve readability of exception messages

Co-authored-by: Randall Hauch <rhauch@gmail.com>
2020-06-10 21:55:25 -05:00
Randall Hauch 95d6ae81a9
MINOR: Fix PluginUtilsTest that resulted from a bad backport (#8848) 2020-06-10 21:54:47 -05:00
Greg Harris 0cde74e977 KAFKA-9969: Exclude ConnectorClientConfigRequest from class loading isolation (#8630)
This fix excludes `ConnectorClientConfigRequest` and its inner class from class loading isolation in a similar way that KAFKA-8415 excluded `ConnectorClientConfigOverridePolicy`.

Reviewer: Konstantine Karantasis <konstantine@confluent.io>
2020-06-10 15:19:10 -07:00
Auston 8134f0cec6 KAFKA-8938: Improve allocations during Struct validation in ConnectSchema (#7384)
Struct value validation in Kafka Connect can be optimized
to avoid creating an Iterator when the expectedClasses list is of
size 1. This is a meaningful enhancement for high throughput
connectors.

Reviewers: Konstantine Karantasis <konstantine@confluent.io>
2020-06-10 00:14:33 -07:00
Konstantine Karantasis 1444ec6586 KAFKA-9849: Fix issue with worker.unsync.backoff.ms creating zombie workers when incremental cooperative rebalancing is used (#8827)
When Incremental Cooperative Rebalancing is enabled and a worker fails to read to the end of the config topic, it needs to voluntarily revoke its locally running tasks on time, before these tasks get assigned to another worker, creating a situation where redundant tasks are running in the Connect cluster. 

Additionally, instead of using the delay `worker.unsync.backoff.ms` that was defined for the eager rebalancing protocol and has a long default value (which coincidentally is equal to the default value of the rebalance delay of the incremental cooperative protocol), the worker should quickly attempt to re-read the config topic and backoff for a fraction of the rebalance delay. After this fix, the worker will retry for a maximum time of 5 times before it revokes its running assignment and for a cumulative delay less than the configured `scheduled.rebalance.max.delay.ms`.

Unit tests are added to cover the backoff logic with incremental cooperative rebalancing. 

Reviewers: Randall Hauch <rhauch@gmail.com>
2020-06-09 13:10:52 -07:00
Konstantine Karantasis ea14a20feb KAFKA-9848: Avoid triggering scheduled rebalance delay when task assignment fails but Connect workers remain in the group (#8805)
In the first version of the incremental cooperative protocol, in the presence of a failed sync request by the leader, the assignor was designed to treat the unapplied assignments as lost and trigger a rebalance delay.

This commit applies optimizations in these cases to avoid the unnecessary activation of the rebalancing delay. First, if the worker that loses the sync group request or response is the leader, then it detects this failure by checking the what is the expected generation when it performs task assignments. If it's not the expected one, it resets its view of the previous assignment because it wasn't successfully applied and it doesn't represent a correct state. Furthermore, if the worker that has missed the assignment sync is an ordinary worker, then the leader is able to detect that there are lost assignments and instead of triggering a rebalance delay among the same members of the group, it treats the lost tasks as new tasks and reassigns them immediately. If the lost assignment included revocations that were not applied, the leader reapplies these revocations again.

Existing unit tests and integration tests are adapted to test the proposed optimizations.

Reviewers: Randall Hauch <rhauch@gmail.com>
2020-06-09 10:08:33 -07:00
Evelyn Bayes e90e8ca724 KAFKA-9216: Enforce internal config topic settings for Connect workers during startup (#8270)
Currently, Kafka Connect creates its config backing topic with a fire and forget approach.
This is fine unless someone has manually created that topic already with the wrong partition count.

In such a case Kafka Connect may run for some time. Especially if it's in standalone mode and once switched to distributed mode it will almost certainly fail.

This commits adds a check when the KafkaConfigBackingStore is starting.
This check will throw a ConfigException if there is more than one partition in the backing store.

This exception is then caught upstream and logged by either:
- DistributedHerder#run
- ConnectStandalone#main

A unit tests was added in KafkaConfigBackingStoreTest to verify the behaviour.

Author: Evelyn Bayes <evelyn@confluent.io>
Co-authored-by: Randall Hauch <rhauch@gmail.com>

Reviewer: Konstantine Karantasis <konstantine@confluent.io>
2020-06-07 12:44:39 -07:00
Konstantine Karantasis 40449d4512 KAFKA-9851: Revoking Connect tasks due to connectivity issues should also clear the running assignment (#8804)
Until recently revocation of connectors and tasks was the result of a rebalance that contained a new assignment. Therefore the view of the running assignment was kept consistent outside the call to `RebalanceListener#onRevoke`. However, after KAFKA-9184 the need appeared for the worker to revoke tasks voluntarily and proactively without having received a new assignment.

This commit will allow the worker to restart tasks that have been stopped as a result of voluntary revocation after a rebalance reassigns these tasks to the work.

The fix is tested by extending an existing integration test.

Reviewers: Randall Hauch <rhauch@gmail.com>
2020-06-05 16:13:32 -07:00
Chris Egerton 2ee9eedd5d KAFKA-9570: Define SSL configs in all worker config classes, not just distributed (#8135)
Define SSL configs in all worker config classes, not just distributed

Author: Chris Egerton <chrise@confluent.io>
Reviewers: Nigel Liang <nigel@nigelliang.com>, Randall Hauch <rhauch@gmail.com>
2020-06-05 16:19:20 -05:00
Randall Hauch bef00a1c41 MINOR: Change the order that Connect calls `config()` and `validate()` to avoid validating if the required ConfigDef is null (#8810)
Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
2020-06-05 15:02:56 -05:00
Chris Egerton b59f880f5d KAFKA-9472: Remove deleted Connect tasks from status store (#8118)
Although the statuses for tasks are removed from the status store when their connector is deleted, their statuses are not removed when only the task is deleted, which happens in the case that the number of tasks for a connector is reduced.

This commit adds logic for deleting the statuses for those tasks from the status store whenever a rebalance has completed and the leader of a distributed cluster has detected that there are recently-deleted tasks. Standalone is also updated to accomplish this.

Unit tests for the `DistributedHerder` and `StandaloneHerder` classes are updated and an integration test has been added.

Reviewers: Nigel Liang <nigel@nigelliang.com>, Konstantine Karantasis <konstantine@confluent.io>
2020-05-25 09:12:23 -07:00
Chris Egerton 2d92d2d73b KAFKA-9888: Copy connector configs before passing to REST extensions (#8511)
The changes made in KIP-454 involved adding a `connectorConfig` method to the ConnectClusterState interface that REST extensions could use to query the worker for the configuration of a given connector. The implementation for this method returns the Java `Map` that's stored in the worker's view of the config topic (when running in distributed mode). No copying is performed, which causes mutations of that `Map` to persist across invocations of `connectorConfig` and, even worse, propagate to the worker when, e.g., starting a connector.

In this commit the map is copied before it's returned to REST extensions.

An existing unit test is modified to ensure that REST extensions receive a copy of the connector config, not the original.

Reviewers: Nigel Liang <nigel@nigelliang.com>, Konstantine Karantasis <konstantine@confluent.io>
2020-05-23 15:39:17 -07:00
Randall Hauch 003c9d50d6 MINOR: Correct MirrorMaker2 integration test configs for Connect internal topics (#8653)
The replication factor for the Connect workers’ internal topics were incorrectly named and not actually used.
2020-05-21 10:56:37 -05:00
Chris Egerton 887a7869f7 KAFKA-9950: Construct new ConfigDef for MirrorTaskConfig before defining new properties (#8608)
`MirrorTaskConfig` class mutates the `ConfigDef` by defining additional properties, which leads to a potential `ConcurrentModificationException` during worker configuration validation and unintended inclusion of those new properties in the `ConfigDef` for the connectors which in turn is then visible via the REST API's `/connectors/{name}/config/validate` endpoint.

The fix here is a one-liner that just creates a copy of the `ConfigDef` before defining new properties.

Reviewers: Ryanne Dolan <ryannedolan@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>
2020-05-20 21:20:46 -07:00
Chris Egerton 06844e78e7 KAFKA-8869: Remove task configs for deleted connectors from config snapshot (#8444)
Currently, if a connector is deleted, its task configurations will remain in the config snapshot tracked by the KafkaConfigBackingStore. This causes issues with incremental cooperative rebalancing, which utilizes that config snapshot to determine which connectors and tasks need to be assigned across the cluster. Specifically, it first checks to see which connectors are present in the config snapshot, and then, for each of those connectors, queries the snapshot for that connector's task configs.

The lifecycle of a connector is for its configuration to be written to the config topic, that write to be picked up by the workers in the cluster and trigger a rebalance, the connector to be assigned to and started by a worker, task configs to be generated by the connector and then written to the config topic, that write to be picked up by the workers in the cluster and trigger a second rebalance, and finally, the tasks to be assigned to and started by workers across the cluster.

There is a brief period in between the first time the connector is started and when the second rebalance has completed during which those stale task configs from a previously-deleted version of the connector will be used by the framework to start tasks for that connector. This fix aims to eliminate that window by preemptively clearing the task configs from the config snapshot for a connector whenever it has been deleted.

An existing unit test is modified to verify this behavior, and should provide sufficient guarantees that the bug has been fixed.

Reviewers: Nigel Liang <nigel@nigelliang.com>, Konstantine Karantasis <konstantine@confluent.io>
2020-05-20 20:17:37 -07:00
Greg Harris e4fc22c051 KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions (#8618)
* If two exceptions are thrown the `closePartitions` exception is suppressed
* Add unit tests that throw exceptions in put and close to verify that
  the exceptions are propagated and suppressed appropriately out of WorkerSinkTask::execute

Reviewers: Chris Egerton <chrise@confluent.io>, Nigel Liang <nigel@nigelliang.com>, Konstantine Karantasis <konstantine@confluent.io>
2020-05-15 17:55:25 -07:00
Jeremy Custenborder e7df585864 KAFKA-9537 - Cleanup error messages for abstract transformations (#8090)
Added check if the transformation is abstract. If so throw an error message with guidance for the user. Ensure that the child classes are also not abstract.

Author: Jeremy Custenborder <jcustenborder@gmail.com>
Reviewer: Randall Hauch <rhauch@gmail.com>
2020-05-15 12:15:28 -05:00
Andy Coates 162448a2ae KAFKA-9667: Connect JSON serde strip trailing zeros (#8230)
This change turns on exact decimal processing in JSON Converter for deserializing decimals, meaning trailing zeros are maintained. Serialization was already using the decimal scale to output the right value, so this change means a value of `1.2300` can now be serialized to JSON and deserialized back to Connect without any loss of information.

Author: Andy Coates <big-andy-coates@users.noreply.github.com>
Reviewers: Randall Hauch <rhauch@gmail.com>, Almog Gavra <almog@confluent.io>
2020-05-07 17:21:15 -05:00
Chris Egerton 050ebef6b1 KAFKA-9768: Fix handling of rest.advertised.listener config (#8360)
The rest.advertised.listener config is currently broken as setting it to http when listeners are configured for both https and http will cause the framework to choose whichever of the two listeners is listed first. The changes here attempt to fix this by checking not only that ServerConnector::getName begins with the specified protocol, but also that that protocol is immediately followed by an underscore, which the framework uses as a delimiter between the protocol and the remainder of the connector name.

An existing unit test for the RestServer::advertisedUrl method has been expanded to include a case that fails with the framework in its current state and passes with the changes in this commit. 

* KAFKA-9768: Fix handling of rest.advertised.listener config
* KAFKA-9768: Add comments on server connector names
* KAFKA-9768: Update RestServerTest comment

Co-authored-by: Randall Hauch <rhauch@gmail.com>

Reviewers: Randall Hauch <rhauch@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>, Andrew Choi <andchoi@linkedin.com>
2020-05-06 18:10:49 -07:00
Tom Bentley 89317c6a9e KAFKA-9633: Ensure ConfigProviders are closed (#8204)
ConfigProvider extends Closeable, but were not closed in the following contexts:
* AbstractConfig
* WorkerConfigTransformer
* Worker

This commit ensures that ConfigProviders are close in the above contexts. 

It also adds MockFileConfigProvider.assertClosed()
Gradle executes test classes concurrently, so MockFileConfigProvider
can't simply use a static field to hold its closure state.
Instead use a protocol whereby the MockFileConfigProvider is configured
with some unique ket identifying the test which also used when calling
assertClosed().

Reviewers: Konstantine Karantasis <konstantine@confluent.io>
2020-04-30 10:35:00 -07:00
Chia-Ping Tsai f47e039bd9 MINOR: Fix unused arguments used in formatted string and log messages (#8036)
Reviewers: Ron Dagostino <rdagostino@confluent.io>, Ismael Juma <ismael@juma.me.uk>, Konstantine Karantasis <konstantine@confluent.io>
2020-04-29 22:41:29 -07:00
Chris Egerton 794230a1cc KAFKA-9919: Add logging to KafkaBasedLog::readToLogEnd (#8554)
Simple logging additions at TRACE level that should help when the worker can't get caught up to the end of an internal topic.

Reviewers: Gwen Shapira <cshapi@gmail.com>, Aakash Shah <ashah@confluent.io>, Konstantine Karantasis <konstantine@confluent.io>
2020-04-29 20:43:38 -07:00
Greg Harris bf50b8410e KAFKA-9830: Implement AutoCloseable in ErrorReporter and subclasses (#8442)
* The DeadLetterQueueReporter has a KafkaProducer that it must close to clean up resources
* Currently, the producer and its threads are leaked every time a task is stopped
* Responsibility for cleaning up ErrorReporters is transitively assigned to the
    ProcessingContext, RetryWithToleranceOperator, and WorkerSinkTask/WorkerSinkTask classes
* One new unit test in ErrorReporterTest asserts that the producer is closed by the dlq reporter

Reviewers: Arjun Satish <arjun@confluent.io>, Chris Egerton <chrise@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>
2020-04-29 19:27:46 -07:00