From 4c70581eb63fe74494fbabf5a90e87c38e17996d Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 20 Feb 2024 12:24:32 -0800 Subject: [PATCH] KAFKA-15770: IQv2 must return immutable position (#15219) ConsistencyVectorIntegrationTest failed frequently because the return Position from IQv2 is not immutable while the test assume immutability. To return a Position with a QueryResult that does not change, we need to deep copy the Position object. Reviewers: John Roesler , Lucas Brutschy --- ...tDualSchemaRocksDBSegmentedBytesStore.java | 45 ++++--- .../AbstractRocksDBSegmentedBytesStore.java | 30 +++-- .../state/internals/AbstractSegments.java | 6 + .../state/internals/CachingKeyValueStore.java | 67 ++++++---- .../internals/InMemoryKeyValueStore.java | 30 +++-- .../state/internals/InMemorySessionStore.java | 50 +++---- .../state/internals/InMemoryWindowStore.java | 66 +++++----- .../state/internals/KeyValueSegment.java | 3 + .../state/internals/KeyValueSegments.java | 2 +- .../internals/LogicalKeyValueSegments.java | 6 + .../state/internals/MemoryLRUCache.java | 34 +++-- .../streams/state/internals/RocksDBStore.java | 47 ++++--- .../internals/RocksDBTimestampedStore.java | 54 ++++---- .../internals/RocksDBVersionedStore.java | 124 ++++++++++-------- .../state/internals/StoreQueryUtils.java | 44 ++++--- .../state/internals/TimestampedSegment.java | 3 + .../state/internals/TimestampedSegments.java | 2 +- .../ConsistencyVectorIntegrationTest.java | 40 +++--- .../integration/IQv2IntegrationTest.java | 26 ++-- .../StoreUpgradeIntegrationTest.java | 3 +- .../state/internals/KeyValueSegmentTest.java | 21 +-- .../LogicalKeyValueSegmentsTest.java | 2 + .../state/internals/SegmentIteratorTest.java | 5 +- .../internals/TimestampedSegmentTest.java | 21 +-- 24 files changed, 405 insertions(+), 326 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java index 441c17201b4..284385a47de 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java @@ -193,17 +193,19 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore indexKeyValue = getIndexKeyValue(rawBaseKey, value); - segment.put(indexKeyValue.key, indexKeyValue.value); + // Put to index first so that if put to base failed, when we iterate index, we will + // find no base value. If put to base first but putting to index fails, when we iterate + // index, we can't find the key but if we iterate over base store, we can find the key + // which lead to inconsistency. + if (hasIndex()) { + final KeyValue indexKeyValue = getIndexKeyValue(rawBaseKey, value); + segment.put(indexKeyValue.key, indexKeyValue.value); + } + segment.put(rawBaseKey, value); } - segment.put(rawBaseKey, value); } } @@ -254,11 +256,12 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore> records) { - try { - final Map writeBatchMap = getWriteBatches(records); - for (final Map.Entry entry : writeBatchMap.entrySet()) { - final S segment = entry.getKey(); - final WriteBatch batch = entry.getValue(); - segment.write(batch); - batch.close(); + synchronized (position) { + try { + final Map writeBatchMap = getWriteBatches(records); + for (final Map.Entry entry : writeBatchMap.entrySet()) { + final S segment = entry.getKey(); + final WriteBatch batch = entry.getValue(); + segment.write(batch); + batch.close(); + } + } catch (final RocksDBException e) { + throw new ProcessorStateException("Error restoring batch to store " + this.name, e); } - } catch (final RocksDBException e) { - throw new ProcessorStateException("Error restoring batch to store " + this.name, e); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java index 79f4f460989..348d6bb1863 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java @@ -266,8 +266,10 @@ public class AbstractRocksDBSegmentedBytesStore implements Se expiredRecordSensor.record(1.0d, context.currentSystemTimeMs()); LOG.warn("Skipping record for expired segment."); } else { - StoreQueryUtils.updatePosition(position, stateStoreContext); - segment.put(key, value); + synchronized (position) { + StoreQueryUtils.updatePosition(position, stateStoreContext); + segment.put(key, value); + } } } @@ -308,11 +310,11 @@ public class AbstractRocksDBSegmentedBytesStore implements Se metrics ); - segments.openExisting(this.context, observedStreamTime); - final File positionCheckpointFile = new File(context.stateDir(), name() + ".position"); this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile); this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint); + segments.setPosition(position); + segments.openExisting(this.context, observedStreamTime); // register and possibly restore the state from the logs stateStoreContext.register( @@ -363,16 +365,18 @@ public class AbstractRocksDBSegmentedBytesStore implements Se // Visible for testing void restoreAllInternal(final Collection> records) { - try { - final Map writeBatchMap = getWriteBatches(records); - for (final Map.Entry entry : writeBatchMap.entrySet()) { - final S segment = entry.getKey(); - final WriteBatch batch = entry.getValue(); - segment.write(batch); - batch.close(); + synchronized (position) { + try { + final Map writeBatchMap = getWriteBatches(records); + for (final Map.Entry entry : writeBatchMap.entrySet()) { + final S segment = entry.getKey(); + final WriteBatch batch = entry.getValue(); + segment.write(batch); + batch.close(); + } + } catch (final RocksDBException e) { + throw new ProcessorStateException("Error restoring batch to store " + this.name, e); } - } catch (final RocksDBException e) { - throw new ProcessorStateException("Error restoring batch to store " + this.name, e); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java index 5c31894f6e3..495c3423b8f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.query.Position; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +44,7 @@ abstract class AbstractSegments implements Segments { private final long retentionPeriod; private final long segmentInterval; private final SimpleDateFormat formatter; + Position position; AbstractSegments(final String name, final long retentionPeriod, final long segmentInterval) { this.name = name; @@ -53,6 +55,10 @@ abstract class AbstractSegments implements Segments { this.formatter.setTimeZone(new SimpleTimeZone(0, "UTC")); } + public void setPosition(final Position position) { + this.position = position; + } + @Override public long segmentId(final long timestamp) { return timestamp / segmentInterval; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index 36f08412828..d89672daff2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -120,8 +120,13 @@ public class CachingKeyValueStore public Position getPosition() { // We return the merged position since the query uses the merged position as well final Position mergedPosition = Position.emptyPosition(); - mergedPosition.merge(position); - mergedPosition.merge(wrapped().getPosition()); + final Position wrappedPosition = wrapped().getPosition(); + synchronized (position) { + synchronized (wrappedPosition) { + mergedPosition.merge(position); + mergedPosition.merge(wrappedPosition); + } + } return mergedPosition; } @@ -183,25 +188,27 @@ public class CachingKeyValueStore final Bytes key = keyQuery.getKey(); - if (context.cache() != null) { - final LRUCacheEntry lruCacheEntry = context.cache().get(cacheName, key); - if (lruCacheEntry != null) { - final byte[] rawValue; - if (timestampedSchema && !WrappedStateStore.isTimestamped(wrapped()) && !StoreQueryUtils.isAdapter(wrapped())) { - rawValue = ValueAndTimestampDeserializer.rawValue(lruCacheEntry.value()); - } else { - rawValue = lruCacheEntry.value(); + synchronized (mergedPosition) { + if (context.cache() != null) { + final LRUCacheEntry lruCacheEntry = context.cache().get(cacheName, key); + if (lruCacheEntry != null) { + final byte[] rawValue; + if (timestampedSchema && !WrappedStateStore.isTimestamped(wrapped()) && !StoreQueryUtils.isAdapter(wrapped())) { + rawValue = ValueAndTimestampDeserializer.rawValue(lruCacheEntry.value()); + } else { + rawValue = lruCacheEntry.value(); + } + result = (QueryResult) QueryResult.forResult(rawValue); } - result = (QueryResult) QueryResult.forResult(rawValue); } - } - // We don't need to check the position at the state store since we already performed the check on - // the merged position above - if (result == null) { - result = wrapped().query(query, PositionBound.unbounded(), config); + // We don't need to check the position at the state store since we already performed the check on + // the merged position above + if (result == null) { + result = wrapped().query(query, PositionBound.unbounded(), config); + } + result.setPosition(mergedPosition.copy()); } - result.setPosition(mergedPosition); return result; } @@ -276,19 +283,21 @@ public class CachingKeyValueStore private void putInternal(final Bytes key, final byte[] value) { - context.cache().put( - cacheName, - key, - new LRUCacheEntry( - value, - context.headers(), - true, - context.offset(), - context.timestamp(), - context.partition(), - context.topic())); + synchronized (position) { + context.cache().put( + cacheName, + key, + new LRUCacheEntry( + value, + context.headers(), + true, + context.offset(), + context.timestamp(), + context.partition(), + context.topic())); - StoreQueryUtils.updatePosition(position, context); + StoreQueryUtils.updatePosition(position, context); + } } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java index 7599bff82b3..652db52f0c7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -79,13 +79,15 @@ public class InMemoryKeyValueStore implements KeyValueStore { context.register( root, (RecordBatchingStateRestoreCallback) records -> { - for (final ConsumerRecord record : records) { - put(Bytes.wrap(record.key()), record.value()); - ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition( - record, - consistencyEnabled, - position - ); + synchronized (position) { + for (final ConsumerRecord record : records) { + put(Bytes.wrap(record.key()), record.value()); + ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition( + record, + consistencyEnabled, + position + ); + } } } ); @@ -152,13 +154,15 @@ public class InMemoryKeyValueStore implements KeyValueStore { // the unlocked implementation of put method, to avoid multiple lock/unlock cost in `putAll` method private void putInternal(final Bytes key, final byte[] value) { - if (value == null) { - map.remove(key); - } else { - map.put(key, value); - } + synchronized (position) { + if (value == null) { + map.remove(key); + } else { + map.put(key, value); + } - StoreQueryUtils.updatePosition(position, context); + StoreQueryUtils.updatePosition(position, context); + } } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java index 579abc36782..67a6ea46b6c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java @@ -124,13 +124,15 @@ public class InMemorySessionStore implements SessionStore { context.register( root, (RecordBatchingStateRestoreCallback) records -> { - for (final ConsumerRecord record : records) { - put(SessionKeySchema.from(Bytes.wrap(record.key())), record.value()); - ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition( - record, - consistencyEnabled, - position - ); + synchronized (position) { + for (final ConsumerRecord record : records) { + put(SessionKeySchema.from(Bytes.wrap(record.key())), record.value()); + ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition( + record, + consistencyEnabled, + position + ); + } } } ); @@ -157,25 +159,27 @@ public class InMemorySessionStore implements SessionStore { final long windowEndTimestamp = sessionKey.window().end(); observedStreamTime = Math.max(observedStreamTime, windowEndTimestamp); - if (windowEndTimestamp <= observedStreamTime - retentionPeriod) { - // The provided context is not required to implement InternalProcessorContext, - // If it doesn't, we can't record this metric (in fact, we wouldn't have even initialized it). - if (expiredRecordSensor != null && context != null) { - expiredRecordSensor.record(1.0d, context.currentSystemTimeMs()); - } - LOG.warn("Skipping record for expired segment."); - } else { - if (aggregate != null) { - endTimeMap.computeIfAbsent(windowEndTimestamp, t -> new ConcurrentSkipListMap<>()); - final ConcurrentNavigableMap> keyMap = endTimeMap.get(windowEndTimestamp); - keyMap.computeIfAbsent(sessionKey.key(), t -> new ConcurrentSkipListMap<>()); - keyMap.get(sessionKey.key()).put(sessionKey.window().start(), aggregate); + synchronized (position) { + if (windowEndTimestamp <= observedStreamTime - retentionPeriod) { + // The provided context is not required to implement InternalProcessorContext, + // If it doesn't, we can't record this metric (in fact, we wouldn't have even initialized it). + if (expiredRecordSensor != null && context != null) { + expiredRecordSensor.record(1.0d, context.currentSystemTimeMs()); + } + LOG.warn("Skipping record for expired segment."); } else { - remove(sessionKey); + if (aggregate != null) { + endTimeMap.computeIfAbsent(windowEndTimestamp, t -> new ConcurrentSkipListMap<>()); + final ConcurrentNavigableMap> keyMap = endTimeMap.get(windowEndTimestamp); + keyMap.computeIfAbsent(sessionKey.key(), t -> new ConcurrentSkipListMap<>()); + keyMap.get(sessionKey.key()).put(sessionKey.window().start(), aggregate); + } else { + remove(sessionKey); + } } - } - StoreQueryUtils.updatePosition(position, stateStoreContext); + StoreQueryUtils.updatePosition(position, stateStoreContext); + } } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index 2ddeadc3585..2f85a1b4cc6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -123,17 +123,19 @@ public class InMemoryWindowStore implements WindowStore { context.register( root, (RecordBatchingStateRestoreCallback) records -> { - for (final ConsumerRecord record : records) { - put( - Bytes.wrap(extractStoreKeyBytes(record.key())), - record.value(), - extractStoreTimestamp(record.key()) - ); - ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition( - record, - consistencyEnabled, - position - ); + synchronized (position) { + for (final ConsumerRecord record : records) { + put( + Bytes.wrap(extractStoreKeyBytes(record.key())), + record.value(), + extractStoreTimestamp(record.key()) + ); + ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition( + record, + consistencyEnabled, + position + ); + } } } ); @@ -158,28 +160,30 @@ public class InMemoryWindowStore implements WindowStore { removeExpiredSegments(); observedStreamTime = Math.max(observedStreamTime, windowStartTimestamp); - if (windowStartTimestamp <= observedStreamTime - retentionPeriod) { - expiredRecordSensor.record(1.0d, context.currentSystemTimeMs()); - LOG.warn("Skipping record for expired segment."); - } else { - if (value != null) { - maybeUpdateSeqnumForDups(); - final Bytes keyBytes = retainDuplicates ? wrapForDups(key, seqnum) : key; - segmentMap.computeIfAbsent(windowStartTimestamp, t -> new ConcurrentSkipListMap<>()); - segmentMap.get(windowStartTimestamp).put(keyBytes, value); - } else if (!retainDuplicates) { - // Skip if value is null and duplicates are allowed since this delete is a no-op - segmentMap.computeIfPresent(windowStartTimestamp, (t, kvMap) -> { - kvMap.remove(key); - if (kvMap.isEmpty()) { - segmentMap.remove(windowStartTimestamp); - } - return kvMap; - }); + synchronized (position) { + if (windowStartTimestamp <= observedStreamTime - retentionPeriod) { + expiredRecordSensor.record(1.0d, context.currentSystemTimeMs()); + LOG.warn("Skipping record for expired segment."); + } else { + if (value != null) { + maybeUpdateSeqnumForDups(); + final Bytes keyBytes = retainDuplicates ? wrapForDups(key, seqnum) : key; + segmentMap.computeIfAbsent(windowStartTimestamp, t -> new ConcurrentSkipListMap<>()); + segmentMap.get(windowStartTimestamp).put(keyBytes, value); + } else if (!retainDuplicates) { + // Skip if value is null and duplicates are allowed since this delete is a no-op + segmentMap.computeIfPresent(windowStartTimestamp, (t, kvMap) -> { + kvMap.remove(key); + if (kvMap.isEmpty()) { + segmentMap.remove(windowStartTimestamp); + } + return kvMap; + }); + } } - } - StoreQueryUtils.updatePosition(position, stateStoreContext); + StoreQueryUtils.updatePosition(position, stateStoreContext); + } } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java index 66c55fc9d92..1cf631c3724 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; import java.io.File; @@ -31,9 +32,11 @@ class KeyValueSegment extends RocksDBStore implements Comparable { return segments.get(segmentId); } else { final KeyValueSegment newSegment = - new KeyValueSegment(segmentName(segmentId), name, segmentId, metricsRecorder); + new KeyValueSegment(segmentName(segmentId), name, segmentId, position, metricsRecorder); if (segments.put(segmentId, newSegment) != null) { throw new IllegalStateException("KeyValueSegment already exists. Possible concurrent access."); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java index e099b0b81a6..2b445ed41cd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; +import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; /** @@ -53,6 +54,11 @@ public class LogicalKeyValueSegments extends AbstractSegments { root, (RecordBatchingStateRestoreCallback) records -> { restoring = true; - for (final ConsumerRecord record : records) { - put(Bytes.wrap(record.key()), record.value()); - ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition( - record, - consistencyEnabled, - position - ); + synchronized (position) { + for (final ConsumerRecord record : records) { + put(Bytes.wrap(record.key()), record.value()); + ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition( + record, + consistencyEnabled, + position + ); + } } restoring = false; } @@ -147,12 +149,14 @@ public class MemoryLRUCache implements KeyValueStore { @Override public synchronized void put(final Bytes key, final byte[] value) { Objects.requireNonNull(key); - if (value == null) { - delete(key); - } else { - this.map.put(key, value); + synchronized (position) { + if (value == null) { + delete(key); + } else { + this.map.put(key, value); + } + StoreQueryUtils.updatePosition(position, context); } - StoreQueryUtils.updatePosition(position, context); } @Override @@ -175,8 +179,10 @@ public class MemoryLRUCache implements KeyValueStore { @Override public synchronized byte[] delete(final Bytes key) { Objects.requireNonNull(key); - StoreQueryUtils.updatePosition(position, context); - return this.map.remove(key); + synchronized (position) { + StoreQueryUtils.updatePosition(position, context); + return this.map.remove(key); + } } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index b09f8e1ddad..0dfae3499c1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -391,9 +391,11 @@ public class RocksDBStore implements KeyValueStore, BatchWritingS final byte[] value) { Objects.requireNonNull(key, "key cannot be null"); validateStoreOpen(); - cfAccessor.put(dbAccessor, key.get(), value); - StoreQueryUtils.updatePosition(position, context); + synchronized (position) { + cfAccessor.put(dbAccessor, key.get(), value); + StoreQueryUtils.updatePosition(position, context); + } } @Override @@ -409,12 +411,14 @@ public class RocksDBStore implements KeyValueStore, BatchWritingS @Override public void putAll(final List> entries) { - try (final WriteBatch batch = new WriteBatch()) { - cfAccessor.prepareBatch(entries, batch); - write(batch); - StoreQueryUtils.updatePosition(position, context); - } catch (final RocksDBException e) { - throw new ProcessorStateException("Error while batch writing to store " + name, e); + synchronized (position) { + try (final WriteBatch batch = new WriteBatch()) { + cfAccessor.prepareBatch(entries, batch); + write(batch); + StoreQueryUtils.updatePosition(position, context); + } catch (final RocksDBException e) { + throw new ProcessorStateException("Error while batch writing to store " + name, e); + } } } @@ -980,21 +984,22 @@ public class RocksDBStore implements KeyValueStore, BatchWritingS } void restoreBatch(final Collection> records) { - try (final WriteBatch batch = new WriteBatch()) { - for (final ConsumerRecord record : records) { - ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition( - record, - consistencyEnabled, - position - ); - // If version headers are not present or version is V0 - cfAccessor.addToBatch(record.key(), record.value(), batch); + synchronized (position) { + try (final WriteBatch batch = new WriteBatch()) { + for (final ConsumerRecord record : records) { + ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition( + record, + consistencyEnabled, + position + ); + // If version headers are not present or version is V0 + cfAccessor.addToBatch(record.key(), record.value(), batch); + } + write(batch); + } catch (final RocksDBException e) { + throw new ProcessorStateException("Error restoring batch to store " + name, e); } - write(batch); - } catch (final RocksDBException e) { - throw new ProcessorStateException("Error restoring batch to store " + name, e); } - } // for testing diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java index d4ba8ccb072..7b75649ba71 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java @@ -101,32 +101,34 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped public void put(final DBAccessor accessor, final byte[] key, final byte[] valueWithTimestamp) { - if (valueWithTimestamp == null) { - try { - accessor.delete(oldColumnFamily, key); - } catch (final RocksDBException e) { - // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. - throw new ProcessorStateException("Error while removing key from store " + name, e); - } - try { - accessor.delete(newColumnFamily, key); - } catch (final RocksDBException e) { - // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. - throw new ProcessorStateException("Error while removing key from store " + name, e); - } - } else { - try { - accessor.delete(oldColumnFamily, key); - } catch (final RocksDBException e) { - // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. - throw new ProcessorStateException("Error while removing key from store " + name, e); - } - try { - accessor.put(newColumnFamily, key, valueWithTimestamp); - StoreQueryUtils.updatePosition(position, context); - } catch (final RocksDBException e) { - // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. - throw new ProcessorStateException("Error while putting key/value into store " + name, e); + synchronized (position) { + if (valueWithTimestamp == null) { + try { + accessor.delete(oldColumnFamily, key); + } catch (final RocksDBException e) { + // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. + throw new ProcessorStateException("Error while removing key from store " + name, e); + } + try { + accessor.delete(newColumnFamily, key); + } catch (final RocksDBException e) { + // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. + throw new ProcessorStateException("Error while removing key from store " + name, e); + } + } else { + try { + accessor.delete(oldColumnFamily, key); + } catch (final RocksDBException e) { + // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. + throw new ProcessorStateException("Error while removing key from store " + name, e); + } + try { + accessor.put(newColumnFamily, key, valueWithTimestamp); + StoreQueryUtils.updatePosition(position, context); + } catch (final RocksDBException e) { + // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores. + throw new ProcessorStateException("Error while putting key/value into store " + name, e); + } } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java index 3379d3cebc9..52ee40b65af 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java @@ -130,24 +130,27 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore existingRecord = get(key, timestamp); + + observedStreamTime = Math.max(observedStreamTime, timestamp); + doPut( + versionedStoreClient, + observedStreamTime, + key, + null, + timestamp + ); + + StoreQueryUtils.updatePosition(position, stateStoreContext); + + return existingRecord; } - - final VersionedRecord existingRecord = get(key, timestamp); - - observedStreamTime = Math.max(observedStreamTime, timestamp); - doPut( - versionedStoreClient, - observedStreamTime, - key, - null, - timestamp - ); - - StoreQueryUtils.updatePosition(position, stateStoreContext); - - return existingRecord; } @Override @@ -361,11 +366,11 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore record : records) { - if (record.timestamp() < observedStreamTime - gracePeriod) { - // record is older than grace period and was therefore never written to the store - continue; - } - // advance observed stream time as usual, for use in deciding whether records have - // exceeded the store's grace period and should be dropped. - observedStreamTime = Math.max(observedStreamTime, record.timestamp()); + synchronized (position) { + for (final ConsumerRecord record : records) { + if (record.timestamp() < observedStreamTime - gracePeriod) { + // record is older than grace period and was therefore never written to the store + continue; + } + // advance observed stream time as usual, for use in deciding whether records have + // exceeded the store's grace period and should be dropped. + observedStreamTime = Math.max(observedStreamTime, record.timestamp()); - ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition( + ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition( record, consistencyEnabled, position - ); + ); - // put records to write buffer - doPut( + // put records to write buffer + doPut( restoreClient, endOfBatchStreamTime, new Bytes(record.key()), record.value(), record.timestamp() - ); + ); + } + + try { + restoreWriteBuffer.flush(); + } catch (final RocksDBException e) { + throw new ProcessorStateException("Error restoring batch to store " + name, e); + } } - try { - restoreWriteBuffer.flush(); - } catch (final RocksDBException e) { - throw new ProcessorStateException("Error restoring batch to store " + name, e); - } } private void validateStoreOpen() { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java index 1609e8b2c5d..fa2081ad25b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java @@ -125,28 +125,30 @@ public final class StoreQueryUtils { final QueryResult result; final QueryHandler handler = QUERY_HANDLER_MAP.get(query.getClass()); - if (handler == null) { - result = QueryResult.forUnknownQueryType(query, store); - } else if (context == null || !isPermitted(position, positionBound, context.taskId().partition())) { - result = QueryResult.notUpToBound( - position, - positionBound, - context == null ? null : context.taskId().partition() - ); - } else { - result = (QueryResult) handler.apply( - query, - positionBound, - config, - store - ); + synchronized (position) { + if (handler == null) { + result = QueryResult.forUnknownQueryType(query, store); + } else if (context == null || !isPermitted(position, positionBound, context.taskId().partition())) { + result = QueryResult.notUpToBound( + position, + positionBound, + context == null ? null : context.taskId().partition() + ); + } else { + result = (QueryResult) handler.apply( + query, + positionBound, + config, + store + ); + } + if (config.isCollectExecutionInfo()) { + result.addExecutionInfo( + "Handled in " + store.getClass() + " in " + (System.nanoTime() - start) + "ns" + ); + } + result.setPosition(position.copy()); } - if (config.isCollectExecutionInfo()) { - result.addExecutionInfo( - "Handled in " + store.getClass() + " in " + (System.nanoTime() - start) + "ns" - ); - } - result.setPosition(position); return result; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java index f0e4cf6132e..1bf07afab77 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; import java.io.File; @@ -31,9 +32,11 @@ class TimestampedSegment extends RocksDBTimestampedStore implements Comparable { return segments.get(segmentId); } else { final TimestampedSegment newSegment = - new TimestampedSegment(segmentName(segmentId), name, segmentId, metricsRecorder); + new TimestampedSegment(segmentName(segmentId), name, segmentId, position, metricsRecorder); if (segments.put(segmentId, newSegment) != null) { throw new IllegalStateException("TimestampedSegment already exists. Possible concurrent access."); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java index 3e809467b08..e5d8e5ae133 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java @@ -38,8 +38,6 @@ import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.query.StateQueryRequest; import org.apache.kafka.streams.query.StateQueryResult; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.QueryableStoreType; -import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; import org.junit.After; @@ -54,7 +52,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Objects; import java.util.Properties; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -63,7 +60,6 @@ import java.util.stream.IntStream; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; -import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -76,6 +72,8 @@ public class ConsistencyVectorIntegrationTest { private static int port = 0; private static final String INPUT_TOPIC_NAME = "input-topic"; private static final String TABLE_NAME = "source-table"; + private static final int KEY = 1; + private static final int NUMBER_OF_MESSAGES = 100; public final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(NUM_BROKERS); @@ -101,12 +99,9 @@ public class ConsistencyVectorIntegrationTest { @Test public void shouldHaveSamePositionBoundActiveAndStandBy() throws Exception { - final int batch1NumMessages = 100; - final int key = 1; final Semaphore semaphore = new Semaphore(0); final StreamsBuilder builder = new StreamsBuilder(); - Objects.requireNonNull(TABLE_NAME, "name cannot be null"); builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()), Materialized.>as(TABLE_NAME) @@ -123,30 +118,30 @@ public class ConsistencyVectorIntegrationTest { try { startApplicationAndWaitUntilRunning(kafkaStreamsList); - produceValueRange(key, 0, batch1NumMessages); + produceValueRange(); // Assert that all messages in the first batch were processed in a timely manner - assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true))); - - final QueryableStoreType> queryableStoreType = keyValueStore(); + assertThat( + "Did not process all message in time.", + semaphore.tryAcquire(NUMBER_OF_MESSAGES, 120, TimeUnit.SECONDS), is(equalTo(true)) + ); // Assert that both active and standby have the same position bound final StateQueryRequest request = StateQueryRequest .inStore(TABLE_NAME) - .withQuery(KeyQuery.withKey(key)) + .withQuery(KeyQuery.withKey(KEY)) .withPositionBound(PositionBound.unbounded()); - checkPosition(batch1NumMessages, request, kafkaStreams1); - checkPosition(batch1NumMessages, request, kafkaStreams2); + checkPosition(request, kafkaStreams1); + checkPosition(request, kafkaStreams2); } finally { kafkaStreams1.close(); kafkaStreams2.close(); } } - private void checkPosition(final int batch1NumMessages, - final StateQueryRequest request, + private void checkPosition(final StateQueryRequest request, final KafkaStreams kafkaStreams1) throws InterruptedException { final long maxWaitMs = TestUtils.DEFAULT_MAX_WAIT_MS; final long expectedEnd = System.currentTimeMillis() + maxWaitMs; @@ -170,7 +165,7 @@ public class ConsistencyVectorIntegrationTest { ) ); - if (queryResult.getResult() == batch1NumMessages - 1) { + if (queryResult.getResult() == NUMBER_OF_MESSAGES - 1) { // we're at the end of the input. return; } @@ -187,14 +182,13 @@ public class ConsistencyVectorIntegrationTest { } - private KafkaStreams createKafkaStreams(final StreamsBuilder builder, final Properties config) { final KafkaStreams streams = new KafkaStreams(builder.build(config), config); streamsToCleanup.add(streams); return streams; } - private void produceValueRange(final int key, final int start, final int endExclusive) { + private void produceValueRange() { final Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); @@ -202,8 +196,8 @@ public class ConsistencyVectorIntegrationTest { IntegrationTestUtils.produceKeyValuesSynchronously( INPUT_TOPIC_NAME, - IntStream.range(start, endExclusive) - .mapToObj(i -> KeyValue.pair(key, i)) + IntStream.range(0, NUMBER_OF_MESSAGES) + .mapToObj(i -> KeyValue.pair(KEY, i)) .collect(Collectors.toList()), producerProps, mockTime @@ -216,8 +210,8 @@ public class ConsistencyVectorIntegrationTest { config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port)); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); - config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class); + config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class); config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java index becbe301ecb..edfc27e458f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java @@ -319,28 +319,36 @@ public class IQv2IntegrationTest { @Override public void put(final Bytes key, final byte[] value) { - map.put(key, value); - StoreQueryUtils.updatePosition(position, context); + synchronized (position) { + map.put(key, value); + StoreQueryUtils.updatePosition(position, context); + } } @Override public byte[] putIfAbsent(final Bytes key, final byte[] value) { - StoreQueryUtils.updatePosition(position, context); - return map.putIfAbsent(key, value); + synchronized (position) { + StoreQueryUtils.updatePosition(position, context); + return map.putIfAbsent(key, value); + } } @Override public void putAll(final List> entries) { - StoreQueryUtils.updatePosition(position, context); - for (final KeyValue entry : entries) { - map.put(entry.key, entry.value); + synchronized (position) { + StoreQueryUtils.updatePosition(position, context); + for (final KeyValue entry : entries) { + map.put(entry.key, entry.value); + } } } @Override public byte[] delete(final Bytes key) { - StoreQueryUtils.updatePosition(position, context); - return map.remove(key); + synchronized (position) { + StoreQueryUtils.updatePosition(position, context); + return map.remove(key); + } } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java index c6ed805ac4c..5db747b1b46 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java @@ -394,7 +394,8 @@ public class StoreUpgradeIntegrationTest { } }, 60_000L, - "Could not get expected result in time."); + 5_000L, + () -> "Could not get expected result in time."); } private void verifyCountWithSurrogateTimestamp(final K key, diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java index e4543d2b56b..a8d2eb2bbce 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; import org.apache.kafka.test.TestUtils; import org.junit.Before; @@ -60,7 +61,7 @@ public class KeyValueSegmentTest { @Test public void shouldDeleteStateDirectoryOnDestroy() throws Exception { - final KeyValueSegment segment = new KeyValueSegment("segment", "window", 0L, metricsRecorder); + final KeyValueSegment segment = new KeyValueSegment("segment", "window", 0L, Position.emptyPosition(), metricsRecorder); final String directoryPath = TestUtils.tempDirectory().getAbsolutePath(); final File directory = new File(directoryPath); @@ -82,10 +83,10 @@ public class KeyValueSegmentTest { @Test public void shouldBeEqualIfIdIsEqual() { - final KeyValueSegment segment = new KeyValueSegment("anyName", "anyName", 0L, metricsRecorder); + final KeyValueSegment segment = new KeyValueSegment("anyName", "anyName", 0L, Position.emptyPosition(), metricsRecorder); final KeyValueSegment segmentSameId = - new KeyValueSegment("someOtherName", "someOtherName", 0L, metricsRecorder); - final KeyValueSegment segmentDifferentId = new KeyValueSegment("anyName", "anyName", 1L, metricsRecorder); + new KeyValueSegment("someOtherName", "someOtherName", 0L, Position.emptyPosition(), metricsRecorder); + final KeyValueSegment segmentDifferentId = new KeyValueSegment("anyName", "anyName", 1L, Position.emptyPosition(), metricsRecorder); assertThat(segment, equalTo(segment)); assertThat(segment, equalTo(segmentSameId)); @@ -98,10 +99,10 @@ public class KeyValueSegmentTest { @Test public void shouldHashOnSegmentIdOnly() { - final KeyValueSegment segment = new KeyValueSegment("anyName", "anyName", 0L, metricsRecorder); + final KeyValueSegment segment = new KeyValueSegment("anyName", "anyName", 0L, Position.emptyPosition(), metricsRecorder); final KeyValueSegment segmentSameId = - new KeyValueSegment("someOtherName", "someOtherName", 0L, metricsRecorder); - final KeyValueSegment segmentDifferentId = new KeyValueSegment("anyName", "anyName", 1L, metricsRecorder); + new KeyValueSegment("someOtherName", "someOtherName", 0L, Position.emptyPosition(), metricsRecorder); + final KeyValueSegment segmentDifferentId = new KeyValueSegment("anyName", "anyName", 1L, Position.emptyPosition(), metricsRecorder); final Set set = new HashSet<>(); assertTrue(set.add(segment)); @@ -113,9 +114,9 @@ public class KeyValueSegmentTest { @Test public void shouldCompareSegmentIdOnly() { - final KeyValueSegment segment1 = new KeyValueSegment("a", "C", 50L, metricsRecorder); - final KeyValueSegment segment2 = new KeyValueSegment("b", "B", 100L, metricsRecorder); - final KeyValueSegment segment3 = new KeyValueSegment("c", "A", 0L, metricsRecorder); + final KeyValueSegment segment1 = new KeyValueSegment("a", "C", 50L, Position.emptyPosition(), metricsRecorder); + final KeyValueSegment segment2 = new KeyValueSegment("b", "B", 100L, Position.emptyPosition(), metricsRecorder); + final KeyValueSegment segment3 = new KeyValueSegment("c", "A", 0L, Position.emptyPosition(), metricsRecorder); assertThat(segment1.compareTo(segment1), equalTo(0)); assertThat(segment1.compareTo(segment2), equalTo(-1)); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java index edc53e4c700..a28c55f70de 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; import org.apache.kafka.test.InternalMockProcessorContext; @@ -69,6 +70,7 @@ public class LogicalKeyValueSegmentsTest { SEGMENT_INTERVAL, new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME) ); + segments.setPosition(Position.emptyPosition()); segments.openExisting(context, 0L); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java index 31ce3c69636..70ca80edb4d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.MockRecordCollector; @@ -46,9 +47,9 @@ public class SegmentIteratorTest { private final RocksDBMetricsRecorder rocksDBMetricsRecorder = new RocksDBMetricsRecorder("metrics-scope", "store-name"); private final KeyValueSegment segmentOne = - new KeyValueSegment("one", "one", 0, rocksDBMetricsRecorder); + new KeyValueSegment("one", "one", 0, Position.emptyPosition(), rocksDBMetricsRecorder); private final KeyValueSegment segmentTwo = - new KeyValueSegment("two", "window", 1, rocksDBMetricsRecorder); + new KeyValueSegment("two", "window", 1, Position.emptyPosition(), rocksDBMetricsRecorder); private final HasNextCondition hasNextCondition = Iterator::hasNext; private SegmentIterator iterator = null; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java index 062fb808ed8..dd31d294c61 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; import org.apache.kafka.test.TestUtils; import org.junit.Before; @@ -60,7 +61,7 @@ public class TimestampedSegmentTest { @Test public void shouldDeleteStateDirectoryOnDestroy() throws Exception { - final TimestampedSegment segment = new TimestampedSegment("segment", "window", 0L, metricsRecorder); + final TimestampedSegment segment = new TimestampedSegment("segment", "window", 0L, Position.emptyPosition(), metricsRecorder); final String directoryPath = TestUtils.tempDirectory().getAbsolutePath(); final File directory = new File(directoryPath); @@ -82,11 +83,11 @@ public class TimestampedSegmentTest { @Test public void shouldBeEqualIfIdIsEqual() { - final TimestampedSegment segment = new TimestampedSegment("anyName", "anyName", 0L, metricsRecorder); + final TimestampedSegment segment = new TimestampedSegment("anyName", "anyName", 0L, Position.emptyPosition(), metricsRecorder); final TimestampedSegment segmentSameId = - new TimestampedSegment("someOtherName", "someOtherName", 0L, metricsRecorder); + new TimestampedSegment("someOtherName", "someOtherName", 0L, Position.emptyPosition(), metricsRecorder); final TimestampedSegment segmentDifferentId = - new TimestampedSegment("anyName", "anyName", 1L, metricsRecorder); + new TimestampedSegment("anyName", "anyName", 1L, Position.emptyPosition(), metricsRecorder); assertThat(segment, equalTo(segment)); assertThat(segment, equalTo(segmentSameId)); @@ -101,11 +102,11 @@ public class TimestampedSegmentTest { @Test public void shouldHashOnSegmentIdOnly() { - final TimestampedSegment segment = new TimestampedSegment("anyName", "anyName", 0L, metricsRecorder); + final TimestampedSegment segment = new TimestampedSegment("anyName", "anyName", 0L, Position.emptyPosition(), metricsRecorder); final TimestampedSegment segmentSameId = - new TimestampedSegment("someOtherName", "someOtherName", 0L, metricsRecorder); + new TimestampedSegment("someOtherName", "someOtherName", 0L, Position.emptyPosition(), metricsRecorder); final TimestampedSegment segmentDifferentId = - new TimestampedSegment("anyName", "anyName", 1L, metricsRecorder); + new TimestampedSegment("anyName", "anyName", 1L, Position.emptyPosition(), metricsRecorder); final Set set = new HashSet<>(); assertTrue(set.add(segment)); @@ -119,9 +120,9 @@ public class TimestampedSegmentTest { @Test public void shouldCompareSegmentIdOnly() { - final TimestampedSegment segment1 = new TimestampedSegment("a", "C", 50L, metricsRecorder); - final TimestampedSegment segment2 = new TimestampedSegment("b", "B", 100L, metricsRecorder); - final TimestampedSegment segment3 = new TimestampedSegment("c", "A", 0L, metricsRecorder); + final TimestampedSegment segment1 = new TimestampedSegment("a", "C", 50L, Position.emptyPosition(), metricsRecorder); + final TimestampedSegment segment2 = new TimestampedSegment("b", "B", 100L, Position.emptyPosition(), metricsRecorder); + final TimestampedSegment segment3 = new TimestampedSegment("c", "A", 0L, Position.emptyPosition(), metricsRecorder); assertThat(segment1.compareTo(segment1), equalTo(0)); assertThat(segment1.compareTo(segment2), equalTo(-1));