We don't need or use the additional functionality provided by
AtomicReference.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Arthur <mumrah@gmail.com>
The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic.
However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end.
Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity.
Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance.
The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes.
These changes are implemented as follows:
1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets.
2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance.
3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed.
4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic.
5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.)
6. Change `MirrorMaker` similarly to `ConnectDistributed`.
7. Change existing unit tests to no longer use deprecated constructors.
8. Add unit tests for new functionality.
Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
Cleanup up to remove redundant type casts in Connect and use the diamond operator when needed
Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
When a source connector is configured to create missing topics has multiple tasks that generate records for the same topic, it is possible that multiple tasks may simultaneously describe the topic, find it does not exist, and attempt to create the task. One of those create topic requests will succeed, and the other concurrent tasks will receive the response from the topic admin as having not created the task and will fail unnecessarily.
This change corrects the logic by moving the `TopicAdmin` logic to create a topic to a new `createOrFindTopics(…)` method that returns the set of created topic names and the set of existing topic names. This allows the existing `createTopics(…)` and `createTopic(…)` methods to retain the same behavior, but also allows the `WorkerSourceTask` to know from its single call to this new method whether the topic was created or was found to exist.
This adds one unit test and modifies several unit tests in `WorkerSourceTaskWithTopicCreationTest` that use mocks to verify the behavior, and modifies several existing unit tests for `TopicAdminTest` to ensure the logic of the new method is as expected.
Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
Problem: when calling the connect status endpoint, a 500 error is returned, e.g.
```
{
"error_code": 500,
"message": "Could not read properties from file /tmp/somefile.properties"
}
```
when any of the connectors has an exception from the config provider. This is because the `AbstractHerder` is trying to use the resolved config to infer the type of the connector. However, only the `connector.class` is needed from the config to infer if a specific connector is of source or sink type. The endpoint should still return the status of the connector instead of a 500 error.
This change uses the raw config from the config backing store to retrieve the connector class to avoid any variable resolution.
Unit tests have been updated to reflect this change.
Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
Fixes a regression introduced by https://github.com/apache/kafka/pull/9806 in the original fix for KAFKA-10895
It was discovered that if an invalid JAAS config is present on the worker, invoking Configuration::getConfiguration throws an exception. The changes from #9806 cause that exception to be thrown during plugin scanning, which causes the worker to fail even if it is not configured to use the basic auth extension at all.
This follow-up handles invalid JAAS configurations more gracefully, and only throws them if the worker is actually configured to use the basic auth extension, at the time that the extension is instantiated and configured.
Two unit tests are added.
Reviewers: Greg Harris <gregh@confluent.io>, Konstantine Karantasis <k.karantasis@gmail.com>
Replace BrokerStates.scala with BrokerState.java, to make it easier to use from Java code if needed. This also makes it easier to go from a numeric type to an enum.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
First issue: When more than one workers join the Connect group the incremental cooperative assignor revokes and reassigns at most average number of tasks per worker.
Side-effect: This results in the additional workers joining the group stay idle and would require more future rebalances to happen to have even distribution of tasks.
Fix: As part of task assignment calculation following a deployment, the reassignment of tasks are calculated by revoking all the tasks above the rounded up (ceil) average number of tasks.
Second issue: When more than one worker is lost and rejoins the group at most one worker will be re assigned with the lost tasks from all the workers that left the group.
Side-effect: In scenarios where more than one worker is lost and rejoins the group only one among them gets assigned all the partitions that were lost in the past. The additional workers that have joined would not get any task assigned to them until a rebalance that happens in future.
Fix: As part fo lost task re assignment all the new workers that have joined the group would be considered for task assignment and would be assigned in a round robin fashion with the new tasks.
Testing strategy :
* System testing in a Kubernetes environment completed
* New integration tests to test for balanced tasks
* Updated unit tests.
Co-authored-by: Rameshkrishnan Muthusamy <rameshkrishnan_muthusamy@apple.com>
Co-authored-by: Randall Hauch <rhauch@gmail.com>
Co-authored-by: Konstantine Karantasis <konstantine@confluent.io>
Reviewers: Randall Hauch <rhauch@gmail.com>, Konstantine Karantasis <k.karantasis@gmail.com>
By default Mirror Maker 2 creates herders for all the possible combinations even if the "links" are not enabled.
This is because the beats are emitted from the "opposite" herder.
If there is a replication flow from A to B and heartbeats are required, 2 herders are needed :
- A->B for the MirrorSourceConnector
- B->A for the MirrorHeartbeatConnector
The MirrorHeartbeatConnector on B->A emits beats into topic heartbeats on cluster A.
The MirrorSourceConnector on A->B then replicates whichever topic is configured as well as heartbeats.
In cases with multiple clusters (10 and more), this leads to an incredible amount of connections, file descriptors and configuration topics created in every target clusters that are not necessary.
With this code change, we will leverage the top level property "emit.heartbeats.enabled" which defaults to "true".
We skip creating the A->B herder whenever A->B.emit.heartbeats.enabled=false (defaults to true) and A->B.enabled=false (defaults to false).
Existing users will not see any change and if they depend on these "opposites" herders for their monitoring, it will still work.
New users with more complex use case can change this property and fine tune their heartbeat generation.
Reviewers: Ryanne Dolan <ryannedolan@gmail.com>, Sanjana Kaundinya <skaundinya@gmail.com>, Jason Gustafson <jason@confluent.io>
When two cooperative rebalances take place soon after one another, a prior rebalance may not complete before the next rebalance is started.
Under Eager rebalancing, no tasks would have been started, so the subsequent onRevoked call is intentionally skipped whenever rebalanceResolved was false.
Under Incremental Cooperative rebalancing, the same logic causes the DistributedHerder to skip stopping all of the connector/task revocations which occur in the second rebalance.
The DistributedHerder still removes the revoked connectors/tasks from its assignment, so that the DistributedHerder and Worker have different knowledge of running connectors/tasks.
This causes the connector/task instances that would have been stopped to disappear from the rebalance protocol, and left running until their workers are halted, or they fail.
Connectors/Tasks which were then reassigned to other workers by the rebalance protocol would be duplicated, and run concurrently with zombie connectors/tasks.
Connectors/Tasks which were reassigned back to the same worker would encounter exceptions in Worker, indicating that the connector/task existed and was already running.
* Added test for revoking and then reassigning a connector under normal circumstances
* Added test for revoking and then reassigning a connector following an incomplete cooperative rebalance
* Changed expectRebalance to make assignment fields mutable before passing them into the DistributedHerder
* Only skip revocation for the Eager protocol, and never skip revocation for incremental cooperative protocols
Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
Fixes errors such as:
```
java.lang.NullPointerException at
org.apache.kafka.connect.mirror.MirrorSourceConnector.listTopics(MirrorSourceConnector.java:348) at
org.apache.kafka.connect.mirror.MirrorSourceConnector.findSourceTopicPartitions(MirrorSourceConnector.java:192) at
org.apache.kafka.connect.mirror.MirrorSourceConnectorTest.testRefreshTopicPartitionsTopicOnTargetFirst(MirrorSourceConnectorTest.java:222)
```
It was a difficult to debug issue due to class loading interference between the Connect worker and Mockito. Digging into the Mockito, found it's not about JUnit 5, it's because of the class loader. In Mockito, we rely on the class loader to generate the proxy instance ([source](https://github.com/mockito/mockito/blob/release/3.x/src/main/java/org/mockito/internal/creation/bytebuddy/SubclassBytecodeGenerator.java#L91)) to intercept the method call, and if the class loader is not expected, we'll generate the wrong proxy instance (with wrong class path). We set the class loader during connector start to resolve conflicting dependencies ([KIP-146](https://cwiki.apache.org/confluence/display/KAFKA/KIP-146+-+Classloading+Isolation+in+Connect)), so we should set it back to the original class loader after connector stop in tests (`EmbeddedConnectCluster` is only used in tests) for the following Mockito works as expected.
So, there's an interference of integration tests with unit tests when Connect integration tests run before the MM2 unit tests, and that will cause the Mockito used in unit tests not work as expected.
Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
Also:
* Remove unused powermock dependency
* Remove "examples" from the JUnit 4 list since one module was already
converted and the other has no tests
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
If a connector, converter, etc. invokes [Configuration::setConfiguration](https://docs.oracle.com/javase/8/docs/api/javax/security/auth/login/Configuration.html#setConfiguration-javax.security.auth.login.Configuration-), it will cause the Connect basic auth filter to use that JAAS config instead of the one configured at startup with the `-Djava.security.auth.login.config` JVM property. This can cause requests to the worker to succeed initially but start to be rejected after the JVM's global JAAS config is altered.
To alleviate this the current PR instructs the Connect Worker to cache the JVM's global JAAS configuration at the beginning (as soon as the `BasicAuthSecurityRestExtension` class is loaded), and use that for all future authentication.
Existing tests for the JAAS basic auth filter are modified to work with the new internal logic. The `testEmptyCredentialsFile` test is corrected to actually operate on an empty credentials file (instead of a non-existent credentials file, which it currently operates on). A new test is added to ensure that, even if the global JAAS config is overwritten, the basic auth filter will use the first one it could find.
Reviewers: Greg Harris <gregh@confluent.io>, Konstantine Karantasis <k.karantasis@gmail.com>
This PR includes following changes.
1. @Test(expected = Exception.class) is replaced by assertThrows
2. remove reference to org.scalatest.Assertions
3. change the magic code from 1 to 2 for testAppendAtInvalidOffset to test ZSTD
4. rename testMaybeAddPartitionToTransactionXXXX to testNotReadyForSendXXX
5. increase maxBlockMs from 1s to 3s to avoid unexpected timeout from TransactionsTest#testTimeout
Reviewers: Ismael Juma <ismael@confluent.io>
If a connector fails on startup, the original cause of the error gets discarded by the framework and the only message that gets logged looks like this:
```
[2020-12-04 16:46:30,464] ERROR [Worker clientId=connect-1, groupId=connect-cluster] Failed to start connector 'conn-1' (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
org.apache.kafka.connect.errors.ConnectException: Failed to start connector: conn-1
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$startConnector$5(DistributedHerder.java:1297)
at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:258)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1321)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1400(DistributedHerder.java:127)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:1329)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:1325)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
```
The changes here should cause the original cause of the connector startup failure to be logged as well.
```
[2020-12-30 09:56:35,481] ERROR [test-connector|worker] [Worker clientId=connect-1, groupId=connect-cluster] Failed to start connector 'conn-1' (org.apache.kafka.connect.runtime.distributed.DistributedHerder:599)
org.apache.kafka.connect.errors.ConnectException: Failed to start connector: conn-1
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$startConnector$4(DistributedHerder.java:1298)
at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:294)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1322)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.processConnectorConfigUpdates(DistributedHerder.java:597)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:416)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:294)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements
```
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This patch updates the request logger to output request and response payloads in JSON. Payloads are converted to JSON based on their auto-generated schema.
Reviewers: Lucas Bradstreet <lucas@confluent.io>, David Mao <dmao@confluent.io>, David Jacot <djacot@confluent.io>
Connection id is now only present in `NetworkSend`, which is now
the class used by `Selector`/`NetworkClient`/`KafkaChannel` (which
works well since `NetworkReceive` is the class used for
received data).
The previous `NetworkSend` was also responsible for adding a size
prefix. This logic is already present in `SendBuilder`, but for the
minority of cases where `SendBuilder` is not used (including
a number of tests), we now have `ByteBufferSend.sizePrefixed()`.
With regards to the request/message utilities:
* Renamed `toByteBuffer`/`toBytes` in `MessageUtil` to
`toVersionPrefixedByteBuffer`/`toVersionPrefixedBytes` for clarity.
* Introduced new `MessageUtil.toByteBuffer` that does not include
the version as the prefix.
* Renamed `serializeBody` in `AbstractRequest/Response` to
`serialize` for symmetry with `parse`.
* Introduced `RequestTestUtils` and moved relevant methods from
`TestUtils`.
* Moved `serializeWithHeader` methods that were only used in
tests to `RequestTestUtils`.
* Deleted `MessageTestUtil`.
Finally, a couple of changes to simplify coding patterns:
* Added `flip()` and `buffer()` to `ByteBufferAccessor`.
* Added `MessageSizeAccumulator.sizeExcludingZeroCopy`.
* Used lambdas instead of `TestCondition`.
* Used `Arrays.copyOf` instead of `System.arraycopy` in `MessageUtil`.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jason Gustafson <jason@confluent.io>
Generated request/response classes have code to serialize/deserialize directly to
`ByteBuffer` so the intermediate conversion to `Struct` can be skipped for them.
We have recently completed the transition to generated request/response classes,
so we can also remove the `Struct` based fallbacks.
Additional noteworthy changes:
* `AbstractRequest.parseRequest` has a more efficient computation of request size that
relies on the received buffer instead of the parsed `Struct`.
* Use `SendBuilder` for `AbstractRequest/Response` `toSend`, made the superclass
implementation final and removed the overrides that are no longer necessary.
* Removed request/response constructors that assume latest version as they are unsafe
outside of tests.
* Removed redundant version fields in requests/responses.
* Removed unnecessary work in `OffsetFetchResponse`'s constructor when version >= 2.
* Made `AbstractResponse.throttleTimeMs()` abstract.
* Using `toSend` in `SaslClientAuthenticator` instead of `serialize`.
* Various changes in Request/Response classes to make them more consistent and to
rely on the Data classes as much as possible when it comes to their state.
* Remove the version argument from `AbstractResponse.toString`.
* Fix `getErrorResponse` for `ProduceRequest` and `DescribeClientQuotasRequest` to
use `ApiError` which processes the error message sent back to the clients. This was
uncovered by an accidental fix to a `RequestResponseTest` test (it was calling
`AbstractResponse.toString` instead of `AbstractResponse.toString(short)`).
Rely on existing protocol tests to ensure this refactoring does not change
observed behavior (aside from improved performance).
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Normally the `EmbeddedConnectCluster` class masks the `Exit` procedures using within the Connect worker. This normally works great when a single instance of the embedded cluster is used. However, the `MirrorConnectorsIntegrationTest` uses two `EmbeddedConnectCluster` instances, and when the first one is stopped it would reset the (static) exit procedures, and any problems during shutdown of the second embedded Connect cluster would cause the worker to shut down the JVM running the tests.
Instead, the `MirrorConnectorsIntegrationTest` class should mask the `Exit` procedures and instruct the `EmbeddedConnectClusters` instances (via the existing builder method) to not mask the procedures.
Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Changes the `WorkerSourceTask` class to only call `SourceTask::stop` from the task thread when the task is actually stopped (via `Source:task::close` just before `WorkerTask::run` completes), and only if an attempt has been made to start the task (which will not be the case if it was created in the paused state and then shut down before being started). This prevents `SourceTask::stop` from being indirectly invoked on the herder's thread, which can have adverse effects if the task is unable to shut down promptly.
Unit tests are tweaked where necessary to account for this new logic, which covers some edge cases mentioned in PR #5020 that were unaddressed up until now.
The existing integration tests for blocking connectors are expanded to also include cases for blocking source and sink tasks. Full coverage of every source/sink task method is intentionally omitted from these expanded tests in order to avoid inflating test runtime (each one adds an extra 5 seconds at minimum) and because the tests that are added here were sufficient to reproduce the bug with source task shutdown.
Author: Chris Egerton <chrise@confluent.io>
Reviewers: Nigel Liang <nigel@nigelliang.com>, Tom Bentley <tbentley@redhat.com>, Randall Hauch <rhauch@gmail.com>
The map passed as an argument remains read-only and therefore can be initialized using Collections#emptyMap instead of being passed a new Map instance.
Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
StandaloneHerder and DistributedHerder have identical implementations of connectorConfig (apart from one line of logging). This commit moves the common implementation of connectorConfig to AbstractHerder.
Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
DistributedHerder goes to updateConfigsWithIncrementalCooperative() synchronized method and called configBackingStore.snapshot() which take a lock on internal object in KafkaConfigBackingStore class.
Meanwhile KafkaConfigBackingStore in ConsumeCallback inside synchronized block on internal object gets SESSION_KEY record and calls updateListener.onSessionKeyUpdate() which take a lock on DistributedHerder.
This results to a deadlock.
To avoid this, updateListener with new session key should be called outside synchronized block as it's done, for example, for updateListener.onTaskConfigUpdate(updatedTasks).
Co-authored-by: Taisiia Goltseva <tado0618@netcracker.com>
Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>