This adds a new configuration `sasl.server.max.receive.size` that sets the maximum receive size for requests before and during authentication.
Reviewers: Tom Bentley <tbentley@redhat.com>, Mickael Maison <mickael.maison@gmail.com>
Co-authored-by: Manikumar Reddy <manikumar.reddy@gmail.com>
Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
When deserializing KRPC (which is used for RPCs sent to Kafka, Kafka Metadata records, and some
other things), check that we have at least N bytes remaining before allocating an array of size N.
Remove DataInputStreamReadable since it was hard to make this class aware of how many bytes were
remaining. Instead, when reading an individual record in the Raft layer, simply create a
ByteBufferAccessor with a ByteBuffer containing just the bytes we're interested in.
Add SimpleArraysMessageTest and ByteBufferAccessorTest. Also add some additional tests in
RequestResponseTest.
Reviewers: Tom Bentley <tbentley@redhat.com>, Mickael Maison <mickael.maison@gmail.com>, Colin McCabe <colin@cmccabe.xyz>
Co-authored-by: Colin McCabe <colin@cmccabe.xyz>
Co-authored-by: Manikumar Reddy <manikumar.reddy@gmail.com>
Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
Update notable changes documentation to mention requiring IDEMPOTENT_WRITE permission
when producing messages with default/idempotent configuration and broker version lower than
2.8.0.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Luke Chen <showuon@gmail.com>
When cleaning a topic with transactional data, if the keys used in the user data happen to conflict with the keys in the transaction markers, it is possible for the markers to get removed before the corresponding data from the transaction is removed. This results in a hanging transaction or the loss of the transaction's atomicity since it would effectively get bundled into the next transaction in the log. Currently control records are excluded when building the offset map, but not when doing the cleaning. This patch fixes the problem by checking for control batches in the `shouldRetainRecord` callback.
Reviewers: Jun Rao <junrao@gmail.com>
Minor change to use ' and not LEFT SINGLE QUOTATION MARK in this log message, as it's the only place we are using such a quote and it can break ingestion pipelines
Reviewers: Kvicii <Karonazaba@gmail.com>, Divij Vaidya <diviv@amazon.com>, Konstantine Karantasis <k.karantasis@gmail.com>
When running our Connect system tests with JDK 10+, we hit the error
AttributeError: 'ClusterNode' object has no attribute 'version'
because util.py attempts to check the version variable for non-Kafka service objects.
Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
In comparator, objects that are not equal need to have a stable order otherwise, binary search may not find the objects. Improve the producer batch comparator
Reviewers: Luke Chen <showuon@gmail.com>
Fixes a bug in the comparator used to sort producer inflight batches for a topic partition. This can cause batches in the map `inflightBatchesBySequence` to be removed incorrectly: i.e. one batch may be removed by another batch with the same sequence number. This leads to an `IllegalStateException` when the inflight request finally returns. This patch fixes the comparator to check equality of the `ProducerBatch` instances if the base sequences match.
Reviewers: Jason Gustafson <jason@confluent.io>
With this change we stop including the non-production grade connectors that are meant to be used for demos and quick starts by default in the CLASSPATH and plugin.path of Connect deployments. The package of these connector will still be shipped with the Apache Kafka distribution and will be available for explicit inclusion.
The changes have been tested through the system tests and the existing unit and integration tests.
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Randall Hauch <rhauch@gmail.com>
Key updates with TLS 1.3 trigger code paths similar to renegotiation with TLS 1.2.
Update the read/write paths not to throw an exception in this case (kept the exception
in the `handshake` method).
With the default configuration, key updates happen after 2^37 bytes are encrypted.
There is a security property to adjust this configuration, but the change has to be
done before it is used for the first time and it cannot be changed after that. As such,
it is best done via a system test (filed KAFKA-13779).
To validate the change, I wrote a unit test that forces key updates and manually ran
a producer workload that produced more than 2^37 bytes. Both cases failed without
these changes and pass with them.
Note that Shylaja Kokoori attached a patch with the SslTransportLayer fix and hence
included them as a co-author of this change.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
Co-authored-by: Shylaja Kokoori
The `retryEndOffsets(…)` method in `TopicAdmin` recently added (KAFKA-12879, #11797) to allow the `KafkaBasedLog.start()` method to retry any failures reading the last offsets for a topic. However, this introduce a regression when talking to older brokers (0.10.x or earlier).
The `KafkaBasedLog` already had logic that expected an `UnsupportedVersionException` thrown by the admin client when a Kafka API is not available on an older broker, but the new retry logic in `TopicAdmin` did not account for this and wrapped the exception, thereby breaking the `KafkaBasedLog` logic and preventing startup.
The fix is to propagate this `UnsupportedVersionException` from the `TopicAdmin.retryEndOffsets(…)` method. Added a new unit test that first replicated the problem before the fix, and verified the fix corrects the problem.
When a log entry is appended to a Kafka topic using KafkaLog4jAppender, the producer.send operation
may hit a deadlock if the producer network thread also tries to append a log at the same log level.
This issue is triggered when idempotence is enabled for the KafkaLog4jAppender and the producer
tries to acquire the TransactionManager lock.
This is a temporary workaround to avoid deadlocks by disabling idempotence explicitly in
KafkaLog4jAppender.
Reviewers: Luke Chen <showuon@gmail.com>, Ismael Juma <ismael@juma.me.uk>
With AK 3.0, idempotence was enabled by default in Kafka producers. However, if idempotence is enabled, Connect won't be able to communicate via its producers with Kafka brokers older than version 0.11. Perhaps more importantly, for brokers older than version 2.8 the IDEMPOTENT_WRITE ACL is required to be granted to the principal of the Connect worker.
Therefore this commit disables producer idempotence by default to all the producers instantiated by Connect. Users can still choose to enable producer idempotence by explicitly setting the right worker and/or connector properties.
The changes were tested via existing unit, integration and system tests.
Reviewers: Randall Hauch <rhauch@gmail.com>
It is possible to clean a segment partially if the offset map is filled before reaching the end of the segment. The highest offset that is reached becomes the new dirty offset after the cleaning completes. The data above this offset is nevertheless copied over to the new partially cleaned segment. Hence we need to ensure that the transaction index reflects aborted transactions from both the cleaned and uncleaned portion of the segment. Prior to this patch, this was not the case. We only collected the aborted transactions from the cleaned portion, which means that the reconstructed index could be incomplete. This can cause the aborted data to become effectively committed. It can also cause the deletion of the abort marker before the corresponding data has been removed (i.e. the aborted transaction becomes hanging).
Reviewers: Jun Rao <junrao@gmail.com>
Ensures we always have the latest published ducktape version.
This way whenever we release a new one, we won't have to cherry pick a bunch of commits across a bunch of branches.
This is an addendum to the KAFKA-12879 (#11797) to fix some tests that are somewhat flaky when a build machine is heavily loaded (when the timeouts are too small).
- Add an if check to void sleep(0)
- Increase timeout in the tests
Fixes the compatibility issue regarding KAFKA-12879 by reverting the changes to the admin client from KAFKA-12339 (#10152) that retry admin client operations, and instead perform the retries within Connect's `KafkaBasedLog` during startup via a new `TopicAdmin.retryEndOffsets(..)` method. This method delegates to the existing `TopicAdmin.endOffsets(...)` method, but will retry on `RetriableException` until the retry timeout elapses.
This change should be backward compatible to the KAFKA-12339 so that when Connect's `KafkaBasedLog` starts up it will retry attempts to read the end offsets for the log's topic. The `KafkaBasedLog` existing thread already has its own retry logic, and this is not changed.
Added more unit tests, and thoroughly tested the new `RetryUtil` used to encapsulate the parameterized retry logic around any supplied function.
When a socket is closed, corresponding channel should be retained only if there is complete buffered requests.
Reviewers: David Jacot <djacot@confluent.io>
Disable idempotence when conflicting config values for acks, retries
and max.in.flight.requests.per.connection are set by the user. For the
former two configs, we log at info level when we disable idempotence
due to conflicting configs. For the latter, we log at warn level since
it's due to an implementation detail that is likely to be surprising.
This mitigates compatibility impact of enabling idempotence by default.
Added unit tests to verify the change in behavior.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>, Mickael Maison <mickael.maison@gmail.com>
Make SetSchemaMetadata SMT ignore records with null value and valueSchema or key and keySchema.
The transform has been unit tested for handling null values gracefully while still providing the necessary validation for non-null values.
Reviewers: Konstantine Karantasis<konstantine@confluent.io>, Bill Bejeck <bbejeck@apache.org>
* Mention `acks=1` to `acks=all` change in 3.0.0 upgrade docs
* Have a separate section for 3.0.1 and 3.1.1 as some may skip the
3.0.0/3.1.0 section when upgrading to a bug fix.
* Move the 3.0.0 note to the top since it's more impactful than the
other changes.
Reviewers: Jason Gustafson <jason@confluent.io>
This patch ensures that the committed offsets are not expired while the group is rebalancing. The issue is that we can't rely on the subscribed topics if the group is not stable.
Reviewers: David Jacot <djacot@confluent.io>
In v3.0, we changed the default value for `enable.idempotence` to true, but we didn't adjust the validator and the `idempotence` enabled check method. So if a user didn't explicitly enable idempotence, this feature won't be turned on. This patch addresses the problem, cleans up associated logic, and fixes tests that broke as a result of properly applying the new default. Specifically it does the following:
1. fix the `ProducerConfig#idempotenceEnabled` method, to make it correctly detect if `idempotence` is enabled or not
2. remove some unnecessary config overridden and checks due to we already default `acks`, `retries` and `enable.idempotence` configs.
3. move the config validator for the idempotent producer from `KafkaProducer` into `ProducerConfig`. The config validation should be the responsibility of `ProducerConfig` class.
4. add an `AbstractConfig#hasKeyInOriginals` method, to avoid `originals` configs get copied and only want to check the existence of the key.
5. fix many broken tests. As mentioned, we didn't actually enable idempotence in v3.0. After this PR, there are some tests broken due to some different behavior between idempotent and non-idempotent producer.
6. add additional tests to validate configuration behavior
Reviewers: Kirk True <kirk@mustardgrain.com>, Ismael Juma <ismael@juma.me.uk>, Mickael Maison <mimaison@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
We introduced `default.api.timeout.ms` in 53ca52f855 but we missed updating `KafkaConsumer.endOffsets` which still use `request.timeout.ms`. This patch fixes this.
Reviewers: David Jacot <djacot@confluent.io>
Sometimes, the Kafka producer encounters an error prior to selecting a topic partition. In this case, we
would like to acknowledge the failure in the producer interceptors, if any are configured. We should also
pass a non-null Metadata object to the producer callback, if there is one. This PR implements that
behavior. It also updates the JavaDoc to clarify that if a partition cannot be selected, we will pass
back a partition id of -1 in the metadata. This is in keeping with KAFKA-3303.
Co-authors: Kirk True <kirk@mustardgrain.com>
Reviewers: Colin P. McCabe <cmccabe@apache.org>
At the moment, the `NetworkClient` will remain stuck in the `CHECKING_API_VERSIONS` state forever if the `Channel` does not become ready. To prevent this from happening, this patch changes the logic to transition to the `CHECKING_API_VERSIONS` only when the `ApiVersionsRequest` is queued to be sent out. With this, the connection will timeout if the `Channel` does not become ready within the connection setup timeout. Once the `ApiVersionsRequest` is queued up, the request timeout takes over.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
Allow the leader epoch to be re-assigned to the new value from the Metadata response if `oldTopicId` is not present in the cache. This is needed because `oldTopicId` is removed from the cache if the topic gets deleted but the leader epoch is not removed. Hence, metadata for the newly recreated topic won't be accepted unless we allow `oldTopicId` to be null.
Reviewers: Jason Gustafson <jason@confluent.io>, David Jacot <djacot@confluent.io>
grgit 4.1.0 caused unsupported version error during gradle builds.
The reason was that grgit 4.1.0 uses always the latest JGit version
internally. Unfortunately, the latest JGit version was compiled with
a Java version later than Java 8 which caused the unsupported version
error during gradle builds for Java 8.
grgit 4.1.1 fixed this issue by upper bounding the version of JGrit
to a version that is still compiled with Java 8. Consequently, we can
remove the hotfix we merged in commit d1e0d2b474
and instead bump the grgit version from 4.1.0 to 4.1.1.
Reviewer: John Roesler <vvcephei@apache.org>
If JAAS configuration does not contain a Client section for ZK clients, an auth failure event is generated. If this occurs after the connection is setup in the controller, we schedule reinitialize(), which causes controller to resign. In the case where SASL is not mandatory and the connection is alive, controller maintains the current session and doesn't register its watchers, leaving it in a bad state.
Reviewers: Jun Rao <junrao@gmail.com>
A new version of JGit that is used by grgit that is used by gradle
causes the following error:
org/eclipse/jgit/storage/file/FileRepositoryBuilder has been compiled
by a more recent version of the Java Runtime (class file version 55.0),
this version of the Java Runtime only recognizes class file versions
up to 52.0
The reason is that version 6.0.0.202111291000-r of JGrit was compiled
with a newer Java version than Java 8, probably Java 11.
Explicitly setting the version of JGrit in gradle to 5.12.0.202106070339-r fixes
the issue.
Reviewers: David Jacot <djacot@confluent.io>, Ismael Juma <ismael@juma.me.uk>, Alexander Stohr, David Arthur <mumrah@gmail.com>
Although committing source task offsets without blocking on the delivery of all in-flight records is beneficial most of the time, it can lead to duplicate record delivery if there are in-flight records at the time of the task's end-of-life offset commit.
A best-effort attempt is made here to wait for any such in-flight records to be delivered before proceeding with the end-of-life offset commit for source tasks. Connect will block for up to offset.flush.timeout.ms milliseconds before calculating the latest committable offsets for the task and flushing those to the persistent offset store.
Author: Chris Egerton <chrise@confluent.io>
Reviewer: Randall Hauch <rhauch@gmail.com>
The `WorkerSinkTask.lastCommittedOffsets` field is now added to (via `Map::putAll`) after a successful offset commit, instead of being completely overwritten. In order to prevent this collection from growing indefinitely, elements are removed from it after topic partitions are revoked from the task's consumer.
Two test cases are added to `WorkerSinkTaskTest`:
- A basic test to verify the "rewind for redelivery" behavior when a task throws an exception from `SinkTask::preCommit`; surprisingly, no existing test cases appear to cover this scenario
- A more sophisticated test to verify this same behavior, but with a few rounds of cooperative consumer rebalancing beforehand that expose a bug in the current logic for the `WorkerSinkTask` class
The `VerifiableSinkTask` class is also updated to only flush the requested topic partitions in its `flush` method. This is technically unrelated to the issue addressed by this PR and can be moved to a separate PR if necessary; including it here as the original context for identifying this bug was debugging failed system tests and the logic in this part of the tests was originally suspected as a cause of the test failure.
Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>