mirror of https://github.com/apache/kafka.git
KAFKA-15770: IQv2 must return immutable position (#15219)
ConsistencyVectorIntegrationTest failed frequently because the return Position from IQv2 is not immutable while the test assume immutability. To return a Position with a QueryResult that does not change, we need to deep copy the Position object. Reviewers: John Roesler <john@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
parent
ead2431c37
commit
4c70581eb6
|
@ -193,17 +193,19 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
|
|||
expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
|
||||
LOG.warn("Skipping record for expired segment.");
|
||||
} else {
|
||||
StoreQueryUtils.updatePosition(position, stateStoreContext);
|
||||
synchronized (position) {
|
||||
StoreQueryUtils.updatePosition(position, stateStoreContext);
|
||||
|
||||
// Put to index first so that if put to base failed, when we iterate index, we will
|
||||
// find no base value. If put to base first but putting to index fails, when we iterate
|
||||
// index, we can't find the key but if we iterate over base store, we can find the key
|
||||
// which lead to inconsistency.
|
||||
if (hasIndex()) {
|
||||
final KeyValue<Bytes, byte[]> indexKeyValue = getIndexKeyValue(rawBaseKey, value);
|
||||
segment.put(indexKeyValue.key, indexKeyValue.value);
|
||||
// Put to index first so that if put to base failed, when we iterate index, we will
|
||||
// find no base value. If put to base first but putting to index fails, when we iterate
|
||||
// index, we can't find the key but if we iterate over base store, we can find the key
|
||||
// which lead to inconsistency.
|
||||
if (hasIndex()) {
|
||||
final KeyValue<Bytes, byte[]> indexKeyValue = getIndexKeyValue(rawBaseKey, value);
|
||||
segment.put(indexKeyValue.key, indexKeyValue.value);
|
||||
}
|
||||
segment.put(rawBaseKey, value);
|
||||
}
|
||||
segment.put(rawBaseKey, value);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -254,11 +256,12 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
|
|||
metrics
|
||||
);
|
||||
|
||||
segments.openExisting(context, observedStreamTime);
|
||||
|
||||
final File positionCheckpointFile = new File(context.stateDir(), name() + ".position");
|
||||
this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
|
||||
this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
|
||||
segments.setPosition(this.position);
|
||||
|
||||
segments.openExisting(context, observedStreamTime);
|
||||
|
||||
// register and possibly restore the state from the logs
|
||||
stateStoreContext.register(
|
||||
|
@ -310,16 +313,18 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
|
|||
|
||||
// Visible for testing
|
||||
void restoreAllInternal(final Collection<ConsumerRecord<byte[], byte[]>> records) {
|
||||
try {
|
||||
final Map<S, WriteBatch> writeBatchMap = getWriteBatches(records);
|
||||
for (final Map.Entry<S, WriteBatch> entry : writeBatchMap.entrySet()) {
|
||||
final S segment = entry.getKey();
|
||||
final WriteBatch batch = entry.getValue();
|
||||
segment.write(batch);
|
||||
batch.close();
|
||||
synchronized (position) {
|
||||
try {
|
||||
final Map<S, WriteBatch> writeBatchMap = getWriteBatches(records);
|
||||
for (final Map.Entry<S, WriteBatch> entry : writeBatchMap.entrySet()) {
|
||||
final S segment = entry.getKey();
|
||||
final WriteBatch batch = entry.getValue();
|
||||
segment.write(batch);
|
||||
batch.close();
|
||||
}
|
||||
} catch (final RocksDBException e) {
|
||||
throw new ProcessorStateException("Error restoring batch to store " + this.name, e);
|
||||
}
|
||||
} catch (final RocksDBException e) {
|
||||
throw new ProcessorStateException("Error restoring batch to store " + this.name, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -266,8 +266,10 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
|
|||
expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
|
||||
LOG.warn("Skipping record for expired segment.");
|
||||
} else {
|
||||
StoreQueryUtils.updatePosition(position, stateStoreContext);
|
||||
segment.put(key, value);
|
||||
synchronized (position) {
|
||||
StoreQueryUtils.updatePosition(position, stateStoreContext);
|
||||
segment.put(key, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -308,11 +310,11 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
|
|||
metrics
|
||||
);
|
||||
|
||||
segments.openExisting(this.context, observedStreamTime);
|
||||
|
||||
final File positionCheckpointFile = new File(context.stateDir(), name() + ".position");
|
||||
this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
|
||||
this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
|
||||
segments.setPosition(position);
|
||||
segments.openExisting(this.context, observedStreamTime);
|
||||
|
||||
// register and possibly restore the state from the logs
|
||||
stateStoreContext.register(
|
||||
|
@ -363,16 +365,18 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
|
|||
|
||||
// Visible for testing
|
||||
void restoreAllInternal(final Collection<ConsumerRecord<byte[], byte[]>> records) {
|
||||
try {
|
||||
final Map<S, WriteBatch> writeBatchMap = getWriteBatches(records);
|
||||
for (final Map.Entry<S, WriteBatch> entry : writeBatchMap.entrySet()) {
|
||||
final S segment = entry.getKey();
|
||||
final WriteBatch batch = entry.getValue();
|
||||
segment.write(batch);
|
||||
batch.close();
|
||||
synchronized (position) {
|
||||
try {
|
||||
final Map<S, WriteBatch> writeBatchMap = getWriteBatches(records);
|
||||
for (final Map.Entry<S, WriteBatch> entry : writeBatchMap.entrySet()) {
|
||||
final S segment = entry.getKey();
|
||||
final WriteBatch batch = entry.getValue();
|
||||
segment.write(batch);
|
||||
batch.close();
|
||||
}
|
||||
} catch (final RocksDBException e) {
|
||||
throw new ProcessorStateException("Error restoring batch to store " + this.name, e);
|
||||
}
|
||||
} catch (final RocksDBException e) {
|
||||
throw new ProcessorStateException("Error restoring batch to store " + this.name, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
|
|||
|
||||
import org.apache.kafka.streams.errors.ProcessorStateException;
|
||||
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||
import org.apache.kafka.streams.query.Position;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -43,6 +44,7 @@ abstract class AbstractSegments<S extends Segment> implements Segments<S> {
|
|||
private final long retentionPeriod;
|
||||
private final long segmentInterval;
|
||||
private final SimpleDateFormat formatter;
|
||||
Position position;
|
||||
|
||||
AbstractSegments(final String name, final long retentionPeriod, final long segmentInterval) {
|
||||
this.name = name;
|
||||
|
@ -53,6 +55,10 @@ abstract class AbstractSegments<S extends Segment> implements Segments<S> {
|
|||
this.formatter.setTimeZone(new SimpleTimeZone(0, "UTC"));
|
||||
}
|
||||
|
||||
public void setPosition(final Position position) {
|
||||
this.position = position;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long segmentId(final long timestamp) {
|
||||
return timestamp / segmentInterval;
|
||||
|
|
|
@ -120,8 +120,13 @@ public class CachingKeyValueStore
|
|||
public Position getPosition() {
|
||||
// We return the merged position since the query uses the merged position as well
|
||||
final Position mergedPosition = Position.emptyPosition();
|
||||
mergedPosition.merge(position);
|
||||
mergedPosition.merge(wrapped().getPosition());
|
||||
final Position wrappedPosition = wrapped().getPosition();
|
||||
synchronized (position) {
|
||||
synchronized (wrappedPosition) {
|
||||
mergedPosition.merge(position);
|
||||
mergedPosition.merge(wrappedPosition);
|
||||
}
|
||||
}
|
||||
return mergedPosition;
|
||||
}
|
||||
|
||||
|
@ -183,25 +188,27 @@ public class CachingKeyValueStore
|
|||
|
||||
final Bytes key = keyQuery.getKey();
|
||||
|
||||
if (context.cache() != null) {
|
||||
final LRUCacheEntry lruCacheEntry = context.cache().get(cacheName, key);
|
||||
if (lruCacheEntry != null) {
|
||||
final byte[] rawValue;
|
||||
if (timestampedSchema && !WrappedStateStore.isTimestamped(wrapped()) && !StoreQueryUtils.isAdapter(wrapped())) {
|
||||
rawValue = ValueAndTimestampDeserializer.rawValue(lruCacheEntry.value());
|
||||
} else {
|
||||
rawValue = lruCacheEntry.value();
|
||||
synchronized (mergedPosition) {
|
||||
if (context.cache() != null) {
|
||||
final LRUCacheEntry lruCacheEntry = context.cache().get(cacheName, key);
|
||||
if (lruCacheEntry != null) {
|
||||
final byte[] rawValue;
|
||||
if (timestampedSchema && !WrappedStateStore.isTimestamped(wrapped()) && !StoreQueryUtils.isAdapter(wrapped())) {
|
||||
rawValue = ValueAndTimestampDeserializer.rawValue(lruCacheEntry.value());
|
||||
} else {
|
||||
rawValue = lruCacheEntry.value();
|
||||
}
|
||||
result = (QueryResult<R>) QueryResult.forResult(rawValue);
|
||||
}
|
||||
result = (QueryResult<R>) QueryResult.forResult(rawValue);
|
||||
}
|
||||
}
|
||||
|
||||
// We don't need to check the position at the state store since we already performed the check on
|
||||
// the merged position above
|
||||
if (result == null) {
|
||||
result = wrapped().query(query, PositionBound.unbounded(), config);
|
||||
// We don't need to check the position at the state store since we already performed the check on
|
||||
// the merged position above
|
||||
if (result == null) {
|
||||
result = wrapped().query(query, PositionBound.unbounded(), config);
|
||||
}
|
||||
result.setPosition(mergedPosition.copy());
|
||||
}
|
||||
result.setPosition(mergedPosition);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -276,19 +283,21 @@ public class CachingKeyValueStore
|
|||
|
||||
private void putInternal(final Bytes key,
|
||||
final byte[] value) {
|
||||
context.cache().put(
|
||||
cacheName,
|
||||
key,
|
||||
new LRUCacheEntry(
|
||||
value,
|
||||
context.headers(),
|
||||
true,
|
||||
context.offset(),
|
||||
context.timestamp(),
|
||||
context.partition(),
|
||||
context.topic()));
|
||||
synchronized (position) {
|
||||
context.cache().put(
|
||||
cacheName,
|
||||
key,
|
||||
new LRUCacheEntry(
|
||||
value,
|
||||
context.headers(),
|
||||
true,
|
||||
context.offset(),
|
||||
context.timestamp(),
|
||||
context.partition(),
|
||||
context.topic()));
|
||||
|
||||
StoreQueryUtils.updatePosition(position, context);
|
||||
StoreQueryUtils.updatePosition(position, context);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -79,13 +79,15 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
|
|||
context.register(
|
||||
root,
|
||||
(RecordBatchingStateRestoreCallback) records -> {
|
||||
for (final ConsumerRecord<byte[], byte[]> record : records) {
|
||||
put(Bytes.wrap(record.key()), record.value());
|
||||
ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
|
||||
record,
|
||||
consistencyEnabled,
|
||||
position
|
||||
);
|
||||
synchronized (position) {
|
||||
for (final ConsumerRecord<byte[], byte[]> record : records) {
|
||||
put(Bytes.wrap(record.key()), record.value());
|
||||
ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
|
||||
record,
|
||||
consistencyEnabled,
|
||||
position
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
|
@ -152,13 +154,15 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
|
|||
|
||||
// the unlocked implementation of put method, to avoid multiple lock/unlock cost in `putAll` method
|
||||
private void putInternal(final Bytes key, final byte[] value) {
|
||||
if (value == null) {
|
||||
map.remove(key);
|
||||
} else {
|
||||
map.put(key, value);
|
||||
}
|
||||
synchronized (position) {
|
||||
if (value == null) {
|
||||
map.remove(key);
|
||||
} else {
|
||||
map.put(key, value);
|
||||
}
|
||||
|
||||
StoreQueryUtils.updatePosition(position, context);
|
||||
StoreQueryUtils.updatePosition(position, context);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -124,13 +124,15 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
|
|||
context.register(
|
||||
root,
|
||||
(RecordBatchingStateRestoreCallback) records -> {
|
||||
for (final ConsumerRecord<byte[], byte[]> record : records) {
|
||||
put(SessionKeySchema.from(Bytes.wrap(record.key())), record.value());
|
||||
ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
|
||||
record,
|
||||
consistencyEnabled,
|
||||
position
|
||||
);
|
||||
synchronized (position) {
|
||||
for (final ConsumerRecord<byte[], byte[]> record : records) {
|
||||
put(SessionKeySchema.from(Bytes.wrap(record.key())), record.value());
|
||||
ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
|
||||
record,
|
||||
consistencyEnabled,
|
||||
position
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
|
@ -157,25 +159,27 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
|
|||
final long windowEndTimestamp = sessionKey.window().end();
|
||||
observedStreamTime = Math.max(observedStreamTime, windowEndTimestamp);
|
||||
|
||||
if (windowEndTimestamp <= observedStreamTime - retentionPeriod) {
|
||||
// The provided context is not required to implement InternalProcessorContext,
|
||||
// If it doesn't, we can't record this metric (in fact, we wouldn't have even initialized it).
|
||||
if (expiredRecordSensor != null && context != null) {
|
||||
expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
|
||||
}
|
||||
LOG.warn("Skipping record for expired segment.");
|
||||
} else {
|
||||
if (aggregate != null) {
|
||||
endTimeMap.computeIfAbsent(windowEndTimestamp, t -> new ConcurrentSkipListMap<>());
|
||||
final ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>> keyMap = endTimeMap.get(windowEndTimestamp);
|
||||
keyMap.computeIfAbsent(sessionKey.key(), t -> new ConcurrentSkipListMap<>());
|
||||
keyMap.get(sessionKey.key()).put(sessionKey.window().start(), aggregate);
|
||||
synchronized (position) {
|
||||
if (windowEndTimestamp <= observedStreamTime - retentionPeriod) {
|
||||
// The provided context is not required to implement InternalProcessorContext,
|
||||
// If it doesn't, we can't record this metric (in fact, we wouldn't have even initialized it).
|
||||
if (expiredRecordSensor != null && context != null) {
|
||||
expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
|
||||
}
|
||||
LOG.warn("Skipping record for expired segment.");
|
||||
} else {
|
||||
remove(sessionKey);
|
||||
if (aggregate != null) {
|
||||
endTimeMap.computeIfAbsent(windowEndTimestamp, t -> new ConcurrentSkipListMap<>());
|
||||
final ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>> keyMap = endTimeMap.get(windowEndTimestamp);
|
||||
keyMap.computeIfAbsent(sessionKey.key(), t -> new ConcurrentSkipListMap<>());
|
||||
keyMap.get(sessionKey.key()).put(sessionKey.window().start(), aggregate);
|
||||
} else {
|
||||
remove(sessionKey);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
StoreQueryUtils.updatePosition(position, stateStoreContext);
|
||||
StoreQueryUtils.updatePosition(position, stateStoreContext);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -123,17 +123,19 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
|
|||
context.register(
|
||||
root,
|
||||
(RecordBatchingStateRestoreCallback) records -> {
|
||||
for (final ConsumerRecord<byte[], byte[]> record : records) {
|
||||
put(
|
||||
Bytes.wrap(extractStoreKeyBytes(record.key())),
|
||||
record.value(),
|
||||
extractStoreTimestamp(record.key())
|
||||
);
|
||||
ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
|
||||
record,
|
||||
consistencyEnabled,
|
||||
position
|
||||
);
|
||||
synchronized (position) {
|
||||
for (final ConsumerRecord<byte[], byte[]> record : records) {
|
||||
put(
|
||||
Bytes.wrap(extractStoreKeyBytes(record.key())),
|
||||
record.value(),
|
||||
extractStoreTimestamp(record.key())
|
||||
);
|
||||
ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
|
||||
record,
|
||||
consistencyEnabled,
|
||||
position
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
|
@ -158,28 +160,30 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
|
|||
removeExpiredSegments();
|
||||
observedStreamTime = Math.max(observedStreamTime, windowStartTimestamp);
|
||||
|
||||
if (windowStartTimestamp <= observedStreamTime - retentionPeriod) {
|
||||
expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
|
||||
LOG.warn("Skipping record for expired segment.");
|
||||
} else {
|
||||
if (value != null) {
|
||||
maybeUpdateSeqnumForDups();
|
||||
final Bytes keyBytes = retainDuplicates ? wrapForDups(key, seqnum) : key;
|
||||
segmentMap.computeIfAbsent(windowStartTimestamp, t -> new ConcurrentSkipListMap<>());
|
||||
segmentMap.get(windowStartTimestamp).put(keyBytes, value);
|
||||
} else if (!retainDuplicates) {
|
||||
// Skip if value is null and duplicates are allowed since this delete is a no-op
|
||||
segmentMap.computeIfPresent(windowStartTimestamp, (t, kvMap) -> {
|
||||
kvMap.remove(key);
|
||||
if (kvMap.isEmpty()) {
|
||||
segmentMap.remove(windowStartTimestamp);
|
||||
}
|
||||
return kvMap;
|
||||
});
|
||||
synchronized (position) {
|
||||
if (windowStartTimestamp <= observedStreamTime - retentionPeriod) {
|
||||
expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
|
||||
LOG.warn("Skipping record for expired segment.");
|
||||
} else {
|
||||
if (value != null) {
|
||||
maybeUpdateSeqnumForDups();
|
||||
final Bytes keyBytes = retainDuplicates ? wrapForDups(key, seqnum) : key;
|
||||
segmentMap.computeIfAbsent(windowStartTimestamp, t -> new ConcurrentSkipListMap<>());
|
||||
segmentMap.get(windowStartTimestamp).put(keyBytes, value);
|
||||
} else if (!retainDuplicates) {
|
||||
// Skip if value is null and duplicates are allowed since this delete is a no-op
|
||||
segmentMap.computeIfPresent(windowStartTimestamp, (t, kvMap) -> {
|
||||
kvMap.remove(key);
|
||||
if (kvMap.isEmpty()) {
|
||||
segmentMap.remove(windowStartTimestamp);
|
||||
}
|
||||
return kvMap;
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
StoreQueryUtils.updatePosition(position, stateStoreContext);
|
||||
StoreQueryUtils.updatePosition(position, stateStoreContext);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
|
|||
|
||||
import org.apache.kafka.common.utils.Bytes;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.streams.query.Position;
|
||||
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
|
||||
|
||||
import java.io.File;
|
||||
|
@ -31,9 +32,11 @@ class KeyValueSegment extends RocksDBStore implements Comparable<KeyValueSegment
|
|||
KeyValueSegment(final String segmentName,
|
||||
final String windowName,
|
||||
final long id,
|
||||
final Position position,
|
||||
final RocksDBMetricsRecorder metricsRecorder) {
|
||||
super(segmentName, windowName, metricsRecorder);
|
||||
this.id = id;
|
||||
this.position = position;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -42,7 +42,7 @@ class KeyValueSegments extends AbstractSegments<KeyValueSegment> {
|
|||
return segments.get(segmentId);
|
||||
} else {
|
||||
final KeyValueSegment newSegment =
|
||||
new KeyValueSegment(segmentName(segmentId), name, segmentId, metricsRecorder);
|
||||
new KeyValueSegment(segmentName(segmentId), name, segmentId, position, metricsRecorder);
|
||||
|
||||
if (segments.put(segmentId, newSegment) != null) {
|
||||
throw new IllegalStateException("KeyValueSegment already exists. Possible concurrent access.");
|
||||
|
|
|
@ -20,6 +20,7 @@ import java.util.HashMap;
|
|||
import java.util.Map;
|
||||
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
|
||||
import org.apache.kafka.streams.query.Position;
|
||||
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
|
||||
|
||||
/**
|
||||
|
@ -53,6 +54,11 @@ public class LogicalKeyValueSegments extends AbstractSegments<LogicalKeyValueSeg
|
|||
this.physicalStore = new RocksDBStore(name, parentDir, metricsRecorder, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setPosition(final Position position) {
|
||||
this.physicalStore.position = position;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LogicalKeyValueSegment getOrCreateSegment(final long segmentId,
|
||||
final ProcessorContext context) {
|
||||
|
|
|
@ -108,13 +108,15 @@ public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> {
|
|||
root,
|
||||
(RecordBatchingStateRestoreCallback) records -> {
|
||||
restoring = true;
|
||||
for (final ConsumerRecord<byte[], byte[]> record : records) {
|
||||
put(Bytes.wrap(record.key()), record.value());
|
||||
ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
|
||||
record,
|
||||
consistencyEnabled,
|
||||
position
|
||||
);
|
||||
synchronized (position) {
|
||||
for (final ConsumerRecord<byte[], byte[]> record : records) {
|
||||
put(Bytes.wrap(record.key()), record.value());
|
||||
ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
|
||||
record,
|
||||
consistencyEnabled,
|
||||
position
|
||||
);
|
||||
}
|
||||
}
|
||||
restoring = false;
|
||||
}
|
||||
|
@ -147,12 +149,14 @@ public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> {
|
|||
@Override
|
||||
public synchronized void put(final Bytes key, final byte[] value) {
|
||||
Objects.requireNonNull(key);
|
||||
if (value == null) {
|
||||
delete(key);
|
||||
} else {
|
||||
this.map.put(key, value);
|
||||
synchronized (position) {
|
||||
if (value == null) {
|
||||
delete(key);
|
||||
} else {
|
||||
this.map.put(key, value);
|
||||
}
|
||||
StoreQueryUtils.updatePosition(position, context);
|
||||
}
|
||||
StoreQueryUtils.updatePosition(position, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -175,8 +179,10 @@ public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> {
|
|||
@Override
|
||||
public synchronized byte[] delete(final Bytes key) {
|
||||
Objects.requireNonNull(key);
|
||||
StoreQueryUtils.updatePosition(position, context);
|
||||
return this.map.remove(key);
|
||||
synchronized (position) {
|
||||
StoreQueryUtils.updatePosition(position, context);
|
||||
return this.map.remove(key);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -391,9 +391,11 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
|||
final byte[] value) {
|
||||
Objects.requireNonNull(key, "key cannot be null");
|
||||
validateStoreOpen();
|
||||
cfAccessor.put(dbAccessor, key.get(), value);
|
||||
|
||||
StoreQueryUtils.updatePosition(position, context);
|
||||
synchronized (position) {
|
||||
cfAccessor.put(dbAccessor, key.get(), value);
|
||||
StoreQueryUtils.updatePosition(position, context);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -409,12 +411,14 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
|||
|
||||
@Override
|
||||
public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
|
||||
try (final WriteBatch batch = new WriteBatch()) {
|
||||
cfAccessor.prepareBatch(entries, batch);
|
||||
write(batch);
|
||||
StoreQueryUtils.updatePosition(position, context);
|
||||
} catch (final RocksDBException e) {
|
||||
throw new ProcessorStateException("Error while batch writing to store " + name, e);
|
||||
synchronized (position) {
|
||||
try (final WriteBatch batch = new WriteBatch()) {
|
||||
cfAccessor.prepareBatch(entries, batch);
|
||||
write(batch);
|
||||
StoreQueryUtils.updatePosition(position, context);
|
||||
} catch (final RocksDBException e) {
|
||||
throw new ProcessorStateException("Error while batch writing to store " + name, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -980,21 +984,22 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
|||
}
|
||||
|
||||
void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
|
||||
try (final WriteBatch batch = new WriteBatch()) {
|
||||
for (final ConsumerRecord<byte[], byte[]> record : records) {
|
||||
ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
|
||||
record,
|
||||
consistencyEnabled,
|
||||
position
|
||||
);
|
||||
// If version headers are not present or version is V0
|
||||
cfAccessor.addToBatch(record.key(), record.value(), batch);
|
||||
synchronized (position) {
|
||||
try (final WriteBatch batch = new WriteBatch()) {
|
||||
for (final ConsumerRecord<byte[], byte[]> record : records) {
|
||||
ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
|
||||
record,
|
||||
consistencyEnabled,
|
||||
position
|
||||
);
|
||||
// If version headers are not present or version is V0
|
||||
cfAccessor.addToBatch(record.key(), record.value(), batch);
|
||||
}
|
||||
write(batch);
|
||||
} catch (final RocksDBException e) {
|
||||
throw new ProcessorStateException("Error restoring batch to store " + name, e);
|
||||
}
|
||||
write(batch);
|
||||
} catch (final RocksDBException e) {
|
||||
throw new ProcessorStateException("Error restoring batch to store " + name, e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// for testing
|
||||
|
|
|
@ -101,32 +101,34 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
|
|||
public void put(final DBAccessor accessor,
|
||||
final byte[] key,
|
||||
final byte[] valueWithTimestamp) {
|
||||
if (valueWithTimestamp == null) {
|
||||
try {
|
||||
accessor.delete(oldColumnFamily, key);
|
||||
} catch (final RocksDBException e) {
|
||||
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
||||
throw new ProcessorStateException("Error while removing key from store " + name, e);
|
||||
}
|
||||
try {
|
||||
accessor.delete(newColumnFamily, key);
|
||||
} catch (final RocksDBException e) {
|
||||
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
||||
throw new ProcessorStateException("Error while removing key from store " + name, e);
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
accessor.delete(oldColumnFamily, key);
|
||||
} catch (final RocksDBException e) {
|
||||
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
||||
throw new ProcessorStateException("Error while removing key from store " + name, e);
|
||||
}
|
||||
try {
|
||||
accessor.put(newColumnFamily, key, valueWithTimestamp);
|
||||
StoreQueryUtils.updatePosition(position, context);
|
||||
} catch (final RocksDBException e) {
|
||||
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
||||
throw new ProcessorStateException("Error while putting key/value into store " + name, e);
|
||||
synchronized (position) {
|
||||
if (valueWithTimestamp == null) {
|
||||
try {
|
||||
accessor.delete(oldColumnFamily, key);
|
||||
} catch (final RocksDBException e) {
|
||||
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
||||
throw new ProcessorStateException("Error while removing key from store " + name, e);
|
||||
}
|
||||
try {
|
||||
accessor.delete(newColumnFamily, key);
|
||||
} catch (final RocksDBException e) {
|
||||
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
||||
throw new ProcessorStateException("Error while removing key from store " + name, e);
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
accessor.delete(oldColumnFamily, key);
|
||||
} catch (final RocksDBException e) {
|
||||
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
||||
throw new ProcessorStateException("Error while removing key from store " + name, e);
|
||||
}
|
||||
try {
|
||||
accessor.put(newColumnFamily, key, valueWithTimestamp);
|
||||
StoreQueryUtils.updatePosition(position, context);
|
||||
} catch (final RocksDBException e) {
|
||||
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
|
||||
throw new ProcessorStateException("Error while putting key/value into store " + name, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -130,24 +130,27 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
|
|||
Objects.requireNonNull(key, "key cannot be null");
|
||||
validateStoreOpen();
|
||||
|
||||
if (timestamp < observedStreamTime - gracePeriod) {
|
||||
expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
|
||||
LOG.warn("Skipping record for expired put.");
|
||||
return PUT_RETURN_CODE_NOT_PUT;
|
||||
synchronized (position) {
|
||||
if (timestamp < observedStreamTime - gracePeriod) {
|
||||
expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
|
||||
LOG.warn("Skipping record for expired put.");
|
||||
StoreQueryUtils.updatePosition(position, stateStoreContext);
|
||||
return PUT_RETURN_CODE_NOT_PUT;
|
||||
}
|
||||
observedStreamTime = Math.max(observedStreamTime, timestamp);
|
||||
|
||||
final long foundTs = doPut(
|
||||
versionedStoreClient,
|
||||
observedStreamTime,
|
||||
key,
|
||||
value,
|
||||
timestamp
|
||||
);
|
||||
|
||||
StoreQueryUtils.updatePosition(position, stateStoreContext);
|
||||
|
||||
return foundTs;
|
||||
}
|
||||
observedStreamTime = Math.max(observedStreamTime, timestamp);
|
||||
|
||||
final long foundTs = doPut(
|
||||
versionedStoreClient,
|
||||
observedStreamTime,
|
||||
key,
|
||||
value,
|
||||
timestamp
|
||||
);
|
||||
|
||||
StoreQueryUtils.updatePosition(position, stateStoreContext);
|
||||
|
||||
return foundTs;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -155,26 +158,28 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
|
|||
Objects.requireNonNull(key, "key cannot be null");
|
||||
validateStoreOpen();
|
||||
|
||||
if (timestamp < observedStreamTime - gracePeriod) {
|
||||
expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
|
||||
LOG.warn("Skipping record for expired delete.");
|
||||
return null;
|
||||
synchronized (position) {
|
||||
if (timestamp < observedStreamTime - gracePeriod) {
|
||||
expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
|
||||
LOG.warn("Skipping record for expired delete.");
|
||||
return null;
|
||||
}
|
||||
|
||||
final VersionedRecord<byte[]> existingRecord = get(key, timestamp);
|
||||
|
||||
observedStreamTime = Math.max(observedStreamTime, timestamp);
|
||||
doPut(
|
||||
versionedStoreClient,
|
||||
observedStreamTime,
|
||||
key,
|
||||
null,
|
||||
timestamp
|
||||
);
|
||||
|
||||
StoreQueryUtils.updatePosition(position, stateStoreContext);
|
||||
|
||||
return existingRecord;
|
||||
}
|
||||
|
||||
final VersionedRecord<byte[]> existingRecord = get(key, timestamp);
|
||||
|
||||
observedStreamTime = Math.max(observedStreamTime, timestamp);
|
||||
doPut(
|
||||
versionedStoreClient,
|
||||
observedStreamTime,
|
||||
key,
|
||||
null,
|
||||
timestamp
|
||||
);
|
||||
|
||||
StoreQueryUtils.updatePosition(position, stateStoreContext);
|
||||
|
||||
return existingRecord;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -361,11 +366,11 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
|
|||
|
||||
metricsRecorder.init(ProcessorContextUtils.getMetricsImpl(context), context.taskId());
|
||||
|
||||
segmentStores.openExisting(context, observedStreamTime);
|
||||
|
||||
final File positionCheckpointFile = new File(context.stateDir(), name() + ".position");
|
||||
this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
|
||||
this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
|
||||
positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
|
||||
position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
|
||||
segmentStores.setPosition(position);
|
||||
segmentStores.openExisting(context, observedStreamTime);
|
||||
|
||||
// register and possibly restore the state from the logs
|
||||
stateStoreContext.register(
|
||||
|
@ -408,36 +413,39 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
|
|||
// "segment" entry -- restoring a single changelog entry could require loading multiple
|
||||
// records into memory. how high this memory amplification will be is very much dependent
|
||||
// on the specific workload and the value of the "segment interval" parameter.
|
||||
for (final ConsumerRecord<byte[], byte[]> record : records) {
|
||||
if (record.timestamp() < observedStreamTime - gracePeriod) {
|
||||
// record is older than grace period and was therefore never written to the store
|
||||
continue;
|
||||
}
|
||||
// advance observed stream time as usual, for use in deciding whether records have
|
||||
// exceeded the store's grace period and should be dropped.
|
||||
observedStreamTime = Math.max(observedStreamTime, record.timestamp());
|
||||
synchronized (position) {
|
||||
for (final ConsumerRecord<byte[], byte[]> record : records) {
|
||||
if (record.timestamp() < observedStreamTime - gracePeriod) {
|
||||
// record is older than grace period and was therefore never written to the store
|
||||
continue;
|
||||
}
|
||||
// advance observed stream time as usual, for use in deciding whether records have
|
||||
// exceeded the store's grace period and should be dropped.
|
||||
observedStreamTime = Math.max(observedStreamTime, record.timestamp());
|
||||
|
||||
ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
|
||||
ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
|
||||
record,
|
||||
consistencyEnabled,
|
||||
position
|
||||
);
|
||||
);
|
||||
|
||||
// put records to write buffer
|
||||
doPut(
|
||||
// put records to write buffer
|
||||
doPut(
|
||||
restoreClient,
|
||||
endOfBatchStreamTime,
|
||||
new Bytes(record.key()),
|
||||
record.value(),
|
||||
record.timestamp()
|
||||
);
|
||||
);
|
||||
}
|
||||
|
||||
try {
|
||||
restoreWriteBuffer.flush();
|
||||
} catch (final RocksDBException e) {
|
||||
throw new ProcessorStateException("Error restoring batch to store " + name, e);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
restoreWriteBuffer.flush();
|
||||
} catch (final RocksDBException e) {
|
||||
throw new ProcessorStateException("Error restoring batch to store " + name, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void validateStoreOpen() {
|
||||
|
|
|
@ -125,28 +125,30 @@ public final class StoreQueryUtils {
|
|||
final QueryResult<R> result;
|
||||
|
||||
final QueryHandler handler = QUERY_HANDLER_MAP.get(query.getClass());
|
||||
if (handler == null) {
|
||||
result = QueryResult.forUnknownQueryType(query, store);
|
||||
} else if (context == null || !isPermitted(position, positionBound, context.taskId().partition())) {
|
||||
result = QueryResult.notUpToBound(
|
||||
position,
|
||||
positionBound,
|
||||
context == null ? null : context.taskId().partition()
|
||||
);
|
||||
} else {
|
||||
result = (QueryResult<R>) handler.apply(
|
||||
query,
|
||||
positionBound,
|
||||
config,
|
||||
store
|
||||
);
|
||||
synchronized (position) {
|
||||
if (handler == null) {
|
||||
result = QueryResult.forUnknownQueryType(query, store);
|
||||
} else if (context == null || !isPermitted(position, positionBound, context.taskId().partition())) {
|
||||
result = QueryResult.notUpToBound(
|
||||
position,
|
||||
positionBound,
|
||||
context == null ? null : context.taskId().partition()
|
||||
);
|
||||
} else {
|
||||
result = (QueryResult<R>) handler.apply(
|
||||
query,
|
||||
positionBound,
|
||||
config,
|
||||
store
|
||||
);
|
||||
}
|
||||
if (config.isCollectExecutionInfo()) {
|
||||
result.addExecutionInfo(
|
||||
"Handled in " + store.getClass() + " in " + (System.nanoTime() - start) + "ns"
|
||||
);
|
||||
}
|
||||
result.setPosition(position.copy());
|
||||
}
|
||||
if (config.isCollectExecutionInfo()) {
|
||||
result.addExecutionInfo(
|
||||
"Handled in " + store.getClass() + " in " + (System.nanoTime() - start) + "ns"
|
||||
);
|
||||
}
|
||||
result.setPosition(position);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
|
|||
|
||||
import org.apache.kafka.common.utils.Bytes;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.streams.query.Position;
|
||||
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
|
||||
|
||||
import java.io.File;
|
||||
|
@ -31,9 +32,11 @@ class TimestampedSegment extends RocksDBTimestampedStore implements Comparable<T
|
|||
TimestampedSegment(final String segmentName,
|
||||
final String windowName,
|
||||
final long id,
|
||||
final Position position,
|
||||
final RocksDBMetricsRecorder metricsRecorder) {
|
||||
super(segmentName, windowName, metricsRecorder);
|
||||
this.id = id;
|
||||
this.position = position;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -42,7 +42,7 @@ class TimestampedSegments extends AbstractSegments<TimestampedSegment> {
|
|||
return segments.get(segmentId);
|
||||
} else {
|
||||
final TimestampedSegment newSegment =
|
||||
new TimestampedSegment(segmentName(segmentId), name, segmentId, metricsRecorder);
|
||||
new TimestampedSegment(segmentName(segmentId), name, segmentId, position, metricsRecorder);
|
||||
|
||||
if (segments.put(segmentId, newSegment) != null) {
|
||||
throw new IllegalStateException("TimestampedSegment already exists. Possible concurrent access.");
|
||||
|
|
|
@ -38,8 +38,6 @@ import org.apache.kafka.streams.query.QueryResult;
|
|||
import org.apache.kafka.streams.query.StateQueryRequest;
|
||||
import org.apache.kafka.streams.query.StateQueryResult;
|
||||
import org.apache.kafka.streams.state.KeyValueStore;
|
||||
import org.apache.kafka.streams.state.QueryableStoreType;
|
||||
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
|
||||
import org.apache.kafka.test.IntegrationTest;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.junit.After;
|
||||
|
@ -54,7 +52,6 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -63,7 +60,6 @@ import java.util.stream.IntStream;
|
|||
|
||||
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
|
||||
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
|
||||
import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
@ -76,6 +72,8 @@ public class ConsistencyVectorIntegrationTest {
|
|||
private static int port = 0;
|
||||
private static final String INPUT_TOPIC_NAME = "input-topic";
|
||||
private static final String TABLE_NAME = "source-table";
|
||||
private static final int KEY = 1;
|
||||
private static final int NUMBER_OF_MESSAGES = 100;
|
||||
|
||||
public final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(NUM_BROKERS);
|
||||
|
||||
|
@ -101,12 +99,9 @@ public class ConsistencyVectorIntegrationTest {
|
|||
|
||||
@Test
|
||||
public void shouldHaveSamePositionBoundActiveAndStandBy() throws Exception {
|
||||
final int batch1NumMessages = 100;
|
||||
final int key = 1;
|
||||
final Semaphore semaphore = new Semaphore(0);
|
||||
|
||||
final StreamsBuilder builder = new StreamsBuilder();
|
||||
Objects.requireNonNull(TABLE_NAME, "name cannot be null");
|
||||
|
||||
builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()),
|
||||
Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as(TABLE_NAME)
|
||||
|
@ -123,30 +118,30 @@ public class ConsistencyVectorIntegrationTest {
|
|||
try {
|
||||
startApplicationAndWaitUntilRunning(kafkaStreamsList);
|
||||
|
||||
produceValueRange(key, 0, batch1NumMessages);
|
||||
produceValueRange();
|
||||
|
||||
// Assert that all messages in the first batch were processed in a timely manner
|
||||
assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
|
||||
|
||||
final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> queryableStoreType = keyValueStore();
|
||||
assertThat(
|
||||
"Did not process all message in time.",
|
||||
semaphore.tryAcquire(NUMBER_OF_MESSAGES, 120, TimeUnit.SECONDS), is(equalTo(true))
|
||||
);
|
||||
|
||||
// Assert that both active and standby have the same position bound
|
||||
final StateQueryRequest<Integer> request =
|
||||
StateQueryRequest
|
||||
.inStore(TABLE_NAME)
|
||||
.withQuery(KeyQuery.<Integer, Integer>withKey(key))
|
||||
.withQuery(KeyQuery.<Integer, Integer>withKey(KEY))
|
||||
.withPositionBound(PositionBound.unbounded());
|
||||
|
||||
checkPosition(batch1NumMessages, request, kafkaStreams1);
|
||||
checkPosition(batch1NumMessages, request, kafkaStreams2);
|
||||
checkPosition(request, kafkaStreams1);
|
||||
checkPosition(request, kafkaStreams2);
|
||||
} finally {
|
||||
kafkaStreams1.close();
|
||||
kafkaStreams2.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void checkPosition(final int batch1NumMessages,
|
||||
final StateQueryRequest<Integer> request,
|
||||
private void checkPosition(final StateQueryRequest<Integer> request,
|
||||
final KafkaStreams kafkaStreams1) throws InterruptedException {
|
||||
final long maxWaitMs = TestUtils.DEFAULT_MAX_WAIT_MS;
|
||||
final long expectedEnd = System.currentTimeMillis() + maxWaitMs;
|
||||
|
@ -170,7 +165,7 @@ public class ConsistencyVectorIntegrationTest {
|
|||
)
|
||||
);
|
||||
|
||||
if (queryResult.getResult() == batch1NumMessages - 1) {
|
||||
if (queryResult.getResult() == NUMBER_OF_MESSAGES - 1) {
|
||||
// we're at the end of the input.
|
||||
return;
|
||||
}
|
||||
|
@ -187,14 +182,13 @@ public class ConsistencyVectorIntegrationTest {
|
|||
|
||||
}
|
||||
|
||||
|
||||
private KafkaStreams createKafkaStreams(final StreamsBuilder builder, final Properties config) {
|
||||
final KafkaStreams streams = new KafkaStreams(builder.build(config), config);
|
||||
streamsToCleanup.add(streams);
|
||||
return streams;
|
||||
}
|
||||
|
||||
private void produceValueRange(final int key, final int start, final int endExclusive) {
|
||||
private void produceValueRange() {
|
||||
final Properties producerProps = new Properties();
|
||||
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
|
||||
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
|
||||
|
@ -202,8 +196,8 @@ public class ConsistencyVectorIntegrationTest {
|
|||
|
||||
IntegrationTestUtils.produceKeyValuesSynchronously(
|
||||
INPUT_TOPIC_NAME,
|
||||
IntStream.range(start, endExclusive)
|
||||
.mapToObj(i -> KeyValue.pair(key, i))
|
||||
IntStream.range(0, NUMBER_OF_MESSAGES)
|
||||
.mapToObj(i -> KeyValue.pair(KEY, i))
|
||||
.collect(Collectors.toList()),
|
||||
producerProps,
|
||||
mockTime
|
||||
|
@ -216,8 +210,8 @@ public class ConsistencyVectorIntegrationTest {
|
|||
config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port));
|
||||
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
|
||||
config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
||||
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
|
||||
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
|
||||
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class);
|
||||
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class);
|
||||
config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
|
||||
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
|
||||
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);
|
||||
|
|
|
@ -319,28 +319,36 @@ public class IQv2IntegrationTest {
|
|||
|
||||
@Override
|
||||
public void put(final Bytes key, final byte[] value) {
|
||||
map.put(key, value);
|
||||
StoreQueryUtils.updatePosition(position, context);
|
||||
synchronized (position) {
|
||||
map.put(key, value);
|
||||
StoreQueryUtils.updatePosition(position, context);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] putIfAbsent(final Bytes key, final byte[] value) {
|
||||
StoreQueryUtils.updatePosition(position, context);
|
||||
return map.putIfAbsent(key, value);
|
||||
synchronized (position) {
|
||||
StoreQueryUtils.updatePosition(position, context);
|
||||
return map.putIfAbsent(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
|
||||
StoreQueryUtils.updatePosition(position, context);
|
||||
for (final KeyValue<Bytes, byte[]> entry : entries) {
|
||||
map.put(entry.key, entry.value);
|
||||
synchronized (position) {
|
||||
StoreQueryUtils.updatePosition(position, context);
|
||||
for (final KeyValue<Bytes, byte[]> entry : entries) {
|
||||
map.put(entry.key, entry.value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] delete(final Bytes key) {
|
||||
StoreQueryUtils.updatePosition(position, context);
|
||||
return map.remove(key);
|
||||
synchronized (position) {
|
||||
StoreQueryUtils.updatePosition(position, context);
|
||||
return map.remove(key);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -394,7 +394,8 @@ public class StoreUpgradeIntegrationTest {
|
|||
}
|
||||
},
|
||||
60_000L,
|
||||
"Could not get expected result in time.");
|
||||
5_000L,
|
||||
() -> "Could not get expected result in time.");
|
||||
}
|
||||
|
||||
private <K> void verifyCountWithSurrogateTimestamp(final K key,
|
||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.kafka.streams.StreamsConfig;
|
|||
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||
import org.apache.kafka.streams.processor.TaskId;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
import org.apache.kafka.streams.query.Position;
|
||||
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.junit.Before;
|
||||
|
@ -60,7 +61,7 @@ public class KeyValueSegmentTest {
|
|||
|
||||
@Test
|
||||
public void shouldDeleteStateDirectoryOnDestroy() throws Exception {
|
||||
final KeyValueSegment segment = new KeyValueSegment("segment", "window", 0L, metricsRecorder);
|
||||
final KeyValueSegment segment = new KeyValueSegment("segment", "window", 0L, Position.emptyPosition(), metricsRecorder);
|
||||
final String directoryPath = TestUtils.tempDirectory().getAbsolutePath();
|
||||
final File directory = new File(directoryPath);
|
||||
|
||||
|
@ -82,10 +83,10 @@ public class KeyValueSegmentTest {
|
|||
|
||||
@Test
|
||||
public void shouldBeEqualIfIdIsEqual() {
|
||||
final KeyValueSegment segment = new KeyValueSegment("anyName", "anyName", 0L, metricsRecorder);
|
||||
final KeyValueSegment segment = new KeyValueSegment("anyName", "anyName", 0L, Position.emptyPosition(), metricsRecorder);
|
||||
final KeyValueSegment segmentSameId =
|
||||
new KeyValueSegment("someOtherName", "someOtherName", 0L, metricsRecorder);
|
||||
final KeyValueSegment segmentDifferentId = new KeyValueSegment("anyName", "anyName", 1L, metricsRecorder);
|
||||
new KeyValueSegment("someOtherName", "someOtherName", 0L, Position.emptyPosition(), metricsRecorder);
|
||||
final KeyValueSegment segmentDifferentId = new KeyValueSegment("anyName", "anyName", 1L, Position.emptyPosition(), metricsRecorder);
|
||||
|
||||
assertThat(segment, equalTo(segment));
|
||||
assertThat(segment, equalTo(segmentSameId));
|
||||
|
@ -98,10 +99,10 @@ public class KeyValueSegmentTest {
|
|||
|
||||
@Test
|
||||
public void shouldHashOnSegmentIdOnly() {
|
||||
final KeyValueSegment segment = new KeyValueSegment("anyName", "anyName", 0L, metricsRecorder);
|
||||
final KeyValueSegment segment = new KeyValueSegment("anyName", "anyName", 0L, Position.emptyPosition(), metricsRecorder);
|
||||
final KeyValueSegment segmentSameId =
|
||||
new KeyValueSegment("someOtherName", "someOtherName", 0L, metricsRecorder);
|
||||
final KeyValueSegment segmentDifferentId = new KeyValueSegment("anyName", "anyName", 1L, metricsRecorder);
|
||||
new KeyValueSegment("someOtherName", "someOtherName", 0L, Position.emptyPosition(), metricsRecorder);
|
||||
final KeyValueSegment segmentDifferentId = new KeyValueSegment("anyName", "anyName", 1L, Position.emptyPosition(), metricsRecorder);
|
||||
|
||||
final Set<KeyValueSegment> set = new HashSet<>();
|
||||
assertTrue(set.add(segment));
|
||||
|
@ -113,9 +114,9 @@ public class KeyValueSegmentTest {
|
|||
|
||||
@Test
|
||||
public void shouldCompareSegmentIdOnly() {
|
||||
final KeyValueSegment segment1 = new KeyValueSegment("a", "C", 50L, metricsRecorder);
|
||||
final KeyValueSegment segment2 = new KeyValueSegment("b", "B", 100L, metricsRecorder);
|
||||
final KeyValueSegment segment3 = new KeyValueSegment("c", "A", 0L, metricsRecorder);
|
||||
final KeyValueSegment segment1 = new KeyValueSegment("a", "C", 50L, Position.emptyPosition(), metricsRecorder);
|
||||
final KeyValueSegment segment2 = new KeyValueSegment("b", "B", 100L, Position.emptyPosition(), metricsRecorder);
|
||||
final KeyValueSegment segment3 = new KeyValueSegment("c", "A", 0L, Position.emptyPosition(), metricsRecorder);
|
||||
|
||||
assertThat(segment1.compareTo(segment1), equalTo(0));
|
||||
assertThat(segment1.compareTo(segment2), equalTo(-1));
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.kafka.common.utils.Bytes;
|
|||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.streams.errors.InvalidStateStoreException;
|
||||
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
|
||||
import org.apache.kafka.streams.query.Position;
|
||||
import org.apache.kafka.streams.state.KeyValueIterator;
|
||||
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
|
||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||
|
@ -69,6 +70,7 @@ public class LogicalKeyValueSegmentsTest {
|
|||
SEGMENT_INTERVAL,
|
||||
new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME)
|
||||
);
|
||||
segments.setPosition(Position.emptyPosition());
|
||||
segments.openExisting(context, 0L);
|
||||
}
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.LogContext;
|
|||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.processor.StateStoreContext;
|
||||
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
|
||||
import org.apache.kafka.streams.query.Position;
|
||||
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
|
||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||
import org.apache.kafka.test.MockRecordCollector;
|
||||
|
@ -46,9 +47,9 @@ public class SegmentIteratorTest {
|
|||
private final RocksDBMetricsRecorder rocksDBMetricsRecorder =
|
||||
new RocksDBMetricsRecorder("metrics-scope", "store-name");
|
||||
private final KeyValueSegment segmentOne =
|
||||
new KeyValueSegment("one", "one", 0, rocksDBMetricsRecorder);
|
||||
new KeyValueSegment("one", "one", 0, Position.emptyPosition(), rocksDBMetricsRecorder);
|
||||
private final KeyValueSegment segmentTwo =
|
||||
new KeyValueSegment("two", "window", 1, rocksDBMetricsRecorder);
|
||||
new KeyValueSegment("two", "window", 1, Position.emptyPosition(), rocksDBMetricsRecorder);
|
||||
private final HasNextCondition hasNextCondition = Iterator::hasNext;
|
||||
|
||||
private SegmentIterator<KeyValueSegment> iterator = null;
|
||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.kafka.streams.StreamsConfig;
|
|||
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||
import org.apache.kafka.streams.processor.TaskId;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
import org.apache.kafka.streams.query.Position;
|
||||
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.junit.Before;
|
||||
|
@ -60,7 +61,7 @@ public class TimestampedSegmentTest {
|
|||
|
||||
@Test
|
||||
public void shouldDeleteStateDirectoryOnDestroy() throws Exception {
|
||||
final TimestampedSegment segment = new TimestampedSegment("segment", "window", 0L, metricsRecorder);
|
||||
final TimestampedSegment segment = new TimestampedSegment("segment", "window", 0L, Position.emptyPosition(), metricsRecorder);
|
||||
final String directoryPath = TestUtils.tempDirectory().getAbsolutePath();
|
||||
final File directory = new File(directoryPath);
|
||||
|
||||
|
@ -82,11 +83,11 @@ public class TimestampedSegmentTest {
|
|||
|
||||
@Test
|
||||
public void shouldBeEqualIfIdIsEqual() {
|
||||
final TimestampedSegment segment = new TimestampedSegment("anyName", "anyName", 0L, metricsRecorder);
|
||||
final TimestampedSegment segment = new TimestampedSegment("anyName", "anyName", 0L, Position.emptyPosition(), metricsRecorder);
|
||||
final TimestampedSegment segmentSameId =
|
||||
new TimestampedSegment("someOtherName", "someOtherName", 0L, metricsRecorder);
|
||||
new TimestampedSegment("someOtherName", "someOtherName", 0L, Position.emptyPosition(), metricsRecorder);
|
||||
final TimestampedSegment segmentDifferentId =
|
||||
new TimestampedSegment("anyName", "anyName", 1L, metricsRecorder);
|
||||
new TimestampedSegment("anyName", "anyName", 1L, Position.emptyPosition(), metricsRecorder);
|
||||
|
||||
assertThat(segment, equalTo(segment));
|
||||
assertThat(segment, equalTo(segmentSameId));
|
||||
|
@ -101,11 +102,11 @@ public class TimestampedSegmentTest {
|
|||
|
||||
@Test
|
||||
public void shouldHashOnSegmentIdOnly() {
|
||||
final TimestampedSegment segment = new TimestampedSegment("anyName", "anyName", 0L, metricsRecorder);
|
||||
final TimestampedSegment segment = new TimestampedSegment("anyName", "anyName", 0L, Position.emptyPosition(), metricsRecorder);
|
||||
final TimestampedSegment segmentSameId =
|
||||
new TimestampedSegment("someOtherName", "someOtherName", 0L, metricsRecorder);
|
||||
new TimestampedSegment("someOtherName", "someOtherName", 0L, Position.emptyPosition(), metricsRecorder);
|
||||
final TimestampedSegment segmentDifferentId =
|
||||
new TimestampedSegment("anyName", "anyName", 1L, metricsRecorder);
|
||||
new TimestampedSegment("anyName", "anyName", 1L, Position.emptyPosition(), metricsRecorder);
|
||||
|
||||
final Set<TimestampedSegment> set = new HashSet<>();
|
||||
assertTrue(set.add(segment));
|
||||
|
@ -119,9 +120,9 @@ public class TimestampedSegmentTest {
|
|||
|
||||
@Test
|
||||
public void shouldCompareSegmentIdOnly() {
|
||||
final TimestampedSegment segment1 = new TimestampedSegment("a", "C", 50L, metricsRecorder);
|
||||
final TimestampedSegment segment2 = new TimestampedSegment("b", "B", 100L, metricsRecorder);
|
||||
final TimestampedSegment segment3 = new TimestampedSegment("c", "A", 0L, metricsRecorder);
|
||||
final TimestampedSegment segment1 = new TimestampedSegment("a", "C", 50L, Position.emptyPosition(), metricsRecorder);
|
||||
final TimestampedSegment segment2 = new TimestampedSegment("b", "B", 100L, Position.emptyPosition(), metricsRecorder);
|
||||
final TimestampedSegment segment3 = new TimestampedSegment("c", "A", 0L, Position.emptyPosition(), metricsRecorder);
|
||||
|
||||
assertThat(segment1.compareTo(segment1), equalTo(0));
|
||||
assertThat(segment1.compareTo(segment2), equalTo(-1));
|
||||
|
|
Loading…
Reference in New Issue