Minimum fix needed to stop this test failing and unblock others
Co-authored-by: Luke Chen <showuon@gmail.com>
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Fix the failed testMultiConsumerStickyAssignment by modifying the logic error in allSubscriptionsEqual method.
We will create the consumerToOwnedPartitions to keep the set of previously owned partitions encoded in the Subscription. It's our basis to do the reassignment. In the allSubscriptionsEqual, we'll get the member generation of the subscription, and remove all previously owned partitions as invalid if the current generation is higher. However, the logic before my fix, will remove the current highest member out of the consumerToOwnedPartitions, which should be kept because it's the current higher generation member. Fix this logic error.
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Fixed spotBugs error introduced by c6633a1:
>Dead store to isFreshAssignment in org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.generalAssign(Map, Map)
Reviewers: Ismael Juma <ismael@juma.me.uk>
Motivation and pseudo code algorithm in the ticket.
Added a scale test with large number of topic partitions and consumers and 30s timeout.
With these changes, assignment with 2,000 consumers and 200 topics with 2,000 each completes within a few seconds.
Porting the same test to trunk, it took 2 minutes even with a 100x reduction in the number of topics (ie, 2 minutes for 2,000 consumers and 2 topics with 2,000 partitions)
Should be cherry-picked to 2.6, 2.5, and 2.4
Reviewers: Guozhang Wang <wangguoz@gmail.com>
We have been seeing increased flakiness in transaction system tests. I believe the cause might be due to KIP-537, which increased the default zk session timeout from 6s to 18s and the default replica lag timeout from 10s to 30s. In the system test, we use the default transaction timeout of 10s. However, since the system test involves hard failures, the Produce request could be blocking for as long as the max of these two in order to wait for an ISR shrink. Hence this patch increases the timeout to 30s.
Reviewers: Bob Barrett <bob.barrett@confluent.io>, Ismael Juma <github@juma.me.uk>
Newer clients were getting stuck entering the validation phase even when a broker didn't support it. This commit will bypass the AWAITING_VALIDATION state when the broker is on an older version of the OffsetsForLeaderEpoch RPC.
Although the statuses for tasks are removed from the status store when their connector is deleted, their statuses are not removed when only the task is deleted, which happens in the case that the number of tasks for a connector is reduced.
This commit adds logic for deleting the statuses for those tasks from the status store whenever a rebalance has completed and the leader of a distributed cluster has detected that there are recently-deleted tasks. Standalone is also updated to accomplish this.
Unit tests for the `DistributedHerder` and `StandaloneHerder` classes are updated and an integration test has been added.
Reviewers: Nigel Liang <nigel@nigelliang.com>, Konstantine Karantasis <konstantine@confluent.io>
The changes made in KIP-454 involved adding a `connectorConfig` method to the ConnectClusterState interface that REST extensions could use to query the worker for the configuration of a given connector. The implementation for this method returns the Java `Map` that's stored in the worker's view of the config topic (when running in distributed mode). No copying is performed, which causes mutations of that `Map` to persist across invocations of `connectorConfig` and, even worse, propagate to the worker when, e.g., starting a connector.
In this commit the map is copied before it's returned to REST extensions.
An existing unit test is modified to ensure that REST extensions receive a copy of the connector config, not the original.
Reviewers: Nigel Liang <nigel@nigelliang.com>, Konstantine Karantasis <konstantine@confluent.io>
`MirrorTaskConfig` class mutates the `ConfigDef` by defining additional properties, which leads to a potential `ConcurrentModificationException` during worker configuration validation and unintended inclusion of those new properties in the `ConfigDef` for the connectors which in turn is then visible via the REST API's `/connectors/{name}/config/validate` endpoint.
The fix here is a one-liner that just creates a copy of the `ConfigDef` before defining new properties.
Reviewers: Ryanne Dolan <ryannedolan@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>
Currently, if a connector is deleted, its task configurations will remain in the config snapshot tracked by the KafkaConfigBackingStore. This causes issues with incremental cooperative rebalancing, which utilizes that config snapshot to determine which connectors and tasks need to be assigned across the cluster. Specifically, it first checks to see which connectors are present in the config snapshot, and then, for each of those connectors, queries the snapshot for that connector's task configs.
The lifecycle of a connector is for its configuration to be written to the config topic, that write to be picked up by the workers in the cluster and trigger a rebalance, the connector to be assigned to and started by a worker, task configs to be generated by the connector and then written to the config topic, that write to be picked up by the workers in the cluster and trigger a second rebalance, and finally, the tasks to be assigned to and started by workers across the cluster.
There is a brief period in between the first time the connector is started and when the second rebalance has completed during which those stale task configs from a previously-deleted version of the connector will be used by the framework to start tasks for that connector. This fix aims to eliminate that window by preemptively clearing the task configs from the config snapshot for a connector whenever it has been deleted.
An existing unit test is modified to verify this behavior, and should provide sufficient guarantees that the bug has been fixed.
Reviewers: Nigel Liang <nigel@nigelliang.com>, Konstantine Karantasis <konstantine@confluent.io>
Fixes EmbeddedKafkaCluster.deleteTopicAndWait for use with kafka_2.13
Reviewers: Boyang Chen <boyang@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, John Roesler <vvcephei@apache.org>
* If two exceptions are thrown the `closePartitions` exception is suppressed
* Add unit tests that throw exceptions in put and close to verify that
the exceptions are propagated and suppressed appropriately out of WorkerSinkTask::execute
Reviewers: Chris Egerton <chrise@confluent.io>, Nigel Liang <nigel@nigelliang.com>, Konstantine Karantasis <konstantine@confluent.io>
Added check if the transformation is abstract. If so throw an error message with guidance for the user. Ensure that the child classes are also not abstract.
Author: Jeremy Custenborder <jcustenborder@gmail.com>
Reviewer: Randall Hauch <rhauch@gmail.com>
It fixes 30 issues, including third party CVE fixes, several leader-election
related fixes and a compatibility issue with applications built against earlier
3.5 client libraries (by restoring a few non public APIs).
See ZooKeeper 3.5.8 Release Notes for details: https://zookeeper.apache.org/doc/r3.5.8/releasenotes.html
Reviewers: Manikumar Reddy <manikumar@confluent.io>
Prior to KAFKA-8106, we allowed the v0 and v1 message formats to contain non-consecutive inner offsets. Inside `LogValidator`, we would detect this case and rewrite the batch. After KAFKA-8106, we changed the logic to raise an error in the case of the v1 message format (v0 was still expected to be rewritten). This caused an incompatibility for older clients which were depending on the looser validation. This patch reverts the old logic of rewriting the batch to fix the invalid inner offsets.
Note that the v2 message format has always had stricter validation. This patch also adds a test case for this.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Ismael Juma <ismael@juma.me.uk>
This change turns on exact decimal processing in JSON Converter for deserializing decimals, meaning trailing zeros are maintained. Serialization was already using the decimal scale to output the right value, so this change means a value of `1.2300` can now be serialized to JSON and deserialized back to Connect without any loss of information.
Author: Andy Coates <big-andy-coates@users.noreply.github.com>
Reviewers: Randall Hauch <rhauch@gmail.com>, Almog Gavra <almog@confluent.io>
The rest.advertised.listener config is currently broken as setting it to http when listeners are configured for both https and http will cause the framework to choose whichever of the two listeners is listed first. The changes here attempt to fix this by checking not only that ServerConnector::getName begins with the specified protocol, but also that that protocol is immediately followed by an underscore, which the framework uses as a delimiter between the protocol and the remainder of the connector name.
An existing unit test for the RestServer::advertisedUrl method has been expanded to include a case that fails with the framework in its current state and passes with the changes in this commit.
* KAFKA-9768: Fix handling of rest.advertised.listener config
* KAFKA-9768: Add comments on server connector names
* KAFKA-9768: Update RestServerTest comment
Co-authored-by: Randall Hauch <rhauch@gmail.com>
Reviewers: Randall Hauch <rhauch@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>, Andrew Choi <andchoi@linkedin.com>
The CircularIterator class uses a wrapping index-based approach to iterate over a list. This can be a performance problem O(n^2) for a LinkedList. Also, the index counter itself is never reset, a modulo is applied to it for every list access. At some point, it may be possible that the index counter overflows to a negative value and therefore may cause a negative index read and an ArrayIndexOutOfBoundsException.
This fix changes the implementation to avoid these two scenarios. Uses the Collection Iterator classes to avoid using an index counter and it avoids having to seek to the correct index every time, this avoiding the LinkedList performance issue.
I have added unit tests to validate the new implementation.
* KAFKA-9419: Integer Overflow Possible with CircularIterator
* Added JavaDoc. Support null values in the underlying collection
* Always return true for hasNext(). Add more JavaDoc
* Use an advance method to load next value and always return true in hasNext()
* Simplify test suite
* Use assertThrows in tests and remove redundant 'this' identifier
Co-authored-by: David Mollitor <dmollitor@apache.org>
Co-authored-by: Konstantine Karantasis <konstantine@confluent.io>
Reviewers: Ron Dagostino <rdagostino@confluent.io>, Konstantine Karantasis <konstantine@confluent.io>
* Broker throttles were incorrectly marked as sensitive configurations. Fix this, so that their values can be returned via DescribeConfigs as expected.
* Previously, changes to broker configs that consisted only of deletions were ignored by the brokers because of faulty delta calculation logic that didn't consider deletions as changes, only alterations as changes. Fix this and add a regression test.
Reviewers: Colin P. McCabe <cmccabe@apache.org>
(cherry picked from commit 5fc3cd61fc)
ConfigProvider extends Closeable, but were not closed in the following contexts:
* AbstractConfig
* WorkerConfigTransformer
* Worker
This commit ensures that ConfigProviders are close in the above contexts.
It also adds MockFileConfigProvider.assertClosed()
Gradle executes test classes concurrently, so MockFileConfigProvider
can't simply use a static field to hold its closure state.
Instead use a protocol whereby the MockFileConfigProvider is configured
with some unique ket identifying the test which also used when calling
assertClosed().
Reviewers: Konstantine Karantasis <konstantine@confluent.io>
Simple logging additions at TRACE level that should help when the worker can't get caught up to the end of an internal topic.
Reviewers: Gwen Shapira <cshapi@gmail.com>, Aakash Shah <ashah@confluent.io>, Konstantine Karantasis <konstantine@confluent.io>
The href attribute missed the starting double quote, so the hyperlink is interpreted as https://docs.oracle.com/.../agent.html", with a redundant tailing double quote.
Add the missing starting double quote back to fix this issue.
Reviewers: Konstantine Karantasis <konstantine@confluent.io>
Class kafka.examples.SimpleConsumerDemo was removed. But the java-simple-consumer-demo.sh was not removed and README was not updated.
This commit removes java-simple-consumer-demo.sh and updates the demo instructions in the examples README.
Author: Jiamei Xie <jiamei.xie@arm.com>
Reviewers: Konstantine Karantasis <konstantine@confluent.io>
* The DeadLetterQueueReporter has a KafkaProducer that it must close to clean up resources
* Currently, the producer and its threads are leaked every time a task is stopped
* Responsibility for cleaning up ErrorReporters is transitively assigned to the
ProcessingContext, RetryWithToleranceOperator, and WorkerSinkTask/WorkerSinkTask classes
* One new unit test in ErrorReporterTest asserts that the producer is closed by the dlq reporter
Reviewers: Arjun Satish <arjun@confluent.io>, Chris Egerton <chrise@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>
In KAFKA-9826, a log whose first dirty offset was past the start of the active segment and past the last cleaned point resulted in an endless cycle of picking the segment to clean and discarding it. Though this didn't interfere with cleaning other log segments, it kept the log cleaner thread continuously busy (potentially wasting CPU and impacting other running threads) and filled the logs with lots of extraneous messages.
This was determined to be because the active segment was getting mistakenly picked for cleaning, and because the logSegments code handles (start == end) cases only for (start, end) on a segment boundary: the intent is to return a null list, but if they're not on a segment boundary, the routine returns that segment.
This fix has two parts:
It changes logSegments to handle start==end by returning an empty List always.
It changes the definition of calculateCleanableBytes to not consider anything past the UncleanableOffset; previously, it would potentially shift the UncleanableOffset to match the firstDirtyOffset even if the firstDirtyOffset was past the firstUncleanableOffset. This has no real effect now in the context of the fix for (1) but it makes the code read more like the model that the code is attempting to follow.
These changes require modifications to a few test cases that handled this particular test case; they were introduced in the context of KAFKA-8764. Those situations are now handled elsewhere in code, but the tests themselves allowed a DirtyOffset in the active segment, and expected an active segment to be selected for cleaning.
Reviewer: Jun Rao <junrao@gmail.com>
Reuse the same pseudo-topic for serializing the LHS value in the foreign-key join resolver as
we originally used to serialize it before sending the subscription request.
Reviewers: Boyang Chen <boyang@confluent.io>
These two options are essentially incompatible, as caching will do nothing to reduce downstream traffic and writes when it has to allow non-unique keys (skipping records where the value is also the same is a separate issue, see KIP-557). But enabling caching on a store that's configured to retain duplicates is actually more than just ineffective, and currently causes incorrect results.
We should just log a warning and disable caching whenever a store is retaining duplicates to avoid introducing a regression. Maybe when 3.0 comes around we should consider throwing an exception instead to alert the user more aggressively.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>, John Roesler <john@confluent.io>
Fixed typos in two MM2 configs that define the replication factor for internal Connect topics.
Only a single test was affected.
Reviewers: Ryanne Dolan <ryannedolan@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>
A broker throws IllegalStateException if the broker epoch in the LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest is larger than its current broker epoch. However, there is no guarantee that the broker would receive the latest broker epoch before the controller: when the broker registers with ZK, there are few more instructions to process before this broker "knows" about its epoch, while the controller may already get notified and send UPDATE_METADATA request (as an example) with the new epoch. This will result in clients getting stale metadata from this broker.
With this PR, a broker accepts LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest if the broker epoch is newer than the current epoch.
Reviewers: David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>
When the Connect worker forwards a REST API request to the leader, it might get back a `RequestTargetException` that suggests the worker should forward the request to a different worker. This can happen when the leader changes, and the worker that receives the original request forwards the request to the worker that it thinks is the current leader, but that worker is not the current leader. In this case. In most cases, the worker that received the forwarded request includes the URL of the current leader, but it is possible (albeit rare) that the worker doesn’t know the current leader and will include a null leader URL in the resulting `RequestTargetException`.
When this rare case happens, the user gets a null pointer exception in their response and the NPE is logged. Instead, the worker should catch this condition and provide a more useful error message that is similar to other existing error messages that might occur.
Added a unit test that verifies this corner case is caught and this particular NPE does not occur.
Author: Randall Hauch <rhauch@gmail.com>
Reviewer: Konstantine Karantasis <konstantine@confluent.io>
- Added additional synchronization and increased timeouts to handle flakiness
- Added some pre-cautionary retries when trying to obtain lag map
Reviewers: Guozhang Wang <wangguoz@gmail.com>
The patch adds a new test case for validating concurrent read/write behavior in the `Log` implementation. In the process of verifying this, we found a race condition in `read`. The previous logic checks whether the start offset is equal to the end offset before collecting the high watermark. It is possible that the log is truncated in between these two conditions which could cause the high watermark to be equal to the log end offset. When this happens, `LogSegment.read` fails because it is unable to find the starting position to read from.
Reviewers: Guozhang Wang <wangguoz@gmail.com>