Discovered while researching KAFKA-14718
Currently, we perform a check during zombie fencing that causes the round of zombie fencing to fail when a rebalance is pending (i.e., when we've detected from a background poll of the config topic that a new connector has been created, that an existing connector has been deleted, or that a new set of connector tasks has been generated).
It's possible but not especially likely that this check causes issues when running vanilla Kafka Connect. Even when it does, it's easy enough to restart failed tasks via the REST API.
However, when running MirrorMaker 2 in dedicated mode, this check is more likely to cause issues as we write three connector configs to the config topic in rapid succession on startup. And in that mode, there is no API to restart failed tasks aside from restarting the worker that they are hosted on.
In either case, this check can lead to test flakiness in integration tests for MirrorMaker 2 both in dedicated mode and when deployed onto a vanilla Kafka Connect cluster.
This check is not actually necessary, and we can safely remove it. Copied from Jira:
>If the worker that we forward the zombie fencing request to is a zombie leader (i.e., a worker that believes it is the leader but in reality is not), it will fail to finish the round of zombie fencing because it won't be able to write to the config topic with a transactional producer.
>If the connector has just been deleted, we'll still fail the request since we force a read-to-end of the config topic and refresh our snapshot of its contents before checking to see if the connector exists.
>And regardless, the worker that owns the task will still do a read-to-end of the config topic and verify that (1) no new task configs have been generated for the connector and (2) the worker is still assigned the connector, before allowing the task to process any data.
In addition, while waiting on a fix for KAFKA-14718 that adds more granularity for diagnosing failures in the DedicatedMirrorIntegrationTest suite (#13284), some of the timeouts in that test are bumped to work better on our CI infrastructure.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Yash Mayya <yash.mayya@gmail.com>, Viktor Somogyi-Vass <viktorsomogyi@gmail.com>
Most of the contents in the README.md was already covered in the docs therefore only had to add the section for Exactly Once support.
Reviewers: Luke Chen <showuon@gmail.com>
In case the Kafka Broker cluster and the Kafka Connect cluster is started together and Connect would want to create its topics, there's a high chance to fail the creation with InvalidReplicationFactorException.
---------
Co-authored-by: Daniel Urban <durban@cloudera.com>
Reviewers: Daniel Urban <durban@cloudera.com>, Mickael Maison <mickael.maison@gmail.com>, Viktor Somogyi-Vass <viktorsomogyi@gmail.com>, Chris Egerton <chrise@aiven.io>, Laszlo Hunyadi <laszlo.istvan.hunyady@gmail.com>
`KafkaBasedLog` is a widely used utility class that provides a generic implementation of a shared, compacted log of records in a Kafka topic. It isn't in Connect's public API, but has been used outside of Connect and we try to preserve backward compatibility whenever possible. KAFKA-14455 modified the two overloaded void `KafkaBasedLog::send` methods to return a `Future`. While this change is source compatible, it isn't binary compatible. We can restore backward compatibility simply by renaming the new Future returning send methods, and reinstating the older send methods to delegate to the newer methods.
This refactoring changes no functionality other than restoring the older methods.
Reviewers: Randall Hauch <rhauch@gmail.com>
#13557 introduced a utils method to close executors silently. This PR leverages that method to close executors in connect runtime. There was duplicate code while closing the executors which isn't the case with this PR.
Note that there are a few more executors used in Connect runtime but their close methods don't follow this pattern of shutdown, await and shutdown. Some of them have some logic like executor like Worker, so not changing at such places.
---------
Co-authored-by: Sagar Rao <sagarrao@Sagars-MacBook-Pro.local>
Reviewers: Daniel Urban <durban@cloudera.com>, Yash Mayya <yash.mayya@gmail.com>, Viktor Somogyi-Vass <viktorsomogyi@gmail.com>
We are handling complex workflows ListOffsets by chaining together MetadataCall instances and ListOffsetsCall instances, there are many complex and error-prone logic. In this PR we rewrote it with the `AdminApiDriver` infra, notable changes better than old logic:
1. Retry lookup stage on receiving `NOT_LEADER_OR_FOLLOWER` and `LEADER_NOT_AVAILABLE`, whereas in the past we failed the partition directly without retry.
2. Removing class field `supportsMaxTimestamp` and calculating it on the fly to avoid the mutable state, this won't change any behavior of the client.
3. Retry fulfillment stage on `RetriableException`, whereas in the past we just retry fulfillment stage on `InvalidMetadataException`, this means we will retry on `TimeoutException` and other `RetriableException`.
We also `handleUnsupportedVersionException` to `AdminApiHandler` and `AdminApiLookupStrategy`, they are used to keep consistency with old logic, and we can continue improvise them.
Reviewers: Ziming Deng <dengziming1993@gmail.com>, David Jacot <djacot@confluent.io>
KafkaStatusBackingStore uses an infinite retry logic on producer send, which can lead to a stack overflow.
To avoid the problem, a background thread was added, and the callback submits the retry onto the background thread.
New add.source.alias.to.metrics setting to add the source cluster alias to the MirrorSourceConnector metrics
Reviewers: Chris Egerton <fearthecellos@gmail.com>
This fixes a regression introduced in #12828, which caused workers to start unconditionally loading (and therefore validating) SSL-related properties when issuing REST requests to other workers. That was fine for the most part, but caused unnecessary failures when workers were configured with invalid SSL-related properties and their REST API used HTTP instead of HTTPS.
Reviewers: Ian McDonald <imcdonald@confluent.io>, Greg Harris <greg.harris@aiven.io>, Yash Mayya <yash.mayya@gmail.com>, Justine Olshan <jolshan@confluent.io>
KAFKA-12468: Fix negative lag on down consumer groups synced by MirrorMaker 2
KAFKA-13659: Stop syncing consumer groups with stale offsets in MirrorMaker 2
KAFKA-12566: Fix flaky MirrorMaker 2 integration tests
Reviewers: Chris Egerton <chrise@aiven.io>
* assertEquals called on array
* Method is identical to its super method
* Simplifiable assertions
* Unused imports
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Divij Vaidya <diviv@amazon.com>
Reviewers: Daniel Urban <durban@cloudera.com>, Greg Harris <greg.harris@aiven.io>, Viktor Somogyi-Vass <viktorsomogyi@gmail.com>, Mickael Maison <mickael.maison@gmail.com>
This is a small refactor extracted from https://github.com/apache/kafka/pull/12845. It basically moves the logic to handle the backward compatibility of `JoinGroupResponseData.protocolName` from `KafkaApis` to `JoinGroupResponse`.
The patch adds a new unit test for `JoinGroupResponse` and relies on existing tests as well.
Reviewers: Justine Olshan <jolshan@confluent.io>, Jason Gustafson <jason@confluent.io>
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Tom Bentley <tombentley@users.noreply.github.com>
Co-authored-by: Tom Bentley <tombentley@users.noreply.github.com>
Co-authored-by: oibrahim3 <omnia@apple.com>
Also moves the Streams LogCaptureAppender class into the clients module so that it can be used by both Streams and Connect.
Reviewers: Nigel Liang <nigel@nigelliang.com>, Kalpesh Patel <kpatel@confluent.io>, John Roesler <vvcephei@apache.org>, Tom Bentley <tbentley@redhat.com>
The motivation for this change is to guard against timing attacks when using InternalRequestSignature.equals()
Pros of this PR
if the InternalRequestSignature.equal() method could be used for a timing attack, then this PR fixes a security vulnerability
Cons of this PR
MessageDigest.isEquals() is slower than Arrays.equal since the former is time constant i.e. it runs for a fixed time irrespective of the length of original signature. The execution time of MessageDigest.isEquals() is a function of length of the byte array that it is being tested against.
Even if InternalRequestSignature.equals() is not being used anywhere in code today where it may cause a timing attack, we should still guard against the possibility where a future change might start using it (especially in an open source project where changes might be contributed & reviewed by multiple group of people). The downside of slower equality comparison over a signature is risk worth accepting given the upside we get to safeguard future use cases.
Co-authored-by: Nandini Krishna Anagondi <nandini@mac.local>
Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Karan Kumar <karankumar1100@gmail.com>, Andrew Eugene Choi <andrew.choi@uwaterloo.ca>
Why
Current set of integration tests leak files in the /tmp directory which makes it cumbersome if you don't restart the machine often.
Fix
Replace the usage of File.createTempFile with existing TestUtils.tempFile method across the test files. TestUtils.tempFile automatically performs a clean up of the temp files generated in /tmp/ folder.
Reviewers: Luke Chen <showuon@gmail.com>, Ismael Juma <mlists@juma.me.uk>
Also includes a minor quality-of-life improvement to clarify why some internal REST requests to workers may fail while that worker is still starting up.
Reviewers: Tom Bentley <tbentley@redhat.com>, Luke Chen <showuon@gmail.com>, José Armando García Sancio <jsancio@gmail.com>, Mickael Maison <mickael.maison@gmail.com>
This implements KIP-830: https://cwiki.apache.org/confluence/display/KAFKA/KIP-830%3A+Allow+disabling+JMX+Reporter
It adds a new configuration `auto.include.jmx.reporter` that can be set to false to disable the JMX Reporter. This configuration is deprecated and will be removed in the next major version.
Reviewers: Tom Bentley <tbentley@redhat.com>, Christo Lolov <christo_lolov@yahoo.com>
Cache the Kafka cluster Id once it has been retrieved to avoid creating many Admin clients at startup.
Reviewers: Chris Egerton <fearthecellos@gmail.com>
This PR is created on top of #10904 and includes commits from original author for attribution.
## Testing
1. `./gradlew connect:runtime:unitTest --tests WorkerGroupMemberTest` is successful.
2. Verified that test is run as part of `./gradlew connect:runtime:unitTest` (see report in the PR)
Reviewers: Ismael Juma <ismael@juma.me.uk>
Co-authored-by: Chun-Hao Tang <tang7526@gmail.com>
Catches valid 404 exceptions, triggered by any HTTP request to a nonexistent path on the Connect REST API, higher in the code to not to log an ERROR log which can be seen as a false alarm
Reviewers: Chris Egerton <fearthecellos@gmail.com>
We should not treat UNKNOWN_MEMBER_ID as an unexpected error in the Admin client. In MirrorMaker, check the result of committing offsets and log an useful error message in case that failed with UNKNOWN_MEMBER_ID.
Reviewers: Chris Egerton <fearthecellos@gmail.com>
The HeaderConverter interface extends Closeable, but we weren't closing them anywhere before. This change causes header converters to be closed as part of task shutdown.
Reviewers: Kvicii <42023367+Kvicii@users.noreply.github.com>, Chris Egerton <fearthecellos@gmail.com>
In KAFKA-13310, we tried to fix a issue that consumer#poll(duration) will be returned after the provided duration. It's because if rebalance needed, we'll try to commit current offset first before rebalance synchronously. And if the offset committing takes too long, the consumer#poll will spend more time than provided duration. To fix that, we change commit sync with commit async before rebalance (i.e. onPrepareJoin).
However, in this ticket, we found the async commit will keep sending a new commit request during each Consumer#poll, because the offset commit never completes in time. The impact is that the existing consumer will be kicked out of the group after rebalance timeout without joining the group. That is, suppose we have consumer A in group G, and now consumer B joined the group, after the rebalance, only consumer B in the group.
Besides, there's also another bug found during fixing this bug. Before KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry when retriable error until timeout. After KAFKA-13310, we thought we have retry, but we'll retry after partitions revoking. That is, even though the retried offset commit successfully, it still causes some partitions offsets un-committed, and after rebalance, other consumers will consume overlapping records.
Reviewers: RivenSun <riven.sun@zoom.us>, Luke Chen <showuon@gmail.com>
Make sure to ack all records where produce failed, when a connector's `errors.tolerance` config property is set to `all`. Acking is essential so that the task will continue to commit future record offsets properly and remove the records from internal tracking, preventing a memory leak.
(cherry picked and slightly modified from commit 63e06aafd0)
Reviewers: Chris Egerton <fearthecellos@gmail.com>, Randall Hauch <rhauch@gmail.com>
Implements embedded end-to-end integration tests for KIP-618, and brings together previously-decoupled logic from upstream PRs.
Reviewers: Luke Chen <showuon@gmail.com>, Tom Bentley <tbentley@redhat.com>, Mickael Maison <mickael.maison@gmail.com>
This causes the artificial reductions in the Connect REST request timeout to be more isolated. Specifically, they now only take place in the tests that need them (instead of any tests that happen to be running after the reduction has taken place and before it has been reset), and they are only performed for the requests that are expected to time out, before being immediately reset. This should help reduce spurious test failures (especially in slow environments like Jenkins) for all Connect integration tests that interact with the REST API, not just the BlockingConnectorTest test suite.
Reviewers: Bruno Cadonna <cadonna@apache.org>
Implements support for per-connector offsets topics as described in KIP-618.
Reviewers: Luke Chen <showuon@gmail.com>, Tom Bentley <tbentley@redhat.com>
Use the newly added function to replace the old addMetric function that may throw illegal argument exceptions.
Although in some cases concurrency should not be possible they do not necessarily remain always true in the future, so it's better to use the new API just to be less error-prone.
Reviewers: Bruno Cadonna <cadonna@apache.org>
New gradle task `connect:runtime:genConnectOpenAPIDocs` that generates `connect_rest.yaml` under `docs/generated`.
This task is executed when `siteDocsTar` runs.
Implements the behavior described in KIP-618: using a transactional producer for writes to the config topic that should only be performed by the leader of the cluster.
Reviewers: Luke Chen <showuon@gmail.com>, Tom Bentley <tbentley@redhat.com>
As an initial step to address the notoriously flaky BlockingConnectorTest test suite, we can try increasing test timeouts.
This approach may not be sufficient, and even if it is, it's still suboptimal. Although it may address flakiness on Jenkins, it will make genuine failures harder to detect when testing local changes. Additionally, if the workload on Jenkins continues to increase, we'll probably have to bump these timeouts in the future again at some point.
Potential next steps, for this PR and beyond:
Stop leaking threads that block during test runs
Instead of artificially reducing the REST request timeout at the beginning of every test, reduce it selectively right before issuing a REST request that is expected to time out, and then immediately reset it.
Eliminate artificial reduction of the REST request timeout entirely, as it may be negatively impacting other Connect integration tests that are being run concurrently.
Test repeatedly on Jenkins, ideally at least 50 times
Gather information on the number of CPU cores available to each Jenkins node and the distribution of how many threads are allocated over a given time period (maybe a day?); this is especially relevant since local testing indicates that these tests all do much better when parallelism is reduced, which shouldn't be too surprising considering that each Connect integration test spins up separate threads for at least one Zookeeper node, one Kafka broker, one Connect worker, and usually at least one connector and one task.
I'd like to test these changes as a first step before investigating any of the above (except maybe items 1 and 2, which should be fairly straightforward). To trigger new runs I plan on pushing empty commits or, if those do not trigger new Jenkins runs, dummy commits. If this is objectionable let me know and hopefully we can find a suitable alternative.
Reviewers: Kvicii <Karonazaba@gmail.com>, Bruno Cadonna <cadonna@apache.org>
Minor change to use ' and not LEFT SINGLE QUOTATION MARK in this log message, as it's the only place we are using such a quote and it can break ingestion pipelines
Reviewers: Kvicii <Karonazaba@gmail.com>, Divij Vaidya <diviv@amazon.com>, Konstantine Karantasis <k.karantasis@gmail.com>
The goals here include:
1. Create an overloaded variant of the IncrementalCooperativeAssignor::performTaskAssignment method that is more testing friendly
2. Simplify the parameter list for the IncrementalCooperativeAssignor::handleLostAssignments method, which in turn simplifies the logic for testing this class
3. Capture repeated Java 8 streams logic in simple, reusable, easily-verifiable utility methods added to the ConnectUtils class
Reviewers: Luke Chen <showuon@gmail.com>
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Reviewers: Tom Bentley <tbentley@redhat.com>, Hector Geraldino <hgeraldino@bloomberg.net>, Andrew Eugene Choi <andrew.choi@uwaterloo.ca>
The `DESTROYED` state is represented internally as a tombstone record when running in distributed mode and by the removal of the connector/task from the in-memory status map when running in standalone mode. As a result, it will never appear to users of the REST API, and we should remove mention of it from our docs so that developers creating tooling against the REST API don't write unnecessary logic to account for that state.
Reviewers: Mickael Maison <mickael.maison@gmail.com>
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Luke Chen <showuon@gmail.com>, Chris Egerton <fearthecellos@gmail.com>
Co-authored-by: Chris Egerton <fearthecellos@gmail.com>
The `retryEndOffsets(…)` method in `TopicAdmin` recently added (KAFKA-12879, #11797) to allow the `KafkaBasedLog.start()` method to retry any failures reading the last offsets for a topic. However, this introduce a regression when talking to older brokers (0.10.x or earlier).
The `KafkaBasedLog` already had logic that expected an `UnsupportedVersionException` thrown by the admin client when a Kafka API is not available on an older broker, but the new retry logic in `TopicAdmin` did not account for this and wrapped the exception, thereby breaking the `KafkaBasedLog` logic and preventing startup.
The fix is to propagate this `UnsupportedVersionException` from the `TopicAdmin.retryEndOffsets(…)` method. Added a new unit test that first replicated the problem before the fix, and verified the fix corrects the problem.
With AK 3.0, idempotence was enabled by default in Kafka producers. However, if idempotence is enabled, Connect won't be able to communicate via its producers with Kafka brokers older than version 0.11. Perhaps more importantly, for brokers older than version 2.8 the IDEMPOTENT_WRITE ACL is required to be granted to the principal of the Connect worker.
Therefore this commit disables producer idempotence by default to all the producers instantiated by Connect. Users can still choose to enable producer idempotence by explicitly setting the right worker and/or connector properties.
The changes were tested via existing unit, integration and system tests.
Reviewers: Randall Hauch <rhauch@gmail.com>
This reverts commit 76cf7a5793.
Connect already allows users to enable idempotent producers for connectors and the Connect workers. Although Kafka producers enabled idempotency by default in 3.0, due to compatibility requirements and the fact that [KIP-318](https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent) hasn't been explicitly approved, the changes here are reverted. A separate commit will explicitly disable idempotency in producers instantiated by Connect by default until KIP-318 is approved and scheduled for release.
I collected a list of the most flaky tests observed lately, checked / created their corresponding tickets, and mark them as ignored for now. Many of these failures are:
0. Failing very frequently in the past (at least in my observations).
1. not investigated for some time.
2. have a PR for review (mostly thanks to @showuon !), but not reviewed for some time.
Since 0), these tests failures are hindering our development; and from 1/2) above, people are either too busy to look after them, or honestly the tests are not considered as providing values since otherwise people should care enough to panic and try to resolve. So I think it's reasonable to disable all these tests for now. If we later learned our lesson a hard way, it would motivate us to tackle flaky tests more diligently as well.
I'm only disabling those tests that have been failed for a while, and if for such time no one have been looking into them, I'm concerned that just gossiping around about those flakiness would not bring people's attention to them either. So my psychological motivation is that "if people do not care about those failed tests for weeks (which, is not a good thing! :P), let's teach ourselves the lesson a hard way when it indeed buries a bug that bites us, or not learn the lesson at all --- that indicates those tests are indeed not valuable". For tests that I only very recently saw I did not disable them.
Reviewers: John Roesler <vvcephei@apache.org>, Matthias J. Sax <mjsax@apache.org>, Luke Chen <showuon@gmail.com>, Randall Hauch <rhauch@gmail.com>
This is an addendum to the KAFKA-12879 (#11797) to fix some tests that are somewhat flaky when a build machine is heavily loaded (when the timeouts are too small).
- Add an if check to void sleep(0)
- Increase timeout in the tests
Fixes the compatibility issue regarding KAFKA-12879 by reverting the changes to the admin client from KAFKA-12339 (#10152) that retry admin client operations, and instead perform the retries within Connect's `KafkaBasedLog` during startup via a new `TopicAdmin.retryEndOffsets(..)` method. This method delegates to the existing `TopicAdmin.endOffsets(...)` method, but will retry on `RetriableException` until the retry timeout elapses.
This change should be backward compatible to the KAFKA-12339 so that when Connect's `KafkaBasedLog` starts up it will retry attempts to read the end offsets for the log's topic. The `KafkaBasedLog` existing thread already has its own retry logic, and this is not changed.
Added more unit tests, and thoroughly tested the new `RetryUtil` used to encapsulate the parameterized retry logic around any supplied function.
Make SetSchemaMetadata SMT ignore records with null value and valueSchema or key and keySchema.
The transform has been unit tested for handling null values gracefully while still providing the necessary validation for non-null values.
Reviewers: Konstantine Karantasis<konstantine@confluent.io>, Bill Bejeck <bbejeck@apache.org>
Lower the log level of a message in `WorkerSourceTask` which indicates that no messages have been produced by the task since it is spammy and causing users confusion.
Reviewers: Jason Gustafson <jason@confluent.io>
This patch adds null value check to the connector config validation, and extends unit tests to cover this functionality.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chris Egerton <chrise@confluent.io>, Boyang Chen <bchen11@outlook.com>, Andras Katona <akatona@cloudera.com>
Title: KafkaConsumer cannot jump out of the poll method, and cpu and traffic on the broker side increase sharply
description: The local test has been passed, the problem described by jira can be solved
JIRA link : https://issues.apache.org/jira/browse/KAFKA-13310
Reviewers: Luke Chen <showuon@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
In v3.0, we changed the default value for `enable.idempotence` to true, but we didn't adjust the validator and the `idempotence` enabled check method. So if a user didn't explicitly enable idempotence, this feature won't be turned on. This patch addresses the problem, cleans up associated logic, and fixes tests that broke as a result of properly applying the new default. Specifically it does the following:
1. fix the `ProducerConfig#idempotenceEnabled` method, to make it correctly detect if `idempotence` is enabled or not
2. remove some unnecessary config overridden and checks due to we already default `acks`, `retries` and `enable.idempotence` configs.
3. move the config validator for the idempotent producer from `KafkaProducer` into `ProducerConfig`. The config validation should be the responsibility of `ProducerConfig` class.
4. add an `AbstractConfig#hasKeyInOriginals` method, to avoid `originals` configs get copied and only want to check the existence of the key.
5. fix many broken tests. As mentioned, we didn't actually enable idempotence in v3.0. After this PR, there are some tests broken due to some different behavior between idempotent and non-idempotent producer.
6. add additional tests to validate configuration behavior
Reviewers: Kirk True <kirk@mustardgrain.com>, Ismael Juma <ismael@juma.me.uk>, Mickael Maison <mimaison@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
EmbeddedKafkaCluster in other projects use 2MB for their offset map to reduce
memory consumption in test runs. Generally we allocate multiple of these offset maps,
one for each broker.
Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Ismael Juma <ismael@juma.me.uk>
I've skipped the following classes as they use powermock to stub/access private and static fields/methods:
- KafkaConfigBackingStoreTest
- KafkaOffsetBackingStoreTest
Those will require some refactoring and will be updated in a separate PR.
Reviewers: Tom Bentley <tbentley@redhat.com>, dengziming <dengziming1993@gmail.com>
Although committing source task offsets without blocking on the delivery of all in-flight records is beneficial most of the time, it can lead to duplicate record delivery if there are in-flight records at the time of the task's end-of-life offset commit.
A best-effort attempt is made here to wait for any such in-flight records to be delivered before proceeding with the end-of-life offset commit for source tasks. Connect will block for up to offset.flush.timeout.ms milliseconds before calculating the latest committable offsets for the task and flushing those to the persistent offset store.
Author: Chris Egerton <chrise@confluent.io>
Reviewer: Randall Hauch <rhauch@gmail.com>
The `WorkerSinkTask.lastCommittedOffsets` field is now added to (via `Map::putAll`) after a successful offset commit, instead of being completely overwritten. In order to prevent this collection from growing indefinitely, elements are removed from it after topic partitions are revoked from the task's consumer.
Two test cases are added to `WorkerSinkTaskTest`:
- A basic test to verify the "rewind for redelivery" behavior when a task throws an exception from `SinkTask::preCommit`; surprisingly, no existing test cases appear to cover this scenario
- A more sophisticated test to verify this same behavior, but with a few rounds of cooperative consumer rebalancing beforehand that expose a bug in the current logic for the `WorkerSinkTask` class
The `VerifiableSinkTask` class is also updated to only flush the requested topic partitions in its `flush` method. This is technically unrelated to the issue addressed by this PR and can be moved to a separate PR if necessary; including it here as the original context for identifying this bug was debugging failed system tests and the logic in this part of the tests was originally suspected as a cause of the test failure.
Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
When running in dedicated mode, Connect runtimes are configured to use the `.internal` suffix for their topics.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Omnia G H Ibrahim <o.g.h.ibrahim@gmail.com>
Currently, the `WorkerSinkTask`'s consumer rebalance listener (and related logic) is hardcoded to assume eager rebalancing, which means that all partitions are revoked any time a rebalance occurs and then the set of partitions included in `onPartitionsAssigned` is assumed to be the complete assignment for the task. Not only does this cause failures when the cooperative consumer protocol is used, it fails to take advantage of the benefits provided by that protocol.
These changes alter framework logic to not only not break when the cooperative consumer protocol is used for a sink connector, but to reap the benefits of it as well, by not revoking partitions unnecessarily from tasks just to reopen them immediately after the rebalance has completed.
This change will be necessary in order to support [KIP-726](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248), which currently proposes that the default consumer partition assignor be changed to the `CooperativeStickyAssignor`.
Two integration tests are added to verify sink task behavior with both eager and cooperative consumer protocols, and new and existing unit tests are adopted as well.
Reviewers: Nigel Liang <nigel@nigelliang.com>, Konstantine Karantasis <k.karantasis@gmail.com>
Replaces the current logic for committing source offsets, which is batch-based and blocks until the entirety of the current batch is fully written to and acknowledged by the broker, with a new non-blocking approach that commits source offsets for source records that have been "fully written" by the producer. The new logic consider a record fully written only if that source record and all records before it with the same source partition have all been written to Kafka and acknowledged.
This new logic uses a deque for every source partition that a source task produces records for. Each element in that deque is a SubmittedRecord with a flag to track whether the producer has ack'd the delivery of that source record to Kafka. Periodically, the worker (on the same thread that polls the source task for records and transforms, converts, and dispatches them to the producer) polls acknowledged elements from the beginning of each of these deques and collects the latest offsets from these elements, storing them in a snapshot that is then committed on the separate source task offset thread.
The behavior of the `offset.flush.timeout.ms property` is retained, but essentially now only applies to the actual writing of offset data to the internal offsets topic (if running in distributed mode) or the offsets file (if running in standalone mode). No time is spent during `WorkerSourceTask::commitOffsets` waiting on the acknowledgment of records by the producer.
This behavior also does not change how the records are dispatched to the producer nor how the producer sends or batches those records.
It's possible that memory exhaustion may occur if, for example, a single Kafka partition is offline for an extended period. In cases like this, the collection of deques in the SubmittedRecords class may continue to grow indefinitely until the partition comes back online and the SubmittedRecords in those deques that targeted the formerly-offline Kafka partition are acknowledged and can be removed. Although this may be suboptimal, it is no worse than the existing behavior of the framework in these cases.
Author: Chris Egerton <chrise@confluent.io>
Reviewed: Randall Hauch <rhauch@gmail.com>
* URL wasn't urlencoded when forwarded reconfiguration to leader connect worker
* handling previously swallowed errors in connect RestClient
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Viktor Somogyi-Vass <viktorsomogyi@gmail.com>
Co-authored-by: Andras Katona <akatona@cloudera.com>
Co-authored-by: Daniel Urban <durban@cloudera.com>
Even after the implementation of KIP-745 it makes sense to return a response code of 204 NO CONTENT when the request is to restart the connector but not the tasks.
This maintains the current behavior for this existing REST call and is also aligned with the description in the RFC:
https://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.2.5
Reviewers: Kalpesh Patel <kpatel@confluent.io>, Randall Hauch <rhauch@gmail.com>
Setting the max.poll.interval.ms to MAX_VALUE causes overflow when computing the joinGroupTimeoutMs and results in the JoinGroup timeout being set to the request.timeout.ms instead, which is much lower.
This can easily make consumers drop out of the group, since they must rejoin now within 30s (by default) yet have no obligation to almost ever call poll() given the high max.poll.interval.ms, especially when each record takes a long time to process or the `max.poll.records` is also very large. We just need to check for overflow and fix it to Integer.MAX_VALUE when it occurs.
Reviewers: Luke Chen <showuon@gmail.com>, John Roesler <vvcephei@apache.org>
Moved the responsibility for recording task and connector startup and failure metrics from the invocation code
into the status listener. The reason behind this is that the WorkerTasks (and subclasses) were either not propagating exceptions upwards, or were unable to do so easily because they were running on completely different threads.
Also split out WorkerMetricsGroup from being an inner class into being a standard class. This was to make sure
the Data Abstraction Count checkStyle rule was not violated.
Author: Michael Carter <michael.carter@instaclustr.com>
Reviewers: Chris Egerton <chrise@confluent.io>, Randall Hauch <rhauch@gmail.com>
[Jira](https://issues.apache.org/jira/browse/KAFKA-13017)
Reverts https://github.com/apache/kafka/pull/7496, which added `ERROR`-level logging for deserialization errors in sink tasks even when connectors had logging for these errors disabled.
No information is lost by this change that cannot be retained in an opt-in fashion by setting `errors.log.enable` and `errors.log.include.messages` to `true` in a connector config.
Reviewers: Arjun Satish <arjun@confluent.io>, Tom Bentley <tbentley@redhat.com>
* Adds SynchronizationTest class for concurrency testing of the classloading isolation mechanism
* Adds a test which deterministically reproduced a deadlock between simultaneous upward (Plugin -> Delegating) & downward (Delegating -> Plugin) class loading operations.
* Makes PluginClassLoader parallel capable, resolving the above deadlock by allowing multiple threads to concurrently use the PluginClassLoader.
* Makes DelegatingClassLoader parallel capable to allow parallel loading of classes from the parent loader (usually the system class loader)
Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>, Tom Bentley <tbentley@redhat.com>, Randall Hauch <rhauch@gmail.com>
The following error message
`org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT64: class java.lang.Long for field: "moderate_time"`
can be confusing because java.lang.Long is acceptable type for schema INT64.
In fact, in this case `org.apache.kafka.connect.data.Timestamp` is used but this info is not logged.
Reviewers: Randall Hauch <rhauch@gmail.com>, Chris Egerton <chrise@confluent.io>, Konstantine Karantasis <k.karantasis@gmail.com>
This implements KIP-699: https://cwiki.apache.org/confluence/display/KAFKA/KIP-699%3A+Update+FindCoordinator+to+resolve+multiple+Coordinators+at+a+time
It updates FindCoordinator request and response to support resolving multiple coordinators at a time. If a broker does not support the new FindCoordinator version, clients can revert to the previous behaviour and use a request for each coordinator.
Reviewers: David Jacot <djacot@confluent.io>, Tom Bentley <tbentley@redhat.com>, Sanjana Kaundinya <skaundinya@gmail.com>
This new policy enables active/passive, one-way replication without renaming topics, similar to MM1. This implementation is described in KIP-382 (adopted), originally as "LegacyReplicationPolicy".
This enables operators to migrate from MM1 to MM2 without re-architecting their replication flows, and enables some additional use-cases for MM2. For example, operators may wish to "upgrade" their Kafka clusters by mirroring everything to a completely new cluster. Such a migration would have been difficult with either MM1 or MM2 previously.
When using IdentityReplicationPolicy, operators should be aware that MM2 will not be able to detect cycles among replicated topics. A misconfigured topology may result in replicating the same records back-and-forth or in an infinite loop. However, we don't prevent this behavior, as some use-cases involve filtering records (via SMTs) to prevent cycles.
Reviewers: Mickael Maison <mickael.maison@gmail.com>
Co-authored-by: Ryanne Dolan <rdolan@twitter.com>
Co-authored-by: Matthew de Detrich <mdedetrich@gmail.com>
Co-authored-by: Ivan Yurchenko <ivanyu@aiven.io>
Implements KIP-745 https://cwiki.apache.org/confluence/display/KAFKA/KIP-745%3A+Connect+API+to+restart+connector+and+tasks to change connector REST API to restart a connector and its tasks as a whole.
Testing strategy
- [x] Unit tests added for all possible combinations of onlyFailed and includeTasks
- [x] Integration tests added for all possible combinations of onlyFailed and includeTasks
- [x] System tests for happy path
Reviewers: Randall Hauch <rhauch@gmail.com>, Diego Erdody <erdody@gmail.com>, Konstantine Karantasis <k.karantasis@gmail.com>
This commit implements KIP-716. It introduces a new setting `offset-syncs.topic.location` that allows specifying where the offset-syncs topic is created.
Reviewers: Tom Bentley <tbentley@redhat.com>, Edoardo Comar <ecomar@uk.ibm.com>
Remove the `rest.host.name` and `rest.port` Connect worker configs that were deprecated in KIP-208 and AK 1.1.
Author: Kalpesh Patel <kalpeshpatel.india@gmail.com>
Reviewers: Randall Hauch <rhauch@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>, wenbingshen <oliver.shen999@gmail.com>
The constant is specified in milliseconds, and so the MILLISECOND time unit should be used instead of SECONDS.
Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
Changes the default value for the `connector.client.config.override.policy` worker configuration property from `None` to `All`. Modified unit tests to verify all policies still work, and that by default connectors can override all client policies.
See https://cwiki.apache.org/confluence/display/KAFKA/KIP-722%3A+Enable+connector+client+overrides+by+default
Updated the documentation for the worker's client overrides policy to mention the new default.
Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
They have been deprecated since 0.10.0. Full list of removes configs:
* port
* host.name
* advertised.port
* advertised.host.name
Also adjust tests to take the removals into account. Some tests were
no longer relevant and have been removed.
Finally, took the chance to:
* Clean up unnecessary usage of `KafkaConfig$.MODULE$` in
related files.
* Add missing `Test` annotations to `AdvertiseBrokerTest` and
make necessary changes for the tests to pass.
Reviewers: David Jacot <djacot@confluent.io>, Luke Chen <showuon@gmail.com>
* replace deprecated Class.newInstance() to class.getDeclaredConstructor().newInstance()
* throw ReflectiveOperationException to cover all other exceptions
Reviewers: Tom Bentley <tbentley@redhat.com>
Concurrent requests to validate endpoint for the same connector type calls AbstractHerder::getConnector to get the cached connector instances and if the connector hasn't been cached yet then there is a race condition in the AbstractHerder::getConnector method that potentially fails to detect that an instance of the connector has already been created and, as a result, can create another instance
Existing tests are present with enough coverage so no new tests are added.
Reviewers: Chris Egerton <chrise@confluent.io>, Konstantine Karantasis <k.karantasis@gmail.com>
These SMTs were originally specified in KIP-145 but never implemented
at the time.
HeaderTo is not included since its original specification doesn't deal with
the fact that there can be >1 header with the same name, but a field can only
have a single value (which could be an array, but not if the headers for
the given name had different schemas).
Reviewers: Chris Egerton <chrise@confluent.io>, Mickael Maison <mickael.maison@gmail.com>
The methods have been deprecated since 0.11 without replacement since
message format 2 moved the checksum to the record batch (instead of the
record).
Unfortunately, we did not deprecate the constructors that take a checksum
(even though we intended to) so we cannot remove them. I have deprecated
them for removal in 4.0 and added a single non deprecated constructor to
`ConsumerRecord` and `RecordMetadata` that take all remaining parameters.
`ConsumerRecord` could do with one additional convenience constructor, but
that requires a KIP and hence should be done separately.
Also:
* Removed `ChecksumMessageFormatter`, which is technically not public
API, but may have been used with the console consumer.
* Updated all usages of `ConsumerRecord`/`RecordMetadata` constructors
to use the non deprecated ones.
* Added tests for deprecated `ConsumerRecord/`RecordMetadata`
constructors.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io>
Uncaught exceptions logged during task stop were misleading because the task is already on its way of being shutdown.
The suppression of exception causes a change in behavior as the caller method now calls `statusListener.onShutdown` instead of `statusListener.onFailure` which is the right behavior. A new test was added to test the right behavior for uncaught exception during shutdown and existing test was modified to test uncaught exception during normal execution.
Reviewers: Chris Egerton <chrise@confluent.io>, Konstantine Karantasis <k.karantasis@gmail.com>
If a distributed worker fails to write (or read back) a new session key to/from the config topic, it dies. This fix softens the blow a bit by instead restarting the herder tick loop anew and forcing a read to the end of the config topic until the worker is able to successfully read to the end.
At this point, if the worker was able to successfully write a new session key in its first attempt, it will have read that key back from the config topic and will not write a new key during the next tick iteration. If it was not able to write that key at all, it will try again to write a new key (if it is still the leader).
Verified with new unit tests for both cases (failure to write, failure to read back after write).
Author: Chris Egerton <chrise@confluent.io>
Reviewers: Greg Harris <gregh@confluent.io>, Randall Hauch <rhauch@gmail.com>
The config has been deprecated since Kafka 2.6 (released ~1 year before
3.0), but it was the default before it got deprecated. As such, it's
reasonably unlikely that people would have set it explicitly.
Given the confusing `default` name even though it's _not_ the default, I
think we should remove it in 3.0.
Also remove `ClientDnsLookup.DEFAULT` (not public API), which unlocks
a number of code simplications.
Reviewers: David Jacot <djacot@confluent.io>
1. Create a reason string to be used for INFO log entry whenever we request re-join or reset generation state.
2. Some minor cleanups.
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>
Cast SMT transformation for bytes -> string.
Without this fix, the conversion becomes ByteBuffer.toString(), which always gives this useless result:
"java.nio.HeapByteBuffer[pos=0 lim=4 cap=4]"
With this change, the byte array is converted into a base64 string of the byte buffer content.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Randall Hauch <rhauch@gmail.com>, Konstantine Karantasis <k.karantasis@gmail.com>
Close the producer in `WorkerSourceTask` when the latter is cancelled. If the broker do not autocreate the topic, and the connector is not configured to create topics written by the source connector, then the `WorkerSourceTask` main thread will block forever until the topic is created, and will not stop if cancelled or scheduled for shutdown by the worker.
Expanded an existing unit test for the WorkerSourceTask class to ensure that the producer is closed when the task is abandoned, and added a new integration test that guarantees that tasks are still shut down even when their producers are trying to write to topics that do not exist.
Author: Chris Egerton <chrise@confluent.io>
Reviewed: Greg Harris <gregh@confluent.io>, Randall Hauch <rhauch@gmail.com>
MM2 creates new topics on the destination cluster with default configurations. It has an async periodic task to refresh topic configurations from the source to destination. However, this opens up a window where the destination cluster has data produced to it with default configurations. In the worst case, this could cause data loss if the destination topic is created without the right cleanup.policy. This commit fixes the above issue by ensuring that the right configurations are supplied to AdminClient#createTopics when MM2 creates topics on the destination cluster.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
Connect uses a high request timeout as a holdover from the days prior to KIP-91 when this was required to guarantee records would not get timed out in the accumulator. Having a high request timeout makes it harder for the producer to gracefully handle unclean connection terminations, which might happen in the case of sudden broker death.
Reducing that value to the default of 30 seconds should address that issue, without compromising the existing delivery guarantees of the Connect framework. Since the delivery timeout is still set to a very-high value, this change shouldn't make it more likely for `Producer::send` to throw an exception and fail the task.
Reviewers: Jason Gustafson <jason@confluent.io>
Refactored the KafkaBasedLog logic to read end offsets into a separate method to make it easier to test. Also changed the TopicAdmin.endOffsets method to throw the original UnsupportedVersionException, LeaderNotAvailableException, and TimeoutException rather than wrapping, to better conform with the consumer method and how the KafkaBasedLog retries those exceptions.
Added new tests to verify various scenarios and errors.
Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
These Kafka*BackingStore classes used in Connect have a recently-added deprecated constructor, which is not used within AK. However, this commit corrects a AdminClient resource leak if those deprecated constructors are used outside of AK. The fix simply ensures that the AdminClient created by the “default” supplier is always closed when the Kafka*BackingStore is stopped.
Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
Fix Typo in metric name of MirrorMaker README file from 'replication-latecny-ms' to 'replication-latency-ms'
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Eric Beaudet <galpha@gmail.com>
Fixes the recent change to the `MirrorMaker` class (used only in the MirrorMaker 2 executable) that uses a `SharedTopicAdmin` client as part of Connect, so that the correct properties into the `SharedTopicAdmin`.
Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
We don't need or use the additional functionality provided by
AtomicReference.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Arthur <mumrah@gmail.com>
The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic.
However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end.
Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity.
Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance.
The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes.
These changes are implemented as follows:
1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets.
2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance.
3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed.
4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic.
5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.)
6. Change `MirrorMaker` similarly to `ConnectDistributed`.
7. Change existing unit tests to no longer use deprecated constructors.
8. Add unit tests for new functionality.
Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
Cleanup up to remove redundant type casts in Connect and use the diamond operator when needed
Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
When a source connector is configured to create missing topics has multiple tasks that generate records for the same topic, it is possible that multiple tasks may simultaneously describe the topic, find it does not exist, and attempt to create the task. One of those create topic requests will succeed, and the other concurrent tasks will receive the response from the topic admin as having not created the task and will fail unnecessarily.
This change corrects the logic by moving the `TopicAdmin` logic to create a topic to a new `createOrFindTopics(…)` method that returns the set of created topic names and the set of existing topic names. This allows the existing `createTopics(…)` and `createTopic(…)` methods to retain the same behavior, but also allows the `WorkerSourceTask` to know from its single call to this new method whether the topic was created or was found to exist.
This adds one unit test and modifies several unit tests in `WorkerSourceTaskWithTopicCreationTest` that use mocks to verify the behavior, and modifies several existing unit tests for `TopicAdminTest` to ensure the logic of the new method is as expected.
Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
Problem: when calling the connect status endpoint, a 500 error is returned, e.g.
```
{
"error_code": 500,
"message": "Could not read properties from file /tmp/somefile.properties"
}
```
when any of the connectors has an exception from the config provider. This is because the `AbstractHerder` is trying to use the resolved config to infer the type of the connector. However, only the `connector.class` is needed from the config to infer if a specific connector is of source or sink type. The endpoint should still return the status of the connector instead of a 500 error.
This change uses the raw config from the config backing store to retrieve the connector class to avoid any variable resolution.
Unit tests have been updated to reflect this change.
Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
Fixes a regression introduced by https://github.com/apache/kafka/pull/9806 in the original fix for KAFKA-10895
It was discovered that if an invalid JAAS config is present on the worker, invoking Configuration::getConfiguration throws an exception. The changes from #9806 cause that exception to be thrown during plugin scanning, which causes the worker to fail even if it is not configured to use the basic auth extension at all.
This follow-up handles invalid JAAS configurations more gracefully, and only throws them if the worker is actually configured to use the basic auth extension, at the time that the extension is instantiated and configured.
Two unit tests are added.
Reviewers: Greg Harris <gregh@confluent.io>, Konstantine Karantasis <k.karantasis@gmail.com>
Replace BrokerStates.scala with BrokerState.java, to make it easier to use from Java code if needed. This also makes it easier to go from a numeric type to an enum.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
First issue: When more than one workers join the Connect group the incremental cooperative assignor revokes and reassigns at most average number of tasks per worker.
Side-effect: This results in the additional workers joining the group stay idle and would require more future rebalances to happen to have even distribution of tasks.
Fix: As part of task assignment calculation following a deployment, the reassignment of tasks are calculated by revoking all the tasks above the rounded up (ceil) average number of tasks.
Second issue: When more than one worker is lost and rejoins the group at most one worker will be re assigned with the lost tasks from all the workers that left the group.
Side-effect: In scenarios where more than one worker is lost and rejoins the group only one among them gets assigned all the partitions that were lost in the past. The additional workers that have joined would not get any task assigned to them until a rebalance that happens in future.
Fix: As part fo lost task re assignment all the new workers that have joined the group would be considered for task assignment and would be assigned in a round robin fashion with the new tasks.
Testing strategy :
* System testing in a Kubernetes environment completed
* New integration tests to test for balanced tasks
* Updated unit tests.
Co-authored-by: Rameshkrishnan Muthusamy <rameshkrishnan_muthusamy@apple.com>
Co-authored-by: Randall Hauch <rhauch@gmail.com>
Co-authored-by: Konstantine Karantasis <konstantine@confluent.io>
Reviewers: Randall Hauch <rhauch@gmail.com>, Konstantine Karantasis <k.karantasis@gmail.com>
By default Mirror Maker 2 creates herders for all the possible combinations even if the "links" are not enabled.
This is because the beats are emitted from the "opposite" herder.
If there is a replication flow from A to B and heartbeats are required, 2 herders are needed :
- A->B for the MirrorSourceConnector
- B->A for the MirrorHeartbeatConnector
The MirrorHeartbeatConnector on B->A emits beats into topic heartbeats on cluster A.
The MirrorSourceConnector on A->B then replicates whichever topic is configured as well as heartbeats.
In cases with multiple clusters (10 and more), this leads to an incredible amount of connections, file descriptors and configuration topics created in every target clusters that are not necessary.
With this code change, we will leverage the top level property "emit.heartbeats.enabled" which defaults to "true".
We skip creating the A->B herder whenever A->B.emit.heartbeats.enabled=false (defaults to true) and A->B.enabled=false (defaults to false).
Existing users will not see any change and if they depend on these "opposites" herders for their monitoring, it will still work.
New users with more complex use case can change this property and fine tune their heartbeat generation.
Reviewers: Ryanne Dolan <ryannedolan@gmail.com>, Sanjana Kaundinya <skaundinya@gmail.com>, Jason Gustafson <jason@confluent.io>
When two cooperative rebalances take place soon after one another, a prior rebalance may not complete before the next rebalance is started.
Under Eager rebalancing, no tasks would have been started, so the subsequent onRevoked call is intentionally skipped whenever rebalanceResolved was false.
Under Incremental Cooperative rebalancing, the same logic causes the DistributedHerder to skip stopping all of the connector/task revocations which occur in the second rebalance.
The DistributedHerder still removes the revoked connectors/tasks from its assignment, so that the DistributedHerder and Worker have different knowledge of running connectors/tasks.
This causes the connector/task instances that would have been stopped to disappear from the rebalance protocol, and left running until their workers are halted, or they fail.
Connectors/Tasks which were then reassigned to other workers by the rebalance protocol would be duplicated, and run concurrently with zombie connectors/tasks.
Connectors/Tasks which were reassigned back to the same worker would encounter exceptions in Worker, indicating that the connector/task existed and was already running.
* Added test for revoking and then reassigning a connector under normal circumstances
* Added test for revoking and then reassigning a connector following an incomplete cooperative rebalance
* Changed expectRebalance to make assignment fields mutable before passing them into the DistributedHerder
* Only skip revocation for the Eager protocol, and never skip revocation for incremental cooperative protocols
Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
Fixes errors such as:
```
java.lang.NullPointerException at
org.apache.kafka.connect.mirror.MirrorSourceConnector.listTopics(MirrorSourceConnector.java:348) at
org.apache.kafka.connect.mirror.MirrorSourceConnector.findSourceTopicPartitions(MirrorSourceConnector.java:192) at
org.apache.kafka.connect.mirror.MirrorSourceConnectorTest.testRefreshTopicPartitionsTopicOnTargetFirst(MirrorSourceConnectorTest.java:222)
```
It was a difficult to debug issue due to class loading interference between the Connect worker and Mockito. Digging into the Mockito, found it's not about JUnit 5, it's because of the class loader. In Mockito, we rely on the class loader to generate the proxy instance ([source](https://github.com/mockito/mockito/blob/release/3.x/src/main/java/org/mockito/internal/creation/bytebuddy/SubclassBytecodeGenerator.java#L91)) to intercept the method call, and if the class loader is not expected, we'll generate the wrong proxy instance (with wrong class path). We set the class loader during connector start to resolve conflicting dependencies ([KIP-146](https://cwiki.apache.org/confluence/display/KAFKA/KIP-146+-+Classloading+Isolation+in+Connect)), so we should set it back to the original class loader after connector stop in tests (`EmbeddedConnectCluster` is only used in tests) for the following Mockito works as expected.
So, there's an interference of integration tests with unit tests when Connect integration tests run before the MM2 unit tests, and that will cause the Mockito used in unit tests not work as expected.
Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
Also:
* Remove unused powermock dependency
* Remove "examples" from the JUnit 4 list since one module was already
converted and the other has no tests
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
If a connector, converter, etc. invokes [Configuration::setConfiguration](https://docs.oracle.com/javase/8/docs/api/javax/security/auth/login/Configuration.html#setConfiguration-javax.security.auth.login.Configuration-), it will cause the Connect basic auth filter to use that JAAS config instead of the one configured at startup with the `-Djava.security.auth.login.config` JVM property. This can cause requests to the worker to succeed initially but start to be rejected after the JVM's global JAAS config is altered.
To alleviate this the current PR instructs the Connect Worker to cache the JVM's global JAAS configuration at the beginning (as soon as the `BasicAuthSecurityRestExtension` class is loaded), and use that for all future authentication.
Existing tests for the JAAS basic auth filter are modified to work with the new internal logic. The `testEmptyCredentialsFile` test is corrected to actually operate on an empty credentials file (instead of a non-existent credentials file, which it currently operates on). A new test is added to ensure that, even if the global JAAS config is overwritten, the basic auth filter will use the first one it could find.
Reviewers: Greg Harris <gregh@confluent.io>, Konstantine Karantasis <k.karantasis@gmail.com>
This PR includes following changes.
1. @Test(expected = Exception.class) is replaced by assertThrows
2. remove reference to org.scalatest.Assertions
3. change the magic code from 1 to 2 for testAppendAtInvalidOffset to test ZSTD
4. rename testMaybeAddPartitionToTransactionXXXX to testNotReadyForSendXXX
5. increase maxBlockMs from 1s to 3s to avoid unexpected timeout from TransactionsTest#testTimeout
Reviewers: Ismael Juma <ismael@confluent.io>
If a connector fails on startup, the original cause of the error gets discarded by the framework and the only message that gets logged looks like this:
```
[2020-12-04 16:46:30,464] ERROR [Worker clientId=connect-1, groupId=connect-cluster] Failed to start connector 'conn-1' (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
org.apache.kafka.connect.errors.ConnectException: Failed to start connector: conn-1
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$startConnector$5(DistributedHerder.java:1297)
at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:258)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1321)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1400(DistributedHerder.java:127)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:1329)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:1325)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
```
The changes here should cause the original cause of the connector startup failure to be logged as well.
```
[2020-12-30 09:56:35,481] ERROR [test-connector|worker] [Worker clientId=connect-1, groupId=connect-cluster] Failed to start connector 'conn-1' (org.apache.kafka.connect.runtime.distributed.DistributedHerder:599)
org.apache.kafka.connect.errors.ConnectException: Failed to start connector: conn-1
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$startConnector$4(DistributedHerder.java:1298)
at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:294)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1322)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.processConnectorConfigUpdates(DistributedHerder.java:597)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:416)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:294)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements
```
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This patch updates the request logger to output request and response payloads in JSON. Payloads are converted to JSON based on their auto-generated schema.
Reviewers: Lucas Bradstreet <lucas@confluent.io>, David Mao <dmao@confluent.io>, David Jacot <djacot@confluent.io>
Connection id is now only present in `NetworkSend`, which is now
the class used by `Selector`/`NetworkClient`/`KafkaChannel` (which
works well since `NetworkReceive` is the class used for
received data).
The previous `NetworkSend` was also responsible for adding a size
prefix. This logic is already present in `SendBuilder`, but for the
minority of cases where `SendBuilder` is not used (including
a number of tests), we now have `ByteBufferSend.sizePrefixed()`.
With regards to the request/message utilities:
* Renamed `toByteBuffer`/`toBytes` in `MessageUtil` to
`toVersionPrefixedByteBuffer`/`toVersionPrefixedBytes` for clarity.
* Introduced new `MessageUtil.toByteBuffer` that does not include
the version as the prefix.
* Renamed `serializeBody` in `AbstractRequest/Response` to
`serialize` for symmetry with `parse`.
* Introduced `RequestTestUtils` and moved relevant methods from
`TestUtils`.
* Moved `serializeWithHeader` methods that were only used in
tests to `RequestTestUtils`.
* Deleted `MessageTestUtil`.
Finally, a couple of changes to simplify coding patterns:
* Added `flip()` and `buffer()` to `ByteBufferAccessor`.
* Added `MessageSizeAccumulator.sizeExcludingZeroCopy`.
* Used lambdas instead of `TestCondition`.
* Used `Arrays.copyOf` instead of `System.arraycopy` in `MessageUtil`.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jason Gustafson <jason@confluent.io>
Generated request/response classes have code to serialize/deserialize directly to
`ByteBuffer` so the intermediate conversion to `Struct` can be skipped for them.
We have recently completed the transition to generated request/response classes,
so we can also remove the `Struct` based fallbacks.
Additional noteworthy changes:
* `AbstractRequest.parseRequest` has a more efficient computation of request size that
relies on the received buffer instead of the parsed `Struct`.
* Use `SendBuilder` for `AbstractRequest/Response` `toSend`, made the superclass
implementation final and removed the overrides that are no longer necessary.
* Removed request/response constructors that assume latest version as they are unsafe
outside of tests.
* Removed redundant version fields in requests/responses.
* Removed unnecessary work in `OffsetFetchResponse`'s constructor when version >= 2.
* Made `AbstractResponse.throttleTimeMs()` abstract.
* Using `toSend` in `SaslClientAuthenticator` instead of `serialize`.
* Various changes in Request/Response classes to make them more consistent and to
rely on the Data classes as much as possible when it comes to their state.
* Remove the version argument from `AbstractResponse.toString`.
* Fix `getErrorResponse` for `ProduceRequest` and `DescribeClientQuotasRequest` to
use `ApiError` which processes the error message sent back to the clients. This was
uncovered by an accidental fix to a `RequestResponseTest` test (it was calling
`AbstractResponse.toString` instead of `AbstractResponse.toString(short)`).
Rely on existing protocol tests to ensure this refactoring does not change
observed behavior (aside from improved performance).
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Normally the `EmbeddedConnectCluster` class masks the `Exit` procedures using within the Connect worker. This normally works great when a single instance of the embedded cluster is used. However, the `MirrorConnectorsIntegrationTest` uses two `EmbeddedConnectCluster` instances, and when the first one is stopped it would reset the (static) exit procedures, and any problems during shutdown of the second embedded Connect cluster would cause the worker to shut down the JVM running the tests.
Instead, the `MirrorConnectorsIntegrationTest` class should mask the `Exit` procedures and instruct the `EmbeddedConnectClusters` instances (via the existing builder method) to not mask the procedures.
Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Changes the `WorkerSourceTask` class to only call `SourceTask::stop` from the task thread when the task is actually stopped (via `Source:task::close` just before `WorkerTask::run` completes), and only if an attempt has been made to start the task (which will not be the case if it was created in the paused state and then shut down before being started). This prevents `SourceTask::stop` from being indirectly invoked on the herder's thread, which can have adverse effects if the task is unable to shut down promptly.
Unit tests are tweaked where necessary to account for this new logic, which covers some edge cases mentioned in PR #5020 that were unaddressed up until now.
The existing integration tests for blocking connectors are expanded to also include cases for blocking source and sink tasks. Full coverage of every source/sink task method is intentionally omitted from these expanded tests in order to avoid inflating test runtime (each one adds an extra 5 seconds at minimum) and because the tests that are added here were sufficient to reproduce the bug with source task shutdown.
Author: Chris Egerton <chrise@confluent.io>
Reviewers: Nigel Liang <nigel@nigelliang.com>, Tom Bentley <tbentley@redhat.com>, Randall Hauch <rhauch@gmail.com>
The map passed as an argument remains read-only and therefore can be initialized using Collections#emptyMap instead of being passed a new Map instance.
Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
StandaloneHerder and DistributedHerder have identical implementations of connectorConfig (apart from one line of logging). This commit moves the common implementation of connectorConfig to AbstractHerder.
Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
DistributedHerder goes to updateConfigsWithIncrementalCooperative() synchronized method and called configBackingStore.snapshot() which take a lock on internal object in KafkaConfigBackingStore class.
Meanwhile KafkaConfigBackingStore in ConsumeCallback inside synchronized block on internal object gets SESSION_KEY record and calls updateListener.onSessionKeyUpdate() which take a lock on DistributedHerder.
This results to a deadlock.
To avoid this, updateListener with new session key should be called outside synchronized block as it's done, for example, for updateListener.onTaskConfigUpdate(updatedTasks).
Co-authored-by: Taisiia Goltseva <tado0618@netcracker.com>
Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
Trigger task reconfiguration when:
- topic-partitions are created or deleted on source cluster
- topic-partitions are missing on target cluster
Authors: Mickael Maison <mickael.maison@gmail.com>, Edoardo Comar <ecomar@uk.ibm.com>
Reviewer: Randall Hauch <rhauch@gmail.com>
Connect should not always add an error to configuration values in validation results that don't have a `ConfigKey` defined in the connector's `ConfigDef`, and any errors on such configuration values included by the connector should be counted in the total number of errors. Added more unit tests for `AbstractHerder.generateResult(...)`.
Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
ErrantRecordReporter uses a RetryWithToleranceOperator instance, which is necessarily stateful, having a ProcessingContext of which there's supposed to be one per task. That ProcessingContext is used by both RetryWithToleranceOperator.executeFailed() and execute(), so it's not enough to just synchronize executeFailed().
So make all public methods of RetryWithToleranceOperator synchronized so that RetryWithToleranceOperator is now threadsafe.
Tested with the addition of a multithreaded test case that fails consistently if the methods are not properly synchronized.
Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
When using an error tracking system, two error log messages result into two different alerts.
It's best to group the logs and have one error with all the information.
For example when using with Sentry, this double line of log.error will create 2 different Issues. One can merge the issues but it will be simpler to have a single error log line.
Signed-off-by: Benoit Maggi <benoit.maggi@gmail.com>
Reviewers: Ewen Cheslack-Postava <me@ewencp.org>, Konstantine Karantasis <k.karantasis@gmail.com>
Changes the Connect `ReplaceField` SMT's configuration properties, deprecating and replacing `blacklist` with `exclude`, and `whitelist` with `include`. The old configurations are still allowed (ensuring backward compatibility), but warning messages are written to the log to suggest users change to `include` and `exclude`.
This is part of KIP-629.
Author: Xavier Léauté <xvrl@apache.org>
Reviewer: Randall Hauch <rhauch@gmail.com>