KAFKA-6265: Add #queryableStoreName() to GlobalKTable

A spinoff of original pull request #4340 for resolving conflicts.

Author: RichardYuSTUG <yohan.richard.yu2@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #4413 from ConcurrencyPractitioner/kafka-6265-2
This commit is contained in:
Richard Yu 2018-01-11 09:54:02 -08:00 committed by Guozhang Wang
parent 4d544d4f08
commit b8aa1761c3
5 changed files with 56 additions and 1 deletions

View File

@ -71,6 +71,13 @@
to distinguish them from configurations of other clients that share the same config names.
</p>
<p>
New method in <code>GlobalKTable</code>
</p>
<ul>
<li> A method has been provided such that it will return the store name associated with the <code>GlobalKTable</code> or <code>null</code> if the store name is non-queryable. </li>
</ul>
<p>
New methods in <code>KafkaStreams</code>:
</p>

View File

@ -67,4 +67,10 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
*/
@InterfaceStability.Evolving
public interface GlobalKTable<K, V> {
/**
* Get the name of the local state store that can be used to query this {@code GlobalKTable}.
*
* @return the underlying state store name, or {@code null} if this {@code GlobalKTable} cannot be queried.
*/
String queryableStoreName();
}

View File

@ -21,13 +21,29 @@ import org.apache.kafka.streams.kstream.GlobalKTable;
public class GlobalKTableImpl<K, V> implements GlobalKTable<K, V> {
private final KTableValueGetterSupplier<K, V> valueGetterSupplier;
private final boolean queryable;
public GlobalKTableImpl(final KTableValueGetterSupplier<K, V> valueGetterSupplier) {
this.valueGetterSupplier = valueGetterSupplier;
this.queryable = true;
}
public GlobalKTableImpl(final KTableValueGetterSupplier<K, V> valueGetterSupplier,
final boolean queryable) {
this.valueGetterSupplier = valueGetterSupplier;
this.queryable = queryable;
}
KTableValueGetterSupplier<K, V> valueGetterSupplier() {
return valueGetterSupplier;
}
@Override
public String queryableStoreName() {
if (!queryable) {
return null;
}
return valueGetterSupplier.storeNames()[0];
}
}

View File

@ -158,7 +158,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
topic,
processorName,
tableSource);
return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier<K, V>(storeBuilder.name()));
return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier<K, V>(storeBuilder.name()), materialized.isQueryable());
}
@Override

View File

@ -156,6 +156,32 @@ public class InternalStreamsBuilderTest {
assertEquals("topic2", topology.storeToChangelogTopic().get(storeName));
assertNull(table1.queryableStoreName());
}
@Test
public void shouldBuildGlobalTableWithNonQueryableStoreName() throws Exception {
final GlobalKTable<String, String> table1 = builder.globalTable(
"topic2",
consumed,
new MaterializedInternal<>(
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>with(null, null),
builder,
storePrefix));
assertNull(table1.queryableStoreName());
}
@Test
public void shouldBuildGlobalTableWithQueryaIbleStoreName() throws Exception {
final GlobalKTable<String, String> table1 = builder.globalTable(
"topic2",
consumed,
new MaterializedInternal<>(
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("globalTable"),
builder,
storePrefix));
assertEquals("globalTable", table1.queryableStoreName());
}
@Test
public void shouldBuildSimpleGlobalTableTopology() throws Exception {