MINOR: remove get prefix for internal IQ methods (#16954)

Reviewers: Bill Bejeck <bill@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Matthias J. Sax 2024-08-25 16:02:49 -07:00 committed by GitHub
parent 9eb7e1a23d
commit 4ae0ab38dd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 114 additions and 114 deletions

View File

@ -1195,7 +1195,7 @@ public class KafkaStreams implements AutoCloseable {
final boolean callingThreadIsNotCurrentStreamThread = !streamThread.getName().equals(Thread.currentThread().getName());
if (streamThread.isThreadAlive() && (callingThreadIsNotCurrentStreamThread || numLiveStreamThreads() == 1)) {
log.info("Removing StreamThread " + streamThread.getName());
final Optional<String> groupInstanceID = streamThread.getGroupInstanceID();
final Optional<String> groupInstanceID = streamThread.groupInstanceID();
streamThread.requestLeaveGroupDuringShutdown();
streamThread.shutdown();
if (!streamThread.getName().equals(Thread.currentThread().getName())) {
@ -1630,7 +1630,7 @@ public class KafkaStreams implements AutoCloseable {
private Consumer<StreamThread> streamThreadLeaveConsumerGroup(final long remainingTimeMs) {
return thread -> {
final Optional<String> groupInstanceId = thread.getGroupInstanceID();
final Optional<String> groupInstanceId = thread.groupInstanceID();
if (groupInstanceId.isPresent()) {
log.debug("Sending leave group trigger to removing instance from consumer group: {}.",
groupInstanceId.get());
@ -1685,7 +1685,7 @@ public class KafkaStreams implements AutoCloseable {
*/
public Collection<StreamsMetadata> metadataForAllStreamsClients() {
validateIsRunningOrRebalancing();
return streamsMetadataState.getAllMetadata();
return streamsMetadataState.allMetadata();
}
/**
@ -1705,7 +1705,7 @@ public class KafkaStreams implements AutoCloseable {
*/
public Collection<StreamsMetadata> streamsMetadataForStore(final String storeName) {
validateIsRunningOrRebalancing();
return streamsMetadataState.getAllMetadataForStore(storeName);
return streamsMetadataState.allMetadataForStore(storeName);
}
/**
@ -1722,7 +1722,7 @@ public class KafkaStreams implements AutoCloseable {
final K key,
final Serializer<K> keySerializer) {
validateIsRunningOrRebalancing();
return streamsMetadataState.getKeyQueryMetadataForKey(storeName, key, keySerializer);
return streamsMetadataState.keyQueryMetadataForKey(storeName, key, keySerializer);
}
/**
@ -1739,7 +1739,7 @@ public class KafkaStreams implements AutoCloseable {
final K key,
final StreamPartitioner<? super K, ?> partitioner) {
validateIsRunningOrRebalancing();
return streamsMetadataState.getKeyQueryMetadataForKey(storeName, key, partitioner);
return streamsMetadataState.keyQueryMetadataForKey(storeName, key, partitioner);
}
/**
@ -1766,7 +1766,7 @@ public class KafkaStreams implements AutoCloseable {
"Cannot get state store " + storeName + " because no such store is registered in the topology."
);
}
return queryableStoreProvider.getStore(storeQueryParameters);
return queryableStoreProvider.store(storeQueryParameters);
}
/**
@ -2049,7 +2049,7 @@ public class KafkaStreams implements AutoCloseable {
final long changelogPosition = allChangelogPositions.getOrDefault(entry.getKey(), earliestOffset);
final long latestOffset = entry.getValue().offset();
final LagInfo lagInfo = new LagInfo(changelogPosition == Task.LATEST_OFFSET ? latestOffset : changelogPosition, latestOffset);
final String storeName = streamsMetadataState.getStoreForChangelogTopic(entry.getKey().topic());
final String storeName = streamsMetadataState.storeForChangelogTopic(entry.getKey().topic());
localStorePartitionLags.computeIfAbsent(storeName, ignored -> new TreeMap<>())
.put(entry.getKey().partition(), lagInfo);
}
@ -2116,7 +2116,7 @@ public class KafkaStreams implements AutoCloseable {
final TaskId taskId = task.id();
final int partition = taskId.partition();
if (request.isAllPartitions() || request.getPartitions().contains(partition)) {
final StateStore store = task.getStore(storeName);
final StateStore store = task.store(storeName);
if (store != null) {
final StreamThread.State state = thread.state();
final boolean active = task.isActive();

View File

@ -124,7 +124,7 @@ public abstract class AbstractTask implements Task {
}
@Override
public StateStore getStore(final String name) {
public StateStore store(final String name) {
return stateMgr.getStore(name);
}

View File

@ -213,8 +213,8 @@ public class ReadOnlyTask implements Task {
}
@Override
public StateStore getStore(final String name) {
return task.getStore(name);
public StateStore store(final String name) {
return task.store(name);
}
@Override

View File

@ -318,7 +318,7 @@ public class StreamThread extends Thread implements ProcessingThread {
private volatile State state = State.CREATED;
private volatile ThreadMetadata threadMetadata;
private StreamThread.StateListener stateListener;
private final Optional<String> getGroupInstanceID;
private final Optional<String> groupInstanceID;
private final ChangelogReader changelogReader;
private final ConsumerRebalanceListener rebalanceListener;
@ -629,7 +629,7 @@ public class StreamThread extends Thread implements ProcessingThread {
this.originalReset = originalReset;
this.nextProbingRebalanceMs = nextProbingRebalanceMs;
this.nonFatalExceptionsToHandle = nonFatalExceptionsToHandle;
this.getGroupInstanceID = mainConsumer.groupMetadata().groupInstanceId();
this.groupInstanceID = mainConsumer.groupMetadata().groupInstanceId();
this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
final int dummyThreadIdx = 1;
@ -1586,8 +1586,8 @@ public class StreamThread extends Thread implements ProcessingThread {
return indent + "\tStreamsThread threadId: " + getName() + "\n" + taskManager.toString(indent);
}
public Optional<String> getGroupInstanceID() {
return getGroupInstanceID;
public Optional<String> groupInstanceID() {
return groupInstanceID;
}
public void requestLeaveGroupDuringShutdown() {

View File

@ -102,7 +102,7 @@ public class StreamsMetadataState {
*
* @return all the {@link StreamsMetadata}s in a {@link KafkaStreams} application
*/
public Collection<StreamsMetadata> getAllMetadata() {
public Collection<StreamsMetadata> allMetadata() {
return Collections.unmodifiableList(allMetadata);
}
@ -112,10 +112,10 @@ public class StreamsMetadataState {
* @param storeName the storeName to find metadata for
* @return A collection of {@link StreamsMetadata} that have the provided storeName
*/
public synchronized Collection<StreamsMetadata> getAllMetadataForStore(final String storeName) {
public synchronized Collection<StreamsMetadata> allMetadataForStore(final String storeName) {
Objects.requireNonNull(storeName, "storeName cannot be null");
if (topologyMetadata.hasNamedTopologies()) {
throw new IllegalArgumentException("Cannot invoke the getAllMetadataForStore(storeName) method when"
throw new IllegalArgumentException("Cannot invoke the allMetadataForStore(storeName) method when"
+ "using named topologies, please use the overload that accepts"
+ "a topologyName parameter to identify the correct store");
}
@ -149,7 +149,7 @@ public class StreamsMetadataState {
* @param topologyName the storeName to find metadata for
* @return A collection of {@link StreamsMetadata} that have the provided storeName
*/
public synchronized Collection<StreamsMetadata> getAllMetadataForStore(final String storeName, final String topologyName) {
public synchronized Collection<StreamsMetadata> allMetadataForStore(final String storeName, final String topologyName) {
Objects.requireNonNull(storeName, "storeName cannot be null");
Objects.requireNonNull(topologyName, "topologyName cannot be null");
@ -193,7 +193,7 @@ public class StreamsMetadataState {
/**
* Find the {@link KeyQueryMetadata}s for a given storeName and key. This method will use the
* {@link DefaultStreamPartitioner} to locate the store. If a custom partitioner has been used
* please use {@link StreamsMetadataState#getKeyQueryMetadataForKey(String, Object, StreamPartitioner)} instead.
* please use {@link StreamsMetadataState#keyQueryMetadataForKey(String, Object, StreamPartitioner)} instead.
*
* Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore},
* this method provides a way of finding which {@link KeyQueryMetadata} it would exist on.
@ -206,29 +206,29 @@ public class StreamsMetadataState {
* if streams is (re-)initializing or {@code null} if the corresponding topic cannot be found,
* or null if no matching metadata could be found.
*/
public synchronized <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName,
public synchronized <K> KeyQueryMetadata keyQueryMetadataForKey(final String storeName,
final K key,
final Serializer<K> keySerializer) {
Objects.requireNonNull(keySerializer, "keySerializer can't be null");
if (topologyMetadata.hasNamedTopologies()) {
throw new IllegalArgumentException("Cannot invoke the getKeyQueryMetadataForKey(storeName, key, keySerializer)"
throw new IllegalArgumentException("Cannot invoke the KeyQueryMetadataForKey(storeName, key, keySerializer)"
+ "method when using named topologies, please use the overload that"
+ "accepts a topologyName parameter to identify the correct store");
}
return getKeyQueryMetadataForKey(storeName,
return keyQueryMetadataForKey(storeName,
key,
new DefaultStreamPartitioner<>(keySerializer));
}
/**
* See {@link StreamsMetadataState#getKeyQueryMetadataForKey(String, Object, Serializer)}
* See {@link StreamsMetadataState#keyQueryMetadataForKey(String, Object, Serializer)}
*/
public synchronized <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName,
public synchronized <K> KeyQueryMetadata keyQueryMetadataForKey(final String storeName,
final K key,
final Serializer<K> keySerializer,
final String topologyName) {
Objects.requireNonNull(keySerializer, "keySerializer can't be null");
return getKeyQueryMetadataForKey(storeName,
return keyQueryMetadataForKey(storeName,
key,
new DefaultStreamPartitioner<>(keySerializer),
topologyName);
@ -247,14 +247,14 @@ public class StreamsMetadataState {
* @return The {@link KeyQueryMetadata} for the storeName and key or {@link KeyQueryMetadata#NOT_AVAILABLE}
* if streams is (re-)initializing, or {@code null} if no matching metadata could be found.
*/
public synchronized <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName,
public synchronized <K> KeyQueryMetadata keyQueryMetadataForKey(final String storeName,
final K key,
final StreamPartitioner<? super K, ?> partitioner) {
Objects.requireNonNull(storeName, "storeName can't be null");
Objects.requireNonNull(key, "key can't be null");
Objects.requireNonNull(partitioner, "partitioner can't be null");
if (topologyMetadata.hasNamedTopologies()) {
throw new IllegalArgumentException("Cannot invoke the getKeyQueryMetadataForKey(storeName, key, partitioner)"
throw new IllegalArgumentException("Cannot invoke the keyQueryMetadataForKey(storeName, key, partitioner)"
+ "method when using named topologies, please use the overload that"
+ "accepts a topologyName parameter to identify the correct store");
}
@ -276,13 +276,13 @@ public class StreamsMetadataState {
if (sourceTopicsInfo == null) {
return null;
}
return getKeyQueryMetadataForKey(storeName, key, partitioner, sourceTopicsInfo);
return keyQueryMetadataForKey(storeName, key, partitioner, sourceTopicsInfo);
}
/**
* See {@link StreamsMetadataState#getKeyQueryMetadataForKey(String, Object, StreamPartitioner)}
* See {@link StreamsMetadataState#keyQueryMetadataForKey(String, Object, StreamPartitioner)}
*/
public synchronized <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName,
public synchronized <K> KeyQueryMetadata keyQueryMetadataForKey(final String storeName,
final K key,
final StreamPartitioner<? super K, ?> partitioner,
final String topologyName) {
@ -300,7 +300,7 @@ public class StreamsMetadataState {
if (sourceTopicsInfo == null) {
return null;
}
return getKeyQueryMetadataForKey(storeName, key, partitioner, sourceTopicsInfo, topologyName);
return keyQueryMetadataForKey(storeName, key, partitioner, sourceTopicsInfo, topologyName);
}
/**
@ -478,7 +478,7 @@ public class StreamsMetadataState {
return maybeMulticastPartitions.get().iterator().next();
};
private <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName,
private <K> KeyQueryMetadata keyQueryMetadataForKey(final String storeName,
final K key,
final StreamPartitioner<? super K, ?> partitioner,
final SourceTopicsInfo sourceTopicsInfo) {
@ -511,7 +511,7 @@ public class StreamsMetadataState {
return new KeyQueryMetadata(activeHost, standbyHosts, partition);
}
private <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName,
private <K> KeyQueryMetadata keyQueryMetadataForKey(final String storeName,
final K key,
final StreamPartitioner<? super K, ?> partitioner,
final SourceTopicsInfo sourceTopicsInfo,
@ -566,7 +566,7 @@ public class StreamsMetadataState {
return partitionsByTopic != null && !partitionsByTopic.isEmpty() && localMetadata.get() != null;
}
public String getStoreForChangelogTopic(final String topicName) {
public String storeForChangelogTopic(final String topicName) {
return topologyMetadata.storeForChangelogTopic(topicName);
}

View File

@ -246,7 +246,7 @@ public interface Task {
// IQ related methods
StateStore getStore(final String name);
StateStore store(final String name);
/**
* @return the offsets of all the changelog partitions associated with this task,

View File

@ -404,7 +404,7 @@ public class KafkaStreamsNamedTopologyWrapper extends KafkaStreams {
public Collection<StreamsMetadata> streamsMetadataForStore(final String storeName, final String topologyName) {
verifyTopologyStateStore(topologyName, storeName);
validateIsRunningOrRebalancing();
return streamsMetadataState.getAllMetadataForStore(storeName, topologyName);
return streamsMetadataState.allMetadataForStore(storeName, topologyName);
}
/**
@ -424,7 +424,7 @@ public class KafkaStreamsNamedTopologyWrapper extends KafkaStreams {
final String topologyName) {
verifyTopologyStateStore(topologyName, storeName);
validateIsRunningOrRebalancing();
return streamsMetadataState.getKeyQueryMetadataForKey(storeName, key, keySerializer, topologyName);
return streamsMetadataState.keyQueryMetadataForKey(storeName, key, keySerializer, topologyName);
}
/**

View File

@ -51,7 +51,7 @@ public class QueryableStoreProvider {
* @param <T> The expected type of the returned store
* @return A composite object that wraps the store instances.
*/
public <T> T getStore(final StoreQueryParameters<T> storeQueryParameters) {
public <T> T store(final StoreQueryParameters<T> storeQueryParameters) {
final String storeName = storeQueryParameters.storeName();
final QueryableStoreType<T> queryableStoreType = storeQueryParameters.queryableStoreType();
final List<T> globalStore = globalStoreProvider.stores(storeName, queryableStoreType);

View File

@ -62,9 +62,9 @@ public class StreamThreadStateStoreProvider {
for (final Task task : tasks) {
if (task.id().partition() == storeQueryParams.partition() &&
(topologyName == null || topologyName.equals(task.id().topologyName())) &&
task.getStore(storeName) != null &&
storeName.equals(task.getStore(storeName).name())) {
final T typedStore = validateAndCastStores(task.getStore(storeName), queryableStoreType, storeName, task.id());
task.store(storeName) != null &&
storeName.equals(task.store(storeName).name())) {
final T typedStore = validateAndCastStores(task.store(storeName), queryableStoreType, storeName, task.id());
return Collections.singletonList(typedStore);
}
}
@ -72,7 +72,7 @@ public class StreamThreadStateStoreProvider {
} else {
final List<T> list = new ArrayList<>();
for (final Task task : tasks) {
final StateStore store = task.getStore(storeName);
final StateStore store = task.store(storeName);
if (store == null) {
// then this task doesn't have that store
} else {

View File

@ -734,7 +734,7 @@ public class KafkaStreamsTest {
final AtomicReference<StreamThread.State> state2 = prepareStreamThread(streamThreadTwo, 2);
prepareThreadState(streamThreadOne, state1);
prepareThreadState(streamThreadTwo, state2);
when(streamThreadOne.getGroupInstanceID()).thenReturn(Optional.empty());
when(streamThreadOne.groupInstanceID()).thenReturn(Optional.empty());
when(streamThreadOne.waitOnThreadState(isA(StreamThread.State.class), anyLong())).thenReturn(true);
when(streamThreadOne.isThreadAlive()).thenReturn(true);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);

View File

@ -50,7 +50,7 @@ class ReadOnlyTaskTest {
add("changelogOffsets");
add("state");
add("id");
add("getStore");
add("store");
}
};

View File

@ -2065,8 +2065,8 @@ public class StreamThreadTest {
assertEquals(task1, standbyTask1.id());
assertEquals(task3, standbyTask2.id());
final KeyValueStore<Object, Long> store1 = (KeyValueStore<Object, Long>) standbyTask1.getStore(storeName1);
final KeyValueStore<Object, Long> store2 = (KeyValueStore<Object, Long>) standbyTask2.getStore(storeName2);
final KeyValueStore<Object, Long> store1 = (KeyValueStore<Object, Long>) standbyTask1.store(storeName1);
final KeyValueStore<Object, Long> store2 = (KeyValueStore<Object, Long>) standbyTask2.store(storeName2);
assertEquals(0L, store1.approximateNumEntries());
assertEquals(0L, store2.approximateNumEntries());
@ -2189,10 +2189,10 @@ public class StreamThreadTest {
assertEquals(task1, standbyTask1.id());
assertEquals(task3, standbyTask2.id());
final KeyValueStore<Object, Long> activeStore = (KeyValueStore<Object, Long>) activeTask1.getStore(storeName1);
final KeyValueStore<Object, Long> activeStore = (KeyValueStore<Object, Long>) activeTask1.store(storeName1);
final KeyValueStore<Object, Long> store1 = (KeyValueStore<Object, Long>) standbyTask1.getStore(storeName1);
final KeyValueStore<Object, Long> store2 = (KeyValueStore<Object, Long>) standbyTask2.getStore(storeName2);
final KeyValueStore<Object, Long> store1 = (KeyValueStore<Object, Long>) standbyTask1.store(storeName1);
final KeyValueStore<Object, Long> store2 = (KeyValueStore<Object, Long>) standbyTask2.store(storeName2);
assertEquals(0L, activeStore.approximateNumEntries());
assertEquals(0L, store1.approximateNumEntries());

View File

@ -157,7 +157,7 @@ public class StreamsMetadataStateTest {
new TopologyMetadata(TopologyWrapper.getInternalTopologyBuilder(builder.build()), new DummyStreamsConfig()),
hostOne,
logContext
).getAllMetadataForStore("store");
).allMetadataForStore("store");
assertEquals(0, metadata.size());
}
@ -179,7 +179,7 @@ public class StreamsMetadataStateTest {
mkSet("table-one", "table-two", "merged-table"),
mkSet(topic1P0, topic2P1, topic4P0));
final Collection<StreamsMetadata> actual = metadataState.getAllMetadata();
final Collection<StreamsMetadata> actual = metadataState.allMetadata();
assertEquals(3, actual.size());
assertTrue(actual.contains(one), "expected " + actual + " to contain " + one);
assertTrue(actual.contains(two), "expected " + actual + " to contain " + two);
@ -199,7 +199,7 @@ public class StreamsMetadataStateTest {
final StreamsMetadata expected = new StreamsMetadataImpl(hostFour, Collections.singleton(globalTable),
Collections.singleton(tp5), Collections.emptySet(), Collections.emptySet());
final Collection<StreamsMetadata> actual = metadataState.getAllMetadata();
final Collection<StreamsMetadata> actual = metadataState.allMetadata();
assertTrue(actual.contains(expected), "expected " + actual + " to contain " + expected);
}
@ -215,7 +215,7 @@ public class StreamsMetadataStateTest {
mkSet(topic2P0, topic1P1),
mkSet("table-three"),
mkSet(topic3P0));
final Collection<StreamsMetadata> actual = metadataState.getAllMetadataForStore("table-one");
final Collection<StreamsMetadata> actual = metadataState.allMetadataForStore("table-one");
final Map<HostInfo, StreamsMetadata> actualAsMap = actual.stream()
.collect(Collectors.toMap(StreamsMetadata::hostInfo, Function.identity()));
assertEquals(3, actual.size());
@ -227,12 +227,12 @@ public class StreamsMetadataStateTest {
@Test
public void shouldThrowIfStoreNameIsNullOnGetAllInstancesWithStore() {
assertThrows(NullPointerException.class, () -> metadataState.getAllMetadataForStore(null));
assertThrows(NullPointerException.class, () -> metadataState.allMetadataForStore(null));
}
@Test
public void shouldReturnEmptyCollectionOnGetAllInstancesWithStoreWhenStoreDoesntExist() {
final Collection<StreamsMetadata> actual = metadataState.getAllMetadataForStore("not-a-store");
final Collection<StreamsMetadata> actual = metadataState.allMetadataForStore("not-a-store");
assertTrue(actual.isEmpty());
}
@ -245,7 +245,7 @@ public class StreamsMetadataStateTest {
Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null)));
final KeyQueryMetadata expected = new KeyQueryMetadata(hostThree, mkSet(hostTwo), 0);
final KeyQueryMetadata actual = metadataState.getKeyQueryMetadataForKey("table-three",
final KeyQueryMetadata actual = metadataState.keyQueryMetadataForKey("table-three",
"the-key",
Serdes.String().serializer());
assertEquals(expected, actual);
@ -261,7 +261,7 @@ public class StreamsMetadataStateTest {
final KeyQueryMetadata expected = new KeyQueryMetadata(hostTwo, Collections.emptySet(), 1);
final KeyQueryMetadata actual = metadataState.getKeyQueryMetadataForKey("table-three",
final KeyQueryMetadata actual = metadataState.keyQueryMetadataForKey("table-three",
"the-key",
partitioner);
assertEquals(expected, actual);
@ -277,7 +277,7 @@ public class StreamsMetadataStateTest {
Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null)));
assertThrows(IllegalArgumentException.class, () -> metadataState.getKeyQueryMetadataForKey("table-three",
assertThrows(IllegalArgumentException.class, () -> metadataState.keyQueryMetadataForKey("table-three",
"the-key",
new MultiValuedPartitioner()));
}
@ -285,7 +285,7 @@ public class StreamsMetadataStateTest {
@Test
public void shouldReturnNotAvailableWhenClusterIsEmpty() {
metadataState.onChange(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
final KeyQueryMetadata result = metadataState.getKeyQueryMetadataForKey("table-one", "a", Serdes.String().serializer());
final KeyQueryMetadata result = metadataState.keyQueryMetadataForKey("table-one", "a", Serdes.String().serializer());
assertEquals(KeyQueryMetadata.NOT_AVAILABLE, result);
}
@ -299,7 +299,7 @@ public class StreamsMetadataStateTest {
final KeyQueryMetadata expected = new KeyQueryMetadata(hostTwo, mkSet(hostOne), 2);
final KeyQueryMetadata actual = metadataState.getKeyQueryMetadataForKey("merged-table", "the-key",
final KeyQueryMetadata actual = metadataState.keyQueryMetadataForKey("merged-table", "the-key",
(topic, key, value, numPartitions) -> 2);
assertEquals(expected, actual);
@ -307,7 +307,7 @@ public class StreamsMetadataStateTest {
@Test
public void shouldReturnNullOnGetWithKeyWhenStoreDoesntExist() {
final KeyQueryMetadata actual = metadataState.getKeyQueryMetadataForKey("not-a-store",
final KeyQueryMetadata actual = metadataState.keyQueryMetadataForKey("not-a-store",
"key",
Serdes.String().serializer());
assertNull(actual);
@ -315,28 +315,28 @@ public class StreamsMetadataStateTest {
@Test
public void shouldThrowWhenKeyIsNull() {
assertThrows(NullPointerException.class, () -> metadataState.getKeyQueryMetadataForKey("table-three", null, Serdes.String().serializer()));
assertThrows(NullPointerException.class, () -> metadataState.keyQueryMetadataForKey("table-three", null, Serdes.String().serializer()));
}
@Test
public void shouldThrowWhenSerializerIsNull() {
assertThrows(NullPointerException.class, () -> metadataState.getKeyQueryMetadataForKey("table-three", "key", (Serializer<Object>) null));
assertThrows(NullPointerException.class, () -> metadataState.keyQueryMetadataForKey("table-three", "key", (Serializer<Object>) null));
}
@Test
public void shouldThrowIfStoreNameIsNull() {
assertThrows(NullPointerException.class, () -> metadataState.getKeyQueryMetadataForKey(null, "key", Serdes.String().serializer()));
assertThrows(NullPointerException.class, () -> metadataState.keyQueryMetadataForKey(null, "key", Serdes.String().serializer()));
}
@SuppressWarnings("unchecked")
@Test
public void shouldThrowIfStreamPartitionerIsNull() {
assertThrows(NullPointerException.class, () -> metadataState.getKeyQueryMetadataForKey(null, "key", (StreamPartitioner) null));
assertThrows(NullPointerException.class, () -> metadataState.keyQueryMetadataForKey(null, "key", (StreamPartitioner) null));
}
@Test
public void shouldHaveGlobalStoreInAllMetadata() {
final Collection<StreamsMetadata> metadata = metadataState.getAllMetadataForStore(globalTable);
final Collection<StreamsMetadata> metadata = metadataState.allMetadataForStore(globalTable);
assertEquals(3, metadata.size());
for (final StreamsMetadata streamsMetadata : metadata) {
assertTrue(streamsMetadata.stateStoreNames().contains(globalTable));
@ -355,7 +355,7 @@ public class StreamsMetadataStateTest {
@Test
public void shouldGetQueryMetadataForGlobalStoreWithKey() {
final KeyQueryMetadata metadata = metadataState.getKeyQueryMetadataForKey(globalTable, "key", Serdes.String().serializer());
final KeyQueryMetadata metadata = metadataState.keyQueryMetadataForKey(globalTable, "key", Serdes.String().serializer());
assertEquals(hostOne, metadata.activeHost());
assertTrue(metadata.standbyHosts().isEmpty());
}
@ -368,12 +368,12 @@ public class StreamsMetadataStateTest {
logContext
);
streamsMetadataState.onChange(hostToActivePartitions, hostToStandbyPartitions, partitionInfos);
assertNotNull(streamsMetadataState.getKeyQueryMetadataForKey(globalTable, "key", Serdes.String().serializer()));
assertNotNull(streamsMetadataState.keyQueryMetadataForKey(globalTable, "key", Serdes.String().serializer()));
}
@Test
public void shouldGetQueryMetadataForGlobalStoreWithKeyAndPartitioner() {
final KeyQueryMetadata metadata = metadataState.getKeyQueryMetadataForKey(globalTable, "key", partitioner);
final KeyQueryMetadata metadata = metadataState.keyQueryMetadataForKey(globalTable, "key", partitioner);
assertEquals(hostOne, metadata.activeHost());
assertTrue(metadata.standbyHosts().isEmpty());
}
@ -386,12 +386,12 @@ public class StreamsMetadataStateTest {
logContext
);
streamsMetadataState.onChange(hostToActivePartitions, hostToStandbyPartitions, partitionInfos);
assertNotNull(streamsMetadataState.getKeyQueryMetadataForKey(globalTable, "key", partitioner));
assertNotNull(streamsMetadataState.keyQueryMetadataForKey(globalTable, "key", partitioner));
}
@Test
public void shouldReturnAllMetadataThatRemainsValidAfterChange() {
final Collection<StreamsMetadata> allMetadata = metadataState.getAllMetadata();
final Collection<StreamsMetadata> allMetadata = metadataState.allMetadata();
final Collection<StreamsMetadata> copy = new ArrayList<>(allMetadata);
assertFalse(allMetadata.isEmpty(), "invalid test");
metadataState.onChange(Collections.emptyMap(), Collections.emptyMap(), partitionInfos);
@ -400,7 +400,7 @@ public class StreamsMetadataStateTest {
@Test
public void shouldNotReturnMutableReferenceToInternalAllMetadataCollection() {
final Collection<StreamsMetadata> allMetadata = metadataState.getAllMetadata();
final Collection<StreamsMetadata> allMetadata = metadataState.allMetadata();
assertFalse(allMetadata.isEmpty(), "invalid test");
try {
@ -410,6 +410,6 @@ public class StreamsMetadataStateTest {
// Or should fail.
}
assertFalse(metadataState.getAllMetadata().isEmpty(), "encapsulation broken");
assertFalse(metadataState.allMetadata().isEmpty(), "encapsulation broken");
}
}

View File

@ -4883,7 +4883,7 @@ public class TaskManagerTest {
}
@Override
public StateStore getStore(final String name) {
public StateStore store(final String name) {
return null;
}

View File

@ -60,54 +60,54 @@ public class QueryableStoreProviderTest {
@Test
public void shouldThrowExceptionIfKVStoreDoesntExist() {
assertThrows(InvalidStateStoreException.class, () -> storeProvider.getStore(
assertThrows(InvalidStateStoreException.class, () -> storeProvider.store(
StoreQueryParameters.fromNameAndType("not-a-store", QueryableStoreTypes.keyValueStore())).get("1"));
}
@Test
public void shouldThrowExceptionIfWindowStoreDoesntExist() {
assertThrows(InvalidStateStoreException.class, () -> storeProvider.getStore(
assertThrows(InvalidStateStoreException.class, () -> storeProvider.store(
StoreQueryParameters.fromNameAndType("not-a-store", QueryableStoreTypes.windowStore())).fetch("1", System.currentTimeMillis()));
}
@Test
public void shouldReturnKVStoreWhenItExists() {
assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType(keyValueStore, QueryableStoreTypes.keyValueStore())));
assertNotNull(storeProvider.store(StoreQueryParameters.fromNameAndType(keyValueStore, QueryableStoreTypes.keyValueStore())));
}
@Test
public void shouldReturnWindowStoreWhenItExists() {
assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType(windowStore, QueryableStoreTypes.windowStore())));
assertNotNull(storeProvider.store(StoreQueryParameters.fromNameAndType(windowStore, QueryableStoreTypes.windowStore())));
}
@Test
public void shouldThrowExceptionWhenLookingForWindowStoreWithDifferentType() {
assertThrows(InvalidStateStoreException.class, () -> storeProvider.getStore(StoreQueryParameters.fromNameAndType(windowStore,
assertThrows(InvalidStateStoreException.class, () -> storeProvider.store(StoreQueryParameters.fromNameAndType(windowStore,
QueryableStoreTypes.keyValueStore())).get("1"));
}
@Test
public void shouldThrowExceptionWhenLookingForKVStoreWithDifferentType() {
assertThrows(InvalidStateStoreException.class, () -> storeProvider.getStore(StoreQueryParameters.fromNameAndType(keyValueStore,
assertThrows(InvalidStateStoreException.class, () -> storeProvider.store(StoreQueryParameters.fromNameAndType(keyValueStore,
QueryableStoreTypes.windowStore())).fetch("1", System.currentTimeMillis()));
}
@Test
public void shouldFindGlobalStores() {
globalStateStores.put("global", new NoOpReadOnlyStore<>());
assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType("global", QueryableStoreTypes.keyValueStore())));
assertNotNull(storeProvider.store(StoreQueryParameters.fromNameAndType("global", QueryableStoreTypes.keyValueStore())));
}
@Test
public void shouldReturnKVStoreWithPartitionWhenItExists() {
assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType(keyValueStore, QueryableStoreTypes.keyValueStore()).withPartition(numStateStorePartitions - 1)));
assertNotNull(storeProvider.store(StoreQueryParameters.fromNameAndType(keyValueStore, QueryableStoreTypes.keyValueStore()).withPartition(numStateStorePartitions - 1)));
}
@Test
public void shouldThrowExceptionWhenKVStoreWithPartitionDoesntExists() {
final int partition = numStateStorePartitions + 1;
final InvalidStateStoreException thrown = assertThrows(InvalidStateStoreException.class, () ->
storeProvider.getStore(
storeProvider.store(
StoreQueryParameters
.fromNameAndType(keyValueStore, QueryableStoreTypes.keyValueStore())
.withPartition(partition)).get("1")
@ -117,14 +117,14 @@ public class QueryableStoreProviderTest {
@Test
public void shouldReturnWindowStoreWithPartitionWhenItExists() {
assertNotNull(storeProvider.getStore(StoreQueryParameters.fromNameAndType(windowStore, QueryableStoreTypes.windowStore()).withPartition(numStateStorePartitions - 1)));
assertNotNull(storeProvider.store(StoreQueryParameters.fromNameAndType(windowStore, QueryableStoreTypes.windowStore()).withPartition(numStateStorePartitions - 1)));
}
@Test
public void shouldThrowExceptionWhenWindowStoreWithPartitionDoesntExists() {
final int partition = numStateStorePartitions + 1;
final InvalidStateStoreException thrown = assertThrows(InvalidStateStoreException.class, () ->
storeProvider.getStore(
storeProvider.store(
StoreQueryParameters
.fromNameAndType(windowStore, QueryableStoreTypes.windowStore())
.withPartition(partition)).fetch("1", System.currentTimeMillis())

View File

@ -316,7 +316,7 @@ public class StreamThreadStateStoreProviderTest {
@Test
public void shouldThrowInvalidStoreExceptionIfKVStoreClosed() {
mockThread(true);
taskOne.getStore("kv-store").close();
taskOne.store("kv-store").close();
assertThrows(InvalidStateStoreException.class, () -> provider.stores(StoreQueryParameters.fromNameAndType("kv-store",
QueryableStoreTypes.keyValueStore())));
}
@ -324,7 +324,7 @@ public class StreamThreadStateStoreProviderTest {
@Test
public void shouldThrowInvalidStoreExceptionIfTsKVStoreClosed() {
mockThread(true);
taskOne.getStore("timestamped-kv-store").close();
taskOne.store("timestamped-kv-store").close();
assertThrows(InvalidStateStoreException.class, () -> provider.stores(StoreQueryParameters.fromNameAndType("timestamped-kv-store",
QueryableStoreTypes.timestampedKeyValueStore())));
}
@ -332,7 +332,7 @@ public class StreamThreadStateStoreProviderTest {
@Test
public void shouldThrowInvalidStoreExceptionIfWindowStoreClosed() {
mockThread(true);
taskOne.getStore("window-store").close();
taskOne.store("window-store").close();
assertThrows(InvalidStateStoreException.class, () -> provider.stores(StoreQueryParameters.fromNameAndType("window-store",
QueryableStoreTypes.windowStore())));
}
@ -340,7 +340,7 @@ public class StreamThreadStateStoreProviderTest {
@Test
public void shouldThrowInvalidStoreExceptionIfTsWindowStoreClosed() {
mockThread(true);
taskOne.getStore("timestamped-window-store").close();
taskOne.store("timestamped-window-store").close();
assertThrows(InvalidStateStoreException.class, () -> provider.stores(StoreQueryParameters.fromNameAndType("timestamped-window-store",
QueryableStoreTypes.timestampedWindowStore())));
}
@ -348,7 +348,7 @@ public class StreamThreadStateStoreProviderTest {
@Test
public void shouldThrowInvalidStoreExceptionIfSessionStoreClosed() {
mockThread(true);
taskOne.getStore("session-store").close();
taskOne.store("session-store").close();
assertThrows(InvalidStateStoreException.class, () -> provider.stores(StoreQueryParameters.fromNameAndType("session-store",
QueryableStoreTypes.sessionStore())));
}