mirror of https://github.com/apache/kafka.git
MINOR: fix typos for streams (#13888)
Reviewers: Divij Vaidya <diviv@amazon.com>, Manyanda Chitimbo <manyanda.chitimbo@gmail.com>
This commit is contained in:
parent
39a47c8999
commit
474053d297
|
@ -36,7 +36,7 @@ import java.util.concurrent.CountDownLatch;
|
||||||
/**
|
/**
|
||||||
* In this example, we implement a simple WordCount program using the high-level Streams DSL
|
* In this example, we implement a simple WordCount program using the high-level Streams DSL
|
||||||
* that reads from a source topic "streams-plaintext-input", where the values of messages represent lines of text,
|
* that reads from a source topic "streams-plaintext-input", where the values of messages represent lines of text,
|
||||||
* split each text line into words and then compute the word occurence histogram, write the continuous updated histogram
|
* split each text line into words and then compute the word occurrence histogram, write the continuous updated histogram
|
||||||
* into a topic "streams-wordcount-output" where each record is an updated count of a single word.
|
* into a topic "streams-wordcount-output" where each record is an updated count of a single word.
|
||||||
*/
|
*/
|
||||||
public class WordCount {
|
public class WordCount {
|
||||||
|
|
|
@ -20,7 +20,7 @@ import java.util.Objects;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Encapsulates information about lag, at a store partition replica (active or standby). This information is constantly changing as the
|
* Encapsulates information about lag, at a store partition replica (active or standby). This information is constantly changing as the
|
||||||
* tasks process records and thus, they should be treated as simply instantaenous measure of lag.
|
* tasks process records and thus, they should be treated as simply instantaneous measure of lag.
|
||||||
*/
|
*/
|
||||||
public class LagInfo {
|
public class LagInfo {
|
||||||
|
|
||||||
|
|
|
@ -849,7 +849,7 @@ public interface KStream<K, V> {
|
||||||
* @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
|
* @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
|
||||||
* @deprecated since 2.6; use {@link #repartition()} instead
|
* @deprecated since 2.6; use {@link #repartition()} instead
|
||||||
*/
|
*/
|
||||||
// TODO: when removed, update `StreamsResetter` decription of --intermediate-topics
|
// TODO: when removed, update `StreamsResetter` description of --intermediate-topics
|
||||||
@Deprecated
|
@Deprecated
|
||||||
KStream<K, V> through(final String topic);
|
KStream<K, V> through(final String topic);
|
||||||
|
|
||||||
|
|
|
@ -142,7 +142,7 @@ public class Produced<K, V> implements NamedOperation<Produced<K, V>> {
|
||||||
* Produce records using the provided partitioner.
|
* Produce records using the provided partitioner.
|
||||||
* @param partitioner the function used to determine how records are distributed among partitions of the topic,
|
* @param partitioner the function used to determine how records are distributed among partitions of the topic,
|
||||||
* if not specified and the key serde provides a {@link WindowedSerializer} for the key
|
* if not specified and the key serde provides a {@link WindowedSerializer} for the key
|
||||||
* {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultStreamPartitioner} wil be used
|
* {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultStreamPartitioner} will be used
|
||||||
* @return this
|
* @return this
|
||||||
*/
|
*/
|
||||||
public Produced<K, V> withStreamPartitioner(final StreamPartitioner<? super K, ? super V> partitioner) {
|
public Produced<K, V> withStreamPartitioner(final StreamPartitioner<? super K, ? super V> partitioner) {
|
||||||
|
|
|
@ -162,7 +162,7 @@ public class Repartitioned<K, V> implements NamedOperation<Repartitioned<K, V>>
|
||||||
*
|
*
|
||||||
* @param partitioner the function used to determine how records are distributed among partitions of the topic,
|
* @param partitioner the function used to determine how records are distributed among partitions of the topic,
|
||||||
* if not specified and the key serde provides a {@link WindowedSerializer} for the key
|
* if not specified and the key serde provides a {@link WindowedSerializer} for the key
|
||||||
* {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultStreamPartitioner} wil be used
|
* {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultStreamPartitioner} will be used
|
||||||
* @return a new {@code Repartitioned} instance configured with provided partitioner
|
* @return a new {@code Repartitioned} instance configured with provided partitioner
|
||||||
*/
|
*/
|
||||||
public Repartitioned<K, V> withStreamPartitioner(final StreamPartitioner<K, V> partitioner) {
|
public Repartitioned<K, V> withStreamPartitioner(final StreamPartitioner<K, V> partitioner) {
|
||||||
|
|
|
@ -165,7 +165,7 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
|
||||||
// problem:
|
// problem:
|
||||||
//
|
//
|
||||||
// Say we have a window size of 5 seconds
|
// Say we have a window size of 5 seconds
|
||||||
// 1. A non-joined record wth time T10 is seen in the left-topic (maxLeftStreamTime: 10)
|
// 1. A non-joined record with time T10 is seen in the left-topic (maxLeftStreamTime: 10)
|
||||||
// The record is not processed yet, and is added to the outer-join store
|
// The record is not processed yet, and is added to the outer-join store
|
||||||
// 2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2)
|
// 2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2)
|
||||||
// The record is not processed yet, and is added to the outer-join store
|
// The record is not processed yet, and is added to the outer-join store
|
||||||
|
|
|
@ -174,8 +174,8 @@ public class TableSourceNode<K, V> extends SourceGraphNode<K, V> {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public TableSourceNodeBuilder<K, V> isGlobalKTable(final boolean isGlobaKTable) {
|
public TableSourceNodeBuilder<K, V> isGlobalKTable(final boolean isGlobalKTable) {
|
||||||
this.isGlobalKTable = isGlobaKTable;
|
this.isGlobalKTable = isGlobalKTable;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -43,7 +43,7 @@ public final class ForwardingDisabledProcessorContext implements ProcessorContex
|
||||||
private static final String EXPLANATION = "ProcessorContext#forward() is not supported from this context, "
|
private static final String EXPLANATION = "ProcessorContext#forward() is not supported from this context, "
|
||||||
+ "as the framework must ensure the key is not changed (#forward allows changing the key on "
|
+ "as the framework must ensure the key is not changed (#forward allows changing the key on "
|
||||||
+ "messages which are sent). Try another function, which doesn't allow the key to be changed "
|
+ "messages which are sent). Try another function, which doesn't allow the key to be changed "
|
||||||
+ "(for example - #tranformValues).";
|
+ "(for example - #transformValues).";
|
||||||
|
|
||||||
public ForwardingDisabledProcessorContext(final ProcessorContext delegate) {
|
public ForwardingDisabledProcessorContext(final ProcessorContext delegate) {
|
||||||
this.delegate = Objects.requireNonNull(delegate, "delegate");
|
this.delegate = Objects.requireNonNull(delegate, "delegate");
|
||||||
|
|
|
@ -149,7 +149,7 @@ public class InternalTopologyBuilder {
|
||||||
|
|
||||||
// The name of the topology this builder belongs to, or null if this is not a NamedTopology
|
// The name of the topology this builder belongs to, or null if this is not a NamedTopology
|
||||||
private final String topologyName;
|
private final String topologyName;
|
||||||
// TODO KAFKA-13336: we can remove this referance once we make the Topology/NamedTopology class into an interface and implement it
|
// TODO KAFKA-13336: we can remove this reference once we make the Topology/NamedTopology class into an interface and implement it
|
||||||
private NamedTopology namedTopology;
|
private NamedTopology namedTopology;
|
||||||
|
|
||||||
// TODO KAFKA-13283: once we enforce all configs be passed in when constructing the topology builder then we can set
|
// TODO KAFKA-13283: once we enforce all configs be passed in when constructing the topology builder then we can set
|
||||||
|
|
|
@ -161,7 +161,7 @@ public class ProcessorRecordContext implements RecordContext, RecordMetadata {
|
||||||
final int partition = buffer.getInt();
|
final int partition = buffer.getInt();
|
||||||
final int headerCount = buffer.getInt();
|
final int headerCount = buffer.getInt();
|
||||||
final Headers headers;
|
final Headers headers;
|
||||||
if (headerCount == -1) { // keep for backward compatibilty
|
if (headerCount == -1) { // keep for backward compatibility
|
||||||
headers = new RecordHeaders();
|
headers = new RecordHeaders();
|
||||||
} else {
|
} else {
|
||||||
final Header[] headerArr = new Header[headerCount];
|
final Header[] headerArr = new Header[headerCount];
|
||||||
|
|
|
@ -426,7 +426,7 @@ public final class Stores {
|
||||||
* Create an in-memory {@link SessionBytesStoreSupplier}.
|
* Create an in-memory {@link SessionBytesStoreSupplier}.
|
||||||
*
|
*
|
||||||
* @param name name of the store (cannot be {@code null})
|
* @param name name of the store (cannot be {@code null})
|
||||||
* @param retentionPeriod length ot time to retain data in the store (cannot be negative)
|
* @param retentionPeriod length of time to retain data in the store (cannot be negative)
|
||||||
* (note that the retention period must be at least as long enough to
|
* (note that the retention period must be at least as long enough to
|
||||||
* contain the inactivity gap of the session and the entire grace period.)
|
* contain the inactivity gap of the session and the entire grace period.)
|
||||||
* @return an instance of a {@link SessionBytesStoreSupplier}
|
* @return an instance of a {@link SessionBytesStoreSupplier}
|
||||||
|
|
|
@ -190,7 +190,7 @@ public class SessionKeySchema implements SegmentedBytesStore.KeySchema {
|
||||||
// since the session window length can vary, we define the search boundary as:
|
// since the session window length can vary, we define the search boundary as:
|
||||||
// lower: [0, ESET]
|
// lower: [0, ESET]
|
||||||
// upper: [LSST, INF]
|
// upper: [LSST, INF]
|
||||||
// and by puting the end time first and then the start time, the serialized search boundary
|
// and by putting the end time first and then the start time, the serialized search boundary
|
||||||
// is: [(ESET-0), (INF-LSST)]
|
// is: [(ESET-0), (INF-LSST)]
|
||||||
buf.put(key.get());
|
buf.put(key.get());
|
||||||
buf.putLong(endTime);
|
buf.putLong(endTime);
|
||||||
|
|
|
@ -204,7 +204,7 @@ public class StreamsConfigTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetMainConsumerConfigsWithMainConsumerOverridenPrefix() {
|
public void testGetMainConsumerConfigsWithMainConsumerOverriddenPrefix() {
|
||||||
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "5");
|
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "5");
|
||||||
props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "50");
|
props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "50");
|
||||||
props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.GROUP_ID_CONFIG), "another-id");
|
props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.GROUP_ID_CONFIG), "another-id");
|
||||||
|
@ -463,7 +463,7 @@ public class StreamsConfigTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetRestoreConsumerConfigsWithRestoreConsumerOverridenPrefix() {
|
public void testGetRestoreConsumerConfigsWithRestoreConsumerOverriddenPrefix() {
|
||||||
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "5");
|
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "5");
|
||||||
props.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "50");
|
props.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "50");
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
|
@ -511,7 +511,7 @@ public class StreamsConfigTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetGlobalConsumerConfigsWithGlobalConsumerOverridenPrefix() {
|
public void testGetGlobalConsumerConfigsWithGlobalConsumerOverriddenPrefix() {
|
||||||
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "5");
|
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "5");
|
||||||
props.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "50");
|
props.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "50");
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
|
@ -599,14 +599,14 @@ public class StreamsConfigTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldAcceptExactlyOnce() {
|
public void shouldAcceptExactlyOnce() {
|
||||||
// don't use `StreamsConfig.EXACLTY_ONCE` to actually do a useful test
|
// don't use `StreamsConfig.EXACTLY_ONCE` to actually do a useful test
|
||||||
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
|
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
|
||||||
new StreamsConfig(props);
|
new StreamsConfig(props);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldAcceptExactlyOnceBeta() {
|
public void shouldAcceptExactlyOnceBeta() {
|
||||||
// don't use `StreamsConfig.EXACLTY_ONCE_BETA` to actually do a useful test
|
// don't use `StreamsConfig.EXACTLY_ONCE_BETA` to actually do a useful test
|
||||||
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once_beta");
|
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once_beta");
|
||||||
new StreamsConfig(props);
|
new StreamsConfig(props);
|
||||||
}
|
}
|
||||||
|
|
|
@ -112,7 +112,7 @@ public class JoinStoreIntegrationTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void providingAJoinStoreNameShouldNotMakeTheJoinResultQueriable() throws InterruptedException {
|
public void providingAJoinStoreNameShouldNotMakeTheJoinResultQueryable() throws InterruptedException {
|
||||||
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-no-store-access");
|
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-no-store-access");
|
||||||
final StreamsBuilder builder = new StreamsBuilder();
|
final StreamsBuilder builder = new StreamsBuilder();
|
||||||
|
|
||||||
|
|
|
@ -713,7 +713,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
|
||||||
queryableStoreName == null ?
|
queryableStoreName == null ?
|
||||||
Materialized.with(null, serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)) :
|
Materialized.with(null, serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)) :
|
||||||
// not actually going to query this store, but we need to force materialization here
|
// not actually going to query this store, but we need to force materialization here
|
||||||
// to really test this confuguration
|
// to really test this configuration
|
||||||
Materialized.<String, String>as(Stores.inMemoryKeyValueStore(queryableStoreName + "-rejoin"))
|
Materialized.<String, String>as(Stores.inMemoryKeyValueStore(queryableStoreName + "-rejoin"))
|
||||||
.withValueSerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
|
.withValueSerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
|
||||||
// the cache suppresses some of the unnecessary tombstones we want to make assertions about
|
// the cache suppresses some of the unnecessary tombstones we want to make assertions about
|
||||||
|
|
|
@ -162,7 +162,7 @@ public class RepartitionTopicNamingTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldNotReuseRepartitionNodeWithUnamedRepartitionTopics() {
|
public void shouldNotReuseRepartitionNodeWithUnnamedRepartitionTopics() {
|
||||||
final StreamsBuilder builder = new StreamsBuilder();
|
final StreamsBuilder builder = new StreamsBuilder();
|
||||||
final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic")
|
final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic")
|
||||||
.selectKey((k, v) -> k)
|
.selectKey((k, v) -> k)
|
||||||
|
@ -174,7 +174,7 @@ public class RepartitionTopicNamingTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldNotReuseRepartitionNodeWithUnamedRepartitionTopicsKGroupedTable() {
|
public void shouldNotReuseRepartitionNodeWithUnnamedRepartitionTopicsKGroupedTable() {
|
||||||
final StreamsBuilder builder = new StreamsBuilder();
|
final StreamsBuilder builder = new StreamsBuilder();
|
||||||
final KGroupedTable<String, String> kGroupedTable = builder.<String, String>table("topic").groupBy(KeyValue::pair);
|
final KGroupedTable<String, String> kGroupedTable = builder.<String, String>table("topic").groupBy(KeyValue::pair);
|
||||||
kGroupedTable.count().toStream().to("output-count");
|
kGroupedTable.count().toStream().to("output-count");
|
||||||
|
|
|
@ -53,7 +53,7 @@ public class TransformerSupplierAdapterTest {
|
||||||
final String value = "World";
|
final String value = "World";
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldCallInitOfAdapteeTransformer() {
|
public void shouldCallInitOfAdaptedTransformer() {
|
||||||
when(transformerSupplier.get()).thenReturn(transformer);
|
when(transformerSupplier.get()).thenReturn(transformer);
|
||||||
|
|
||||||
final TransformerSupplierAdapter<String, String, Integer, Integer> adapter =
|
final TransformerSupplierAdapter<String, String, Integer, Integer> adapter =
|
||||||
|
@ -65,7 +65,7 @@ public class TransformerSupplierAdapterTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldCallCloseOfAdapteeTransformer() {
|
public void shouldCallCloseOfAdaptedTransformer() {
|
||||||
when(transformerSupplier.get()).thenReturn(transformer);
|
when(transformerSupplier.get()).thenReturn(transformer);
|
||||||
|
|
||||||
final TransformerSupplierAdapter<String, String, Integer, Integer> adapter =
|
final TransformerSupplierAdapter<String, String, Integer, Integer> adapter =
|
||||||
|
@ -77,7 +77,7 @@ public class TransformerSupplierAdapterTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldCallStoresOfAdapteeTransformerSupplier() {
|
public void shouldCallStoresOfAdaptedTransformerSupplier() {
|
||||||
when(transformerSupplier.stores()).thenReturn(stores);
|
when(transformerSupplier.stores()).thenReturn(stores);
|
||||||
|
|
||||||
final TransformerSupplierAdapter<String, String, Integer, Integer> adapter =
|
final TransformerSupplierAdapter<String, String, Integer, Integer> adapter =
|
||||||
|
@ -86,7 +86,7 @@ public class TransformerSupplierAdapterTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldCallTransformOfAdapteeTransformerAndReturnSingletonIterable() {
|
public void shouldCallTransformOfAdaptedTransformerAndReturnSingletonIterable() {
|
||||||
when(transformerSupplier.get()).thenReturn(transformer);
|
when(transformerSupplier.get()).thenReturn(transformer);
|
||||||
when(transformer.transform(key, value)).thenReturn(KeyValue.pair(0, 1));
|
when(transformer.transform(key, value)).thenReturn(KeyValue.pair(0, 1));
|
||||||
|
|
||||||
|
@ -101,7 +101,7 @@ public class TransformerSupplierAdapterTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldCallTransformOfAdapteeTransformerAndReturnEmptyIterable() {
|
public void shouldCallTransformOfAdaptedTransformerAndReturnEmptyIterable() {
|
||||||
when(transformerSupplier.get()).thenReturn(transformer);
|
when(transformerSupplier.get()).thenReturn(transformer);
|
||||||
when(transformer.transform(key, value)).thenReturn(null);
|
when(transformer.transform(key, value)).thenReturn(null);
|
||||||
|
|
||||||
|
|
|
@ -60,7 +60,7 @@ public class StreamsGraphTest {
|
||||||
private Initializer<String> initializer;
|
private Initializer<String> initializer;
|
||||||
private Aggregator<String, String, String> aggregator;
|
private Aggregator<String, String, String> aggregator;
|
||||||
|
|
||||||
// Test builds topology in succesive manner but only graph node not yet processed written to topology
|
// Test builds topology in successive manner but only graph node not yet processed written to topology
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldBeAbleToBuildTopologyIncrementally() {
|
public void shouldBeAbleToBuildTopologyIncrementally() {
|
||||||
|
@ -200,11 +200,11 @@ public class StreamsGraphTest {
|
||||||
@Test
|
@Test
|
||||||
public void shouldNotOptimizeWhenAThroughOperationIsDone() {
|
public void shouldNotOptimizeWhenAThroughOperationIsDone() {
|
||||||
final Topology attemptedOptimize = getTopologyWithThroughOperation(StreamsConfig.OPTIMIZE);
|
final Topology attemptedOptimize = getTopologyWithThroughOperation(StreamsConfig.OPTIMIZE);
|
||||||
final Topology noOptimziation = getTopologyWithThroughOperation(StreamsConfig.NO_OPTIMIZATION);
|
final Topology noOptimization = getTopologyWithThroughOperation(StreamsConfig.NO_OPTIMIZATION);
|
||||||
|
|
||||||
assertEquals(attemptedOptimize.describe().toString(), noOptimziation.describe().toString());
|
assertEquals(attemptedOptimize.describe().toString(), noOptimization.describe().toString());
|
||||||
assertEquals(0, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString()));
|
assertEquals(0, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString()));
|
||||||
assertEquals(0, getCountOfRepartitionTopicsFound(noOptimziation.describe().toString()));
|
assertEquals(0, getCountOfRepartitionTopicsFound(noOptimization.describe().toString()));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -233,11 +233,11 @@ public class StreamsGraphTest {
|
||||||
@Test
|
@Test
|
||||||
public void shouldNotOptimizeWhenRepartitionOperationIsDone() {
|
public void shouldNotOptimizeWhenRepartitionOperationIsDone() {
|
||||||
final Topology attemptedOptimize = getTopologyWithRepartitionOperation(StreamsConfig.OPTIMIZE);
|
final Topology attemptedOptimize = getTopologyWithRepartitionOperation(StreamsConfig.OPTIMIZE);
|
||||||
final Topology noOptimziation = getTopologyWithRepartitionOperation(StreamsConfig.NO_OPTIMIZATION);
|
final Topology noOptimization = getTopologyWithRepartitionOperation(StreamsConfig.NO_OPTIMIZATION);
|
||||||
|
|
||||||
assertEquals(attemptedOptimize.describe().toString(), noOptimziation.describe().toString());
|
assertEquals(attemptedOptimize.describe().toString(), noOptimization.describe().toString());
|
||||||
assertEquals(2, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString()));
|
assertEquals(2, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString()));
|
||||||
assertEquals(2, getCountOfRepartitionTopicsFound(noOptimziation.describe().toString()));
|
assertEquals(2, getCountOfRepartitionTopicsFound(noOptimization.describe().toString()));
|
||||||
}
|
}
|
||||||
|
|
||||||
private Topology getTopologyWithChangingValuesAfterChangingKey(final String optimizeConfig) {
|
private Topology getTopologyWithChangingValuesAfterChangingKey(final String optimizeConfig) {
|
||||||
|
|
|
@ -109,7 +109,7 @@ public class CopartitionedTopicsEnforcerTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldThrowAnExceptionIfRepartitionTopicConfigsWithEnforcedNumOfPartitionsHaveDifferentNumOfPartitiones() {
|
public void shouldThrowAnExceptionIfRepartitionTopicConfigsWithEnforcedNumOfPartitionsHaveDifferentNumOfPartitions() {
|
||||||
final InternalTopicConfig topic1 = createRepartitionTopicConfigWithEnforcedNumberOfPartitions("repartitioned-1", 10);
|
final InternalTopicConfig topic1 = createRepartitionTopicConfigWithEnforcedNumberOfPartitions("repartitioned-1", 10);
|
||||||
final InternalTopicConfig topic2 = createRepartitionTopicConfigWithEnforcedNumberOfPartitions("repartitioned-2", 5);
|
final InternalTopicConfig topic2 = createRepartitionTopicConfigWithEnforcedNumberOfPartitions("repartitioned-2", 5);
|
||||||
|
|
||||||
|
|
|
@ -2180,7 +2180,7 @@ public class TaskManagerTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitWithALOS() {
|
public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitWithALSO() {
|
||||||
final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
|
final ProcessorStateManager stateManager = EasyMock.createStrictMock(ProcessorStateManager.class);
|
||||||
stateManager.markChangelogAsCorrupted(taskId00Partitions);
|
stateManager.markChangelogAsCorrupted(taskId00Partitions);
|
||||||
replay(stateManager);
|
replay(stateManager);
|
||||||
|
|
|
@ -26,7 +26,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
public class TopicPartitionMetadataTest {
|
public class TopicPartitionMetadataTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldGetPartitonTimeAndProcessorMeta() {
|
public void shouldGetPartitionTimeAndProcessorMeta() {
|
||||||
final ProcessorMetadata metadata = new ProcessorMetadata();
|
final ProcessorMetadata metadata = new ProcessorMetadata();
|
||||||
final String key = "some_key";
|
final String key = "some_key";
|
||||||
final long value = 100L;
|
final long value = 100L;
|
||||||
|
|
|
@ -38,7 +38,7 @@ public class BufferValueTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldDeduplicateIndenticalValues() {
|
public void shouldDeduplicateIdenticalValues() {
|
||||||
final byte[] bytes = {(byte) 0};
|
final byte[] bytes = {(byte) 0};
|
||||||
final BufferValue bufferValue = new BufferValue(bytes, bytes, null, null);
|
final BufferValue bufferValue = new BufferValue(bytes, bytes, null, null);
|
||||||
assertSame(bufferValue.priorValue(), bufferValue.oldValue());
|
assertSame(bufferValue.priorValue(), bufferValue.oldValue());
|
||||||
|
|
|
@ -64,7 +64,7 @@ public class CompositeReadOnlySessionStoreTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldFetchResulstFromUnderlyingSessionStore() {
|
public void shouldFetchResultsFromUnderlyingSessionStore() {
|
||||||
underlyingSessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
|
underlyingSessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
|
||||||
underlyingSessionStore.put(new Windowed<>("a", new SessionWindow(10, 10)), 2L);
|
underlyingSessionStore.put(new Windowed<>("a", new SessionWindow(10, 10)), 2L);
|
||||||
|
|
||||||
|
|
|
@ -126,7 +126,7 @@ public class FilteredCacheIteratorTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldThrowUnsupportedOperationExeceptionOnRemove() {
|
public void shouldThrowUnsupportedOperationExceptionOnRemove() {
|
||||||
assertThrows(UnsupportedOperationException.class, () -> allIterator.remove());
|
assertThrows(UnsupportedOperationException.class, () -> allIterator.remove());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -315,7 +315,7 @@ public class SessionKeySchemaTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldDeSerializeEmtpyByteArrayToNull() {
|
public void shouldDeSerializeEmptyByteArrayToNull() {
|
||||||
assertNull(keySerde.deserializer().deserialize(topic, new byte[0]));
|
assertNull(keySerde.deserializer().deserialize(topic, new byte[0]));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -122,7 +122,7 @@ public class MockRestoreConsumer<K, V> extends MockConsumer<byte[], byte[]> {
|
||||||
throw new IllegalArgumentException("RestoreConsumer: offset should not be negative");
|
throw new IllegalArgumentException("RestoreConsumer: offset should not be negative");
|
||||||
|
|
||||||
if (seekOffset >= 0)
|
if (seekOffset >= 0)
|
||||||
throw new IllegalStateException("RestoreConsumer: offset already seeked");
|
throw new IllegalStateException("RestoreConsumer: offset already sought");
|
||||||
|
|
||||||
seekOffset = offset;
|
seekOffset = offset;
|
||||||
currentOffset = offset;
|
currentOffset = offset;
|
||||||
|
|
Loading…
Reference in New Issue