Commit Graph

615 Commits

Author SHA1 Message Date
Ismael Juma ad541b9759
MINOR: KafkaBroker.brokerState should be volatile instead of AtomicReference (#10080)
We don't need or use the additional functionality provided by
AtomicReference.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Arthur <mumrah@gmail.com>
2021-02-10 07:12:06 -08:00
Randall Hauch 982ea2f6a4
KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics (#9780)
The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic.

However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end.

Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity.

Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance.

The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes.

These changes are implemented as follows:
1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets.
2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance.
3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed.
4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic.
5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.)
6. Change `MirrorMaker` similarly to `ConnectDistributed`.
7. Change existing unit tests to no longer use deprecated constructors.
8. Add unit tests for new functionality.

Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
2021-02-09 11:09:41 -06:00
dengziming 3769bc21b5
MINOR: replace hard-coding utf-8 with StandardCharsets.UTF_8 (#10079)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2021-02-09 10:06:01 +08:00
Lev Zemlyanov 07843cfbf4
KAFKA-10834: Remove redundant type casts in Connect (#10053)
Cleanup up to remove redundant type casts in Connect and use the diamond operator when needed 

Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
2021-02-04 17:08:56 -08:00
Mickael Maison b78fa9cea2
KAFKA-10833: Expose task configurations in Connect REST API (KIP-661) (#9726)
This PR adds a new REST endpoint to Connect: GET /{connector}/tasks-config, that returns the configuration of all tasks for the connector.

Details in: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-661%3A+Expose+task+configurations+in+Connect+REST+API

Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
Co-authored-by: Oliver Dineen <dineeno@uk.ibm.com>

Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
2021-02-04 17:01:07 -08:00
Lev Zemlyanov c19a35d1b7
KAFKA-10835: Replace Runnable and Callable overrides with lambdas in Connect (#9867)
Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
2021-02-04 09:15:49 -08:00
Randall Hauch 5eb8a238e2
KAFKA-12270: Handle race condition when Connect tasks attempt to create topics (#10032)
When a source connector is configured to create missing topics has multiple tasks that generate records for the same topic, it is possible that multiple tasks may simultaneously describe the topic, find it does not exist, and attempt to create the task. One of those create topic requests will succeed, and the other concurrent tasks will receive the response from the topic admin as having not created the task and will fail unnecessarily.

This change corrects the logic by moving the `TopicAdmin` logic to create a topic to a new `createOrFindTopics(…)` method that returns the set of created topic names and the set of existing topic names. This allows the existing `createTopics(…)` and `createTopic(…)` methods to retain the same behavior, but also allows the `WorkerSourceTask` to know from its single call to this new method whether the topic was created or was found to exist.

This adds one unit test and modifies several unit tests in `WorkerSourceTaskWithTopicCreationTest` that use mocks to verify the behavior, and modifies several existing unit tests for `TopicAdminTest` to ensure the logic of the new method is as expected.

Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
2021-02-03 18:29:55 -06:00
Yilong Chang c64241f9c2
KAFKA-12259: Use raw config to infer the connector type when returning a connector status response (#10040)
Problem: when calling the connect status endpoint, a 500 error is returned, e.g.
```
{
  "error_code": 500,
  "message": "Could not read properties from file /tmp/somefile.properties"
}
```
when any of the connectors has an exception from the config provider. This is because the `AbstractHerder` is trying to use the resolved config to infer the type of the connector. However, only the `connector.class` is needed from the config to infer if a specific connector is of source or sink type. The endpoint should still return the status of the connector instead of a 500 error.

This change uses the raw config from the config backing store to retrieve the connector class to avoid any variable resolution.

Unit tests have been updated to reflect this change.

Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
2021-02-03 15:39:32 -08:00
Chris Egerton 0fe9fde376
KAFKA-10895: Gracefully handle invalid JAAS configs (follow up fix) (#9987)
Fixes a regression introduced by https://github.com/apache/kafka/pull/9806 in the original fix for KAFKA-10895

It was discovered that if an invalid JAAS config is present on the worker, invoking Configuration::getConfiguration throws an exception. The changes from #9806 cause that exception to be thrown during plugin scanning, which causes the worker to fail even if it is not configured to use the basic auth extension at all.

This follow-up handles invalid JAAS configurations more gracefully, and only throws them if the worker is actually configured to use the basic auth extension, at the time that the extension is instantiated and configured.

Two unit tests are added.

Reviewers: Greg Harris <gregh@confluent.io>, Konstantine Karantasis <k.karantasis@gmail.com>
2021-02-03 13:51:03 -08:00
Colin Patrick McCabe 772f2cfc82
MINOR: Replace BrokerStates.scala with BrokerState.java (#10028)
Replace BrokerStates.scala with BrokerState.java, to make it easier to use from Java code if needed.  This also makes it easier to go from a numeric type to an enum.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2021-02-03 13:41:38 -08:00
Ramesh Krishnan M e260f64a9c
KAFKA-10413: Allow for even distribution of lost/new tasks when multiple Connect workers join at the same time (#9319)
First issue: When more than one workers join the Connect group the incremental cooperative assignor revokes and reassigns at most average number of tasks per worker.
Side-effect: This results in the additional workers joining the group stay idle and would require more future rebalances to happen to have even distribution of tasks.
Fix: As part of task assignment calculation following a deployment, the reassignment of tasks are calculated by revoking all the tasks above the rounded up (ceil) average number of tasks.

Second issue: When more than one worker is lost and rejoins the group at most one worker will be re assigned with the lost tasks from all the workers that left the group.
Side-effect: In scenarios where more than one worker is lost and rejoins the group only one among them gets assigned all the partitions that were lost in the past. The additional workers that have joined would not get any task assigned to them until a rebalance that happens in future.
Fix: As part fo lost task re assignment all the new workers that have joined the group would be considered for task assignment and would be assigned in a round robin fashion with the new tasks.

Testing strategy : 
* System testing in a Kubernetes environment completed
* New integration tests to test for balanced tasks
* Updated unit tests. 

Co-authored-by: Rameshkrishnan Muthusamy <rameshkrishnan_muthusamy@apple.com>
Co-authored-by: Randall Hauch <rhauch@gmail.com>
Co-authored-by: Konstantine Karantasis <konstantine@confluent.io>

Reviewers: Randall Hauch <rhauch@gmail.com>, Konstantine Karantasis <k.karantasis@gmail.com>
2021-02-02 12:04:06 -08:00
Ismael Juma bf4afae8f5 MINOR: Fix MirrorMakerConfigTest build failures 2021-01-28 18:32:35 -08:00
Julien Chanaud a63094930f
KAFKA-10710; MM2 - Create herders only if source->target.enabled=true and heartbeats are disabled (#9589)
By default Mirror Maker 2 creates herders for all the possible combinations even if the "links" are not enabled.

This is because the beats are emitted from the "opposite" herder.
If there is a replication flow from A to B and heartbeats are required, 2 herders are needed :

- A->B for the MirrorSourceConnector
- B->A for the MirrorHeartbeatConnector

The MirrorHeartbeatConnector on B->A emits beats into topic heartbeats on cluster A.
The MirrorSourceConnector on A->B then replicates whichever topic is configured as well as heartbeats.

In cases with multiple clusters (10 and more), this leads to an incredible amount of connections, file descriptors and configuration topics created in every target clusters that are not necessary.

With this code change, we will leverage the top level property "emit.heartbeats.enabled" which defaults to "true".
We skip creating the A->B herder whenever A->B.emit.heartbeats.enabled=false (defaults to true) and A->B.enabled=false (defaults to false). 

Existing users will not see any change and if they depend on these "opposites" herders for their monitoring, it will still work.
New users with more complex use case can change this property and fine tune their heartbeat generation.

Reviewers: Ryanne Dolan <ryannedolan@gmail.com>,  Sanjana Kaundinya <skaundinya@gmail.com>, Jason Gustafson <jason@confluent.io>
2021-01-28 14:52:51 -08:00
Chia-Ping Tsai 923dea34b8
KAFKA-10658 ErrantRecordReporter.report always return completed futur… (#9525)
Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
2021-01-28 14:12:54 +08:00
Geordie fb6c7beb29
MINOR: Remove redundant casting and if condition from ConnectSchema (#9959)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2021-01-27 14:52:33 +08:00
Chris Egerton 8bdab2e4cf
MINOR: Remove outdated comment in Connect's WorkerCoordinator (#9805)
Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2021-01-27 14:50:01 +08:00
Greg Harris f572545611
KAFKA-10763: Fix incomplete cooperative rebalances preventing connector/task revocations (#9765)
When two cooperative rebalances take place soon after one another, a prior rebalance may not complete before the next rebalance is started.
Under Eager rebalancing, no tasks would have been started, so the subsequent onRevoked call is intentionally skipped whenever rebalanceResolved was false.
Under Incremental Cooperative rebalancing, the same logic causes the DistributedHerder to skip stopping all of the connector/task revocations which occur in the second rebalance.
The DistributedHerder still removes the revoked connectors/tasks from its assignment, so that the DistributedHerder and Worker have different knowledge of running connectors/tasks.
This causes the connector/task instances that would have been stopped to disappear from the rebalance protocol, and left running until their workers are halted, or they fail.
Connectors/Tasks which were then reassigned to other workers by the rebalance protocol would be duplicated, and run concurrently with zombie connectors/tasks.
Connectors/Tasks which were reassigned back to the same worker would encounter exceptions in Worker, indicating that the connector/task existed and was already running.

* Added test for revoking and then reassigning a connector under normal circumstances
* Added test for revoking and then reassigning a connector following an incomplete cooperative rebalance
* Changed expectRebalance to make assignment fields mutable before passing them into the DistributedHerder
* Only skip revocation for the Eager protocol, and never skip revocation for incremental cooperative protocols

Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
2021-01-26 10:17:05 -08:00
Luke Chen 45b7a0a334
KAFKA-12229: Restore original class loader in integration tests using EmbeddedConnectCluster during shutdown (#9942)
Fixes errors such as: 
```
java.lang.NullPointerException at 
org.apache.kafka.connect.mirror.MirrorSourceConnector.listTopics(MirrorSourceConnector.java:348) at 
org.apache.kafka.connect.mirror.MirrorSourceConnector.findSourceTopicPartitions(MirrorSourceConnector.java:192) at 
org.apache.kafka.connect.mirror.MirrorSourceConnectorTest.testRefreshTopicPartitionsTopicOnTargetFirst(MirrorSourceConnectorTest.java:222)
```

It was a difficult to debug issue due to class loading interference between the Connect worker and Mockito. Digging into the Mockito, found it's not about JUnit 5, it's because of the class loader. In Mockito, we rely on the class loader to generate the proxy instance ([source](https://github.com/mockito/mockito/blob/release/3.x/src/main/java/org/mockito/internal/creation/bytebuddy/SubclassBytecodeGenerator.java#L91)) to intercept the method call, and if the class loader is not expected, we'll generate the wrong proxy instance (with wrong class path). We set the class loader during connector start to resolve conflicting dependencies ([KIP-146](https://cwiki.apache.org/confluence/display/KAFKA/KIP-146+-+Classloading+Isolation+in+Connect)), so we should set it back to the original class loader after connector stop in tests (`EmbeddedConnectCluster` is only used in tests) for the following Mockito works as expected.

So, there's an interference of integration tests with unit tests when Connect integration tests run before the MM2 unit tests, and that will cause the Mockito used in unit tests not work as expected.

Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
2021-01-25 19:36:57 -08:00
Chia-Ping Tsai f2ac0c62ef
KAFKA-12221 remove PowerMock from connect-json module and connect-transforms module (#9924)
Reviewers: Ismael Juma <ismael@juma.me.uk>
2021-01-19 00:05:44 +08:00
Geordie 462be6da9b
KAFKA-12196: Migrate connect:api module to JUnit 5 (#9909)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Ismael Juma <ismael@juma.me.uk>
2021-01-17 19:17:47 -08:00
CHUN-HAO TANG 962b69b5b3
KAFKA-12200: Migrate connect:file module to JUnit 5 (#9917)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Ismael Juma <ismael@juma.me.uk>
2021-01-17 17:01:51 -08:00
Chia-Ping Tsai e0ebb1d202
KAFKA-12202 Migrate connect:mirror module to JUnit 5 (#9894)
1. Replace junit 4 APIs by junit 5
2. Remove the dependencies of junit 4 from `EmbeddedKafkaCluster`

Reviewers: Ismael Juma <ismael@juma.me.uk>
2021-01-16 09:30:04 -08:00
dengziming 4126cfd049
KAFKA-12197: Migrate connect:transforms module to JUnit 5 (#9907)
Reviewers: Ismael Juma <ismael@juma.me.uk>
2021-01-16 07:17:37 -08:00
dengziming 3e6c6f5950
KAFKA-12198: Migrate connect:json module to JUnit 5 (#9890)
Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>
2021-01-16 03:14:08 +08:00
Chia-Ping Tsai 0c92b8398d
KAFKA-12203 Migrate connect:mirror-client module to JUnit 5 (#9889)
Reviewers: Ismael Juma <ismael@juma.me.uk>
2021-01-15 13:42:07 +08:00
Ismael Juma a68c14190e
KAFKA-12201: Migrate connect:basic-auth-extensio module to JUnit 5 (#9892)
Also:
* Remove unused powermock dependency
* Remove "examples" from the JUnit 4 list since one module was already
converted and the other has no tests

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2021-01-14 15:43:47 -08:00
Ning Zhang 2cde6f61b8
KAFKA-10304: Refactor MM2 integration tests (#9224)
Co-authored-by: Ning Zhang <nzhang1220@fb.com>
Reviewers: Mickael Maison <mickael.maison@gmail.com>
2021-01-14 14:48:17 +00:00
Chris Egerton 7455b70102
KAFKA-10895: Attempt to prevent JAAS config from being overwritten for basic auth filter in Connect (#9806)
If a connector, converter, etc. invokes [Configuration::setConfiguration](https://docs.oracle.com/javase/8/docs/api/javax/security/auth/login/Configuration.html#setConfiguration-javax.security.auth.login.Configuration-), it will cause the Connect basic auth filter to use that JAAS config instead of the one configured at startup with the `-Djava.security.auth.login.config` JVM property. This can cause requests to the worker to succeed initially but start to be rejected after the JVM's global JAAS config is altered.

To alleviate this the current PR instructs the Connect Worker to cache the JVM's global JAAS configuration at the beginning (as soon as the `BasicAuthSecurityRestExtension` class is loaded), and use that for all future authentication. 

Existing tests for the JAAS basic auth filter are modified to work with the new internal logic. The `testEmptyCredentialsFile` test is corrected to actually operate on an empty credentials file (instead of a non-existent credentials file, which it currently operates on). A new test is added to ensure that, even if the global JAAS config is overwritten, the basic auth filter will use the first one it could find.

Reviewers: Greg Harris <gregh@confluent.io>, Konstantine Karantasis <k.karantasis@gmail.com>
2021-01-11 21:20:57 -08:00
dengziming 119a2d9127
MINOR: Substitute assertEquals(null) with assertNull (#9852)
Reviewers: David Jacot <djacot@confluent.io>
2021-01-10 20:06:37 +01:00
Chia-Ping Tsai 913a019d6c
MINOR: replace test "expected" parameter by assertThrows (#9520)
This PR includes following changes.

1. @Test(expected = Exception.class) is replaced by assertThrows
2. remove reference to org.scalatest.Assertions
3. change the magic code from 1 to 2 for testAppendAtInvalidOffset to test ZSTD
4. rename testMaybeAddPartitionToTransactionXXXX to testNotReadyForSendXXX
5. increase maxBlockMs from 1s to 3s to avoid unexpected timeout from TransactionsTest#testTimeout

Reviewers: Ismael Juma <ismael@confluent.io>
2021-01-10 20:20:13 +08:00
dengziming 0e91d053f3
MINOR: rename @returns to @return (#9808)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2021-01-03 21:04:28 +08:00
Chris Egerton afa5423356
MINOR: Fix connector startup error logging (#9784)
If a connector fails on startup, the original cause of the error gets discarded by the framework and the only message that gets logged looks like this:

```
[2020-12-04 16:46:30,464] ERROR [Worker clientId=connect-1, groupId=connect-cluster] Failed to start connector 'conn-1' (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
org.apache.kafka.connect.errors.ConnectException: Failed to start connector: conn-1
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$startConnector$5(DistributedHerder.java:1297)
        at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:258)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1321)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1400(DistributedHerder.java:127)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:1329)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:1325)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
```

The changes here should cause the original cause of the connector startup failure to be logged as well.

```
[2020-12-30 09:56:35,481] ERROR [test-connector|worker] [Worker clientId=connect-1, groupId=connect-cluster] Failed to start connector 'conn-1' (org.apache.kafka.connect.runtime.distributed.DistributedHerder:599)
org.apache.kafka.connect.errors.ConnectException: Failed to start connector: conn-1
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$startConnector$4(DistributedHerder.java:1298)
	at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:294)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1322)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.processConnectorConfigUpdates(DistributedHerder.java:597)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:416)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:294)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements
```

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2020-12-30 23:06:29 +08:00
APaMio 1670362236
MINOR: Replace Collection.toArray(new T[size]) by Collection.toArray(new T[0]) (#9750)
This PR is based on the research of https://shipilev.net/blog/2016/arrays-wisdom-ancients

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2020-12-21 17:38:33 +08:00
Tom Bentley 67150b815e
KAFKA-10846: Grow buffer in FileSourceStreamTask only when needed (#9735)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2020-12-18 12:01:38 +08:00
Anastasia Vela 1a10c3445e
KAFKA-10525: Emit JSONs with new auto-generated schema (KIP-673) (#9526)
This patch updates the request logger to output request and response payloads in JSON. Payloads are converted to JSON based on their auto-generated schema.

Reviewers:  Lucas Bradstreet <lucas@confluent.io>, David Mao <dmao@confluent.io>, David Jacot <djacot@confluent.io>
2020-12-15 14:33:36 +01:00
Ismael Juma 1f98112e99
MINOR: Remove connection id from Send and consolidate request/message utils (#9714)
Connection id is now only present in `NetworkSend`, which is now
the class used by `Selector`/`NetworkClient`/`KafkaChannel` (which
works well since `NetworkReceive` is the class used for
received data).

The previous `NetworkSend` was also responsible for adding a size
prefix. This logic is already present in `SendBuilder`, but for the
minority of cases where `SendBuilder` is not used (including
a number of tests), we now have `ByteBufferSend.sizePrefixed()`.

With regards to the request/message utilities:
* Renamed `toByteBuffer`/`toBytes` in `MessageUtil` to
`toVersionPrefixedByteBuffer`/`toVersionPrefixedBytes` for clarity.
* Introduced new `MessageUtil.toByteBuffer` that does not include
the version as the prefix.
* Renamed `serializeBody` in `AbstractRequest/Response` to
`serialize` for symmetry with `parse`.
* Introduced `RequestTestUtils` and moved relevant methods from
`TestUtils`.
* Moved `serializeWithHeader` methods that were only used in
tests to `RequestTestUtils`.
* Deleted `MessageTestUtil`.

Finally, a couple of changes to simplify coding patterns:
* Added `flip()` and `buffer()` to `ByteBufferAccessor`.
* Added `MessageSizeAccumulator.sizeExcludingZeroCopy`.
* Used lambdas instead of `TestCondition`.
* Used `Arrays.copyOf` instead of `System.arraycopy` in `MessageUtil`.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jason Gustafson <jason@confluent.io>
2020-12-09 11:15:58 -08:00
APaMio c5575801b7
MINOR: Using primitive data types for loop index (#9705)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2020-12-09 10:44:55 +08:00
Ismael Juma 6f27bb02da
KAFKA-10818: Skip conversion to `Struct` when serializing generated requests/responses (#7409)
Generated request/response classes have code to serialize/deserialize directly to
`ByteBuffer` so the intermediate conversion to `Struct` can be skipped for them.
We have recently completed the transition to generated request/response classes,
so we can also remove the `Struct` based fallbacks.

Additional noteworthy changes:
* `AbstractRequest.parseRequest` has a more efficient computation of request size that
relies on the received buffer instead of the parsed `Struct`.
* Use `SendBuilder` for `AbstractRequest/Response` `toSend`, made the superclass
implementation final and removed the overrides that are no longer necessary.
* Removed request/response constructors that assume latest version as they are unsafe
outside of tests.
* Removed redundant version fields in requests/responses.
* Removed unnecessary work in `OffsetFetchResponse`'s constructor when version >= 2.
* Made `AbstractResponse.throttleTimeMs()` abstract.
* Using `toSend` in `SaslClientAuthenticator` instead of `serialize`.
* Various changes in Request/Response classes to make them more consistent and to
rely on the Data classes as much as possible when it comes to their state.
* Remove the version argument from `AbstractResponse.toString`.
* Fix `getErrorResponse` for `ProduceRequest` and `DescribeClientQuotasRequest` to
use `ApiError` which processes the error message sent back to the clients. This was
uncovered by an accidental fix to a `RequestResponseTest` test (it was calling
`AbstractResponse.toString` instead of `AbstractResponse.toString(short)`).

Rely on existing protocol tests to ensure this refactoring does not change 
observed behavior (aside from improved performance).

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2020-12-07 15:39:57 -08:00
Randall Hauch 8db3b1a09a
KAFKA-10811: Correct the MirrorConnectorsIntegrationTest to correctly mask the exit procedures (#9698)
Normally the `EmbeddedConnectCluster` class masks the `Exit` procedures using within the Connect worker. This normally works great when a single instance of the embedded cluster is used. However, the `MirrorConnectorsIntegrationTest` uses two `EmbeddedConnectCluster` instances, and when the first one is stopped it would reset the (static) exit procedures, and any problems during shutdown of the second embedded Connect cluster would cause the worker to shut down the JVM running the tests.

Instead, the `MirrorConnectorsIntegrationTest` class should mask the `Exit` procedures and instruct the `EmbeddedConnectClusters` instances (via the existing builder method) to not mask the procedures.

Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2020-12-07 09:34:34 -06:00
Chris Egerton 4f2f08eb00
KAFKA-10792: Prevent source task shutdown from blocking herder thread (#9669)
Changes the `WorkerSourceTask` class to only call `SourceTask::stop` from the task thread when the task is actually stopped (via `Source:task::close` just before `WorkerTask::run` completes), and only if an attempt has been made to start the task (which will not be the case if it was created in the paused state and then shut down before being started). This prevents `SourceTask::stop` from being indirectly invoked on the herder's thread, which can have adverse effects if the task is unable to shut down promptly.

Unit tests are tweaked where necessary to account for this new logic, which covers some edge cases mentioned in PR #5020 that were unaddressed up until now.

The existing integration tests for blocking connectors are expanded to also include cases for blocking source and sink tasks. Full coverage of every source/sink task method is intentionally omitted from these expanded tests in order to avoid inflating test runtime (each one adds an extra 5 seconds at minimum) and because the tests that are added here were sufficient to reproduce the bug with source task shutdown.

Author: Chris Egerton <chrise@confluent.io>
Reviewers: Nigel Liang <nigel@nigelliang.com>, Tom Bentley <tbentley@redhat.com>, Randall Hauch <rhauch@gmail.com>
2020-12-04 11:48:23 -06:00
ArunParthiban-ST cc1aa3b83d
KAFKA-10770: Remove duplicate defination of Metrics#getTags (#9659)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2020-12-01 13:10:22 +08:00
Tom Bentley 0df461582c
KAFKA-10720: Document prohibition on header mutation by SMTs (#9597)
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chris Egerton <fearthecellos@gmail.com>
2020-11-26 22:39:41 +00:00
abc863377 5200eaab25
MINOR: Remove unnecessary statement from WorkerConnector#doRun (#9653)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2020-11-25 22:48:02 +08:00
abc863377 047ad654da
MINOR: Remove redundant argument from TaskMetricsGroup#recordCommit (#9642)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2020-11-24 15:02:56 +08:00
Samuel Cantero 1625984149
MINOR: Allow Checkpoints for consumers using static partition assignments (#9545)
Reviewers: Mickael Maison <mickael.maison@gmail.com>
2020-11-19 15:01:22 +00:00
Chris Egerton 83f75464c2
MINOR: Include connector name in error message (#9599)
Reviewers: Randall Hauch <rhauch@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2020-11-18 15:25:21 +08:00
abc863377 aafab199e3
MINOR: Initialize ConnectorConfig constructor with emptyMap and avoid instantiating a new Map (#9603)
The map passed as an argument remains read-only and therefore can be initialized using Collections#emptyMap instead of being passed a new Map instance. 

Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
2020-11-17 09:51:12 -08:00
Ivan Yurchenko ec3feb4f09
MINOR: move connectorConfig to AbstractHerder (#6820)
StandaloneHerder and DistributedHerder have identical implementations of connectorConfig (apart from one line of logging). This commit moves the common implementation of connectorConfig to AbstractHerder.

Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2020-11-04 11:20:11 +08:00
Matthias J. Sax cf78fbe41e
MINOR: improve `null` checks for headers (#9513)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Luke Chen @showuon
2020-10-29 16:45:43 -07:00
xakassi f38ebbaa5b
KAFKA-10426: Deadlock in DistributedHerder during session key update. (#9431)
DistributedHerder goes to updateConfigsWithIncrementalCooperative() synchronized method and called configBackingStore.snapshot() which take a lock on internal object in KafkaConfigBackingStore class.

Meanwhile KafkaConfigBackingStore in ConsumeCallback inside synchronized block on internal object gets SESSION_KEY record and calls updateListener.onSessionKeyUpdate() which take a lock on DistributedHerder.

This results to a deadlock.

To avoid this, updateListener with new session key should be called outside synchronized block as it's done, for example, for updateListener.onTaskConfigUpdate(updatedTasks).

Co-authored-by: Taisiia Goltseva <tado0618@netcracker.com>

Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
2020-10-20 09:05:30 -07:00