StreamsRebalanceListenerInvoker was implemented to match the behavior of
ConsumerRebalanceListenerInvoker, however StreamsRebalanceListener has a
subtly different interface than ConsumerRebalanceListener - it does not
throw exceptions, but returns it as an Optional.
In the interest of consistency, this change fixes this mismatch by
changing the StreamsRebalanceListener interface to behave more like the
ConsumerRebalanceListener - throwing exceptions directly.
In another minor fix, the StreamsRebalanceListenerInvoker is changed to
simply skip callback execution instead of throwing an
IllegalStateException when no streamRebalanceListener is defined. This
can happen when the consumer is closed before Consumer.subscribe is
called.
Reviewers: Lianet Magrans <lmagrans@confluent.io>, Matthias J. Sax
<matthias@confluent.io>
We need to only pass in the reset strategy, as the `logMessage`
parameter was removed.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Lucas Brutschy
<lbrutschy@confluent.io>
*What*
https://issues.apache.org/jira/browse/KAFKA-19623
- The PR implements KIP-1147
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-1147%3A+Improve+consistency+of+command-line+arguments)
for the console tools i.e. `ConsoleProducer`, `ConsoleConsumer` and
`ConsoleShareConsumer`.
- Currently the previous names for the options are still usable but
there will be warning message stating those are deprecated and will be
removed in a future version.
- I have added unit tests and also manually verified using the console
tools that things are working as expected.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Jhen-Yung Hsu
<jhenyunghsu@gmail.com>, Jimmy Wang
<48462172+JimmyWang6@users.noreply.github.com>
This is the first part of cleaning up of the tests in `TaskManagerTest`
- Removed dead tests
- Added new tests as suggested earlier
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
In the consumer, we invoke the consumer rebalance onPartitionRevoked or
onPartitionLost callbacks, when the consumer closes. The point is that
the application may want to commit, or wipe the state if we are closing
unsuccessfully.
In the StreamsRebalanceListener, we did not implement this behavior,
which means when closing the consumer we may lose some progress, and in
the worst case also miss that we have to wipe our local state state
since we got fenced.
In this PR we implement StreamsRebalanceListenerInvoker, very similarly
to ConsumerRebalanceListenerInvoker and invoke it in Consumer.close.
Reviewers: Lianet Magrans <lmagrans@confluent.io>, Matthias J. Sax
<matthias@confluent.io>, TengYao Chi <frankvicky@apache.org>,
Uladzislau Blok <123193120+UladzislauBlok@users.noreply.github.com>
---------
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This improves the SmokeTestDriverIntegrationTest in three ways:
1) If a SmokeTestClient fails (enters a terminal ERROR state), the
SmokeTestDriverIntegrationTest currently times out, because it keeps
waiting for state NOT_RUNNING. This makes debugging quite difficult.
This minor change makes sure to just fail the test immediately, if a
SmokeTestClient enters the ERROR state.
2) If a test times out or fails prematurely, because a SmokeTestClient
crashed, the SmokeTestClients aren't shut down correctly, which will
affect the following test runs. Therefore, I am adding clean-up logic
that running SmokeTestClients in `@AfterAll`.
3) Finally, I found that the processingThread variation of this thread
triggers a subtle race condition. Since this features is currently not
actively developed, I disabled those variations and created a ticket to
reactivate the test.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Chia-Ping Tsai
<chia7712@gmail.com>, Bill Bejeck <bill@confluent.io>
---------
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
That config has a `Validator` so we already automatically print the
valid values in the generated docs:
https://kafka.apache.org/documentation/#streamsconfigs_upgrade.from
That will be one less place to upgrade each time we make a new release.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Querying the oldest-open-iterator metric can result in a
NoSuchElementException when the last open iterator gets removed, due to
a race condition between the query and the metric update.
To avoid this race condition, this PR caches the metric result, to avoid
accessing the list of open iterator directly. We don't need to clear
this cache, because the entire metric is removed when the last iterator
gets removed.
Reviewers: Matthias J. Sax <matthias@confluent.io>
This is actually fixing a difference between the old and the new
assignor. Given the assignment ordering, the legacy assignor has a
preference for range-style assignments built in, that is, assigning
C1: 0_0, 1_0 C2: 0_1, 1_1
instead of
C1: 0_0, 0_1 C2: 1_0, 1_1
We add tests to both assignors to check for this behavior, and improve
the new assingor by enforcing corresponding orderings.
Reviewers: Bill Bejeck <bill@confluent.io>
The original test timeout when using new protocol, because it use
`ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG` as the exception's timeout,
which is 300s. Also the test for new protocol and old protocol use the
same group ID, so the failure will be hidden.
What I do:
1. Set the timeout as 5 secs so it can be captured within 10s
2. Use new appId for new protocol
Reviewers: Lucas Brutschy <lucasbru@apache.org>
This PR removes two System.out.println(...) statements from
StreamsGraphTest. These outputs were left over from debugging and are
not needed in the test logic.
Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi
<kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
With "merge.repartition.topic" optimization enabled, Kafka Streams tries
to push repartition topics upstream, to be able to merge multiple
repartition topics from different downstream branches together.
However, it is not safe to push a repartition topic if the parent node
is value-changing: because of potentially changing data types, the
topology might become invalid, and fail with serde error at runtime.
The optimization itself work correctly, however, processValues() is not
correctly declared as value-changing, what can lead to invalid
topologies.
Reviewers: Bill Bejeck <bill@confluent.io>, Lucas Brutschy
<lbrutschy@confluent.io>
We add the three main changes in this PR
- Disallowing null values for most LIST-type configurations makes sense,
since users cannot explicitly set a configuration to null in a
properties file. Therefore, only configurations with a default value of
null should be allowed to accept null.
- Disallowing duplicate values is reasonable, as there are currently no
known configurations in Kafka that require specifying the same value
multiple times. Allowing duplicates is both rare in practice and
potentially confusing to users.
- Disallowing empty list, even though many configurations currently
accept them. In practice, setting an empty list for several of these
configurations can lead to server startup failures or unexpected
behavior. Therefore, enforcing non-empty lists helps prevent
misconfiguration and improves system robustness.
These changes may introduce some backward incompatibility, but this
trade-off is justified by the significant improvements in safety,
consistency, and overall user experience.
Additionally, we introduce two minor adjustments:
- Reclassify some STRING-type configurations as LIST-type, particularly
those using comma-separated values to represent multiple entries. This
change reflects the actual semantics used in Kafka.
- Update the default values for some configurations to better align with
other configs.
These changes will not introduce any compatibility issues.
Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
The original implementation uses a linear search to find the least
loaded process in O(n), and we can replace this by look-ups in a heap is
O(log(n)), as described below
Active tasks: For active tasks, we can do exactly the same assignment as
in the original algorithm by first building a heap (by load) of all
processes. When we assign a task, we pick the head off the heap, assign
the task to it, update the load, and re-insert it into the heap in
O(log(n)).
Standby tasks: For standby tasks, we cannot do this optimization
directly, because of the order in which we assign tasks:
1. We first try to assign task A to a process that previously owned A.
2. If we did not find such a process, we assign A to the least loaded
node.
3. We now try to assign task B to a process that previously owned B
4. If we did not find such a process, we assign B to the least loaded
node
...
The problem is that we cannot efficiently keep a heap (by load)
throughout this process, because finding and removing process that
previously owned A (and B and…) in the heap is O(n). We therefore need
to change the order of evaluation to be able to use a heap:
1. Try to assign all tasks A, B.. to a process that previously owned the
task
2. Build a heap.
3. Assign all remaining tasks to the least-loaded process that does not
yet own the task. Since at most NumStandbyReplicas already own the task,
we can do it by removing up to NumStandbyReplicas from the top of the
heap in O(log(n)), so we get O(log(NumProcesses)*NumStandbyReplicas).
Note that the change in order changes the resulting standby assignments
(although this difference does not show up in the existing unit tests).
I would argue that the new order of assignment will actually yield
better assignments, since the assignment will be more sticky, which has
the potential to reduce the amount of store we have to restore from the
changelog topic after assingments.
In our worst-performing benchmark, this improves the runtime by ~107x.
Reviewers: Bill Bejeck<bbejeck@apache.org>
In KAFKA-19570 we implemented offline migration between groups, that is,
the following integration test or system test should be possible:
Test A:
- Start a streams application with classic protocol, process up to a
certain offset and commit the offset and shut down. - Start the same
streams application with streams protocol (same app ID!). - Make sure
that the offsets before the one committed in the first run are not
reprocessed in the second run.
Test B:
- Start a streams application with streams protocol, process up to a
certain offset and commit the offset and shut down. - Start the same
streams application with classic protocol (same app ID!). - Make sure
that the offsets before the one committed in the first run are not
reprocessed in the second run.
We have unit tests that make sure that non-empty groups will not be
converted. This should be enough.
Reviewers: Bill Bejeck <bbejeck@apache.org>
`shouldCallOldImplementationExceptionHandler` should be a test case, but
somehow misses the `@Test` tag
Reviewers: Ken Huang <s7133700@gmail.com>, TaiJuWu <tjwu1217@gmail.com>,
Chia-Ping Tsai <chia7712@gmail.com>
What I observed is that if I run both combinations useNewProtocol=true,
useNewProtocol=false it would often fail the second time, but if I only
run the second variation useNewProtocol=false it works, and only the
first variation useNewProtocol=true also works. So this points to some
state that is not cleared between the tests - and indeed, the test
creates a topic “inputTopic”, produces to it, but doesn’t delete it, so
the second variation will run with produce to it again and then run with
twice the data.
I also reduced heartbeat interval and session timeout since some of the
tests need to wait for the old consumer to leave which (sigh) Kafka
Streams doesn't do, so we have to wait that it gets kicked out by
session timeout. So previously we waited for 45 seconds, now, we at
least wait only 1 second.
Reviewers: Bill Bejeck <bbejeck@apache.org>, Chia-Ping Tsai
<chia7712@gmail.com>
The test uses regular consumer to commit offsets. New protocol requires
a streams consumer since we are using streams groups, otherwise we run
into group ID conflicts.
Followed the addition of the KafkaAdmin interface for setting offsets, a
Kafka Admin client is created and used the interface in to set the
committed offsets instead of instantiating a consumer.
Also enable all tests for stream new protocol.
Reviewers: Alieh Saeedi<asaeedi@confluent.io>, Kirk True
<ktrue@confluent.io>, Matthias Sax <mjsax@apache.org>, Bill Bejeck
<bbejeck@apache.org>
Implements a timeout mechanism (using maxPollTimeMs) that waits for
missing source topics to be created before failing, instead of
immediately throwing exceptions in the new Streams protocol.
Additionally, throw TopologyException when partition count mismatch is
detected.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Alieh Saeedi
<asaeedi@confluent.io>, Matthias J. Sax <matthias@confluent.io>
All state updater threads use the same metrics instance, but do not use
unique names for their sensors. This can have the following symptoms:
1) Data inserted into one sensor by one thread can affect the metrics of
all state updater threads.
2) If one state updater thread is shutdown, the metrics associated to
all state updater threads are removed.
3) If one state updater thread is started, while another one is removed,
it can happen that a metric is registered with the `Metrics` instance,
but not associated to any `Sensor` (because it is concurrently removed),
which means that the metric will not be removed upon shutdown. If a
thread with the same name later tries to register the same metric, we
may run into a `java.lang.IllegalArgumentException: A metric named ...
already exists`, as described in the ticket.
This change fixes the bug giving unique names to the sensors. A test is
added that there is no interference of the removal of sensors and
metrics during shutdown.
Reviewers: Matthias J. Sax <matthias@confluent.io>
This PR fixes an issue where the thread name shown in log messages did
not match the actual execution context. Previously, log entries
displayed the context of the newly created thread, while the logger
reflected the current executing thread. This mismatch led to confusion
and made log tracing more difficult.
Changes:
- Use logger without context to not have context
- Updated log messages to explicitly describe the thread being created
- Fixed instances where the log context reflected the current thread
instead of the newly created one
Reviewers: Matthias J. Sax <matthias@confluent.io>
Implements KIP-1195.
BrokerNotFoundException exception is unused since 2.8 Marking it
deprecated so that it can be removed in next major release.
Reviewers: Matthias J. Sax <matthias@confluent.io>
With 4.0 release, we remove pageview demo because it depends on
`:connect:json` which requires JDK 17. This PR removes the connect
dependency and adds a customized serializer and deserializer, to make
pageview demo works with JDK 11.
Reviewers: Matthias J. Sax <matthias@confluent.io>
There is a typo in the unit test, it calls
`runOnceWithoutProcessingThreads` while it should call
`runOnceWithProcessingThreads` instead.
Reviewers: Lucas Brutschy <lucasbru@apache.org>
Implements KIP-1034 to add support of Dead Letter
Queue in Kafka Streams.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Bruno Cadonna
<cadonna@apache.org>
Co-authored-by: Sebastien Viale <sebastien.viale@michelin.com>
Temporarily fix it by disable the new protocol, will take a deeper look
at it in the consumer protocol.
Reviewers: Matthias J. Sax <matthias@confluent.io>
The new "streams" protocol behaves slightly different to the "classic"
protocol, and thus we need to update the test to avoid race conditions.
In particular, the first call to `poll()` won't "block" and return after
task assignment completed if we need to create internal topics, but
returns early without a task assignment, and only a consecutive
rebalance will assign tasks.
This implies, that KafkaStreams transits to RUNNING state even if the
group is still in NOT_READY state broker side, but this NOT_READY state
is not reflected in the client side state machine.
Disabling the combination of "complex-topology + streams" for now,
until this difference in behavior of the client state machine is fixed.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
In the old protocol, Kafka Streams used to throw a
`MissingSourceTopicException` when a source topic is missing. In the new
protocol, it doesn’t do that anymore, while only log the status that is
returned from the broker, which contains a status that indicates that a
source topic is missing.
This change:
1. Throws an `MissingSourceTopicException` when source topic is missing
2. Adds unit tests
3. Modifies integration tests to fit both old and new protocols
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
When building Streams I get this warning:
```
> Task :streams:compileTestJava
Note: Some input files use or override a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
Note:
<PATH>/kafka/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
uses unchecked or unsafe operations.
```
Reviewers: Jhen-Yung Hsu <jhenyunghsu@gmail.com>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
When writing HTML, it's recommended to use the <code> element instead of
backticks for inline code formatting.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, TengYao Chi
<frankvicky@apache.org>
We can remove the explicit counter for open iterators, and just use
size() on the set of open iterators we track anyway.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
It looks like we are checking for properties that are not guaranteed
under at_least_once, for example, exact counting (not allowing for
overcounting).
This change relaxes the validation constraint:
The TAGG topic contains effectively count-by-count results. So for
example, if we have the input without duplication
0 -> 1,2,3 we will get in TAGG 3 -> 1, since 1 key had 3 values.
with duplication:
0 -> 1,1,2,3 we will get in TAGG 4 -> 1, since 1 key had 4 values.
This makes the result difficult to compare. Since we run the smoke test
also with Exactly_Once, I propose to disable validation off TAGG under
ALOS.
Similarly, the topic AVG may overcount or undercount. The test case is
extremely similar to DIF, both performing a join and two streams, the
only difference being the mathematical operation performed, so we can
also disable this validation under ALOS with minimal loss of coverage.
Finally, the change fixes a bug that would throw a NPE when validation
of a windowed stream would fail.
Reviewers: Kirk True <kirk@kirktrue.pro>, Matthias J. Sax
<matthias@confluent.io>
StreamProducer may timeout in sendOffsetsToTransaction() or
commitTransaction() call. To distinguish both cases, we should make both
calls in individual try-catch blocks.
Reviewers: Bill Bejeck<bbejeck@apache.org>
Ensure the config.providers configuration is documented for all
components supporting it
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Greg Harris
<gharris1727@gmail.com>, Matthias J. Sax <mjsax@apache.org>
The metric for oldest-iterator-open-since-ms might report a null value
if there is not open iterator.
This PR changes the behavior to dynamically register/deregister the
entire metric instead of allowing it to return a null value.
Reviewers: Bill Bejeck <bbejeck@apache.org>