mirror of https://github.com/apache/kafka.git
MINOR: Small cleanups in streams tests (#19446)
- Fixed typos - Fixed IDEA code inspection warnings - Removed unused fields and methods Reviewers: PoAn Yang <payang@apache.org>, TengYao Chi <frankvicky@apache.org>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
parent
6f783f8536
commit
988fa3f272
|
|
@ -41,7 +41,7 @@ class AutoOffsetResetTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void shouldThrowExceptionOnDurationForLastetReset() {
|
void shouldThrowExceptionOnDurationForLatestReset() {
|
||||||
final AutoOffsetResetInternal latest = new AutoOffsetResetInternal(AutoOffsetReset.latest());
|
final AutoOffsetResetInternal latest = new AutoOffsetResetInternal(AutoOffsetReset.latest());
|
||||||
assertThrows(IllegalStateException.class, latest::duration, "Latest should not have a duration.");
|
assertThrows(IllegalStateException.class, latest::duration, "Latest should not have a duration.");
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -38,7 +38,6 @@ import org.apache.kafka.common.utils.Time;
|
||||||
import org.apache.kafka.streams.KafkaStreams.State;
|
import org.apache.kafka.streams.KafkaStreams.State;
|
||||||
import org.apache.kafka.streams.errors.StreamsException;
|
import org.apache.kafka.streams.errors.StreamsException;
|
||||||
import org.apache.kafka.streams.errors.StreamsNotStartedException;
|
import org.apache.kafka.streams.errors.StreamsNotStartedException;
|
||||||
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
|
|
||||||
import org.apache.kafka.streams.errors.TopologyException;
|
import org.apache.kafka.streams.errors.TopologyException;
|
||||||
import org.apache.kafka.streams.errors.UnknownStateStoreException;
|
import org.apache.kafka.streams.errors.UnknownStateStoreException;
|
||||||
import org.apache.kafka.streams.internals.StreamsConfigUtils;
|
import org.apache.kafka.streams.internals.StreamsConfigUtils;
|
||||||
|
|
@ -345,7 +344,7 @@ public class KafkaStreamsTest {
|
||||||
}).when(thread).start();
|
}).when(thread).start();
|
||||||
}
|
}
|
||||||
|
|
||||||
private CountDownLatch terminableThreadBlockingLatch = new CountDownLatch(1);
|
private final CountDownLatch terminableThreadBlockingLatch = new CountDownLatch(1);
|
||||||
|
|
||||||
private void prepareTerminableThread(final StreamThread thread) throws InterruptedException {
|
private void prepareTerminableThread(final StreamThread thread) throws InterruptedException {
|
||||||
doAnswer(invocation -> {
|
doAnswer(invocation -> {
|
||||||
|
|
@ -561,7 +560,7 @@ public class KafkaStreamsTest {
|
||||||
|
|
||||||
try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time)) {
|
try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time)) {
|
||||||
assertEquals(NUM_THREADS, streams.threads.size());
|
assertEquals(NUM_THREADS, streams.threads.size());
|
||||||
assertEquals(streams.state(), KafkaStreams.State.CREATED);
|
assertEquals(KafkaStreams.State.CREATED, streams.state());
|
||||||
|
|
||||||
streams.start();
|
streams.start();
|
||||||
waitForCondition(
|
waitForCondition(
|
||||||
|
|
@ -620,7 +619,7 @@ public class KafkaStreamsTest {
|
||||||
);
|
);
|
||||||
|
|
||||||
streams.close();
|
streams.close();
|
||||||
assertEquals(streams.state(), KafkaStreams.State.ERROR, "KafkaStreams should remain in ERROR state after close.");
|
assertEquals(KafkaStreams.State.ERROR, streams.state(), "KafkaStreams should remain in ERROR state after close.");
|
||||||
assertThat(appender.getMessages(), hasItem(containsString("State transition from RUNNING to PENDING_ERROR")));
|
assertThat(appender.getMessages(), hasItem(containsString("State transition from RUNNING to PENDING_ERROR")));
|
||||||
assertThat(appender.getMessages(), hasItem(containsString("State transition from PENDING_ERROR to ERROR")));
|
assertThat(appender.getMessages(), hasItem(containsString("State transition from PENDING_ERROR to ERROR")));
|
||||||
assertThat(appender.getMessages(), hasItem(containsString("Streams client is already in the terminal ERROR state")));
|
assertThat(appender.getMessages(), hasItem(containsString("Streams client is already in the terminal ERROR state")));
|
||||||
|
|
@ -644,7 +643,7 @@ public class KafkaStreamsTest {
|
||||||
streams.start();
|
streams.start();
|
||||||
final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
|
final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
|
||||||
streams.close();
|
streams.close();
|
||||||
assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING);
|
assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state());
|
||||||
assertEquals(oldCloseCount + initDiff, MockMetricsReporter.CLOSE_COUNT.get());
|
assertEquals(oldCloseCount + initDiff, MockMetricsReporter.CLOSE_COUNT.get());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -875,7 +874,7 @@ public class KafkaStreamsTest {
|
||||||
prepareThreadState(streamThreadTwo, state2);
|
prepareThreadState(streamThreadTwo, state2);
|
||||||
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
|
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
|
||||||
streams.start();
|
streams.start();
|
||||||
assertThrows(IllegalStateException.class, () -> streams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler) null));
|
assertThrows(IllegalStateException.class, () -> streams.setUncaughtExceptionHandler(null));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -886,7 +885,7 @@ public class KafkaStreamsTest {
|
||||||
prepareStreamThread(streamThreadTwo, 2);
|
prepareStreamThread(streamThreadTwo, 2);
|
||||||
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
|
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
|
||||||
streams.start();
|
streams.start();
|
||||||
assertThrows(IllegalStateException.class, () -> streams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler) null));
|
assertThrows(IllegalStateException.class, () -> streams.setUncaughtExceptionHandler(null));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -896,7 +895,7 @@ public class KafkaStreamsTest {
|
||||||
prepareStreamThread(streamThreadOne, 1);
|
prepareStreamThread(streamThreadOne, 1);
|
||||||
prepareStreamThread(streamThreadTwo, 2);
|
prepareStreamThread(streamThreadTwo, 2);
|
||||||
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
|
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
|
||||||
assertThrows(NullPointerException.class, () -> streams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler) null));
|
assertThrows(NullPointerException.class, () -> streams.setUncaughtExceptionHandler(null));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1369,7 +1368,7 @@ public class KafkaStreamsTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldGetClientSupplierFromConfigForConstructorWithTime() throws Exception {
|
public void shouldGetClientSupplierFromConfigForConstructorWithTime() {
|
||||||
prepareStreams();
|
prepareStreams();
|
||||||
final AtomicReference<StreamThread.State> state1 = prepareStreamThread(streamThreadOne, 1);
|
final AtomicReference<StreamThread.State> state1 = prepareStreamThread(streamThreadOne, 1);
|
||||||
final AtomicReference<StreamThread.State> state2 = prepareStreamThread(streamThreadTwo, 2);
|
final AtomicReference<StreamThread.State> state2 = prepareStreamThread(streamThreadTwo, 2);
|
||||||
|
|
@ -1548,7 +1547,7 @@ public class KafkaStreamsTest {
|
||||||
try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time)) {
|
try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time)) {
|
||||||
|
|
||||||
assertThat(streams.threads.size(), equalTo(0));
|
assertThat(streams.threads.size(), equalTo(0));
|
||||||
assertEquals(streams.state(), KafkaStreams.State.CREATED);
|
assertEquals(KafkaStreams.State.CREATED, streams.state());
|
||||||
|
|
||||||
streams.start();
|
streams.start();
|
||||||
waitForCondition(
|
waitForCondition(
|
||||||
|
|
@ -1796,7 +1795,7 @@ public class KafkaStreamsTest {
|
||||||
final AtomicBoolean didAssertGlobalThread = new AtomicBoolean(false);
|
final AtomicBoolean didAssertGlobalThread = new AtomicBoolean(false);
|
||||||
|
|
||||||
when(streamThreadOne.clientInstanceIds(any()))
|
when(streamThreadOne.clientInstanceIds(any()))
|
||||||
.thenReturn(Collections.singletonMap("any-client-1", new KafkaFutureImpl<Uuid>() {
|
.thenReturn(Collections.singletonMap("any-client-1", new KafkaFutureImpl<>() {
|
||||||
@Override
|
@Override
|
||||||
public Uuid get(final long timeout, final TimeUnit timeUnit) {
|
public Uuid get(final long timeout, final TimeUnit timeUnit) {
|
||||||
didAssertThreadOne.set(true);
|
didAssertThreadOne.set(true);
|
||||||
|
|
@ -1806,7 +1805,7 @@ public class KafkaStreamsTest {
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
when(streamThreadTwo.clientInstanceIds(any()))
|
when(streamThreadTwo.clientInstanceIds(any()))
|
||||||
.thenReturn(Collections.singletonMap("any-client-2", new KafkaFutureImpl<Uuid>() {
|
.thenReturn(Collections.singletonMap("any-client-2", new KafkaFutureImpl<>() {
|
||||||
@Override
|
@Override
|
||||||
public Uuid get(final long timeout, final TimeUnit timeUnit) {
|
public Uuid get(final long timeout, final TimeUnit timeUnit) {
|
||||||
didAssertThreadTwo.set(true);
|
didAssertThreadTwo.set(true);
|
||||||
|
|
@ -1823,7 +1822,7 @@ public class KafkaStreamsTest {
|
||||||
streams.start();
|
streams.start();
|
||||||
|
|
||||||
when(globalStreamThreadMockedConstruction.constructed().get(0).globalConsumerInstanceId(any()))
|
when(globalStreamThreadMockedConstruction.constructed().get(0).globalConsumerInstanceId(any()))
|
||||||
.thenReturn(new KafkaFutureImpl<Uuid>() {
|
.thenReturn(new KafkaFutureImpl<>() {
|
||||||
@Override
|
@Override
|
||||||
public Uuid get(final long timeout, final TimeUnit timeUnit) {
|
public Uuid get(final long timeout, final TimeUnit timeUnit) {
|
||||||
didAssertGlobalThread.set(true);
|
didAssertGlobalThread.set(true);
|
||||||
|
|
|
||||||
|
|
@ -145,7 +145,7 @@ public class StreamsBuilderTest {
|
||||||
),
|
),
|
||||||
"topic",
|
"topic",
|
||||||
Consumed.with(Serdes.String(), Serdes.String()),
|
Consumed.with(Serdes.String(), Serdes.String()),
|
||||||
() -> new Processor<String, String, Void, Void>() {
|
() -> new Processor<>() {
|
||||||
private KeyValueStore<String, String> store;
|
private KeyValueStore<String, String> store;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -454,7 +454,7 @@ public class StreamsBuilderTest {
|
||||||
|
|
||||||
builder.stream(topic)
|
builder.stream(topic)
|
||||||
.groupByKey()
|
.groupByKey()
|
||||||
.count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as("store"))
|
.count(Materialized.as("store"))
|
||||||
.toStream();
|
.toStream();
|
||||||
|
|
||||||
builder.build();
|
builder.build();
|
||||||
|
|
@ -474,7 +474,7 @@ public class StreamsBuilderTest {
|
||||||
|
|
||||||
builder.stream(topic)
|
builder.stream(topic)
|
||||||
.groupByKey()
|
.groupByKey()
|
||||||
.count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as("store"))
|
.count(Materialized.as("store"))
|
||||||
.toStream();
|
.toStream();
|
||||||
|
|
||||||
builder.build();
|
builder.build();
|
||||||
|
|
|
||||||
|
|
@ -855,7 +855,7 @@ public class StreamsConfigTest {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
new StreamsConfig(props).getProducerConfigs(clientId);
|
new StreamsConfig(props).getProducerConfigs(clientId);
|
||||||
fail("Should throw ConfigException when EOS is enabled and maxInFlight cannot be paresed into an integer");
|
fail("Should throw ConfigException when EOS is enabled and maxInFlight cannot be parsed into an integer");
|
||||||
} catch (final ConfigException e) {
|
} catch (final ConfigException e) {
|
||||||
assertEquals(
|
assertEquals(
|
||||||
"Invalid value not-a-number for configuration max.in.flight.requests.per.connection:" +
|
"Invalid value not-a-number for configuration max.in.flight.requests.per.connection:" +
|
||||||
|
|
@ -875,8 +875,8 @@ public class StreamsConfigTest {
|
||||||
@Test
|
@Test
|
||||||
public void shouldSpecifyNoOptimizationWhenNotExplicitlyAddedToConfigs() {
|
public void shouldSpecifyNoOptimizationWhenNotExplicitlyAddedToConfigs() {
|
||||||
final String expectedOptimizeConfig = "none";
|
final String expectedOptimizeConfig = "none";
|
||||||
final String actualOptimizedConifig = streamsConfig.getString(TOPOLOGY_OPTIMIZATION_CONFIG);
|
final String actualOptimizedConfig = streamsConfig.getString(TOPOLOGY_OPTIMIZATION_CONFIG);
|
||||||
assertEquals(expectedOptimizeConfig, actualOptimizedConifig, "Optimization should be \"none\"");
|
assertEquals(expectedOptimizeConfig, actualOptimizedConfig, "Optimization should be \"none\"");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -884,8 +884,8 @@ public class StreamsConfigTest {
|
||||||
final String expectedOptimizeConfig = "all";
|
final String expectedOptimizeConfig = "all";
|
||||||
props.put(TOPOLOGY_OPTIMIZATION_CONFIG, "all");
|
props.put(TOPOLOGY_OPTIMIZATION_CONFIG, "all");
|
||||||
final StreamsConfig config = new StreamsConfig(props);
|
final StreamsConfig config = new StreamsConfig(props);
|
||||||
final String actualOptimizedConifig = config.getString(TOPOLOGY_OPTIMIZATION_CONFIG);
|
final String actualOptimizedConfig = config.getString(TOPOLOGY_OPTIMIZATION_CONFIG);
|
||||||
assertEquals(expectedOptimizeConfig, actualOptimizedConifig, "Optimization should be \"all\"");
|
assertEquals(expectedOptimizeConfig, actualOptimizedConfig, "Optimization should be \"all\"");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -1216,13 +1216,13 @@ public class StreamsConfigTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldtSetMinTrafficRackAwareAssignmentConfig() {
|
public void shouldSetMinTrafficRackAwareAssignmentConfig() {
|
||||||
props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC);
|
props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC);
|
||||||
assertEquals("min_traffic", new StreamsConfig(props).getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG));
|
assertEquals("min_traffic", new StreamsConfig(props).getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldtSetBalanceSubtopologyRackAwareAssignmentConfig() {
|
public void shouldSetBalanceSubtopologyRackAwareAssignmentConfig() {
|
||||||
props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY);
|
props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY);
|
||||||
assertEquals("balance_subtopology", new StreamsConfig(props).getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG));
|
assertEquals("balance_subtopology", new StreamsConfig(props).getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -57,7 +57,7 @@ public class ReadOnlyStoreTest {
|
||||||
new StringDeserializer(),
|
new StringDeserializer(),
|
||||||
"storeTopic",
|
"storeTopic",
|
||||||
"readOnlyProcessor",
|
"readOnlyProcessor",
|
||||||
() -> new Processor<Integer, String, Void, Void>() {
|
() -> new Processor<>() {
|
||||||
KeyValueStore<Integer, String> store;
|
KeyValueStore<Integer, String> store;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -124,8 +124,8 @@ public class CopartitionedTopicsEnforcerTest {
|
||||||
);
|
);
|
||||||
|
|
||||||
final TreeMap<String, Integer> sorted = new TreeMap<>(
|
final TreeMap<String, Integer> sorted = new TreeMap<>(
|
||||||
Utils.mkMap(Utils.mkEntry(topic1.name(), topic1.numberOfPartitions().get()),
|
Utils.mkMap(Utils.mkEntry(topic1.name(), topic1.numberOfPartitions().orElseThrow()),
|
||||||
Utils.mkEntry(topic2.name(), topic2.numberOfPartitions().get()))
|
Utils.mkEntry(topic2.name(), topic2.numberOfPartitions().orElseThrow()))
|
||||||
);
|
);
|
||||||
|
|
||||||
assertEquals(String.format("Invalid topology: thread " +
|
assertEquals(String.format("Invalid topology: thread " +
|
||||||
|
|
@ -161,7 +161,7 @@ public class CopartitionedTopicsEnforcerTest {
|
||||||
assertEquals(String.format("Invalid topology: thread Number of partitions [%s] " +
|
assertEquals(String.format("Invalid topology: thread Number of partitions [%s] " +
|
||||||
"of repartition topic [%s] " +
|
"of repartition topic [%s] " +
|
||||||
"doesn't match number of partitions [%s] of the source topic.",
|
"doesn't match number of partitions [%s] of the source topic.",
|
||||||
topic1.numberOfPartitions().get(), topic1.name(), 2), ex.getMessage());
|
topic1.numberOfPartitions().orElseThrow(), topic1.name(), 2), ex.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
||||||
|
|
@ -203,7 +203,7 @@ class DefaultStateUpdaterTest {
|
||||||
verifyRestoredActiveTasks(restoredTask);
|
verifyRestoredActiveTasks(restoredTask);
|
||||||
stateUpdater.shutdown(Duration.ofMinutes(1));
|
stateUpdater.shutdown(Duration.ofMinutes(1));
|
||||||
|
|
||||||
final IllegalStateException exception = assertThrows(IllegalStateException.class, () -> stateUpdater.start());
|
final IllegalStateException exception = assertThrows(IllegalStateException.class, stateUpdater::start);
|
||||||
|
|
||||||
assertEquals("State updater started with non-empty output queues."
|
assertEquals("State updater started with non-empty output queues."
|
||||||
+ " This indicates a bug. Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the"
|
+ " This indicates a bug. Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the"
|
||||||
|
|
@ -220,7 +220,7 @@ class DefaultStateUpdaterTest {
|
||||||
verifyExceptionsAndFailedTasks(new ExceptionAndTask(taskCorruptedException, failedTask));
|
verifyExceptionsAndFailedTasks(new ExceptionAndTask(taskCorruptedException, failedTask));
|
||||||
stateUpdater.shutdown(Duration.ofMinutes(1));
|
stateUpdater.shutdown(Duration.ofMinutes(1));
|
||||||
|
|
||||||
final IllegalStateException exception = assertThrows(IllegalStateException.class, () -> stateUpdater.start());
|
final IllegalStateException exception = assertThrows(IllegalStateException.class, stateUpdater::start);
|
||||||
|
|
||||||
assertEquals("State updater started with non-empty output queues."
|
assertEquals("State updater started with non-empty output queues."
|
||||||
+ " This indicates a bug. Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the"
|
+ " This indicates a bug. Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the"
|
||||||
|
|
@ -1785,7 +1785,7 @@ class DefaultStateUpdaterTest {
|
||||||
|
|
||||||
private void verifyIdle() throws Exception {
|
private void verifyIdle() throws Exception {
|
||||||
waitForCondition(
|
waitForCondition(
|
||||||
() -> stateUpdater.isIdle(),
|
stateUpdater::isIdle,
|
||||||
VERIFICATION_TIMEOUT,
|
VERIFICATION_TIMEOUT,
|
||||||
"State updater did not enter an idling state!"
|
"State updater did not enter an idling state!"
|
||||||
);
|
);
|
||||||
|
|
@ -1864,26 +1864,4 @@ class DefaultStateUpdaterTest {
|
||||||
assertFalse(stateUpdater.hasExceptionsAndFailedTasks());
|
assertFalse(stateUpdater.hasExceptionsAndFailedTasks());
|
||||||
assertTrue(stateUpdater.drainExceptionsAndFailedTasks().isEmpty());
|
assertTrue(stateUpdater.drainExceptionsAndFailedTasks().isEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifyRemovedTasks(final Task... tasks) throws Exception {
|
|
||||||
if (tasks.length == 0) {
|
|
||||||
waitForCondition(
|
|
||||||
() -> stateUpdater.removedTasks().isEmpty(),
|
|
||||||
VERIFICATION_TIMEOUT,
|
|
||||||
"Did not get empty removed task within the given timeout!"
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
final Set<Task> expectedRemovedTasks = Set.of(tasks);
|
|
||||||
final Set<Task> removedTasks = new HashSet<>();
|
|
||||||
waitForCondition(
|
|
||||||
() -> {
|
|
||||||
removedTasks.addAll(stateUpdater.removedTasks());
|
|
||||||
return removedTasks.containsAll(expectedRemovedTasks)
|
|
||||||
&& removedTasks.size() == expectedRemovedTasks.size();
|
|
||||||
},
|
|
||||||
VERIFICATION_TIMEOUT,
|
|
||||||
"Did not get all removed task within the given timeout!"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -29,7 +29,6 @@ import org.apache.kafka.common.utils.LogContext;
|
||||||
import org.apache.kafka.common.utils.MockTime;
|
import org.apache.kafka.common.utils.MockTime;
|
||||||
import org.apache.kafka.streams.KeyValue;
|
import org.apache.kafka.streams.KeyValue;
|
||||||
import org.apache.kafka.streams.StreamsConfig;
|
import org.apache.kafka.streams.StreamsConfig;
|
||||||
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
|
|
||||||
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
|
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
|
||||||
import org.apache.kafka.streams.errors.ProcessorStateException;
|
import org.apache.kafka.streams.errors.ProcessorStateException;
|
||||||
import org.apache.kafka.streams.errors.StreamsException;
|
import org.apache.kafka.streams.errors.StreamsException;
|
||||||
|
|
@ -110,7 +109,6 @@ public class GlobalStateManagerImplTest {
|
||||||
private ProcessorTopology topology;
|
private ProcessorTopology topology;
|
||||||
private InternalMockProcessorContext processorContext;
|
private InternalMockProcessorContext processorContext;
|
||||||
private Optional<InternalTopologyBuilder.ReprocessFactory<?, ?, ?, ?>> optionalMockReprocessFactory;
|
private Optional<InternalTopologyBuilder.ReprocessFactory<?, ?, ?, ?>> optionalMockReprocessFactory;
|
||||||
private DeserializationExceptionHandler deserializationExceptionHandler;
|
|
||||||
|
|
||||||
static ProcessorTopology withGlobalStores(final List<StateStore> stateStores,
|
static ProcessorTopology withGlobalStores(final List<StateStore> stateStores,
|
||||||
final Map<String, String> storeToChangelogTopic,
|
final Map<String, String> storeToChangelogTopic,
|
||||||
|
|
@ -307,7 +305,7 @@ public class GlobalStateManagerImplTest {
|
||||||
|
|
||||||
stateManager.initialize();
|
stateManager.initialize();
|
||||||
stateManager.registerStore(
|
stateManager.registerStore(
|
||||||
new WrappedStateStore<NoOpReadOnlyStore<Object, Object>, Object, Object>(store1) {
|
new WrappedStateStore<>(store1) {
|
||||||
},
|
},
|
||||||
stateRestoreCallback,
|
stateRestoreCallback,
|
||||||
null);
|
null);
|
||||||
|
|
@ -335,7 +333,7 @@ public class GlobalStateManagerImplTest {
|
||||||
|
|
||||||
stateManager.initialize();
|
stateManager.initialize();
|
||||||
stateManager.registerStore(
|
stateManager.registerStore(
|
||||||
new WrappedStateStore<NoOpReadOnlyStore<Object, Object>, Object, Object>(store2) {
|
new WrappedStateStore<>(store2) {
|
||||||
},
|
},
|
||||||
stateRestoreCallback,
|
stateRestoreCallback,
|
||||||
null);
|
null);
|
||||||
|
|
@ -424,7 +422,7 @@ public class GlobalStateManagerImplTest {
|
||||||
stateManager.initialize();
|
stateManager.initialize();
|
||||||
// register the stores
|
// register the stores
|
||||||
initializeConsumer(1, 0, t1);
|
initializeConsumer(1, 0, t1);
|
||||||
stateManager.registerStore(new NoOpReadOnlyStore<Object, Object>(store1.name()) {
|
stateManager.registerStore(new NoOpReadOnlyStore<>(store1.name()) {
|
||||||
@Override
|
@Override
|
||||||
public void flush() {
|
public void flush() {
|
||||||
throw new RuntimeException("KABOOM!");
|
throw new RuntimeException("KABOOM!");
|
||||||
|
|
@ -434,7 +432,7 @@ public class GlobalStateManagerImplTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldCloseStateStores() throws IOException {
|
public void shouldCloseStateStores() {
|
||||||
stateManager.initialize();
|
stateManager.initialize();
|
||||||
// register the stores
|
// register the stores
|
||||||
initializeConsumer(1, 0, t1);
|
initializeConsumer(1, 0, t1);
|
||||||
|
|
@ -451,7 +449,7 @@ public class GlobalStateManagerImplTest {
|
||||||
public void shouldThrowProcessorStateStoreExceptionIfStoreCloseFailed() {
|
public void shouldThrowProcessorStateStoreExceptionIfStoreCloseFailed() {
|
||||||
stateManager.initialize();
|
stateManager.initialize();
|
||||||
initializeConsumer(1, 0, t1);
|
initializeConsumer(1, 0, t1);
|
||||||
stateManager.registerStore(new NoOpReadOnlyStore<Object, Object>(store1.name()) {
|
stateManager.registerStore(new NoOpReadOnlyStore<>(store1.name()) {
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
throw new RuntimeException("KABOOM!");
|
throw new RuntimeException("KABOOM!");
|
||||||
|
|
@ -476,7 +474,7 @@ public class GlobalStateManagerImplTest {
|
||||||
public void shouldNotCloseStoresIfCloseAlreadyCalled() {
|
public void shouldNotCloseStoresIfCloseAlreadyCalled() {
|
||||||
stateManager.initialize();
|
stateManager.initialize();
|
||||||
initializeConsumer(1, 0, t1);
|
initializeConsumer(1, 0, t1);
|
||||||
stateManager.registerStore(new NoOpReadOnlyStore<Object, Object>("t1-store") {
|
stateManager.registerStore(new NoOpReadOnlyStore<>("t1-store") {
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
if (!isOpen()) {
|
if (!isOpen()) {
|
||||||
|
|
@ -494,7 +492,7 @@ public class GlobalStateManagerImplTest {
|
||||||
public void shouldAttemptToCloseAllStoresEvenWhenSomeException() {
|
public void shouldAttemptToCloseAllStoresEvenWhenSomeException() {
|
||||||
stateManager.initialize();
|
stateManager.initialize();
|
||||||
initializeConsumer(1, 0, t1);
|
initializeConsumer(1, 0, t1);
|
||||||
final NoOpReadOnlyStore<Object, Object> store = new NoOpReadOnlyStore<Object, Object>("t1-store") {
|
final NoOpReadOnlyStore<Object, Object> store = new NoOpReadOnlyStore<>("t1-store") {
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
super.close();
|
super.close();
|
||||||
|
|
@ -598,7 +596,7 @@ public class GlobalStateManagerImplTest {
|
||||||
@Test
|
@Test
|
||||||
public void shouldNotRetryWhenEndOffsetsThrowsTimeoutExceptionAndTaskTimeoutIsZero() {
|
public void shouldNotRetryWhenEndOffsetsThrowsTimeoutExceptionAndTaskTimeoutIsZero() {
|
||||||
final AtomicInteger numberOfCalls = new AtomicInteger(0);
|
final AtomicInteger numberOfCalls = new AtomicInteger(0);
|
||||||
consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
||||||
@Override
|
@Override
|
||||||
public synchronized Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) {
|
public synchronized Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) {
|
||||||
numberOfCalls.incrementAndGet();
|
numberOfCalls.incrementAndGet();
|
||||||
|
|
@ -634,13 +632,13 @@ public class GlobalStateManagerImplTest {
|
||||||
assertThat(cause, instanceOf(TimeoutException.class));
|
assertThat(cause, instanceOf(TimeoutException.class));
|
||||||
assertThat(cause.getMessage(), equalTo("KABOOM!"));
|
assertThat(cause.getMessage(), equalTo("KABOOM!"));
|
||||||
|
|
||||||
assertEquals(numberOfCalls.get(), 1);
|
assertEquals(1, numberOfCalls.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldRetryAtLeastOnceWhenEndOffsetsThrowsTimeoutException() {
|
public void shouldRetryAtLeastOnceWhenEndOffsetsThrowsTimeoutException() {
|
||||||
final AtomicInteger numberOfCalls = new AtomicInteger(0);
|
final AtomicInteger numberOfCalls = new AtomicInteger(0);
|
||||||
consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
||||||
@Override
|
@Override
|
||||||
public synchronized Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) {
|
public synchronized Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) {
|
||||||
time.sleep(100L);
|
time.sleep(100L);
|
||||||
|
|
@ -675,13 +673,13 @@ public class GlobalStateManagerImplTest {
|
||||||
);
|
);
|
||||||
assertThat(expected.getMessage(), equalTo("Global task did not make progress to restore state within 100 ms. Adjust `task.timeout.ms` if needed."));
|
assertThat(expected.getMessage(), equalTo("Global task did not make progress to restore state within 100 ms. Adjust `task.timeout.ms` if needed."));
|
||||||
|
|
||||||
assertEquals(numberOfCalls.get(), 2);
|
assertEquals(2, numberOfCalls.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldRetryWhenEndOffsetsThrowsTimeoutExceptionUntilTaskTimeoutExpired() {
|
public void shouldRetryWhenEndOffsetsThrowsTimeoutExceptionUntilTaskTimeoutExpired() {
|
||||||
final AtomicInteger numberOfCalls = new AtomicInteger(0);
|
final AtomicInteger numberOfCalls = new AtomicInteger(0);
|
||||||
consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
||||||
@Override
|
@Override
|
||||||
public synchronized Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) {
|
public synchronized Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) {
|
||||||
time.sleep(100L);
|
time.sleep(100L);
|
||||||
|
|
@ -716,13 +714,13 @@ public class GlobalStateManagerImplTest {
|
||||||
);
|
);
|
||||||
assertThat(expected.getMessage(), equalTo("Global task did not make progress to restore state within 1000 ms. Adjust `task.timeout.ms` if needed."));
|
assertThat(expected.getMessage(), equalTo("Global task did not make progress to restore state within 1000 ms. Adjust `task.timeout.ms` if needed."));
|
||||||
|
|
||||||
assertEquals(numberOfCalls.get(), 11);
|
assertEquals(11, numberOfCalls.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldNotFailOnSlowProgressWhenEndOffsetsThrowsTimeoutException() {
|
public void shouldNotFailOnSlowProgressWhenEndOffsetsThrowsTimeoutException() {
|
||||||
final AtomicInteger numberOfCalls = new AtomicInteger(0);
|
final AtomicInteger numberOfCalls = new AtomicInteger(0);
|
||||||
consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
||||||
@Override
|
@Override
|
||||||
public synchronized Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) {
|
public synchronized Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) {
|
||||||
time.sleep(1L);
|
time.sleep(1L);
|
||||||
|
|
@ -764,7 +762,7 @@ public class GlobalStateManagerImplTest {
|
||||||
@Test
|
@Test
|
||||||
public void shouldNotRetryWhenPartitionsForThrowsTimeoutExceptionAndTaskTimeoutIsZero() {
|
public void shouldNotRetryWhenPartitionsForThrowsTimeoutExceptionAndTaskTimeoutIsZero() {
|
||||||
final AtomicInteger numberOfCalls = new AtomicInteger(0);
|
final AtomicInteger numberOfCalls = new AtomicInteger(0);
|
||||||
consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
||||||
@Override
|
@Override
|
||||||
public List<PartitionInfo> partitionsFor(final String topic) {
|
public List<PartitionInfo> partitionsFor(final String topic) {
|
||||||
numberOfCalls.incrementAndGet();
|
numberOfCalls.incrementAndGet();
|
||||||
|
|
@ -800,13 +798,13 @@ public class GlobalStateManagerImplTest {
|
||||||
assertThat(cause, instanceOf(TimeoutException.class));
|
assertThat(cause, instanceOf(TimeoutException.class));
|
||||||
assertThat(cause.getMessage(), equalTo("KABOOM!"));
|
assertThat(cause.getMessage(), equalTo("KABOOM!"));
|
||||||
|
|
||||||
assertEquals(numberOfCalls.get(), 1);
|
assertEquals(1, numberOfCalls.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldRetryAtLeastOnceWhenPartitionsForThrowsTimeoutException() {
|
public void shouldRetryAtLeastOnceWhenPartitionsForThrowsTimeoutException() {
|
||||||
final AtomicInteger numberOfCalls = new AtomicInteger(0);
|
final AtomicInteger numberOfCalls = new AtomicInteger(0);
|
||||||
consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
||||||
@Override
|
@Override
|
||||||
public List<PartitionInfo> partitionsFor(final String topic) {
|
public List<PartitionInfo> partitionsFor(final String topic) {
|
||||||
time.sleep(100L);
|
time.sleep(100L);
|
||||||
|
|
@ -841,13 +839,13 @@ public class GlobalStateManagerImplTest {
|
||||||
);
|
);
|
||||||
assertThat(expected.getMessage(), equalTo("Global task did not make progress to restore state within 100 ms. Adjust `task.timeout.ms` if needed."));
|
assertThat(expected.getMessage(), equalTo("Global task did not make progress to restore state within 100 ms. Adjust `task.timeout.ms` if needed."));
|
||||||
|
|
||||||
assertEquals(numberOfCalls.get(), 2);
|
assertEquals(2, numberOfCalls.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldRetryWhenPartitionsForThrowsTimeoutExceptionUntilTaskTimeoutExpires() {
|
public void shouldRetryWhenPartitionsForThrowsTimeoutExceptionUntilTaskTimeoutExpires() {
|
||||||
final AtomicInteger numberOfCalls = new AtomicInteger(0);
|
final AtomicInteger numberOfCalls = new AtomicInteger(0);
|
||||||
consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
||||||
@Override
|
@Override
|
||||||
public List<PartitionInfo> partitionsFor(final String topic) {
|
public List<PartitionInfo> partitionsFor(final String topic) {
|
||||||
time.sleep(100L);
|
time.sleep(100L);
|
||||||
|
|
@ -882,13 +880,13 @@ public class GlobalStateManagerImplTest {
|
||||||
);
|
);
|
||||||
assertThat(expected.getMessage(), equalTo("Global task did not make progress to restore state within 1000 ms. Adjust `task.timeout.ms` if needed."));
|
assertThat(expected.getMessage(), equalTo("Global task did not make progress to restore state within 1000 ms. Adjust `task.timeout.ms` if needed."));
|
||||||
|
|
||||||
assertEquals(numberOfCalls.get(), 11);
|
assertEquals(11, numberOfCalls.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldNotFailOnSlowProgressWhenPartitionForThrowsTimeoutException() {
|
public void shouldNotFailOnSlowProgressWhenPartitionForThrowsTimeoutException() {
|
||||||
final AtomicInteger numberOfCalls = new AtomicInteger(0);
|
final AtomicInteger numberOfCalls = new AtomicInteger(0);
|
||||||
consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
||||||
@Override
|
@Override
|
||||||
public List<PartitionInfo> partitionsFor(final String topic) {
|
public List<PartitionInfo> partitionsFor(final String topic) {
|
||||||
time.sleep(1L);
|
time.sleep(1L);
|
||||||
|
|
@ -930,7 +928,7 @@ public class GlobalStateManagerImplTest {
|
||||||
@Test
|
@Test
|
||||||
public void shouldNotRetryWhenPositionThrowsTimeoutExceptionAndTaskTimeoutIsZero() {
|
public void shouldNotRetryWhenPositionThrowsTimeoutExceptionAndTaskTimeoutIsZero() {
|
||||||
final AtomicInteger numberOfCalls = new AtomicInteger(0);
|
final AtomicInteger numberOfCalls = new AtomicInteger(0);
|
||||||
consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
||||||
@Override
|
@Override
|
||||||
public synchronized long position(final TopicPartition partition) {
|
public synchronized long position(final TopicPartition partition) {
|
||||||
numberOfCalls.incrementAndGet();
|
numberOfCalls.incrementAndGet();
|
||||||
|
|
@ -966,13 +964,13 @@ public class GlobalStateManagerImplTest {
|
||||||
assertThat(cause, instanceOf(TimeoutException.class));
|
assertThat(cause, instanceOf(TimeoutException.class));
|
||||||
assertThat(cause.getMessage(), equalTo("KABOOM!"));
|
assertThat(cause.getMessage(), equalTo("KABOOM!"));
|
||||||
|
|
||||||
assertEquals(numberOfCalls.get(), 1);
|
assertEquals(1, numberOfCalls.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldRetryAtLeastOnceWhenPositionThrowsTimeoutException() {
|
public void shouldRetryAtLeastOnceWhenPositionThrowsTimeoutException() {
|
||||||
final AtomicInteger numberOfCalls = new AtomicInteger(0);
|
final AtomicInteger numberOfCalls = new AtomicInteger(0);
|
||||||
consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
||||||
@Override
|
@Override
|
||||||
public synchronized long position(final TopicPartition partition) {
|
public synchronized long position(final TopicPartition partition) {
|
||||||
time.sleep(100L);
|
time.sleep(100L);
|
||||||
|
|
@ -1007,13 +1005,13 @@ public class GlobalStateManagerImplTest {
|
||||||
);
|
);
|
||||||
assertThat(expected.getMessage(), equalTo("Global task did not make progress to restore state within 100 ms. Adjust `task.timeout.ms` if needed."));
|
assertThat(expected.getMessage(), equalTo("Global task did not make progress to restore state within 100 ms. Adjust `task.timeout.ms` if needed."));
|
||||||
|
|
||||||
assertEquals(numberOfCalls.get(), 2);
|
assertEquals(2, numberOfCalls.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldRetryWhenPositionThrowsTimeoutExceptionUntilTaskTimeoutExpired() {
|
public void shouldRetryWhenPositionThrowsTimeoutExceptionUntilTaskTimeoutExpired() {
|
||||||
final AtomicInteger numberOfCalls = new AtomicInteger(0);
|
final AtomicInteger numberOfCalls = new AtomicInteger(0);
|
||||||
consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
||||||
@Override
|
@Override
|
||||||
public synchronized long position(final TopicPartition partition) {
|
public synchronized long position(final TopicPartition partition) {
|
||||||
time.sleep(100L);
|
time.sleep(100L);
|
||||||
|
|
@ -1048,13 +1046,13 @@ public class GlobalStateManagerImplTest {
|
||||||
);
|
);
|
||||||
assertThat(expected.getMessage(), equalTo("Global task did not make progress to restore state within 1000 ms. Adjust `task.timeout.ms` if needed."));
|
assertThat(expected.getMessage(), equalTo("Global task did not make progress to restore state within 1000 ms. Adjust `task.timeout.ms` if needed."));
|
||||||
|
|
||||||
assertEquals(numberOfCalls.get(), 11);
|
assertEquals(11, numberOfCalls.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldNotFailOnSlowProgressWhenPositionThrowsTimeoutException() {
|
public void shouldNotFailOnSlowProgressWhenPositionThrowsTimeoutException() {
|
||||||
final AtomicInteger numberOfCalls = new AtomicInteger(0);
|
final AtomicInteger numberOfCalls = new AtomicInteger(0);
|
||||||
consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
||||||
@Override
|
@Override
|
||||||
public synchronized long position(final TopicPartition partition) {
|
public synchronized long position(final TopicPartition partition) {
|
||||||
time.sleep(1L);
|
time.sleep(1L);
|
||||||
|
|
@ -1090,7 +1088,7 @@ public class GlobalStateManagerImplTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldUsePollMsPlusRequestTimeoutInPollDuringRestoreAndTimeoutWhenNoProgressDuringRestore() {
|
public void shouldUsePollMsPlusRequestTimeoutInPollDuringRestoreAndTimeoutWhenNoProgressDuringRestore() {
|
||||||
consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
||||||
@Override
|
@Override
|
||||||
public synchronized ConsumerRecords<byte[], byte[]> poll(final Duration timeout) {
|
public synchronized ConsumerRecords<byte[], byte[]> poll(final Duration timeout) {
|
||||||
time.sleep(timeout.toMillis());
|
time.sleep(timeout.toMillis());
|
||||||
|
|
|
||||||
|
|
@ -103,7 +103,7 @@ public class GlobalStreamThreadTest {
|
||||||
);
|
);
|
||||||
|
|
||||||
final ProcessorSupplier<Object, Object, Void, Void> processorSupplier = () ->
|
final ProcessorSupplier<Object, Object, Void, Void> processorSupplier = () ->
|
||||||
new ContextualProcessor<Object, Object, Void, Void>() {
|
new ContextualProcessor<>() {
|
||||||
@Override
|
@Override
|
||||||
public void process(final Record<Object, Object> record) {
|
public void process(final Record<Object, Object> record) {
|
||||||
}
|
}
|
||||||
|
|
@ -163,7 +163,7 @@ public class GlobalStreamThreadTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldThrowStreamsExceptionOnStartupIfExceptionOccurred() throws Exception {
|
public void shouldThrowStreamsExceptionOnStartupIfExceptionOccurred() throws Exception {
|
||||||
final MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
final MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
||||||
@Override
|
@Override
|
||||||
public List<PartitionInfo> partitionsFor(final String topic) {
|
public List<PartitionInfo> partitionsFor(final String topic) {
|
||||||
throw new RuntimeException("KABOOM!");
|
throw new RuntimeException("KABOOM!");
|
||||||
|
|
|
||||||
|
|
@ -96,7 +96,7 @@ import static org.mockito.Mockito.when;
|
||||||
public class InternalTopicManagerTest {
|
public class InternalTopicManagerTest {
|
||||||
private final Node broker1 = new Node(0, "dummyHost-1", 1234);
|
private final Node broker1 = new Node(0, "dummyHost-1", 1234);
|
||||||
private final Node broker2 = new Node(1, "dummyHost-2", 1234);
|
private final Node broker2 = new Node(1, "dummyHost-2", 1234);
|
||||||
private final List<Node> cluster = new ArrayList<Node>(2) {
|
private final List<Node> cluster = new ArrayList<>(2) {
|
||||||
{
|
{
|
||||||
add(broker1);
|
add(broker1);
|
||||||
add(broker2);
|
add(broker2);
|
||||||
|
|
@ -115,7 +115,7 @@ public class InternalTopicManagerTest {
|
||||||
private InternalTopicManager internalTopicManager;
|
private InternalTopicManager internalTopicManager;
|
||||||
private final MockTime time = new MockTime(0);
|
private final MockTime time = new MockTime(0);
|
||||||
|
|
||||||
private final Map<String, Object> config = new HashMap<String, Object>() {
|
private final Map<String, Object> config = new HashMap<>() {
|
||||||
{
|
{
|
||||||
put(StreamsConfig.APPLICATION_ID_CONFIG, "app-id");
|
put(StreamsConfig.APPLICATION_ID_CONFIG, "app-id");
|
||||||
put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, broker1.host() + ":" + broker1.port());
|
put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, broker1.host() + ":" + broker1.port());
|
||||||
|
|
@ -361,8 +361,8 @@ public class InternalTopicManagerTest {
|
||||||
final InternalTopicManager internalTopicManager =
|
final InternalTopicManager internalTopicManager =
|
||||||
new InternalTopicManager(time, mockAdminClient, new StreamsConfig(config));
|
new InternalTopicManager(time, mockAdminClient, new StreamsConfig(config));
|
||||||
try {
|
try {
|
||||||
final Set<String> topic1set = new HashSet<String>(Collections.singletonList(topic1));
|
final Set<String> topic1set = new HashSet<>(Collections.singletonList(topic1));
|
||||||
final Set<String> topic2set = new HashSet<String>(Collections.singletonList(topic2));
|
final Set<String> topic2set = new HashSet<>(Collections.singletonList(topic2));
|
||||||
|
|
||||||
internalTopicManager.getNumPartitions(topic1set, topic2set);
|
internalTopicManager.getNumPartitions(topic1set, topic2set);
|
||||||
|
|
||||||
|
|
@ -373,8 +373,8 @@ public class InternalTopicManagerTest {
|
||||||
mockAdminClient.timeoutNextRequest(1);
|
mockAdminClient.timeoutNextRequest(1);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final Set<String> topic1set = new HashSet<String>(Collections.singletonList(topic1));
|
final Set<String> topic1set = new HashSet<>(Collections.singletonList(topic1));
|
||||||
final Set<String> topic2set = new HashSet<String>(Collections.singletonList(topic2));
|
final Set<String> topic2set = new HashSet<>(Collections.singletonList(topic2));
|
||||||
|
|
||||||
internalTopicManager.getNumPartitions(topic1set, topic2set);
|
internalTopicManager.getNumPartitions(topic1set, topic2set);
|
||||||
|
|
||||||
|
|
@ -710,22 +710,22 @@ public class InternalTopicManagerTest {
|
||||||
internalTopicManager.makeReady(Collections.singletonMap(topic4, topicConfig4));
|
internalTopicManager.makeReady(Collections.singletonMap(topic4, topicConfig4));
|
||||||
|
|
||||||
assertEquals(Set.of(topic1, topic2, topic3, topic4), mockAdminClient.listTopics().names().get());
|
assertEquals(Set.of(topic1, topic2, topic3, topic4), mockAdminClient.listTopics().names().get());
|
||||||
assertEquals(new TopicDescription(topic1, false, new ArrayList<TopicPartitionInfo>() {
|
assertEquals(new TopicDescription(topic1, false, new ArrayList<>() {
|
||||||
{
|
{
|
||||||
add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
|
add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
|
||||||
}
|
}
|
||||||
}), mockAdminClient.describeTopics(Collections.singleton(topic1)).topicNameValues().get(topic1).get());
|
}), mockAdminClient.describeTopics(Collections.singleton(topic1)).topicNameValues().get(topic1).get());
|
||||||
assertEquals(new TopicDescription(topic2, false, new ArrayList<TopicPartitionInfo>() {
|
assertEquals(new TopicDescription(topic2, false, new ArrayList<>() {
|
||||||
{
|
{
|
||||||
add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
|
add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
|
||||||
}
|
}
|
||||||
}), mockAdminClient.describeTopics(Collections.singleton(topic2)).topicNameValues().get(topic2).get());
|
}), mockAdminClient.describeTopics(Collections.singleton(topic2)).topicNameValues().get(topic2).get());
|
||||||
assertEquals(new TopicDescription(topic3, false, new ArrayList<TopicPartitionInfo>() {
|
assertEquals(new TopicDescription(topic3, false, new ArrayList<>() {
|
||||||
{
|
{
|
||||||
add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
|
add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
|
||||||
}
|
}
|
||||||
}), mockAdminClient.describeTopics(Collections.singleton(topic3)).topicNameValues().get(topic3).get());
|
}), mockAdminClient.describeTopics(Collections.singleton(topic3)).topicNameValues().get(topic3).get());
|
||||||
assertEquals(new TopicDescription(topic4, false, new ArrayList<TopicPartitionInfo>() {
|
assertEquals(new TopicDescription(topic4, false, new ArrayList<>() {
|
||||||
{
|
{
|
||||||
add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
|
add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
|
||||||
}
|
}
|
||||||
|
|
@ -804,7 +804,7 @@ public class InternalTopicManagerTest {
|
||||||
mockAdminClient.addTopic(
|
mockAdminClient.addTopic(
|
||||||
false,
|
false,
|
||||||
topic1,
|
topic1,
|
||||||
new ArrayList<TopicPartitionInfo>() {
|
new ArrayList<>() {
|
||||||
{
|
{
|
||||||
add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()));
|
add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()));
|
||||||
add(new TopicPartitionInfo(1, broker1, singleReplica, Collections.emptyList()));
|
add(new TopicPartitionInfo(1, broker1, singleReplica, Collections.emptyList()));
|
||||||
|
|
|
||||||
|
|
@ -116,7 +116,7 @@ public class InternalTopologyBuilderTest {
|
||||||
assertThat(builder.offsetResetStrategy(earliestTopic), equalTo(AutoOffsetResetStrategy.EARLIEST));
|
assertThat(builder.offsetResetStrategy(earliestTopic), equalTo(AutoOffsetResetStrategy.EARLIEST));
|
||||||
assertThat(builder.offsetResetStrategy(latestTopic), equalTo(AutoOffsetResetStrategy.LATEST));
|
assertThat(builder.offsetResetStrategy(latestTopic), equalTo(AutoOffsetResetStrategy.LATEST));
|
||||||
assertThat(builder.offsetResetStrategy(durationTopic).type(), equalTo(AutoOffsetResetStrategy.StrategyType.BY_DURATION));
|
assertThat(builder.offsetResetStrategy(durationTopic).type(), equalTo(AutoOffsetResetStrategy.StrategyType.BY_DURATION));
|
||||||
assertThat(builder.offsetResetStrategy(durationTopic).duration().get().toSeconds(), equalTo(42L));
|
assertThat(builder.offsetResetStrategy(durationTopic).duration().orElseThrow().toSeconds(), equalTo(42L));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -127,7 +127,7 @@ public class InternalTopologyBuilderTest {
|
||||||
final String durationTopicPattern = "duration.*Topic";
|
final String durationTopicPattern = "duration.*Topic";
|
||||||
|
|
||||||
builder.addSource(new AutoOffsetResetInternal(AutoOffsetReset.none()), "source0", null, null, null, Pattern.compile(noneTopicPattern));
|
builder.addSource(new AutoOffsetResetInternal(AutoOffsetReset.none()), "source0", null, null, null, Pattern.compile(noneTopicPattern));
|
||||||
builder.addSource(new AutoOffsetResetInternal(AutoOffsetReset.earliest()), "sourc1", null, null, null, Pattern.compile(earliestTopicPattern));
|
builder.addSource(new AutoOffsetResetInternal(AutoOffsetReset.earliest()), "source1", null, null, null, Pattern.compile(earliestTopicPattern));
|
||||||
builder.addSource(new AutoOffsetResetInternal(AutoOffsetReset.latest()), "source2", null, null, null, Pattern.compile(latestTopicPattern));
|
builder.addSource(new AutoOffsetResetInternal(AutoOffsetReset.latest()), "source2", null, null, null, Pattern.compile(latestTopicPattern));
|
||||||
builder.addSource(new AutoOffsetResetInternal(AutoOffsetReset.byDuration(Duration.ofSeconds(42))), "source3", null, null, null, Pattern.compile(durationTopicPattern));
|
builder.addSource(new AutoOffsetResetInternal(AutoOffsetReset.byDuration(Duration.ofSeconds(42))), "source3", null, null, null, Pattern.compile(durationTopicPattern));
|
||||||
|
|
||||||
|
|
@ -137,7 +137,7 @@ public class InternalTopologyBuilderTest {
|
||||||
assertThat(builder.offsetResetStrategy("earliestTestTopic"), equalTo(AutoOffsetResetStrategy.EARLIEST));
|
assertThat(builder.offsetResetStrategy("earliestTestTopic"), equalTo(AutoOffsetResetStrategy.EARLIEST));
|
||||||
assertThat(builder.offsetResetStrategy("latestTestTopic"), equalTo(AutoOffsetResetStrategy.LATEST));
|
assertThat(builder.offsetResetStrategy("latestTestTopic"), equalTo(AutoOffsetResetStrategy.LATEST));
|
||||||
assertThat(builder.offsetResetStrategy("durationTestTopic").type(), equalTo(AutoOffsetResetStrategy.StrategyType.BY_DURATION));
|
assertThat(builder.offsetResetStrategy("durationTestTopic").type(), equalTo(AutoOffsetResetStrategy.StrategyType.BY_DURATION));
|
||||||
assertThat(builder.offsetResetStrategy("durationTestTopic").duration().get().toSeconds(), equalTo(42L));
|
assertThat(builder.offsetResetStrategy("durationTestTopic").duration().orElseThrow().toSeconds(), equalTo(42L));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
||||||
|
|
@ -107,9 +107,9 @@ public class NamedTopologyTest {
|
||||||
final NamedTopology topology2 = builder2.build();
|
final NamedTopology topology2 = builder2.build();
|
||||||
final NamedTopology topology3 = builder3.build();
|
final NamedTopology topology3 = builder3.build();
|
||||||
streams.start(asList(topology1, topology2, topology3));
|
streams.start(asList(topology1, topology2, topology3));
|
||||||
assertThat(streams.getTopologyByName("topology-1").get(), equalTo(topology1));
|
assertThat(streams.getTopologyByName("topology-1").orElseThrow(), equalTo(topology1));
|
||||||
assertThat(streams.getTopologyByName("topology-2").get(), equalTo(topology2));
|
assertThat(streams.getTopologyByName("topology-2").orElseThrow(), equalTo(topology2));
|
||||||
assertThat(streams.getTopologyByName("topology-3").get(), equalTo(topology3));
|
assertThat(streams.getTopologyByName("topology-3").orElseThrow(), equalTo(topology3));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
||||||
|
|
@ -31,7 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
public class ProcessorMetadataTest {
|
public class ProcessorMetadataTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldAddandGetKeyValueWithEmptyConstructor() {
|
public void shouldAddAndGetKeyValueWithEmptyConstructor() {
|
||||||
final ProcessorMetadata metadata = new ProcessorMetadata();
|
final ProcessorMetadata metadata = new ProcessorMetadata();
|
||||||
final String key = "some_key";
|
final String key = "some_key";
|
||||||
final long value = 100L;
|
final long value = 100L;
|
||||||
|
|
@ -46,7 +46,7 @@ public class ProcessorMetadataTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldAddandGetKeyValueWithExistingMeta() {
|
public void shouldAddAndGetKeyValueWithExistingMeta() {
|
||||||
final Map<String, Long> map = new HashMap<>();
|
final Map<String, Long> map = new HashMap<>();
|
||||||
map.put("key1", 1L);
|
map.put("key1", 1L);
|
||||||
map.put("key2", 2L);
|
map.put("key2", 2L);
|
||||||
|
|
|
||||||
|
|
@ -106,7 +106,7 @@ public class ProcessorNodeTest {
|
||||||
final FailedProcessingException failedProcessingException = assertThrows(FailedProcessingException.class,
|
final FailedProcessingException failedProcessingException = assertThrows(FailedProcessingException.class,
|
||||||
() -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)));
|
() -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)));
|
||||||
|
|
||||||
assertTrue(failedProcessingException.getCause() instanceof RuntimeException);
|
assertInstanceOf(RuntimeException.class, failedProcessingException.getCause());
|
||||||
assertEquals("Processing exception should be caught and handled by the processing exception handler.",
|
assertEquals("Processing exception should be caught and handled by the processing exception handler.",
|
||||||
failedProcessingException.getCause().getMessage());
|
failedProcessingException.getCause().getMessage());
|
||||||
assertEquals(NAME, failedProcessingException.failedProcessorNodeName());
|
assertEquals(NAME, failedProcessingException.failedProcessorNodeName());
|
||||||
|
|
@ -310,7 +310,7 @@ public class ProcessorNodeTest {
|
||||||
StreamsException.class,
|
StreamsException.class,
|
||||||
() -> node.process(new Record<>(KEY, VALUE, TIMESTAMP))
|
() -> node.process(new Record<>(KEY, VALUE, TIMESTAMP))
|
||||||
);
|
);
|
||||||
assertTrue(se.getCause() instanceof ClassCastException);
|
assertInstanceOf(ClassCastException.class, se.getCause());
|
||||||
assertTrue(se.getMessage().contains("default Serdes"));
|
assertTrue(se.getMessage().contains("default Serdes"));
|
||||||
assertTrue(se.getMessage().contains("input types"));
|
assertTrue(se.getMessage().contains("input types"));
|
||||||
assertTrue(se.getMessage().contains("pname"));
|
assertTrue(se.getMessage().contains("pname"));
|
||||||
|
|
|
||||||
|
|
@ -41,7 +41,7 @@ import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
class ReadOnlyTaskTest {
|
class ReadOnlyTaskTest {
|
||||||
|
|
||||||
private final List<String> readOnlyMethods = new LinkedList<String>() {
|
private final List<String> readOnlyMethods = new LinkedList<>() {
|
||||||
{
|
{
|
||||||
add("needsInitializationOrRestoration");
|
add("needsInitializationOrRestoration");
|
||||||
add("inputPartitions");
|
add("inputPartitions");
|
||||||
|
|
@ -56,7 +56,7 @@ class ReadOnlyTaskTest {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
private final List<String> objectMethods = new LinkedList<String>() {
|
private final List<String> objectMethods = new LinkedList<>() {
|
||||||
{
|
{
|
||||||
add("wait");
|
add("wait");
|
||||||
add("equals");
|
add("equals");
|
||||||
|
|
|
||||||
|
|
@ -50,8 +50,6 @@ import org.apache.kafka.test.StreamsTestUtils;
|
||||||
import org.junit.jupiter.api.AfterEach;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
@ -72,8 +70,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
public class RepartitionOptimizingTest {
|
public class RepartitionOptimizingTest {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(RepartitionOptimizingTest.class);
|
|
||||||
|
|
||||||
private static final String INPUT_TOPIC = "input";
|
private static final String INPUT_TOPIC = "input";
|
||||||
private static final String COUNT_TOPIC = "outputTopic_0";
|
private static final String COUNT_TOPIC = "outputTopic_0";
|
||||||
private static final String AGGREGATION_TOPIC = "outputTopic_1";
|
private static final String AGGREGATION_TOPIC = "outputTopic_1";
|
||||||
|
|
|
||||||
|
|
@ -42,8 +42,6 @@ import org.apache.kafka.test.StreamsTestUtils;
|
||||||
import org.junit.jupiter.api.AfterEach;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
@ -60,8 +58,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
public class RepartitionWithMergeOptimizingTest {
|
public class RepartitionWithMergeOptimizingTest {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(RepartitionWithMergeOptimizingTest.class);
|
|
||||||
|
|
||||||
private static final String INPUT_A_TOPIC = "inputA";
|
private static final String INPUT_A_TOPIC = "inputA";
|
||||||
private static final String INPUT_B_TOPIC = "inputB";
|
private static final String INPUT_B_TOPIC = "inputB";
|
||||||
private static final String COUNT_TOPIC = "outputTopic_0";
|
private static final String COUNT_TOPIC = "outputTopic_0";
|
||||||
|
|
|
||||||
|
|
@ -178,7 +178,7 @@ public class StandbyTaskTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOException {
|
public void shouldThrowLockExceptionIfFailedToLockStateDirectory() {
|
||||||
stateDirectory = mock(StateDirectory.class);
|
stateDirectory = mock(StateDirectory.class);
|
||||||
when(stateDirectory.lock(taskId)).thenReturn(false);
|
when(stateDirectory.lock(taskId)).thenReturn(false);
|
||||||
when(stateManager.taskType()).thenReturn(TaskType.STANDBY);
|
when(stateManager.taskType()).thenReturn(TaskType.STANDBY);
|
||||||
|
|
|
||||||
|
|
@ -41,8 +41,6 @@ import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.mockito.ArgumentMatchers;
|
import org.mockito.ArgumentMatchers;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import java.io.BufferedWriter;
|
import java.io.BufferedWriter;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
|
@ -95,7 +93,6 @@ import static org.junit.jupiter.api.Assertions.fail;
|
||||||
|
|
||||||
public class StateDirectoryTest {
|
public class StateDirectoryTest {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(StateDirectoryTest.class);
|
|
||||||
private final MockTime time = new MockTime();
|
private final MockTime time = new MockTime();
|
||||||
private File stateDir;
|
private File stateDir;
|
||||||
private final String applicationId = "applicationId";
|
private final String applicationId = "applicationId";
|
||||||
|
|
|
||||||
|
|
@ -184,7 +184,7 @@ public class StateManagerUtilTest {
|
||||||
doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
|
doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
|
||||||
when(stateManager.baseDir()).thenReturn(randomFile);
|
when(stateManager.baseDir()).thenReturn(randomFile);
|
||||||
|
|
||||||
try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
|
try (MockedStatic<Utils> ignored = mockStatic(Utils.class)) {
|
||||||
assertThrows(ProcessorStateException.class, () ->
|
assertThrows(ProcessorStateException.class, () ->
|
||||||
StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
|
StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -389,7 +389,7 @@ public class StoreChangelogReaderTest {
|
||||||
|
|
||||||
adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
|
adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
|
||||||
|
|
||||||
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
||||||
@Override
|
@Override
|
||||||
public long position(final TopicPartition partition) {
|
public long position(final TopicPartition partition) {
|
||||||
throw new TimeoutException("KABOOM!");
|
throw new TimeoutException("KABOOM!");
|
||||||
|
|
@ -674,7 +674,7 @@ public class StoreChangelogReaderTest {
|
||||||
when(activeStateManager.taskId()).thenReturn(taskId);
|
when(activeStateManager.taskId()).thenReturn(taskId);
|
||||||
|
|
||||||
final AtomicBoolean clearException = new AtomicBoolean(false);
|
final AtomicBoolean clearException = new AtomicBoolean(false);
|
||||||
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
||||||
@Override
|
@Override
|
||||||
public long position(final TopicPartition partition) {
|
public long position(final TopicPartition partition) {
|
||||||
if (clearException.get()) {
|
if (clearException.get()) {
|
||||||
|
|
@ -720,7 +720,7 @@ public class StoreChangelogReaderTest {
|
||||||
when(activeStateManager.taskId()).thenReturn(taskId);
|
when(activeStateManager.taskId()).thenReturn(taskId);
|
||||||
when(storeMetadata.offset()).thenReturn(10L);
|
when(storeMetadata.offset()).thenReturn(10L);
|
||||||
|
|
||||||
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
||||||
@Override
|
@Override
|
||||||
public long position(final TopicPartition partition) {
|
public long position(final TopicPartition partition) {
|
||||||
throw kaboom;
|
throw kaboom;
|
||||||
|
|
@ -770,7 +770,7 @@ public class StoreChangelogReaderTest {
|
||||||
};
|
};
|
||||||
adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
|
adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
|
||||||
|
|
||||||
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
||||||
@Override
|
@Override
|
||||||
public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions) {
|
public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions) {
|
||||||
throw new AssertionError("Should not trigger this function");
|
throw new AssertionError("Should not trigger this function");
|
||||||
|
|
@ -928,7 +928,7 @@ public class StoreChangelogReaderTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldThrowIfUnsubscribeFail() {
|
public void shouldThrowIfUnsubscribeFail() {
|
||||||
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
||||||
@Override
|
@Override
|
||||||
public void unsubscribe() {
|
public void unsubscribe() {
|
||||||
throw kaboom;
|
throw kaboom;
|
||||||
|
|
|
||||||
|
|
@ -766,7 +766,7 @@ public class StreamTaskTest {
|
||||||
metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), time);
|
metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), time);
|
||||||
|
|
||||||
// Create a processor that only forwards even keys to test the metrics at the source and terminal nodes
|
// Create a processor that only forwards even keys to test the metrics at the source and terminal nodes
|
||||||
final MockSourceNode<Integer, Integer> evenKeyForwardingSourceNode = new MockSourceNode<Integer, Integer>(intDeserializer, intDeserializer) {
|
final MockSourceNode<Integer, Integer> evenKeyForwardingSourceNode = new MockSourceNode<>(intDeserializer, intDeserializer) {
|
||||||
InternalProcessorContext<Integer, Integer> context;
|
InternalProcessorContext<Integer, Integer> context;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -2045,7 +2045,7 @@ public class StreamTaskTest {
|
||||||
public void shouldThrowStreamsExceptionWhenFetchCommittedFailed() {
|
public void shouldThrowStreamsExceptionWhenFetchCommittedFailed() {
|
||||||
when(stateManager.taskId()).thenReturn(taskId);
|
when(stateManager.taskId()).thenReturn(taskId);
|
||||||
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
||||||
final Consumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
final Consumer<byte[], byte[]> consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
||||||
@Override
|
@Override
|
||||||
public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions) {
|
public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions) {
|
||||||
throw new KafkaException("KABOOM!");
|
throw new KafkaException("KABOOM!");
|
||||||
|
|
@ -3109,7 +3109,7 @@ public class StreamTaskTest {
|
||||||
singletonList(stateStore),
|
singletonList(stateStore),
|
||||||
emptyMap());
|
emptyMap());
|
||||||
|
|
||||||
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
|
||||||
@Override
|
@Override
|
||||||
public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions) {
|
public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions) {
|
||||||
throw new TimeoutException("KABOOM!");
|
throw new TimeoutException("KABOOM!");
|
||||||
|
|
|
||||||
|
|
@ -2238,7 +2238,7 @@ public class StreamThreadTest {
|
||||||
final List<Long> punctuatedStreamTime = new ArrayList<>();
|
final List<Long> punctuatedStreamTime = new ArrayList<>();
|
||||||
final List<Long> punctuatedWallClockTime = new ArrayList<>();
|
final List<Long> punctuatedWallClockTime = new ArrayList<>();
|
||||||
final ProcessorSupplier<Object, Object, Void, Void> punctuateProcessor =
|
final ProcessorSupplier<Object, Object, Void, Void> punctuateProcessor =
|
||||||
() -> new ContextualProcessor<Object, Object, Void, Void>() {
|
() -> new ContextualProcessor<>() {
|
||||||
@Override
|
@Override
|
||||||
public void init(final ProcessorContext<Void, Void> context) {
|
public void init(final ProcessorContext<Void, Void> context) {
|
||||||
context.schedule(Duration.ofMillis(100L), PunctuationType.STREAM_TIME, punctuatedStreamTime::add);
|
context.schedule(Duration.ofMillis(100L), PunctuationType.STREAM_TIME, punctuatedStreamTime::add);
|
||||||
|
|
@ -2506,7 +2506,7 @@ public class StreamThreadTest {
|
||||||
|
|
||||||
if (stateUpdaterEnabled) {
|
if (stateUpdaterEnabled) {
|
||||||
TestUtils.waitForCondition(
|
TestUtils.waitForCondition(
|
||||||
() -> mockRestoreConsumer.assignment().size() == 0,
|
() -> mockRestoreConsumer.assignment().isEmpty(),
|
||||||
"Never get the assignment");
|
"Never get the assignment");
|
||||||
} else {
|
} else {
|
||||||
TestUtils.waitForCondition(
|
TestUtils.waitForCondition(
|
||||||
|
|
@ -3444,7 +3444,7 @@ public class StreamThreadTest {
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@MethodSource("data")
|
@MethodSource("data")
|
||||||
public void shouldReturnErrorIfProducerInstanceIdNotInitialized(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception {
|
public void shouldReturnErrorIfProducerInstanceIdNotInitialized(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
|
||||||
thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
|
thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
|
||||||
thread.setState(State.STARTING);
|
thread.setState(State.STARTING);
|
||||||
|
|
||||||
|
|
@ -3560,7 +3560,7 @@ public class StreamThreadTest {
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@MethodSource("data")
|
@MethodSource("data")
|
||||||
public void shouldTimeOutOnProducerInstanceId(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception {
|
public void shouldTimeOutOnProducerInstanceId(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
|
||||||
final MockProducer<byte[], byte[]> producer = new MockProducer<>();
|
final MockProducer<byte[], byte[]> producer = new MockProducer<>();
|
||||||
producer.setClientInstanceId(Uuid.randomUuid());
|
producer.setClientInstanceId(Uuid.randomUuid());
|
||||||
producer.injectTimeoutException(-1);
|
producer.injectTimeoutException(-1);
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,6 @@ import org.apache.kafka.clients.admin.AdminClient;
|
||||||
import org.apache.kafka.clients.admin.ListOffsetsResult;
|
import org.apache.kafka.clients.admin.ListOffsetsResult;
|
||||||
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
|
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
|
||||||
import org.apache.kafka.clients.admin.OffsetSpec;
|
import org.apache.kafka.clients.admin.OffsetSpec;
|
||||||
import org.apache.kafka.clients.admin.TopicDescription;
|
|
||||||
import org.apache.kafka.clients.consumer.Consumer;
|
import org.apache.kafka.clients.consumer.Consumer;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
|
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
|
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
|
||||||
|
|
@ -2819,23 +2818,6 @@ public class StreamsPartitionAssignorTest {
|
||||||
return changelogEndOffsets;
|
return changelogEndOffsets;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Map<String, TopicDescription> getTopicDescriptionMap(final List<String> changelogTopics,
|
|
||||||
final List<List<TopicPartitionInfo>> topicPartitionInfos) {
|
|
||||||
if (changelogTopics.size() != topicPartitionInfos.size()) {
|
|
||||||
throw new IllegalStateException("Passed in " + changelogTopics.size() + " changelog topic names, but " +
|
|
||||||
topicPartitionInfos.size() + " different topicPartitionInfo for the topics");
|
|
||||||
}
|
|
||||||
final Map<String, TopicDescription> changeLogTopicDescriptions = new HashMap<>();
|
|
||||||
for (int i = 0; i < changelogTopics.size(); i++) {
|
|
||||||
final String topic = changelogTopics.get(i);
|
|
||||||
final List<TopicPartitionInfo> topicPartitionInfo = topicPartitionInfos.get(i);
|
|
||||||
changeLogTopicDescriptions.put(topic, new TopicDescription(topic, false, topicPartitionInfo));
|
|
||||||
}
|
|
||||||
|
|
||||||
return changeLogTopicDescriptions;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private static SubscriptionInfo getInfoForOlderVersion(final int version,
|
private static SubscriptionInfo getInfoForOlderVersion(final int version,
|
||||||
final ProcessId processId,
|
final ProcessId processId,
|
||||||
final Set<TaskId> prevTasks,
|
final Set<TaskId> prevTasks,
|
||||||
|
|
|
||||||
|
|
@ -920,8 +920,8 @@ public class StreamsProducerTest {
|
||||||
METADATA_WAIT_TIME;
|
METADATA_WAIT_TIME;
|
||||||
assertThat(eosStreamsProducer.totalBlockedTime(), equalTo(expectedTotalBlocked));
|
assertThat(eosStreamsProducer.totalBlockedTime(), equalTo(expectedTotalBlocked));
|
||||||
final long closeStart = 1L;
|
final long closeStart = 1L;
|
||||||
final long clodeDelay = 1L;
|
final long closeDelay = 1L;
|
||||||
when(mockTime.nanoseconds()).thenReturn(closeStart).thenReturn(closeStart + clodeDelay);
|
when(mockTime.nanoseconds()).thenReturn(closeStart).thenReturn(closeStart + closeDelay);
|
||||||
eosStreamsProducer.resetProducer(eosMockProducer);
|
eosStreamsProducer.resetProducer(eosMockProducer);
|
||||||
setProducerMetrics(
|
setProducerMetrics(
|
||||||
eosMockProducer,
|
eosMockProducer,
|
||||||
|
|
@ -937,7 +937,7 @@ public class StreamsProducerTest {
|
||||||
|
|
||||||
assertThat(
|
assertThat(
|
||||||
eosStreamsProducer.totalBlockedTime(),
|
eosStreamsProducer.totalBlockedTime(),
|
||||||
closeTo(2 * expectedTotalBlocked + clodeDelay, 0.01)
|
closeTo(2 * expectedTotalBlocked + closeDelay, 0.01)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1678,7 +1678,7 @@ public class TaskManagerTest {
|
||||||
);
|
);
|
||||||
|
|
||||||
assertEquals(exception, thrown);
|
assertEquals(exception, thrown);
|
||||||
assertEquals(statefulTask.id(), thrown.taskId().get());
|
assertEquals(statefulTask.id(), thrown.taskId().orElseThrow());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -2149,7 +2149,7 @@ public class TaskManagerTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldCloseActiveTasksWhenHandlingLostTasks() throws Exception {
|
public void shouldCloseActiveTasksWhenHandlingLostTasks() {
|
||||||
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
|
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
|
||||||
final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager);
|
final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -523,7 +523,7 @@ public final class AssignmentTestUtils {
|
||||||
static <V> Matcher<ClientState> hasProperty(final String propertyName,
|
static <V> Matcher<ClientState> hasProperty(final String propertyName,
|
||||||
final Function<ClientState, V> propertyExtractor,
|
final Function<ClientState, V> propertyExtractor,
|
||||||
final V propertyValue) {
|
final V propertyValue) {
|
||||||
return new BaseMatcher<ClientState>() {
|
return new BaseMatcher<>() {
|
||||||
@Override
|
@Override
|
||||||
public void describeTo(final Description description) {
|
public void describeTo(final Description description) {
|
||||||
description.appendText(propertyName).appendText(":").appendValue(propertyValue);
|
description.appendText(propertyName).appendText(":").appendValue(propertyValue);
|
||||||
|
|
|
||||||
|
|
@ -826,7 +826,7 @@ public class RackAwareTaskAssignorTest {
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@MethodSource("paramStoreType")
|
@MethodSource("paramStoreType")
|
||||||
public void shouldThrowIfMissingCallcanEnableRackAwareAssignor(final boolean stateful, final String assignmentStrategy) {
|
public void shouldThrowIfMissingCallCanEnableRackAwareAssignor(final boolean stateful, final String assignmentStrategy) {
|
||||||
setUp(stateful);
|
setUp(stateful);
|
||||||
final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
|
final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
|
||||||
getClusterForAllTopics(),
|
getClusterForAllTopics(),
|
||||||
|
|
@ -1125,28 +1125,28 @@ public class RackAwareTaskAssignorTest {
|
||||||
setUp(stateful);
|
setUp(stateful);
|
||||||
final int nodeSize = 50;
|
final int nodeSize = 50;
|
||||||
final int tpSize = 60;
|
final int tpSize = 60;
|
||||||
final int partionSize = 3;
|
final int partitionSize = 3;
|
||||||
final int clientSize = 50;
|
final int clientSize = 50;
|
||||||
final int replicaCount = 3;
|
final int replicaCount = 3;
|
||||||
final int maxCapacity = 3;
|
final int maxCapacity = 3;
|
||||||
final SortedMap<TaskId, Set<TopicPartition>> taskTopicPartitionMap = getTaskTopicPartitionMap(
|
final SortedMap<TaskId, Set<TopicPartition>> taskTopicPartitionMap = getTaskTopicPartitionMap(
|
||||||
tpSize, partionSize, false);
|
tpSize, partitionSize, false);
|
||||||
final AssignmentConfigs assignorConfiguration = getRackAwareEnabledConfigWithStandby(replicaCount, assignmentStrategy);
|
final AssignmentConfigs assignorConfiguration = getRackAwareEnabledConfigWithStandby(replicaCount, assignmentStrategy);
|
||||||
|
|
||||||
final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
|
final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
|
||||||
getRandomCluster(nodeSize, tpSize, partionSize),
|
getRandomCluster(nodeSize, tpSize, partitionSize),
|
||||||
taskTopicPartitionMap,
|
taskTopicPartitionMap,
|
||||||
getTaskTopicPartitionMap(tpSize, partionSize, true),
|
getTaskTopicPartitionMap(tpSize, partitionSize, true),
|
||||||
getTopologyGroupTaskMap(),
|
getTopologyGroupTaskMap(),
|
||||||
getRandomProcessRacks(clientSize, nodeSize),
|
getRandomProcessRacks(clientSize, nodeSize),
|
||||||
mockInternalTopicManagerForRandomChangelog(nodeSize, tpSize, partionSize),
|
mockInternalTopicManagerForRandomChangelog(nodeSize, tpSize, partitionSize),
|
||||||
assignorConfiguration,
|
assignorConfiguration,
|
||||||
time
|
time
|
||||||
);
|
);
|
||||||
|
|
||||||
final SortedSet<TaskId> taskIds = (SortedSet<TaskId>) taskTopicPartitionMap.keySet();
|
final SortedSet<TaskId> taskIds = (SortedSet<TaskId>) taskTopicPartitionMap.keySet();
|
||||||
final SortedMap<ProcessId, ClientState> clientStateMap = getRandomClientState(clientSize,
|
final SortedMap<ProcessId, ClientState> clientStateMap = getRandomClientState(clientSize,
|
||||||
tpSize, partionSize, maxCapacity, taskIds);
|
tpSize, partitionSize, maxCapacity, taskIds);
|
||||||
|
|
||||||
final StandbyTaskAssignor standbyTaskAssignor = StandbyTaskAssignorFactory.create(
|
final StandbyTaskAssignor standbyTaskAssignor = StandbyTaskAssignorFactory.create(
|
||||||
assignorConfiguration, assignor);
|
assignorConfiguration, assignor);
|
||||||
|
|
|
||||||
|
|
@ -544,13 +544,11 @@ public class TaskAssignmentUtilsTest {
|
||||||
new TopicPartition(String.format("test-topic-%d", taskId.subtopology()), taskId.partition()),
|
new TopicPartition(String.format("test-topic-%d", taskId.subtopology()), taskId.partition()),
|
||||||
true,
|
true,
|
||||||
true,
|
true,
|
||||||
() -> {
|
() -> partitions.forEach(partition -> {
|
||||||
partitions.forEach(partition -> {
|
if (partition != null && rackIds != null) {
|
||||||
if (partition != null && rackIds != null) {
|
partition.annotateWithRackIds(rackIds);
|
||||||
partition.annotateWithRackIds(rackIds);
|
}
|
||||||
}
|
})
|
||||||
});
|
|
||||||
}
|
|
||||||
));
|
));
|
||||||
return mkEntry(
|
return mkEntry(
|
||||||
taskId,
|
taskId,
|
||||||
|
|
|
||||||
|
|
@ -425,7 +425,7 @@ public class TaskAssignorConvergenceTest {
|
||||||
testForConvergence(harness, configs, numStatefulTasks / maxWarmupReplicas + 1);
|
testForConvergence(harness, configs, numStatefulTasks / maxWarmupReplicas + 1);
|
||||||
verifyValidAssignment(numStandbyReplicas, harness);
|
verifyValidAssignment(numStandbyReplicas, harness);
|
||||||
|
|
||||||
// min-cost rack aware assignor doesn't balance subtopolgy
|
// min-cost rack aware assignor doesn't balance subtopology
|
||||||
if (!rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC)) {
|
if (!rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC)) {
|
||||||
verifyBalancedAssignment(harness, skewThreshold);
|
verifyBalancedAssignment(harness, skewThreshold);
|
||||||
}
|
}
|
||||||
|
|
@ -463,7 +463,7 @@ public class TaskAssignorConvergenceTest {
|
||||||
|
|
||||||
verifyValidAssignment(numStandbyReplicas, harness);
|
verifyValidAssignment(numStandbyReplicas, harness);
|
||||||
|
|
||||||
// min-cost rack aware assignor doesn't balance subtopolgy
|
// min-cost rack aware assignor doesn't balance subtopology
|
||||||
if (!rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC)) {
|
if (!rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC)) {
|
||||||
verifyBalancedAssignment(harness, skewThreshold);
|
verifyBalancedAssignment(harness, skewThreshold);
|
||||||
}
|
}
|
||||||
|
|
@ -520,7 +520,7 @@ public class TaskAssignorConvergenceTest {
|
||||||
testForConvergence(harness, configs, 1);
|
testForConvergence(harness, configs, 1);
|
||||||
verifyValidAssignment(numStandbyReplicas, harness);
|
verifyValidAssignment(numStandbyReplicas, harness);
|
||||||
|
|
||||||
// min-cost rack aware assignor doesn't balance subtopolgy
|
// min-cost rack aware assignor doesn't balance subtopology
|
||||||
if (!rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC)) {
|
if (!rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC)) {
|
||||||
verifyBalancedAssignment(harness, skewThreshold);
|
verifyBalancedAssignment(harness, skewThreshold);
|
||||||
}
|
}
|
||||||
|
|
@ -540,7 +540,7 @@ public class TaskAssignorConvergenceTest {
|
||||||
if (!harness.clientStates.isEmpty()) {
|
if (!harness.clientStates.isEmpty()) {
|
||||||
testForConvergence(harness, configs, 2 * (numStatefulTasks + numStatefulTasks * numStandbyReplicas));
|
testForConvergence(harness, configs, 2 * (numStatefulTasks + numStatefulTasks * numStandbyReplicas));
|
||||||
verifyValidAssignment(numStandbyReplicas, harness);
|
verifyValidAssignment(numStandbyReplicas, harness);
|
||||||
// min-cost rack aware assignor doesn't balance subtopolgy
|
// min-cost rack aware assignor doesn't balance subtopology
|
||||||
if (!rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC)) {
|
if (!rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC)) {
|
||||||
verifyBalancedAssignment(harness, skewThreshold);
|
verifyBalancedAssignment(harness, skewThreshold);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -99,9 +99,7 @@ public class StreamsMetricsImplTest {
|
||||||
private static final String TASK_ID1 = "test-task-1";
|
private static final String TASK_ID1 = "test-task-1";
|
||||||
private static final String TASK_ID2 = "test-task-2";
|
private static final String TASK_ID2 = "test-task-2";
|
||||||
private static final String NODE_ID1 = "test-node-1";
|
private static final String NODE_ID1 = "test-node-1";
|
||||||
private static final String NODE_ID2 = "test-node-2";
|
|
||||||
private static final String TOPIC_ID1 = "test-topic-1";
|
private static final String TOPIC_ID1 = "test-topic-1";
|
||||||
private static final String TOPIC_ID2 = "test-topic-2";
|
|
||||||
private static final String METRIC_NAME1 = "test-metric1";
|
private static final String METRIC_NAME1 = "test-metric1";
|
||||||
private static final String METRIC_NAME2 = "test-metric2";
|
private static final String METRIC_NAME2 = "test-metric2";
|
||||||
private static final String THREAD_ID_TAG = "thread-id";
|
private static final String THREAD_ID_TAG = "thread-id";
|
||||||
|
|
@ -236,8 +234,7 @@ public class StreamsMetricsImplTest {
|
||||||
return sensorKeys;
|
return sensorKeys;
|
||||||
}
|
}
|
||||||
|
|
||||||
private ArgumentCaptor<String> setupGetNewSensorTest(final Metrics metrics,
|
private ArgumentCaptor<String> setupGetNewSensorTest(final Metrics metrics) {
|
||||||
final RecordingLevel recordingLevel) {
|
|
||||||
final ArgumentCaptor<String> sensorKey = ArgumentCaptor.forClass(String.class);
|
final ArgumentCaptor<String> sensorKey = ArgumentCaptor.forClass(String.class);
|
||||||
when(metrics.getSensor(sensorKey.capture())).thenReturn(null);
|
when(metrics.getSensor(sensorKey.capture())).thenReturn(null);
|
||||||
final Sensor[] parents = {};
|
final Sensor[] parents = {};
|
||||||
|
|
@ -253,7 +250,7 @@ public class StreamsMetricsImplTest {
|
||||||
public void shouldGetNewThreadLevelSensor() {
|
public void shouldGetNewThreadLevelSensor() {
|
||||||
final Metrics metrics = mock(Metrics.class);
|
final Metrics metrics = mock(Metrics.class);
|
||||||
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
||||||
setupGetNewSensorTest(metrics, recordingLevel);
|
setupGetNewSensorTest(metrics);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
|
||||||
|
|
||||||
final Sensor actualSensor = streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel);
|
final Sensor actualSensor = streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel);
|
||||||
|
|
@ -277,7 +274,7 @@ public class StreamsMetricsImplTest {
|
||||||
public void shouldGetNewTaskLevelSensor() {
|
public void shouldGetNewTaskLevelSensor() {
|
||||||
final Metrics metrics = mock(Metrics.class);
|
final Metrics metrics = mock(Metrics.class);
|
||||||
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
||||||
setupGetNewSensorTest(metrics, recordingLevel);
|
setupGetNewSensorTest(metrics);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
|
||||||
|
|
||||||
final Sensor actualSensor = streamsMetrics.taskLevelSensor(
|
final Sensor actualSensor = streamsMetrics.taskLevelSensor(
|
||||||
|
|
@ -311,7 +308,7 @@ public class StreamsMetricsImplTest {
|
||||||
public void shouldGetNewTopicLevelSensor() {
|
public void shouldGetNewTopicLevelSensor() {
|
||||||
final Metrics metrics = mock(Metrics.class);
|
final Metrics metrics = mock(Metrics.class);
|
||||||
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
||||||
setupGetNewSensorTest(metrics, recordingLevel);
|
setupGetNewSensorTest(metrics);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
|
||||||
|
|
||||||
final Sensor actualSensor = streamsMetrics.topicLevelSensor(
|
final Sensor actualSensor = streamsMetrics.topicLevelSensor(
|
||||||
|
|
@ -349,7 +346,7 @@ public class StreamsMetricsImplTest {
|
||||||
public void shouldGetNewStoreLevelSensorIfNoneExists() {
|
public void shouldGetNewStoreLevelSensorIfNoneExists() {
|
||||||
final Metrics metrics = mock(Metrics.class);
|
final Metrics metrics = mock(Metrics.class);
|
||||||
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
||||||
final ArgumentCaptor<String> sensorKeys = setupGetNewSensorTest(metrics, recordingLevel);
|
final ArgumentCaptor<String> sensorKeys = setupGetNewSensorTest(metrics);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
|
||||||
|
|
||||||
final Sensor actualSensor = streamsMetrics.storeLevelSensor(
|
final Sensor actualSensor = streamsMetrics.storeLevelSensor(
|
||||||
|
|
@ -477,10 +474,11 @@ public class StreamsMetricsImplTest {
|
||||||
final MetricName metricName =
|
final MetricName metricName =
|
||||||
new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP);
|
new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP);
|
||||||
final MetricConfig metricConfig = new MetricConfig().recordLevel(INFO_RECORDING_LEVEL);
|
final MetricConfig metricConfig = new MetricConfig().recordLevel(INFO_RECORDING_LEVEL);
|
||||||
final Metrics metrics = new Metrics(metricConfig);
|
try (Metrics metrics = new Metrics(metricConfig)) {
|
||||||
assertNull(metrics.metric(metricName));
|
assertNull(metrics.metric(metricName));
|
||||||
metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER);
|
metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER);
|
||||||
assertNotNull(metrics.metric(metricName));
|
assertNotNull(metrics.metric(metricName));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -509,10 +507,11 @@ public class StreamsMetricsImplTest {
|
||||||
final MetricName metricName =
|
final MetricName metricName =
|
||||||
new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP);
|
new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP);
|
||||||
final MetricConfig metricConfig = new MetricConfig().recordLevel(INFO_RECORDING_LEVEL);
|
final MetricConfig metricConfig = new MetricConfig().recordLevel(INFO_RECORDING_LEVEL);
|
||||||
final Metrics metrics = new Metrics(metricConfig);
|
try (Metrics metrics = new Metrics(metricConfig)) {
|
||||||
assertNull(metrics.metric(metricName));
|
assertNull(metrics.metric(metricName));
|
||||||
final KafkaMetric kafkaMetric = metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER);
|
final KafkaMetric kafkaMetric = metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER);
|
||||||
assertEquals(kafkaMetric, metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER));
|
assertEquals(kafkaMetric, metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -520,20 +519,21 @@ public class StreamsMetricsImplTest {
|
||||||
final MetricName metricName =
|
final MetricName metricName =
|
||||||
new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP);
|
new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP);
|
||||||
final MetricConfig metricConfig = new MetricConfig().recordLevel(INFO_RECORDING_LEVEL);
|
final MetricConfig metricConfig = new MetricConfig().recordLevel(INFO_RECORDING_LEVEL);
|
||||||
final Metrics metrics = new Metrics(metricConfig);
|
try (Metrics metrics = new Metrics(metricConfig)) {
|
||||||
assertNull(metrics.metric(metricName));
|
assertNull(metrics.metric(metricName));
|
||||||
final AtomicReference<KafkaMetric> metricCreatedViaThread1 = new AtomicReference<>();
|
final AtomicReference<KafkaMetric> metricCreatedViaThread1 = new AtomicReference<>();
|
||||||
final AtomicReference<KafkaMetric> metricCreatedViaThread2 = new AtomicReference<>();
|
final AtomicReference<KafkaMetric> metricCreatedViaThread2 = new AtomicReference<>();
|
||||||
|
|
||||||
final Thread thread1 = new Thread(() -> metricCreatedViaThread1.set(metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER)));
|
final Thread thread1 = new Thread(() -> metricCreatedViaThread1.set(metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER)));
|
||||||
final Thread thread2 = new Thread(() -> metricCreatedViaThread2.set(metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER)));
|
final Thread thread2 = new Thread(() -> metricCreatedViaThread2.set(metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER)));
|
||||||
|
|
||||||
thread1.start();
|
thread1.start();
|
||||||
thread2.start();
|
thread2.start();
|
||||||
|
|
||||||
thread1.join();
|
thread1.join();
|
||||||
thread2.join();
|
thread2.join();
|
||||||
assertEquals(metricCreatedViaThread1.get(), metricCreatedViaThread2.get());
|
assertEquals(metricCreatedViaThread1.get(), metricCreatedViaThread2.get());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -561,7 +561,7 @@ public class StreamsMetricsImplTest {
|
||||||
public void shouldGetNewNodeLevelSensor() {
|
public void shouldGetNewNodeLevelSensor() {
|
||||||
final Metrics metrics = mock(Metrics.class);
|
final Metrics metrics = mock(Metrics.class);
|
||||||
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
||||||
setupGetNewSensorTest(metrics, recordingLevel);
|
setupGetNewSensorTest(metrics);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
|
||||||
|
|
||||||
final Sensor actualSensor = streamsMetrics.nodeLevelSensor(
|
final Sensor actualSensor = streamsMetrics.nodeLevelSensor(
|
||||||
|
|
@ -598,7 +598,7 @@ public class StreamsMetricsImplTest {
|
||||||
final Metrics metrics = mock(Metrics.class);
|
final Metrics metrics = mock(Metrics.class);
|
||||||
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
||||||
final String processorCacheName = "processorNodeName";
|
final String processorCacheName = "processorNodeName";
|
||||||
setupGetNewSensorTest(metrics, recordingLevel);
|
setupGetNewSensorTest(metrics);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
|
||||||
|
|
||||||
final Sensor actualSensor = streamsMetrics.cacheLevelSensor(
|
final Sensor actualSensor = streamsMetrics.cacheLevelSensor(
|
||||||
|
|
@ -634,7 +634,7 @@ public class StreamsMetricsImplTest {
|
||||||
public void shouldGetNewClientLevelSensor() {
|
public void shouldGetNewClientLevelSensor() {
|
||||||
final Metrics metrics = mock(Metrics.class);
|
final Metrics metrics = mock(Metrics.class);
|
||||||
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
final RecordingLevel recordingLevel = RecordingLevel.INFO;
|
||||||
setupGetNewSensorTest(metrics, recordingLevel);
|
setupGetNewSensorTest(metrics);
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
|
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID, time);
|
||||||
|
|
||||||
final Sensor actualSensor = streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel);
|
final Sensor actualSensor = streamsMetrics.clientLevelSensor(SENSOR_NAME_1, recordingLevel);
|
||||||
|
|
|
||||||
|
|
@ -529,7 +529,7 @@ public abstract class AbstractRocksDBWindowStoreTest extends AbstractWindowBytes
|
||||||
}
|
}
|
||||||
|
|
||||||
// the latest record has a timestamp > 60k. So, the +1 in actualFrom calculation in
|
// the latest record has a timestamp > 60k. So, the +1 in actualFrom calculation in
|
||||||
// RocksDbWindowStore shouldn't have an implciation and all stores should return the same fetched counts.
|
// RocksDbWindowStore shouldn't have an implication and all stores should return the same fetched counts.
|
||||||
assertEquals(1, fetchedCount);
|
assertEquals(1, fetchedCount);
|
||||||
assertEquals(
|
assertEquals(
|
||||||
Set.of(segments.segmentName(3L), segments.segmentName(5L)),
|
Set.of(segments.segmentName(3L), segments.segmentName(5L)),
|
||||||
|
|
|
||||||
|
|
@ -879,13 +879,13 @@ public class CachingPersistentSessionStoreTest {
|
||||||
|
|
||||||
public static class CacheFlushListenerStub<K, V> implements CacheFlushListener<byte[], byte[]> {
|
public static class CacheFlushListenerStub<K, V> implements CacheFlushListener<byte[], byte[]> {
|
||||||
private final Deserializer<K> keyDeserializer;
|
private final Deserializer<K> keyDeserializer;
|
||||||
private final Deserializer<V> valueDesializer;
|
private final Deserializer<V> valueDeserializer;
|
||||||
private final List<KeyValueTimestamp<K, Change<V>>> forwarded = new LinkedList<>();
|
private final List<KeyValueTimestamp<K, Change<V>>> forwarded = new LinkedList<>();
|
||||||
|
|
||||||
CacheFlushListenerStub(final Deserializer<K> keyDeserializer,
|
CacheFlushListenerStub(final Deserializer<K> keyDeserializer,
|
||||||
final Deserializer<V> valueDesializer) {
|
final Deserializer<V> valueDeserializer) {
|
||||||
this.keyDeserializer = keyDeserializer;
|
this.keyDeserializer = keyDeserializer;
|
||||||
this.valueDesializer = valueDesializer;
|
this.valueDeserializer = valueDeserializer;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -894,8 +894,8 @@ public class CachingPersistentSessionStoreTest {
|
||||||
new KeyValueTimestamp<>(
|
new KeyValueTimestamp<>(
|
||||||
keyDeserializer.deserialize(null, record.key()),
|
keyDeserializer.deserialize(null, record.key()),
|
||||||
new Change<>(
|
new Change<>(
|
||||||
valueDesializer.deserialize(null, record.value().newValue),
|
valueDeserializer.deserialize(null, record.value().newValue),
|
||||||
valueDesializer.deserialize(null, record.value().oldValue)),
|
valueDeserializer.deserialize(null, record.value().oldValue)),
|
||||||
record.timestamp()
|
record.timestamp()
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,6 @@ package org.apache.kafka.streams.state.internals;
|
||||||
|
|
||||||
import org.apache.kafka.common.utils.Bytes;
|
import org.apache.kafka.common.utils.Bytes;
|
||||||
import org.apache.kafka.streams.KeyValue;
|
import org.apache.kafka.streams.KeyValue;
|
||||||
import org.apache.kafka.streams.state.KeyValueIterator;
|
|
||||||
import org.apache.kafka.streams.state.KeyValueStore;
|
import org.apache.kafka.streams.state.KeyValueStore;
|
||||||
import org.apache.kafka.test.GenericInMemoryKeyValueStore;
|
import org.apache.kafka.test.GenericInMemoryKeyValueStore;
|
||||||
|
|
||||||
|
|
@ -27,6 +26,7 @@ import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import static java.util.Arrays.asList;
|
import static java.util.Arrays.asList;
|
||||||
|
|
@ -67,22 +67,12 @@ public class FilteredCacheIteratorTest {
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
public void before() {
|
public void before() {
|
||||||
store.putAll(entries);
|
store.putAll(entries);
|
||||||
final HasNextCondition allCondition = new HasNextCondition() {
|
final HasNextCondition allCondition = Iterator::hasNext;
|
||||||
@Override
|
|
||||||
public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
|
|
||||||
return iterator.hasNext();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
allIterator = new FilteredCacheIterator(
|
allIterator = new FilteredCacheIterator(
|
||||||
new DelegatingPeekingKeyValueIterator<>("",
|
new DelegatingPeekingKeyValueIterator<>("",
|
||||||
store.all()), allCondition, IDENTITY_FUNCTION);
|
store.all()), allCondition, IDENTITY_FUNCTION);
|
||||||
|
|
||||||
final HasNextCondition firstEntryCondition = new HasNextCondition() {
|
final HasNextCondition firstEntryCondition = iterator -> iterator.hasNext() && iterator.peekNextKey().equals(firstEntry.key);
|
||||||
@Override
|
|
||||||
public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
|
|
||||||
return iterator.hasNext() && iterator.peekNextKey().equals(firstEntry.key);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
firstEntryIterator = new FilteredCacheIterator(
|
firstEntryIterator = new FilteredCacheIterator(
|
||||||
new DelegatingPeekingKeyValueIterator<>("",
|
new DelegatingPeekingKeyValueIterator<>("",
|
||||||
store.all()), firstEntryCondition, IDENTITY_FUNCTION);
|
store.all()), firstEntryCondition, IDENTITY_FUNCTION);
|
||||||
|
|
|
||||||
|
|
@ -42,10 +42,4 @@ public class MonotonicProcessorRecordContext extends ProcessorRecordContext {
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void kick() {
|
|
||||||
if (!automatic) {
|
|
||||||
counter++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -44,8 +44,8 @@ public class Murmur3Test {
|
||||||
cases.put(new byte[]{'a', 'b', 'c'}, 461137560);
|
cases.put(new byte[]{'a', 'b', 'c'}, 461137560);
|
||||||
|
|
||||||
int seed = 123;
|
int seed = 123;
|
||||||
for (Map.Entry c : cases.entrySet()) {
|
for (Map.Entry<byte[], Integer> c : cases.entrySet()) {
|
||||||
byte[] b = (byte[]) c.getKey();
|
byte[] b = c.getKey();
|
||||||
assertEquals(c.getValue(), Murmur3.hash32(b, b.length, seed));
|
assertEquals(c.getValue(), Murmur3.hash32(b, b.length, seed));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -62,10 +62,10 @@ public class Murmur3Test {
|
||||||
|
|
||||||
int seed = 123;
|
int seed = 123;
|
||||||
|
|
||||||
for (Map.Entry c : cases.entrySet()) {
|
for (Map.Entry<byte[], long[]> c : cases.entrySet()) {
|
||||||
byte[] b = (byte[]) c.getKey();
|
byte[] b = c.getKey();
|
||||||
long[] result = Murmur3.hash128(b, 0, b.length, seed);
|
long[] result = Murmur3.hash128(b, 0, b.length, seed);
|
||||||
assertArrayEquals((long[]) c.getValue(), result);
|
assertArrayEquals(c.getValue(), result);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -110,7 +110,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
|
||||||
}
|
}
|
||||||
final Iterator<KeyValue<Windowed<K>, V>> iterator = results.iterator();
|
final Iterator<KeyValue<Windowed<K>, V>> iterator = results.iterator();
|
||||||
|
|
||||||
return new KeyValueIterator<Windowed<K>, V>() {
|
return new KeyValueIterator<>() {
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
}
|
}
|
||||||
|
|
@ -149,7 +149,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
|
||||||
}
|
}
|
||||||
final Iterator<KeyValue<Windowed<K>, V>> iterator = results.iterator();
|
final Iterator<KeyValue<Windowed<K>, V>> iterator = results.iterator();
|
||||||
|
|
||||||
return new KeyValueIterator<Windowed<K>, V>() {
|
return new KeyValueIterator<>() {
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
}
|
}
|
||||||
|
|
@ -191,7 +191,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
|
||||||
}
|
}
|
||||||
final Iterator<KeyValue<Windowed<K>, V>> iterator = results.iterator();
|
final Iterator<KeyValue<Windowed<K>, V>> iterator = results.iterator();
|
||||||
|
|
||||||
return new KeyValueIterator<Windowed<K>, V>() {
|
return new KeyValueIterator<>() {
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
}
|
}
|
||||||
|
|
@ -235,7 +235,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
|
||||||
}
|
}
|
||||||
final Iterator<KeyValue<Windowed<K>, V>> iterator = results.iterator();
|
final Iterator<KeyValue<Windowed<K>, V>> iterator = results.iterator();
|
||||||
|
|
||||||
return new KeyValueIterator<Windowed<K>, V>() {
|
return new KeyValueIterator<>() {
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
}
|
}
|
||||||
|
|
@ -286,7 +286,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
|
||||||
}
|
}
|
||||||
final Iterator<KeyValue<Windowed<K>, V>> iterator = results.iterator();
|
final Iterator<KeyValue<Windowed<K>, V>> iterator = results.iterator();
|
||||||
|
|
||||||
return new KeyValueIterator<Windowed<K>, V>() {
|
return new KeyValueIterator<>() {
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
}
|
}
|
||||||
|
|
@ -342,7 +342,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
|
||||||
}
|
}
|
||||||
final Iterator<KeyValue<Windowed<K>, V>> iterator = results.iterator();
|
final Iterator<KeyValue<Windowed<K>, V>> iterator = results.iterator();
|
||||||
|
|
||||||
return new KeyValueIterator<Windowed<K>, V>() {
|
return new KeyValueIterator<>() {
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -81,7 +81,7 @@ import static org.mockito.Mockito.mockingDetails;
|
||||||
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
|
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
|
||||||
public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest {
|
public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest {
|
||||||
|
|
||||||
private final List<String> walRelatedMethods = new LinkedList<String>() {
|
private final List<String> walRelatedMethods = new LinkedList<>() {
|
||||||
{
|
{
|
||||||
add("setManualWalFlush");
|
add("setManualWalFlush");
|
||||||
add("setMaxTotalWalSize");
|
add("setMaxTotalWalSize");
|
||||||
|
|
@ -94,7 +94,7 @@ public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
private final List<String> ignoreMethods = new LinkedList<String>() {
|
private final List<String> ignoreMethods = new LinkedList<>() {
|
||||||
{
|
{
|
||||||
add("isOwningHandle");
|
add("isOwningHandle");
|
||||||
add("getNativeHandle");
|
add("getNativeHandle");
|
||||||
|
|
|
||||||
|
|
@ -31,7 +31,6 @@ import org.apache.kafka.streams.kstream.Consumed;
|
||||||
import org.apache.kafka.streams.kstream.Grouped;
|
import org.apache.kafka.streams.kstream.Grouped;
|
||||||
import org.apache.kafka.streams.kstream.KStream;
|
import org.apache.kafka.streams.kstream.KStream;
|
||||||
import org.apache.kafka.streams.kstream.Materialized;
|
import org.apache.kafka.streams.kstream.Materialized;
|
||||||
import org.apache.kafka.streams.kstream.TimeWindowedKStream;
|
|
||||||
import org.apache.kafka.streams.kstream.TimeWindows;
|
import org.apache.kafka.streams.kstream.TimeWindows;
|
||||||
import org.apache.kafka.streams.kstream.Windowed;
|
import org.apache.kafka.streams.kstream.Windowed;
|
||||||
import org.apache.kafka.streams.kstream.internals.TimeWindow;
|
import org.apache.kafka.streams.kstream.internals.TimeWindow;
|
||||||
|
|
@ -88,8 +87,6 @@ public class WindowStoreFetchTest {
|
||||||
private String innerLowBetween;
|
private String innerLowBetween;
|
||||||
private String innerHighBetween;
|
private String innerHighBetween;
|
||||||
|
|
||||||
private TimeWindowedKStream<String, String> windowedStream;
|
|
||||||
|
|
||||||
public void setup(final StoreType storeType,
|
public void setup(final StoreType storeType,
|
||||||
final boolean enableLogging,
|
final boolean enableLogging,
|
||||||
final boolean enableCaching,
|
final boolean enableCaching,
|
||||||
|
|
|
||||||
|
|
@ -65,7 +65,7 @@ public class WrappingStoreProviderTest {
|
||||||
@Test
|
@Test
|
||||||
public void shouldFindKeyValueStores() {
|
public void shouldFindKeyValueStores() {
|
||||||
final List<ReadOnlyKeyValueStore<String, String>> results =
|
final List<ReadOnlyKeyValueStore<String, String>> results =
|
||||||
wrappingStoreProvider.stores("kv", QueryableStoreTypes.<String, String>keyValueStore());
|
wrappingStoreProvider.stores("kv", QueryableStoreTypes.keyValueStore());
|
||||||
assertEquals(2, results.size());
|
assertEquals(2, results.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -95,7 +95,7 @@ public class WrappingStoreProviderTest {
|
||||||
public void shouldReturnAllStoreWhenQueryWithoutPartition() {
|
public void shouldReturnAllStoreWhenQueryWithoutPartition() {
|
||||||
wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.<String, String>keyValueStore()));
|
wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.<String, String>keyValueStore()));
|
||||||
final List<ReadOnlyKeyValueStore<String, String>> results =
|
final List<ReadOnlyKeyValueStore<String, String>> results =
|
||||||
wrappingStoreProvider.stores("kv", QueryableStoreTypes.<String, String>keyValueStore());
|
wrappingStoreProvider.stores("kv", QueryableStoreTypes.keyValueStore());
|
||||||
assertEquals(numStateStorePartitions, results.size());
|
assertEquals(numStateStorePartitions, results.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -103,7 +103,7 @@ public class WrappingStoreProviderTest {
|
||||||
public void shouldReturnSingleStoreWhenQueryWithPartition() {
|
public void shouldReturnSingleStoreWhenQueryWithPartition() {
|
||||||
wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.<String, String>keyValueStore()).withPartition(numStateStorePartitions - 1));
|
wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.<String, String>keyValueStore()).withPartition(numStateStorePartitions - 1));
|
||||||
final List<ReadOnlyKeyValueStore<String, String>> results =
|
final List<ReadOnlyKeyValueStore<String, String>> results =
|
||||||
wrappingStoreProvider.stores("kv", QueryableStoreTypes.<String, String>keyValueStore());
|
wrappingStoreProvider.stores("kv", QueryableStoreTypes.keyValueStore());
|
||||||
assertEquals(1, results.size());
|
assertEquals(1, results.size());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -311,7 +311,7 @@ public class RelationalSmokeTest extends SmokeTestUtil {
|
||||||
final long dataStartTime = System.currentTimeMillis() - timeSpan;
|
final long dataStartTime = System.currentTimeMillis() - timeSpan;
|
||||||
final long dataEndTime = System.currentTimeMillis();
|
final long dataEndTime = System.currentTimeMillis();
|
||||||
|
|
||||||
// Explicitly create a seed so we can we can log.
|
// Explicitly create a seed so we can log.
|
||||||
// If we are debugging a failed run, we can deterministically produce the same dataset
|
// If we are debugging a failed run, we can deterministically produce the same dataset
|
||||||
// by plugging in the seed from that run.
|
// by plugging in the seed from that run.
|
||||||
final long seed = new Random().nextLong();
|
final long seed = new Random().nextLong();
|
||||||
|
|
@ -368,7 +368,7 @@ public class RelationalSmokeTest extends SmokeTestUtil {
|
||||||
* data distribution: Zipfian and Normal, while also being efficient to generate.
|
* data distribution: Zipfian and Normal, while also being efficient to generate.
|
||||||
*/
|
*/
|
||||||
private static Iterator<Integer> zipfNormal(final Random random, final int keySpace) {
|
private static Iterator<Integer> zipfNormal(final Random random, final int keySpace) {
|
||||||
return new Iterator<Integer>() {
|
return new Iterator<>() {
|
||||||
@Override
|
@Override
|
||||||
public boolean hasNext() {
|
public boolean hasNext() {
|
||||||
return true;
|
return true;
|
||||||
|
|
@ -829,13 +829,13 @@ public class RelationalSmokeTest extends SmokeTestUtil {
|
||||||
pass,
|
pass,
|
||||||
report,
|
report,
|
||||||
"Expected 1 article, got " + consumedArticles.size(),
|
"Expected 1 article, got " + consumedArticles.size(),
|
||||||
consumedArticles.size() > 0
|
!consumedArticles.isEmpty()
|
||||||
);
|
);
|
||||||
assertThat(
|
assertThat(
|
||||||
pass,
|
pass,
|
||||||
report,
|
report,
|
||||||
"Expected 1 comment, got " + consumedComments.size(),
|
"Expected 1 comment, got " + consumedComments.size(),
|
||||||
consumedComments.size() > 0
|
!consumedComments.isEmpty()
|
||||||
);
|
);
|
||||||
|
|
||||||
assertThat(
|
assertThat(
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,6 @@ import org.apache.kafka.streams.StreamsBuilder;
|
||||||
import org.apache.kafka.streams.StreamsConfig;
|
import org.apache.kafka.streams.StreamsConfig;
|
||||||
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
|
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
|
||||||
import org.apache.kafka.streams.kstream.Consumed;
|
import org.apache.kafka.streams.kstream.Consumed;
|
||||||
import org.apache.kafka.streams.kstream.ForeachAction;
|
|
||||||
import org.apache.kafka.streams.kstream.KStream;
|
import org.apache.kafka.streams.kstream.KStream;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
|
@ -49,11 +48,8 @@ public class ShutdownDeadlockTest {
|
||||||
final StreamsBuilder builder = new StreamsBuilder();
|
final StreamsBuilder builder = new StreamsBuilder();
|
||||||
final KStream<String, String> source = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()));
|
final KStream<String, String> source = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()));
|
||||||
|
|
||||||
source.foreach(new ForeachAction<String, String>() {
|
source.foreach((key, value) -> {
|
||||||
@Override
|
throw new RuntimeException("KABOOM!");
|
||||||
public void apply(final String key, final String value) {
|
|
||||||
throw new RuntimeException("KABOOM!");
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
|
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
|
||||||
streams.setUncaughtExceptionHandler(e -> {
|
streams.setUncaughtExceptionHandler(e -> {
|
||||||
|
|
|
||||||
|
|
@ -174,7 +174,7 @@ public class SmokeTestClient extends SmokeTestUtil {
|
||||||
|
|
||||||
final KTable<Windowed<String>, Integer> smallWindowSum = groupedData
|
final KTable<Windowed<String>, Integer> smallWindowSum = groupedData
|
||||||
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofSeconds(2), Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(1)))
|
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofSeconds(2), Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(1)))
|
||||||
.reduce((l, r) -> l + r);
|
.reduce(Integer::sum);
|
||||||
|
|
||||||
streamify(smallWindowSum, "sws-raw");
|
streamify(smallWindowSum, "sws-raw");
|
||||||
streamify(smallWindowSum.suppress(untilWindowCloses(BufferConfig.unbounded())), "sws-suppressed");
|
streamify(smallWindowSum.suppress(untilWindowCloses(BufferConfig.unbounded())), "sws-suppressed");
|
||||||
|
|
|
||||||
|
|
@ -39,7 +39,7 @@ public class SmokeTestUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
static ProcessorSupplier<Object, Object, Void, Void> printProcessorSupplier(final String topic, final String name) {
|
static ProcessorSupplier<Object, Object, Void, Void> printProcessorSupplier(final String topic, final String name) {
|
||||||
return () -> new ContextualProcessor<Object, Object, Void, Void>() {
|
return () -> new ContextualProcessor<>() {
|
||||||
private int numRecordsProcessed = 0;
|
private int numRecordsProcessed = 0;
|
||||||
private long smallestOffset = Long.MAX_VALUE;
|
private long smallestOffset = Long.MAX_VALUE;
|
||||||
private long largestOffset = Long.MIN_VALUE;
|
private long largestOffset = Long.MIN_VALUE;
|
||||||
|
|
|
||||||
|
|
@ -93,7 +93,7 @@ public class StreamsBrokerDownResilienceTest {
|
||||||
final Serde<String> stringSerde = Serdes.String();
|
final Serde<String> stringSerde = Serdes.String();
|
||||||
|
|
||||||
builder.stream(Collections.singletonList(SOURCE_TOPIC_1), Consumed.with(stringSerde, stringSerde))
|
builder.stream(Collections.singletonList(SOURCE_TOPIC_1), Consumed.with(stringSerde, stringSerde))
|
||||||
.peek(new ForeachAction<String, String>() {
|
.peek(new ForeachAction<>() {
|
||||||
int messagesProcessed = 0;
|
int messagesProcessed = 0;
|
||||||
@Override
|
@Override
|
||||||
public void apply(final String key, final String value) {
|
public void apply(final String key, final String value) {
|
||||||
|
|
|
||||||
|
|
@ -55,7 +55,7 @@ public class StreamsNamedRepartitionTest {
|
||||||
|
|
||||||
final String inputTopic = (String) (Objects.requireNonNull(streamsProperties.remove("input.topic")));
|
final String inputTopic = (String) (Objects.requireNonNull(streamsProperties.remove("input.topic")));
|
||||||
final String aggregationTopic = (String) (Objects.requireNonNull(streamsProperties.remove("aggregation.topic")));
|
final String aggregationTopic = (String) (Objects.requireNonNull(streamsProperties.remove("aggregation.topic")));
|
||||||
final boolean addOperators = Boolean.valueOf(Objects.requireNonNull((String) streamsProperties.remove("add.operations")));
|
final boolean addOperators = Boolean.parseBoolean(Objects.requireNonNull((String) streamsProperties.remove("add.operations")));
|
||||||
|
|
||||||
|
|
||||||
final Initializer<Integer> initializer = () -> 0;
|
final Initializer<Integer> initializer = () -> 0;
|
||||||
|
|
|
||||||
|
|
@ -50,7 +50,7 @@ public class MockInternalTopicManager extends InternalTopicManager {
|
||||||
public Set<String> makeReady(final Map<String, InternalTopicConfig> topics) {
|
public Set<String> makeReady(final Map<String, InternalTopicConfig> topics) {
|
||||||
for (final InternalTopicConfig topic : topics.values()) {
|
for (final InternalTopicConfig topic : topics.values()) {
|
||||||
final String topicName = topic.name();
|
final String topicName = topic.name();
|
||||||
final int numberOfPartitions = topic.numberOfPartitions().get();
|
final int numberOfPartitions = topic.numberOfPartitions().orElseThrow();
|
||||||
readyTopics.put(topicName, numberOfPartitions);
|
readyTopics.put(topicName, numberOfPartitions);
|
||||||
|
|
||||||
final List<PartitionInfo> partitions = new ArrayList<>();
|
final List<PartitionInfo> partitions = new ArrayList<>();
|
||||||
|
|
|
||||||
|
|
@ -23,11 +23,6 @@ public class MockValueJoiner {
|
||||||
public static final ValueJoiner<Object, Object, String> TOSTRING_JOINER = instance("+");
|
public static final ValueJoiner<Object, Object, String> TOSTRING_JOINER = instance("+");
|
||||||
|
|
||||||
public static <V1, V2> ValueJoiner<V1, V2, String> instance(final String separator) {
|
public static <V1, V2> ValueJoiner<V1, V2, String> instance(final String separator) {
|
||||||
return new ValueJoiner<V1, V2, String>() {
|
return (value1, value2) -> value1 + separator + value2;
|
||||||
@Override
|
|
||||||
public String apply(final V1 value1, final V2 value2) {
|
|
||||||
return value1 + separator + value2;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,7 @@ public class NoOpValueTransformerWithKeySupplier<K, V> implements ValueTransform
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ValueTransformerWithKey<K, V, V> get() {
|
public ValueTransformerWithKey<K, V, V> get() {
|
||||||
return new ValueTransformerWithKey<K, V, V>() {
|
return new ValueTransformerWithKey<>() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(final ProcessorContext context1) {
|
public void init(final ProcessorContext context1) {
|
||||||
|
|
|
||||||
|
|
@ -103,7 +103,7 @@ public class ReadOnlySessionStoreStub<K, V> implements ReadOnlySessionStore<K, V
|
||||||
}
|
}
|
||||||
final Iterator<List<KeyValue<Windowed<K>, V>>> keysIterator = subSessionsMap.values().iterator();
|
final Iterator<List<KeyValue<Windowed<K>, V>>> keysIterator = subSessionsMap.values().iterator();
|
||||||
return new KeyValueIteratorStub<>(
|
return new KeyValueIteratorStub<>(
|
||||||
new Iterator<KeyValue<Windowed<K>, V>>() {
|
new Iterator<>() {
|
||||||
|
|
||||||
Iterator<KeyValue<Windowed<K>, V>> it;
|
Iterator<KeyValue<Windowed<K>, V>> it;
|
||||||
|
|
||||||
|
|
@ -155,7 +155,7 @@ public class ReadOnlySessionStoreStub<K, V> implements ReadOnlySessionStore<K, V
|
||||||
|
|
||||||
final Iterator<List<KeyValue<Windowed<K>, V>>> keysIterator = subSessionsMap.descendingMap().values().iterator();
|
final Iterator<List<KeyValue<Windowed<K>, V>>> keysIterator = subSessionsMap.descendingMap().values().iterator();
|
||||||
return new KeyValueIteratorStub<>(
|
return new KeyValueIteratorStub<>(
|
||||||
new Iterator<KeyValue<Windowed<K>, V>>() {
|
new Iterator<>() {
|
||||||
|
|
||||||
Iterator<KeyValue<Windowed<K>, V>> it;
|
Iterator<KeyValue<Windowed<K>, V>> it;
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue