For debugging it is useful to see the actual state directory when
an exception regarding the state directory is thrown.
Reviewer: Bill Bejeck <bbejeck@apache.org>
Previously we were only verifying the new query could be added after we had already inserted it into the TopologyMetadata, so we need to move the validation upfront.
Also adds a test case for this and improves handling of NPE in case of future or undiscovered bugs.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
RocksDB v6.27.3 has been released and it is the first release to support s390x. RocksDB is currently the only dependency in gradle/dependencies.gradle without s390x support.
RocksDB v6.27.3 has added some new options that require an update to streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java but no other changes are needed to upgrade.
I have run the unit/integration tests locally on s390x and also the :streams tests on x86_64 and they pass.
Reviewers: Luke Chen <showuon@gmail.com>, Bruno Cadonna <cadonna@apache.org>
Co-authored-by: Kurt Ostfeld <kurt@samba.tv>
Reviewers: Kvicii <Karonazaba@gmail.com>, Luke Chen <showuon@gmail.com>, David Jacot <djacot@confluent.io>
This PR is an implementation of: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390. The following changes have been made:
* Adding a new config input.buffer.max.bytes applicable at a topology level.
* Adding new config statestore.cache.max.bytes.
* Adding new metric called input-buffer-bytes-total.
* The per partition config buffered.records.per.partition is deprecated.
* The config cache.max.bytes.buffering is deprecated.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Guozhang Wang <guozhang@confluent.io>
There are cases in which a state store neither has an in-memory position built up nor has it gone through the state restoration process. If a store is persistent (i.e., RocksDB), and we stop and restart Streams, we will have neither of those continuity mechanisms available.
This patch:
* adds a test to verify that all stores correctly recover their position after a restart
* implements storage and recovery of the position for persistent stores alongside on-disk state
Reviewers: Vicky Papavasileiou <vpapavasileiou@confluent.io>, Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <guozhang@apache.org>, John Roesler <vvcephei@apache.org>
Currently, IQv2 forwards all queries to the underlying store. We add this bypass to allow handling of key queries in the cache. If a key exists in the cache, it will get answered from there.
As part of this PR, we realized we need access to the position of the underlying stores. So, I added the method getPosition to the public API and ensured all state stores implement it. Only the "leaf" stores (Rocks*, InMemory*) have an actual position, all wrapping stores access their wrapped store's position.
Reviewers: Patrick Stuedi <pstuedi@apache.org>, John Roesler <vvcephei@apache.org>
There is a brief window between when the store is registered and when
it is initialized when it might handle a query, but there is no context.
We treat this condition just like a store that hasn't caught up to the
desired position yet.
Reviewers: Guozhang Wang <guozhang@apache.org>, Matthias J. Sax <mjsax@apache.org>, A. Sophie Blee-Goldman <ableegoldman@apache.org>, Patrick Stuedi <pstuedi@apache.org>
Followup to #11600 to invoke the streams exception handler on the MissingSourceTopicException, without killing/replacing the thread
Reviewers: Guozhang Wang <guozhang@confluent.io>, Bruno Cadonna <cadonna@confluent.io>
Fixes some issues with the NamedTopology version of the IQ methods that accept a topologyName argument, and adds tests for all.
Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
Avoid throwing a MissingSourceTopicException inside the #assign method when named topologies are used, and just remove those topologies which are missing any of their input topics from the assignment.
Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>, Bruno Cadonna <cadonna@confluent.io>
During some recent reviews, @mjsax pointed out that StateStore layers
are constructed differently the stores are added via the PAPI vs. the DSL.
This PR adds PAPI construction for Window and Session stores to the
IQv2StoreIntegrationTest so that we can ensure IQv2 works on every
possible state store.
Reviewer: Guozhang Wang <guozhang@apache.org>
During some recent reviews, @mjsax pointed out that StateStore layers
are constructed differently the stores are added via the PAPI vs. the DSL.
This PR adds KeyValueStore PAPI construction to the
IQv2StoreIntegrationTest so that we can ensure IQv2 works on every
possible state store.
Reviewers: Patrick Stuedi <pstuedi@apache.org>, Guozhang Wang <guozhang@apache.org>
Implement the KeyQuery as proposed in KIP-796
Reviewers: Vicky Papavasileiou <vpapavasileiou@confluent.io>, Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <guozhang@apache.org>
When this was made I didn't expect deleteOffsetsResult to be set if an exception was thrown. But it is and to retry we need to reset it to null. Changing the KafkaStreamsNamedTopologyWrapper for remove topology when resetting offsets to retry upon GroupSubscribedToTopicException and swallow/complete upon GroupIdNotFoundException
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@ache.>
Update an unclear log message and method name in TopologyMetadata
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Guozhang Wang <guozhang@confluent.io>, Luke Chen <showuon@confluent.io>
* Fill in the Position response in the IQv2 result.
* Enforce PositionBound in IQv2 queries.
* Update integration testing approach to leverage consistent queries.
Reviewers: Patrick Stuedi <pstuedi@apache.org>, Vicky Papavasileiou <vpapavasileiou@confluent.io>, Guozhang Wang <guozhang@apache.org>
Fix for one of the causes of failure in the NamedTopologyIntegrationTest: org.apache.kafka.streams.errors.StreamsException: java.lang.IllegalStateException: Must initialize prevActiveTasks from ownedPartitions before initializing remaining tasks.
This exception could occur if a member sent in a subscription where all of its ownedPartitions were from a named topology that is no longer recognized by the group leader, eg because it was just removed from the client. We should filter each ClientState based on the current topology only so the assignor only processes the partitions/tasks it can identify. The member with the out-of-date tasks will eventually clean them up when the #removeNamedTopology API is invoked on them
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
Use the name specified via consumed parameter in InternalStreamsBuilder#addGlobalStore method for initializing the source name and processor name. If not specified, the names are generated.
Reviewers: Luke Chen <showuon@gmail.com>, Bill Bejeck <bbejeck@apache.org>
In the new NamedTopology API being worked on, state store names and their uniqueness requirement is going to be scoped only to the owning topology, rather than to the entire app. In other words, two different named topologies can have different state stores with the same name.
This is going to cause problems for some of the existing IQ APIs which require only a name to resolve the underlying state store. We're now going to need to take in the topology name in addition to the state store name to be able to locate the specific store a user wants to query
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Introduces changelog headers to pass position information
to standby and restoring stores. The headers are guarded by an internal
config, which defaults to `false` for backward compatibility. Once IQv2
is finalized, we will make that flag a public config.
Reviewers: Patrick Stuedi <pstuedi@apache.org>, John Roesler <vvcephei@apache.org>
When the unit tests of the internal topic manager test
are executed on a slow machine (like sometimes in automatic builds)
they sometimes fail with a timeout exception instead of the expected
exception. To fix this behavior, this commit replaces the use of
system time with mock time.
Reviewer: John Roesler <vvcephei@apache.org>
Implements the major part of the IQv2 framework as proposed in KIP-796.
Reviewers: Patrick Stuedi <pstuedi@apache.org>, Vicky Papavasileiou <vpapavasileiou@confluent.io>, Bruno Cadonnna <cadonna@apache.org>
Log messages were changed in the AssignorConfiguration (#11490) that are
also used for verification in system test
StreamsCooperativeRebalanceUpgradeTest.test_upgrade_to_cooperative_rebalance.
This commit fixes the test and adds comments to the log messages
that point to the test that needs to be updated in case of
changes to the log messages.
Reviewers: John Roesler <vvcephei@apache.org>, Luke Chen <showuon@gmail.com>, David Jacot <djacot@confluent.io>
With a nice mock in RocksDBMetricsRecorderTest#shouldCorrectlyHandleHitRatioRecordingsWithZeroHitsAndMisses() and RocksDBMetricsRecorderTest#shouldCorrectlyHandleAvgRecordingsWithZeroSumAndCount() were green although getTickerCount() was never called. The tests were green because EasyMock returns 0 for a numerical return value by default if no expectation is specified. Thus, commenting out the expectation for getTickerCount() did not change the result of the test.
This commit changes the mock to a default mock and fixes the expectation to expect getAndResetTickerCount(). Now, commenting out the expectation leads to a test failure.
Reviewers: Luizfrf3 <lf.fonseca@hotmail.com>, Guozhang Wang <wangguoz@gmail.com>
Deprecate eager rebalancing protocol in kafka streams and log warning message when upgrade.from is set to 2.3 or lower. Also add a note in upgrade doc to prepare users for the removal of eager rebalancing support
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>