MINOR: remove get prefix for internal state methods (#17053)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Matthias J. Sax 2024-08-31 05:02:06 -07:00 committed by GitHub
parent 8f4d856977
commit fc720d33a0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 110 additions and 114 deletions

View File

@ -207,7 +207,7 @@ abstract class KStreamKStreamJoin<K, VLeft, VRight, VOut, VThis, VOther> impleme
while (it.hasNext()) { while (it.hasNext()) {
final KeyValue<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<VLeft, VRight>> nextKeyValue = it.next(); final KeyValue<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<VLeft, VRight>> nextKeyValue = it.next();
final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide = nextKeyValue.key; final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide = nextKeyValue.key;
sharedTimeTracker.minTime = timestampedKeyAndJoinSide.getTimestamp(); sharedTimeTracker.minTime = timestampedKeyAndJoinSide.timestamp();
if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) { if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) {
// if windows are open for both joinSides we can break since there are no more candidates to emit // if windows are open for both joinSides we can break since there are no more candidates to emit
break; break;
@ -250,8 +250,8 @@ abstract class KStreamKStreamJoin<K, VLeft, VRight, VOut, VThis, VOther> impleme
private void forwardNonJoinedOuterRecords(final Record<K, VThis> record, private void forwardNonJoinedOuterRecords(final Record<K, VThis> record,
final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide, final TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide,
final LeftOrRightValue<VLeft, VRight> leftOrRightValue) { final LeftOrRightValue<VLeft, VRight> leftOrRightValue) {
final K key = timestampedKeyAndJoinSide.getKey(); final K key = timestampedKeyAndJoinSide.key();
final long timestamp = timestampedKeyAndJoinSide.getTimestamp(); final long timestamp = timestampedKeyAndJoinSide.timestamp();
final VThis thisValue = getThisValue(leftOrRightValue); final VThis thisValue = getThisValue(leftOrRightValue);
final VOther otherValue = getOtherValue(leftOrRightValue); final VOther otherValue = getOtherValue(leftOrRightValue);
final VOut nullJoinedValue = joiner.apply(key, thisValue, otherValue); final VOut nullJoinedValue = joiner.apply(key, thisValue, otherValue);

View File

@ -60,12 +60,12 @@ class KStreamKStreamJoinLeftSide<K, VLeft, VRight, VOut> extends KStreamKStreamJ
@Override @Override
public VLeft getThisValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue) { public VLeft getThisValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue) {
return leftOrRightValue.getLeftValue(); return leftOrRightValue.leftValue();
} }
@Override @Override
public VRight getOtherValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue) { public VRight getOtherValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue) {
return leftOrRightValue.getRightValue(); return leftOrRightValue.rightValue();
} }
} }
} }

View File

@ -59,12 +59,12 @@ class KStreamKStreamJoinRightSide<K, VLeft, VRight, VOut> extends KStreamKStream
@Override @Override
public VRight getThisValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue) { public VRight getThisValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue) {
return leftOrRightValue.getRightValue(); return leftOrRightValue.rightValue();
} }
@Override @Override
public VLeft getOtherValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue) { public VLeft getOtherValue(final LeftOrRightValue<? extends VLeft, ? extends VRight> leftOrRightValue) {
return leftOrRightValue.getLeftValue(); return leftOrRightValue.leftValue();
} }
} }
} }

View File

@ -53,7 +53,7 @@ abstract class AbstractReadWriteDecorator<T extends StateStore, K, V> extends Wr
throw new UnsupportedOperationException(ERROR_MESSAGE); throw new UnsupportedOperationException(ERROR_MESSAGE);
} }
static StateStore readWriteStore(final StateStore store) { static StateStore wrapWithReadWriteStore(final StateStore store) {
if (store instanceof TimestampedKeyValueStore) { if (store instanceof TimestampedKeyValueStore) {
return new TimestampedKeyValueStoreReadWriteDecorator<>((TimestampedKeyValueStore<?, ?>) store); return new TimestampedKeyValueStoreReadWriteDecorator<>((TimestampedKeyValueStore<?, ?>) store);
} else if (store instanceof VersionedKeyValueStore) { } else if (store instanceof VersionedKeyValueStore) {

View File

@ -200,7 +200,7 @@ public class DefaultStateUpdater implements StateUpdater {
private void performActionsOnTasks() { private void performActionsOnTasks() {
tasksAndActionsLock.lock(); tasksAndActionsLock.lock();
try { try {
for (final TaskAndAction taskAndAction : getTasksAndActions()) { for (final TaskAndAction taskAndAction : tasksAndActions()) {
final Action action = taskAndAction.action(); final Action action = taskAndAction.action();
switch (action) { switch (action) {
case ADD: case ADD:
@ -458,7 +458,7 @@ public class DefaultStateUpdater implements StateUpdater {
changelogReader.clear(); changelogReader.clear();
} }
private List<TaskAndAction> getTasksAndActions() { private List<TaskAndAction> tasksAndActions() {
final List<TaskAndAction> tasksAndActionsToProcess = new ArrayList<>(tasksAndActions); final List<TaskAndAction> tasksAndActionsToProcess = new ArrayList<>(tasksAndActions);
tasksAndActions.clear(); tasksAndActions.clear();
return tasksAndActionsToProcess; return tasksAndActionsToProcess;

View File

@ -34,7 +34,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListe
import java.time.Duration; import java.time.Duration;
import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.readWriteStore; import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.wrapWithReadWriteStore;
public class GlobalProcessorContextImpl extends AbstractProcessorContext<Object, Object> { public class GlobalProcessorContextImpl extends AbstractProcessorContext<Object, Object> {
@ -60,7 +60,7 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext<Object,
@Override @Override
public <S extends StateStore> S getStateStore(final String name) { public <S extends StateStore> S getStateStore(final String name) {
final StateStore store = stateManager.globalStore(name); final StateStore store = stateManager.globalStore(name);
return (S) readWriteStore(store); return (S) wrapWithReadWriteStore(store);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View File

@ -47,7 +47,7 @@ import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTEN
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration; import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
import static org.apache.kafka.streams.processor.internals.AbstractReadOnlyDecorator.getReadOnlyStore; import static org.apache.kafka.streams.processor.internals.AbstractReadOnlyDecorator.getReadOnlyStore;
import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.readWriteStore; import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.wrapWithReadWriteStore;
public class ProcessorContextImpl extends AbstractProcessorContext<Object, Object> implements RecordCollector.Supplier { public class ProcessorContextImpl extends AbstractProcessorContext<Object, Object> implements RecordCollector.Supplier {
// the below are null for standby tasks // the below are null for standby tasks
@ -182,7 +182,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext<Object, Objec
} }
final StateStore store = stateManager.store(name); final StateStore store = stateManager.store(name);
return (S) readWriteStore(store); return (S) wrapWithReadWriteStore(store);
} }
@Override @Override

View File

@ -113,7 +113,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
public void remove(final Bytes rawBaseKey) { public void remove(final Bytes rawBaseKey) {
final long timestamp = baseKeySchema.segmentTimestamp(rawBaseKey); final long timestamp = baseKeySchema.segmentTimestamp(rawBaseKey);
observedStreamTime = Math.max(observedStreamTime, timestamp); observedStreamTime = Math.max(observedStreamTime, timestamp);
final S segment = segments.getSegmentForTimestamp(timestamp); final S segment = segments.segmentForTimestamp(timestamp);
if (segment == null) { if (segment == null) {
return; return;
} }
@ -227,7 +227,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
} }
} }
final S segment = segments.getSegmentForTimestamp(timestampFromRawKey); final S segment = segments.segmentForTimestamp(timestampFromRawKey);
if (segment == null) { if (segment == null) {
return null; return null;
} }

View File

@ -239,7 +239,7 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
public void remove(final Bytes key) { public void remove(final Bytes key) {
final long timestamp = keySchema.segmentTimestamp(key); final long timestamp = keySchema.segmentTimestamp(key);
observedStreamTime = Math.max(observedStreamTime, timestamp); observedStreamTime = Math.max(observedStreamTime, timestamp);
final S segment = segments.getSegmentForTimestamp(timestamp); final S segment = segments.segmentForTimestamp(timestamp);
if (segment == null) { if (segment == null) {
return; return;
} }
@ -249,7 +249,7 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
@Override @Override
public void remove(final Bytes key, final long timestamp) { public void remove(final Bytes key, final long timestamp) {
final Bytes keyBytes = keySchema.toStoreBinaryKeyPrefix(key, timestamp); final Bytes keyBytes = keySchema.toStoreBinaryKeyPrefix(key, timestamp);
final S segment = segments.getSegmentForTimestamp(timestamp); final S segment = segments.segmentForTimestamp(timestamp);
if (segment != null) { if (segment != null) {
segment.deleteRange(keyBytes, keyBytes); segment.deleteRange(keyBytes, keyBytes);
} }
@ -281,7 +281,7 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
key.toString(), timestampFromKey, observedStreamTime - retentionPeriod + 1); key.toString(), timestampFromKey, observedStreamTime - retentionPeriod + 1);
return null; return null;
} }
final S segment = segments.getSegmentForTimestamp(timestampFromKey); final S segment = segments.segmentForTimestamp(timestampFromKey);
if (segment == null) { if (segment == null) {
return null; return null;
} }

View File

@ -75,7 +75,7 @@ abstract class AbstractSegments<S extends Segment> implements Segments<S> {
} }
@Override @Override
public S getSegmentForTimestamp(final long timestamp) { public S segmentForTimestamp(final long timestamp) {
return segments.get(segmentId(timestamp)); return segments.get(segmentId(timestamp));
} }

View File

@ -63,11 +63,11 @@ public class LeftOrRightValue<V1, V2> {
return new LeftOrRightValue<>(null, rightValue); return new LeftOrRightValue<>(null, rightValue);
} }
public V1 getLeftValue() { public V1 leftValue() {
return leftValue; return leftValue;
} }
public V2 getRightValue() { public V2 rightValue() {
return rightValue; return rightValue;
} }

View File

@ -65,9 +65,9 @@ public class LeftOrRightValueSerializer<V1, V2> implements WrappingNullableSeria
return null; return null;
} }
final byte[] rawValue = (data.getLeftValue() != null) final byte[] rawValue = (data.leftValue() != null)
? leftSerializer.serialize(topic, data.getLeftValue()) ? leftSerializer.serialize(topic, data.leftValue())
: rightSerializer.serialize(topic, data.getRightValue()); : rightSerializer.serialize(topic, data.rightValue());
if (rawValue == null) { if (rawValue == null) {
return null; return null;
@ -75,7 +75,7 @@ public class LeftOrRightValueSerializer<V1, V2> implements WrappingNullableSeria
return ByteBuffer return ByteBuffer
.allocate(1 + rawValue.length) .allocate(1 + rawValue.length)
.put((byte) (data.getLeftValue() != null ? 1 : 0)) .put((byte) (data.leftValue() != null ? 1 : 0))
.put(rawValue) .put(rawValue)
.array(); .array();
} }

View File

@ -89,7 +89,7 @@ class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segm
+ "an entire store is closed, via the close() method rather than destroy()."); + "an entire store is closed, via the close() method rather than destroy().");
} }
final Bytes keyPrefix = prefixKeyFormatter.getPrefix(); final Bytes keyPrefix = prefixKeyFormatter.prefix();
// this deleteRange() call deletes all entries with the given prefix, because the // this deleteRange() call deletes all entries with the given prefix, because the
// deleteRange() implementation calls Bytes.increment() in order to make keyTo inclusive // deleteRange() implementation calls Bytes.increment() in order to make keyTo inclusive
@ -192,8 +192,8 @@ class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segm
} }
} }
public Snapshot getSnapshot() { public Snapshot snapshot() {
return physicalStore.getSnapshot(); return physicalStore.snapshot();
} }
public void releaseSnapshot(final Snapshot snapshot) { public void releaseSnapshot(final Snapshot snapshot) {
@ -204,14 +204,14 @@ class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segm
public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) { public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {
// from bound is inclusive. if the provided bound is null, replace with prefix // from bound is inclusive. if the provided bound is null, replace with prefix
final Bytes fromBound = from == null final Bytes fromBound = from == null
? prefixKeyFormatter.getPrefix() ? prefixKeyFormatter.prefix()
: prefixKeyFormatter.addPrefix(from); : prefixKeyFormatter.addPrefix(from);
// to bound is inclusive. if the provided bound is null, replace with the next prefix. // to bound is inclusive. if the provided bound is null, replace with the next prefix.
// this requires potentially filtering out the element corresponding to the next prefix // this requires potentially filtering out the element corresponding to the next prefix
// with empty bytes from the returned iterator. this filtering is accomplished by // with empty bytes from the returned iterator. this filtering is accomplished by
// passing the prefix filter into StrippedPrefixKeyValueIteratorAdapter(). // passing the prefix filter into StrippedPrefixKeyValueIteratorAdapter().
final Bytes toBound = to == null final Bytes toBound = to == null
? incrementWithoutOverflow(prefixKeyFormatter.getPrefix()) ? incrementWithoutOverflow(prefixKeyFormatter.prefix())
: prefixKeyFormatter.addPrefix(to); : prefixKeyFormatter.addPrefix(to);
final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = physicalStore.range( final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = physicalStore.range(
fromBound, fromBound,
@ -226,7 +226,7 @@ class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segm
@Override @Override
public synchronized KeyValueIterator<Bytes, byte[]> all() { public synchronized KeyValueIterator<Bytes, byte[]> all() {
final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = physicalStore.prefixScan( final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes = physicalStore.prefixScan(
prefixKeyFormatter.getPrefix(), prefixKeyFormatter.prefix(),
new BytesSerializer(), new BytesSerializer(),
openIterators); openIterators);
return new StrippedPrefixKeyValueIteratorAdapter( return new StrippedPrefixKeyValueIteratorAdapter(
@ -288,7 +288,7 @@ class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segm
return rawKey; return rawKey;
} }
Bytes getPrefix() { Bytes prefix() {
return Bytes.wrap(prefix); return Bytes.wrap(prefix);
} }

View File

@ -29,7 +29,7 @@ import java.util.Map;
* Regular segments with {@code segmentId >= 0} expire according to the specified * Regular segments with {@code segmentId >= 0} expire according to the specified
* retention period. "Reserved" segments with {@code segmentId < 0} do not expire * retention period. "Reserved" segments with {@code segmentId < 0} do not expire
* and are completely separate from regular segments in that methods such as * and are completely separate from regular segments in that methods such as
* {@link #getSegmentForTimestamp(long)}, {@link #getOrCreateSegment(long, ProcessorContext)}, * {@link #segmentForTimestamp(long)}, {@link #getOrCreateSegment(long, ProcessorContext)},
* {@link #getOrCreateSegmentIfLive(long, ProcessorContext, long)}, * {@link #getOrCreateSegmentIfLive(long, ProcessorContext, long)},
* {@link #segments(long, long, boolean)}, and {@link #allSegments(boolean)} * {@link #segments(long, long, boolean)}, and {@link #allSegments(boolean)}
* only return regular segments and not reserved segments. The methods {@link #flush()} * only return regular segments and not reserved segments. The methods {@link #flush()}

View File

@ -95,16 +95,16 @@ public class LogicalSegmentIterator implements VersionedRecordIterator {
// fact all use the same physical RocksDB under-the-hood. // fact all use the same physical RocksDB under-the-hood.
this.snapshotOwner = segment; this.snapshotOwner = segment;
// take a RocksDB snapshot to return the segments content at the query time (in order to guarantee consistency) // take a RocksDB snapshot to return the segments content at the query time (in order to guarantee consistency)
this.snapshot = snapshotOwner.getSnapshot(); this.snapshot = snapshotOwner.snapshot();
} }
final byte[] rawSegmentValue = segment.get(key, snapshot); final byte[] rawSegmentValue = segment.get(key, snapshot);
if (rawSegmentValue != null) { // this segment contains record(s) with the specified key if (rawSegmentValue != null) { // this segment contains record(s) with the specified key
if (segment.id() == -1) { // this is the latestValueStore if (segment.id() == -1) { // this is the latestValueStore
final long recordTimestamp = RocksDBVersionedStore.LatestValueFormatter.getTimestamp(rawSegmentValue); final long recordTimestamp = RocksDBVersionedStore.LatestValueFormatter.timestamp(rawSegmentValue);
if (recordTimestamp <= toTime) { if (recordTimestamp <= toTime) {
// latest value satisfies timestamp bound // latest value satisfies timestamp bound
queryResults.add(new VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.getValue(rawSegmentValue), recordTimestamp)); queryResults.add(new VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.value(rawSegmentValue), recordTimestamp));
} }
} else { } else {
// this segment contains records with the specified key and time range // this segment contains records with the specified key and time range

View File

@ -61,7 +61,6 @@ import java.util.function.Function;
import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
import static org.apache.kafka.streams.state.internals.StoreQueryUtils.getDeserializeValue;
/** /**
* A Metered {@link KeyValueStore} wrapper that is used for recording operation metrics, and hence its * A Metered {@link KeyValueStore} wrapper that is used for recording operation metrics, and hence its
@ -263,7 +262,7 @@ public class MeteredKeyValueStore<K, V>
final KeyValueIterator<K, V> resultIterator = new MeteredKeyValueTimestampedIterator( final KeyValueIterator<K, V> resultIterator = new MeteredKeyValueTimestampedIterator(
iterator, iterator,
getSensor, getSensor,
getDeserializeValue(serdes, wrapped()) StoreQueryUtils.deserializeValue(serdes, wrapped())
); );
final QueryResult<KeyValueIterator<K, V>> typedQueryResult = final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult( InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
@ -289,7 +288,7 @@ public class MeteredKeyValueStore<K, V>
final QueryResult<byte[]> rawResult = final QueryResult<byte[]> rawResult =
wrapped().query(rawKeyQuery, positionBound, config); wrapped().query(rawKeyQuery, positionBound, config);
if (rawResult.isSuccess()) { if (rawResult.isSuccess()) {
final Function<byte[], V> deserializer = getDeserializeValue(serdes, wrapped()); final Function<byte[], V> deserializer = StoreQueryUtils.deserializeValue(serdes, wrapped());
final V value = deserializer.apply(rawResult.getResult()); final V value = deserializer.apply(rawResult.getResult());
final QueryResult<V> typedQueryResult = final QueryResult<V> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, value); InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, value);

View File

@ -455,7 +455,7 @@ public class MeteredSessionStore<K, V>
iteratorDurationSensor, iteratorDurationSensor,
streamsMetrics, streamsMetrics,
serdes::keyFrom, serdes::keyFrom,
StoreQueryUtils.getDeserializeValue(serdes, wrapped()), StoreQueryUtils.deserializeValue(serdes, wrapped()),
time, time,
numOpenIterators, numOpenIterators,
openIterators openIterators

View File

@ -44,7 +44,6 @@ import java.util.function.Function;
import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
import static org.apache.kafka.streams.state.internals.StoreQueryUtils.getDeserializeValue;
/** /**
* A Metered {@link TimestampedKeyValueStore} wrapper that is used for recording operation metrics, and hence its * A Metered {@link TimestampedKeyValueStore} wrapper that is used for recording operation metrics, and hence its
@ -186,7 +185,7 @@ public class MeteredTimestampedKeyValueStore<K, V>
final QueryResult<byte[]> rawResult = final QueryResult<byte[]> rawResult =
wrapped().query(rawKeyQuery, positionBound, config); wrapped().query(rawKeyQuery, positionBound, config);
if (rawResult.isSuccess()) { if (rawResult.isSuccess()) {
final Function<byte[], ValueAndTimestamp<V>> deserializer = getDeserializeValue(serdes, wrapped()); final Function<byte[], ValueAndTimestamp<V>> deserializer = StoreQueryUtils.deserializeValue(serdes, wrapped());
final ValueAndTimestamp<V> valueAndTimestamp = deserializer.apply(rawResult.getResult()); final ValueAndTimestamp<V> valueAndTimestamp = deserializer.apply(rawResult.getResult());
final QueryResult<ValueAndTimestamp<V>> typedQueryResult = final QueryResult<ValueAndTimestamp<V>> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, valueAndTimestamp); InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, valueAndTimestamp);
@ -224,7 +223,7 @@ public class MeteredTimestampedKeyValueStore<K, V>
final KeyValueIterator<K, ValueAndTimestamp<V>> resultIterator = (KeyValueIterator<K, ValueAndTimestamp<V>>) new MeteredTimestampedKeyValueStoreIterator( final KeyValueIterator<K, ValueAndTimestamp<V>> resultIterator = (KeyValueIterator<K, ValueAndTimestamp<V>>) new MeteredTimestampedKeyValueStoreIterator(
iterator, iterator,
getSensor, getSensor,
getDeserializeValue(serdes, wrapped()), StoreQueryUtils.deserializeValue(serdes, wrapped()),
false false
); );
final QueryResult<KeyValueIterator<K, ValueAndTimestamp<V>>> typedQueryResult = final QueryResult<KeyValueIterator<K, ValueAndTimestamp<V>>> typedQueryResult =
@ -251,7 +250,7 @@ public class MeteredTimestampedKeyValueStore<K, V>
final QueryResult<byte[]> rawResult = final QueryResult<byte[]> rawResult =
wrapped().query(rawKeyQuery, positionBound, config); wrapped().query(rawKeyQuery, positionBound, config);
if (rawResult.isSuccess()) { if (rawResult.isSuccess()) {
final Function<byte[], ValueAndTimestamp<V>> deserializer = getDeserializeValue(serdes, wrapped()); final Function<byte[], ValueAndTimestamp<V>> deserializer = StoreQueryUtils.deserializeValue(serdes, wrapped());
final ValueAndTimestamp<V> valueAndTimestamp = deserializer.apply(rawResult.getResult()); final ValueAndTimestamp<V> valueAndTimestamp = deserializer.apply(rawResult.getResult());
final V plainValue = valueAndTimestamp == null ? null : valueAndTimestamp.value(); final V plainValue = valueAndTimestamp == null ? null : valueAndTimestamp.value();
final QueryResult<V> typedQueryResult = final QueryResult<V> typedQueryResult =
@ -290,7 +289,7 @@ public class MeteredTimestampedKeyValueStore<K, V>
final KeyValueIterator<K, V> resultIterator = new MeteredTimestampedKeyValueStoreIterator( final KeyValueIterator<K, V> resultIterator = new MeteredTimestampedKeyValueStoreIterator(
iterator, iterator,
getSensor, getSensor,
getDeserializeValue(serdes, wrapped()), StoreQueryUtils.deserializeValue(serdes, wrapped()),
true true
); );
final QueryResult<KeyValueIterator<K, V>> typedQueryResult = final QueryResult<KeyValueIterator<K, V>> typedQueryResult =

View File

@ -267,7 +267,7 @@ public class MeteredVersionedKeyValueStore<K, V>
rawResult.getResult(), rawResult.getResult(),
iteratorDurationSensor, iteratorDurationSensor,
time, time,
StoreQueryUtils.getDeserializeValue(plainValueSerdes), StoreQueryUtils.deserializeValue(plainValueSerdes),
numOpenIterators, numOpenIterators,
openIterators openIterators
); );

View File

@ -56,7 +56,6 @@ import java.util.concurrent.atomic.LongAdder;
import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
import static org.apache.kafka.streams.state.internals.StoreQueryUtils.getDeserializeValue;
public class MeteredWindowStore<K, V> public class MeteredWindowStore<K, V>
extends WrappedStateStore<WindowStore<Bytes, byte[]>, Windowed<K>, V> extends WrappedStateStore<WindowStore<Bytes, byte[]>, Windowed<K>, V>
@ -417,7 +416,7 @@ public class MeteredWindowStore<K, V>
iteratorDurationSensor, iteratorDurationSensor,
streamsMetrics, streamsMetrics,
serdes::keyFrom, serdes::keyFrom,
getDeserializeValue(serdes, wrapped()), StoreQueryUtils.deserializeValue(serdes, wrapped()),
time, time,
numOpenIterators, numOpenIterators,
openIterators openIterators
@ -469,7 +468,7 @@ public class MeteredWindowStore<K, V>
fetchSensor, fetchSensor,
iteratorDurationSensor, iteratorDurationSensor,
streamsMetrics, streamsMetrics,
getDeserializeValue(serdes, wrapped()), StoreQueryUtils.deserializeValue(serdes, wrapped()),
time, time,
numOpenIterators, numOpenIterators,
openIterators openIterators

View File

@ -365,7 +365,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
} }
} }
public Snapshot getSnapshot() { public Snapshot snapshot() {
return db.getSnapshot(); return db.getSnapshot();
} }

View File

@ -191,8 +191,8 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key); final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key);
if (rawLatestValueAndTimestamp != null) { if (rawLatestValueAndTimestamp != null) {
return new VersionedRecord<>( return new VersionedRecord<>(
LatestValueFormatter.getValue(rawLatestValueAndTimestamp), LatestValueFormatter.value(rawLatestValueAndTimestamp),
LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp) LatestValueFormatter.timestamp(rawLatestValueAndTimestamp)
); );
} else { } else {
return null; return null;
@ -210,11 +210,11 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
// still be returned (i.e., the latest record version per key never expires). // still be returned (i.e., the latest record version per key never expires).
final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key); final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key);
if (rawLatestValueAndTimestamp != null) { if (rawLatestValueAndTimestamp != null) {
final long latestTimestamp = LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp); final long latestTimestamp = LatestValueFormatter.timestamp(rawLatestValueAndTimestamp);
if (latestTimestamp <= asOfTimestamp) { if (latestTimestamp <= asOfTimestamp) {
// latest value satisfies timestamp bound // latest value satisfies timestamp bound
return new VersionedRecord<>( return new VersionedRecord<>(
LatestValueFormatter.getValue(rawLatestValueAndTimestamp), LatestValueFormatter.value(rawLatestValueAndTimestamp),
latestTimestamp latestTimestamp
); );
} }
@ -230,9 +230,9 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
// first check the latest value store // first check the latest value store
final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key); final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key);
if (rawLatestValueAndTimestamp != null) { if (rawLatestValueAndTimestamp != null) {
final long latestTimestamp = LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp); final long latestTimestamp = LatestValueFormatter.timestamp(rawLatestValueAndTimestamp);
if (latestTimestamp <= asOfTimestamp) { if (latestTimestamp <= asOfTimestamp) {
return new VersionedRecord<>(LatestValueFormatter.getValue(rawLatestValueAndTimestamp), latestTimestamp); return new VersionedRecord<>(LatestValueFormatter.value(rawLatestValueAndTimestamp), latestTimestamp);
} }
} }
@ -241,14 +241,14 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
for (final LogicalKeyValueSegment segment : segments) { for (final LogicalKeyValueSegment segment : segments) {
final byte[] rawSegmentValue = segment.get(key); final byte[] rawSegmentValue = segment.get(key);
if (rawSegmentValue != null) { if (rawSegmentValue != null) {
final long nextTs = RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue); final long nextTs = RocksDBVersionedStoreSegmentValueFormatter.nextTimestamp(rawSegmentValue);
if (nextTs <= asOfTimestamp) { if (nextTs <= asOfTimestamp) {
// this segment contains no data for the queried timestamp, so earlier segments // this segment contains no data for the queried timestamp, so earlier segments
// cannot either // cannot either
return null; return null;
} }
if (RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(rawSegmentValue) > asOfTimestamp) { if (RocksDBVersionedStoreSegmentValueFormatter.minTimestamp(rawSegmentValue) > asOfTimestamp) {
// the segment only contains data for after the queried timestamp. skip and // the segment only contains data for after the queried timestamp. skip and
// continue the search to earlier segments. as an optimization, this code // continue the search to earlier segments. as an optimization, this code
// could be updated to skip forward to the segment containing the minTimestamp // could be updated to skip forward to the segment containing the minTimestamp
@ -474,7 +474,7 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
/** /**
* @return the contents of the latest value store, for the given key * @return the contents of the latest value store, for the given key
*/ */
byte[] getLatestValue(Bytes key); byte[] latestValue(Bytes key);
/** /**
* Puts the provided key and value into the latest value store. * Puts the provided key and value into the latest value store.
@ -496,7 +496,7 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
* timestamp bound, in reverse order by segment id (and time), i.e., such that * timestamp bound, in reverse order by segment id (and time), i.e., such that
* the most recent segment is first * the most recent segment is first
*/ */
List<T> getReverseSegments(long timestampFrom); List<T> reversedSegments(long timestampFrom);
/** /**
* @return the segment id associated with the provided timestamp * @return the segment id associated with the provided timestamp
@ -510,7 +510,7 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
class RocksDBVersionedStoreClient implements VersionedStoreClient<LogicalKeyValueSegment> { class RocksDBVersionedStoreClient implements VersionedStoreClient<LogicalKeyValueSegment> {
@Override @Override
public byte[] getLatestValue(final Bytes key) { public byte[] latestValue(final Bytes key) {
return latestValueStore.get(key); return latestValueStore.get(key);
} }
@ -530,7 +530,7 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
} }
@Override @Override
public List<LogicalKeyValueSegment> getReverseSegments(final long timestampFrom) { public List<LogicalKeyValueSegment> reversedSegments(final long timestampFrom) {
return segmentStores.segments(timestampFrom, Long.MAX_VALUE, false); return segmentStores.segments(timestampFrom, Long.MAX_VALUE, false);
} }
@ -668,9 +668,9 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
// that the segment should be inserted into the latest value store. // that the segment should be inserted into the latest value store.
long foundTs = SENTINEL_TIMESTAMP; long foundTs = SENTINEL_TIMESTAMP;
final byte[] rawLatestValueAndTimestamp = versionedStoreClient.getLatestValue(key); final byte[] rawLatestValueAndTimestamp = versionedStoreClient.latestValue(key);
if (rawLatestValueAndTimestamp != null) { if (rawLatestValueAndTimestamp != null) {
final long latestValueStoreTimestamp = LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp); final long latestValueStoreTimestamp = LatestValueFormatter.timestamp(rawLatestValueAndTimestamp);
if (timestamp >= latestValueStoreTimestamp) { if (timestamp >= latestValueStoreTimestamp) {
// new record belongs in the latest value store // new record belongs in the latest value store
if (timestamp > latestValueStoreTimestamp) { if (timestamp > latestValueStoreTimestamp) {
@ -692,7 +692,7 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
// is expired.) so, there is nothing to do for this step if `segment == null`, // is expired.) so, there is nothing to do for this step if `segment == null`,
// but we do still update the latest value store with the new record below. // but we do still update the latest value store with the new record below.
if (segment != null) { if (segment != null) {
final byte[] rawValueToMove = LatestValueFormatter.getValue(rawLatestValueAndTimestamp); final byte[] rawValueToMove = LatestValueFormatter.value(rawLatestValueAndTimestamp);
final byte[] rawSegmentValue = segment.get(key); final byte[] rawSegmentValue = segment.get(key);
if (rawSegmentValue == null) { if (rawSegmentValue == null) {
segment.put( segment.put(
@ -734,11 +734,11 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
// initialize with current foundTs value // initialize with current foundTs value
long foundTs = prevFoundTs; long foundTs = prevFoundTs;
final List<T> segments = versionedStoreClient.getReverseSegments(timestamp); final List<T> segments = versionedStoreClient.reversedSegments(timestamp);
for (final T segment : segments) { for (final T segment : segments) {
final byte[] rawSegmentValue = segment.get(key); final byte[] rawSegmentValue = segment.get(key);
if (rawSegmentValue != null) { if (rawSegmentValue != null) {
final long foundNextTs = RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue); final long foundNextTs = RocksDBVersionedStoreSegmentValueFormatter.nextTimestamp(rawSegmentValue);
if (foundNextTs <= timestamp) { if (foundNextTs <= timestamp) {
// this segment (and all earlier segments) does not contain records affected by // this segment (and all earlier segments) does not contain records affected by
// this put. insert into the segment specified by foundTs (i.e., the next // this put. insert into the segment specified by foundTs (i.e., the next
@ -746,7 +746,7 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
return new PutStatus(false, foundTs); return new PutStatus(false, foundTs);
} }
final long foundMinTs = RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(rawSegmentValue); final long foundMinTs = RocksDBVersionedStoreSegmentValueFormatter.minTimestamp(rawSegmentValue);
if (foundMinTs <= timestamp) { if (foundMinTs <= timestamp) {
// the record being inserted belongs in this segment. // the record being inserted belongs in this segment.
// insert and conclude the procedure. // insert and conclude the procedure.
@ -906,7 +906,7 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
); );
} else { } else {
// insert as latest, since foundTs = sentinel means nothing later exists // insert as latest, since foundTs = sentinel means nothing later exists
if (RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue) == timestamp) { if (RocksDBVersionedStoreSegmentValueFormatter.nextTimestamp(rawSegmentValue) == timestamp) {
// next timestamp equal to put() timestamp already represents a tombstone, // next timestamp equal to put() timestamp already represents a tombstone,
// so no additional insertion is needed in this case // so no additional insertion is needed in this case
return foundTs; return foundTs;
@ -914,7 +914,7 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
final SegmentValue segmentValue final SegmentValue segmentValue
= RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue); = RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue);
segmentValue.insertAsLatest( segmentValue.insertAsLatest(
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue), RocksDBVersionedStoreSegmentValueFormatter.nextTimestamp(rawSegmentValue),
timestamp, timestamp,
null null
); );
@ -948,7 +948,7 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
.serialize() .serialize()
); );
} else { } else {
final long foundNextTs = RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue); final long foundNextTs = RocksDBVersionedStoreSegmentValueFormatter.nextTimestamp(rawSegmentValue);
if (foundNextTs <= timestamp) { if (foundNextTs <= timestamp) {
// insert as latest. this case is possible if the found segment is "degenerate" // insert as latest. this case is possible if the found segment is "degenerate"
// (cf RocksDBVersionedStoreSegmentValueFormatter.java for details) as older // (cf RocksDBVersionedStoreSegmentValueFormatter.java for details) as older
@ -980,7 +980,7 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
* @return the timestamp, from the latest value store value bytes (representing value * @return the timestamp, from the latest value store value bytes (representing value
* and timestamp) * and timestamp)
*/ */
static long getTimestamp(final byte[] rawLatestValueAndTimestamp) { static long timestamp(final byte[] rawLatestValueAndTimestamp) {
return ByteBuffer.wrap(rawLatestValueAndTimestamp).getLong(); return ByteBuffer.wrap(rawLatestValueAndTimestamp).getLong();
} }
@ -988,7 +988,7 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
* @return the actual record value, from the latest value store value bytes (representing * @return the actual record value, from the latest value store value bytes (representing
* value and timestamp) * value and timestamp)
*/ */
static byte[] getValue(final byte[] rawLatestValueAndTimestamp) { static byte[] value(final byte[] rawLatestValueAndTimestamp) {
final byte[] rawValue = new byte[rawLatestValueAndTimestamp.length - TIMESTAMP_SIZE]; final byte[] rawValue = new byte[rawLatestValueAndTimestamp.length - TIMESTAMP_SIZE];
System.arraycopy(rawLatestValueAndTimestamp, TIMESTAMP_SIZE, rawValue, 0, rawValue.length); System.arraycopy(rawLatestValueAndTimestamp, TIMESTAMP_SIZE, rawValue, 0, rawValue.length);
return rawValue; return rawValue;

View File

@ -90,7 +90,7 @@ public class RocksDBVersionedStoreRestoreWriteBuffer {
// flush segments first, as this is consistent with the store always writing to // flush segments first, as this is consistent with the store always writing to
// older segments/stores before later ones // older segments/stores before later ones
try (final WriteBatch segmentsBatch = new WriteBatch()) { try (final WriteBatch segmentsBatch = new WriteBatch()) {
final List<WriteBufferSegmentWithDbFallback> allSegments = restoreClient.getReverseSegments(Long.MIN_VALUE); final List<WriteBufferSegmentWithDbFallback> allSegments = restoreClient.reversedSegments(Long.MIN_VALUE);
if (allSegments.size() > 0) { if (allSegments.size() > 0) {
// collect entries into write batch // collect entries into write batch
for (final WriteBufferSegmentWithDbFallback bufferSegment : allSegments) { for (final WriteBufferSegmentWithDbFallback bufferSegment : allSegments) {
@ -186,12 +186,12 @@ public class RocksDBVersionedStoreRestoreWriteBuffer {
private class RocksDBVersionedStoreRestoreClient implements VersionedStoreClient<WriteBufferSegmentWithDbFallback> { private class RocksDBVersionedStoreRestoreClient implements VersionedStoreClient<WriteBufferSegmentWithDbFallback> {
@Override @Override
public byte[] getLatestValue(final Bytes key) { public byte[] latestValue(final Bytes key) {
final Optional<byte[]> bufferValue = latestValueWriteBuffer.get(key); final Optional<byte[]> bufferValue = latestValueWriteBuffer.get(key);
if (bufferValue != null) { if (bufferValue != null) {
return bufferValue.orElse(null); return bufferValue.orElse(null);
} }
return dbClient.getLatestValue(key); return dbClient.latestValue(key);
} }
@Override @Override
@ -221,13 +221,13 @@ public class RocksDBVersionedStoreRestoreWriteBuffer {
} }
@Override @Override
public List<WriteBufferSegmentWithDbFallback> getReverseSegments(final long timestampFrom) { public List<WriteBufferSegmentWithDbFallback> reversedSegments(final long timestampFrom) {
// head and not tail because the map is sorted in reverse order // head and not tail because the map is sorted in reverse order
final long segmentFrom = segmentIdForTimestamp(timestampFrom); final long segmentFrom = segmentIdForTimestamp(timestampFrom);
final List<WriteBufferSegmentWithDbFallback> bufferSegments = final List<WriteBufferSegmentWithDbFallback> bufferSegments =
new ArrayList<>(segmentsWriteBuffer.headMap(segmentFrom, true).values()); new ArrayList<>(segmentsWriteBuffer.headMap(segmentFrom, true).values());
final List<LogicalKeyValueSegment> dbSegments = dbClient.getReverseSegments(timestampFrom); final List<LogicalKeyValueSegment> dbSegments = dbClient.reversedSegments(timestampFrom);
// merge segments from db with segments from write buffer // merge segments from db with segments from write buffer
final List<WriteBufferSegmentWithDbFallback> allSegments = new ArrayList<>(); final List<WriteBufferSegmentWithDbFallback> allSegments = new ArrayList<>();

View File

@ -101,14 +101,14 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
/** /**
* @return the validTo timestamp of the latest record in the provided segment * @return the validTo timestamp of the latest record in the provided segment
*/ */
static long getNextTimestamp(final byte[] segmentValue) { static long nextTimestamp(final byte[] segmentValue) {
return ByteBuffer.wrap(segmentValue).getLong(0); return ByteBuffer.wrap(segmentValue).getLong(0);
} }
/** /**
* @return the (validFrom) timestamp of the earliest record in the provided segment. * @return the (validFrom) timestamp of the earliest record in the provided segment.
*/ */
static long getMinTimestamp(final byte[] segmentValue) { static long minTimestamp(final byte[] segmentValue) {
return ByteBuffer.wrap(segmentValue).getLong(TIMESTAMP_SIZE); return ByteBuffer.wrap(segmentValue).getLong(TIMESTAMP_SIZE);
} }
@ -271,9 +271,9 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
private PartiallyDeserializedSegmentValue(final byte[] segmentValue) { private PartiallyDeserializedSegmentValue(final byte[] segmentValue) {
this.segmentValue = segmentValue; this.segmentValue = segmentValue;
this.nextTimestamp = this.nextTimestamp =
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue); RocksDBVersionedStoreSegmentValueFormatter.nextTimestamp(segmentValue);
this.minTimestamp = this.minTimestamp =
RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue); RocksDBVersionedStoreSegmentValueFormatter.minTimestamp(segmentValue);
this.isDegenerate = nextTimestamp == minTimestamp; this.isDegenerate = nextTimestamp == minTimestamp;
resetDeserHelpers(); resetDeserHelpers();
} }

View File

@ -26,7 +26,7 @@ interface Segments<S extends Segment> {
String segmentName(final long segmentId); String segmentName(final long segmentId);
S getSegmentForTimestamp(final long timestamp); S segmentForTimestamp(final long timestamp);
S getOrCreateSegmentIfLive(final long segmentId, final ProcessorContext context, final long streamTime); S getOrCreateSegmentIfLive(final long segmentId, final ProcessorContext context, final long streamTime);

View File

@ -410,7 +410,7 @@ public final class StoreQueryUtils {
} }
@SuppressWarnings({"unchecked", "rawtypes"}) @SuppressWarnings({"unchecked", "rawtypes"})
public static <V> Function<byte[], V> getDeserializeValue(final StateSerdes<?, V> serdes, final StateStore wrapped) { public static <V> Function<byte[], V> deserializeValue(final StateSerdes<?, V> serdes, final StateStore wrapped) {
final Serde<V> valueSerde = serdes.valueSerde(); final Serde<V> valueSerde = serdes.valueSerde();
final boolean timestamped = WrappedStateStore.isTimestamped(wrapped) || isAdapter(wrapped); final boolean timestamped = WrappedStateStore.isTimestamped(wrapped) || isAdapter(wrapped);
final Deserializer<V> deserializer; final Deserializer<V> deserializer;
@ -435,7 +435,7 @@ public final class StoreQueryUtils {
} }
@SuppressWarnings({"unchecked", "rawtypes"}) @SuppressWarnings({"unchecked", "rawtypes"})
public static <V> Function<VersionedRecord<byte[]>, VersionedRecord<V>> getDeserializeValue(final StateSerdes<?, V> serdes) { public static <V> Function<VersionedRecord<byte[]>, VersionedRecord<V>> deserializeValue(final StateSerdes<?, V> serdes) {
final Serde<V> valueSerde = serdes.valueSerde(); final Serde<V> valueSerde = serdes.valueSerde();
final Deserializer<V> deserializer = valueSerde.deserializer(); final Deserializer<V> deserializer = valueSerde.deserializer();
return rawVersionedRecord -> rawVersionedRecord.validTo().isPresent() ? new VersionedRecord<>(deserializer.deserialize(serdes.topic(), rawVersionedRecord.value()), return rawVersionedRecord -> rawVersionedRecord.validTo().isPresent() ? new VersionedRecord<>(deserializer.deserialize(serdes.topic(), rawVersionedRecord.value()),

View File

@ -65,11 +65,11 @@ public class TimestampedKeyAndJoinSide<K> {
return leftSide; return leftSide;
} }
public K getKey() { public K key() {
return key; return key;
} }
public long getTimestamp() { public long timestamp() {
return timestamp; return timestamp;
} }

View File

@ -56,8 +56,8 @@ public class TimestampedKeyAndJoinSideSerializer<K> implements WrappingNullableS
@Override @Override
public byte[] serialize(final String topic, final TimestampedKeyAndJoinSide<K> data) { public byte[] serialize(final String topic, final TimestampedKeyAndJoinSide<K> data) {
final byte boolByte = (byte) (data.isLeftSide() ? 1 : 0); final byte boolByte = (byte) (data.isLeftSide() ? 1 : 0);
final byte[] keyBytes = keySerializer.serialize(topic, data.getKey()); final byte[] keyBytes = keySerializer.serialize(topic, data.key());
final byte[] timestampBytes = timestampSerializer.serialize(topic, data.getTimestamp()); final byte[] timestampBytes = timestampSerializer.serialize(topic, data.timestamp());
return ByteBuffer return ByteBuffer
.allocate(timestampBytes.length + 1 + keyBytes.length) .allocate(timestampBytes.length + 1 + keyBytes.length)

View File

@ -292,7 +292,6 @@ public class StreamsMetadataStateTest {
final KeyQueryMetadata expected = new KeyQueryMetadata(hostTwo, mkSet(hostOne), 2); final KeyQueryMetadata expected = new KeyQueryMetadata(hostTwo, mkSet(hostOne), 2);
final KeyQueryMetadata actual = metadataState.keyQueryMetadataForKey("merged-table", "the-key", final KeyQueryMetadata actual = metadataState.keyQueryMetadataForKey("merged-table", "the-key",
(topic, key, value, numPartitions) -> Optional.of(Collections.singleton(2))); (topic, key, value, numPartitions) -> Optional.of(Collections.singleton(2)));

View File

@ -133,7 +133,7 @@ public class KeyValueSegmentsTest {
public void shouldGetSegmentForTimestamp() { public void shouldGetSegmentForTimestamp() {
final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(0, context, -1L); final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(0, context, -1L);
segments.getOrCreateSegmentIfLive(1, context, -1L); segments.getOrCreateSegmentIfLive(1, context, -1L);
assertEquals(segment, segments.getSegmentForTimestamp(0L)); assertEquals(segment, segments.segmentForTimestamp(0L));
} }
@Test @Test
@ -169,11 +169,11 @@ public class KeyValueSegmentsTest {
segments = new KeyValueSegments("test", METRICS_SCOPE, 4, 1); segments = new KeyValueSegments("test", METRICS_SCOPE, 4, 1);
segments.openExisting(context, -1L); segments.openExisting(context, -1L);
assertTrue(segments.getSegmentForTimestamp(0).isOpen()); assertTrue(segments.segmentForTimestamp(0).isOpen());
assertTrue(segments.getSegmentForTimestamp(1).isOpen()); assertTrue(segments.segmentForTimestamp(1).isOpen());
assertTrue(segments.getSegmentForTimestamp(2).isOpen()); assertTrue(segments.segmentForTimestamp(2).isOpen());
assertTrue(segments.getSegmentForTimestamp(3).isOpen()); assertTrue(segments.segmentForTimestamp(3).isOpen());
assertTrue(segments.getSegmentForTimestamp(4).isOpen()); assertTrue(segments.segmentForTimestamp(4).isOpen());
} }
@Test @Test
@ -342,7 +342,7 @@ public class KeyValueSegmentsTest {
public void shouldClearSegmentsOnClose() { public void shouldClearSegmentsOnClose() {
segments.getOrCreateSegmentIfLive(0, context, -1L); segments.getOrCreateSegmentIfLive(0, context, -1L);
segments.close(); segments.close();
assertThat(segments.getSegmentForTimestamp(0), is(nullValue())); assertThat(segments.segmentForTimestamp(0), is(nullValue()));
} }
private void verifyCorrectSegments(final long first, final int numSegments) { private void verifyCorrectSegments(final long first, final int numSegments) {

View File

@ -167,10 +167,10 @@ public class LogicalKeyValueSegmentsTest {
final LogicalKeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context, 0L); final LogicalKeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context, 0L);
final LogicalKeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(1, context, SEGMENT_INTERVAL); final LogicalKeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(1, context, SEGMENT_INTERVAL);
assertEquals(segment1, segments.getSegmentForTimestamp(0L)); assertEquals(segment1, segments.segmentForTimestamp(0L));
assertEquals(segment1, segments.getSegmentForTimestamp(SEGMENT_INTERVAL - 1)); assertEquals(segment1, segments.segmentForTimestamp(SEGMENT_INTERVAL - 1));
assertEquals(segment2, segments.getSegmentForTimestamp(SEGMENT_INTERVAL)); assertEquals(segment2, segments.segmentForTimestamp(SEGMENT_INTERVAL));
assertEquals(segment2, segments.getSegmentForTimestamp(2 * SEGMENT_INTERVAL - 1)); assertEquals(segment2, segments.segmentForTimestamp(2 * SEGMENT_INTERVAL - 1));
} }
@Test @Test
@ -226,7 +226,7 @@ public class LogicalKeyValueSegmentsTest {
segments.close(); segments.close();
assertThat(segments.getSegmentForTimestamp(0), is(nullValue())); assertThat(segments.segmentForTimestamp(0), is(nullValue()));
assertThat(segments.getReservedSegment(-1), is(nullValue())); assertThat(segments.getReservedSegment(-1), is(nullValue()));
// verify iterators closed as well // verify iterators closed as well
assertThrows(InvalidStateStoreException.class, all1::hasNext); assertThrows(InvalidStateStoreException.class, all1::hasNext);

View File

@ -270,8 +270,8 @@ public class RocksDBVersionedStoreSegmentValueFormatterTest {
public void shouldGetTimestamps(final TestCase testCase) { public void shouldGetTimestamps(final TestCase testCase) {
final byte[] segmentValue = buildSegmentWithInsertLatest(testCase).serialize(); final byte[] segmentValue = buildSegmentWithInsertLatest(testCase).serialize();
assertThat(RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue), equalTo(testCase.nextTimestamp)); assertThat(RocksDBVersionedStoreSegmentValueFormatter.nextTimestamp(segmentValue), equalTo(testCase.nextTimestamp));
assertThat(RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue), equalTo(testCase.minTimestamp)); assertThat(RocksDBVersionedStoreSegmentValueFormatter.minTimestamp(segmentValue), equalTo(testCase.minTimestamp));
} }
@ParameterizedTest @ParameterizedTest

View File

@ -134,7 +134,7 @@ public class TimestampedSegmentsTest {
public void shouldGetSegmentForTimestamp() { public void shouldGetSegmentForTimestamp() {
final TimestampedSegment segment = segments.getOrCreateSegmentIfLive(0, context, -1L); final TimestampedSegment segment = segments.getOrCreateSegmentIfLive(0, context, -1L);
segments.getOrCreateSegmentIfLive(1, context, -1L); segments.getOrCreateSegmentIfLive(1, context, -1L);
assertEquals(segment, segments.getSegmentForTimestamp(0L)); assertEquals(segment, segments.segmentForTimestamp(0L));
} }
@Test @Test
@ -170,11 +170,11 @@ public class TimestampedSegmentsTest {
segments = new TimestampedSegments("test", METRICS_SCOPE, 4, 1); segments = new TimestampedSegments("test", METRICS_SCOPE, 4, 1);
segments.openExisting(context, -1L); segments.openExisting(context, -1L);
assertTrue(segments.getSegmentForTimestamp(0).isOpen()); assertTrue(segments.segmentForTimestamp(0).isOpen());
assertTrue(segments.getSegmentForTimestamp(1).isOpen()); assertTrue(segments.segmentForTimestamp(1).isOpen());
assertTrue(segments.getSegmentForTimestamp(2).isOpen()); assertTrue(segments.segmentForTimestamp(2).isOpen());
assertTrue(segments.getSegmentForTimestamp(3).isOpen()); assertTrue(segments.segmentForTimestamp(3).isOpen());
assertTrue(segments.getSegmentForTimestamp(4).isOpen()); assertTrue(segments.segmentForTimestamp(4).isOpen());
} }
@Test @Test
@ -343,7 +343,7 @@ public class TimestampedSegmentsTest {
public void shouldClearSegmentsOnClose() { public void shouldClearSegmentsOnClose() {
segments.getOrCreateSegmentIfLive(0, context, -1L); segments.getOrCreateSegmentIfLive(0, context, -1L);
segments.close(); segments.close();
assertThat(segments.getSegmentForTimestamp(0), is(nullValue())); assertThat(segments.segmentForTimestamp(0), is(nullValue()));
} }
private void verifyCorrectSegments(final long first, final int numSegments) { private void verifyCorrectSegments(final long first, final int numSegments) {