mirror of https://github.com/apache/kafka.git
KAFKA-14491: [18/N] Update versioned store to check latest value on timestamped get (#13409)
Part of KIP-889. Prior to this PR, versioned stores always returned null for get(key, timestamp) calls where the timestamp has exceeded the store's history retention, even if the latest value for the key (i.e., the one returned from get(key)) satisfies the timestamp bound. This was an oversight from the earlier implementation -- get(key, timestamp) should still return a record in this situation since the record exists in the store. This PR updates both the javadocs and the implementation accordingly. Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
parent
bfd15299b1
commit
1560c5bd7e
|
|
@ -59,12 +59,24 @@ public interface VersionedKeyValueStore<K, V> extends StateStore {
|
|||
* Delete the value associated with this key from the store, at the specified timestamp
|
||||
* (if there is such a value), and return the deleted value.
|
||||
* <p>
|
||||
* This operation is semantically equivalent to {@link #get(Object, long) #get(key, timestamp)}
|
||||
* followed by {@link #put(Object, Object, long) #put(key, null, timestamp)}.
|
||||
* <p>
|
||||
* If the timestamp associated with this deletion is older than the store's grace period
|
||||
* (i.e., history retention) relative to the current observed stream time, then the deletion
|
||||
* will not be performed.
|
||||
* will not be performed and {@code null} will be returned.
|
||||
* <p>
|
||||
* As a consequence of the above, the way to delete a record version is <it>not</it>
|
||||
* to first call {@link #get(Object) #get(key)} or {@link #get(Object, long) #get(key, timestamp)}
|
||||
* and use the returned {@link VersionedRecord#timestamp()} in a call to this
|
||||
* {@code delete(key, timestamp)} method, as the returned timestamp may be older than
|
||||
* the store's grace period (i.e., history retention) and will therefore not take place.
|
||||
* Instead, you should pass a business logic inferred timestamp that specifies when
|
||||
* the delete actually happens. For example, it could be the timestamp of the currently
|
||||
* processed input record or the current stream time.
|
||||
* <p>
|
||||
* This operation is semantically equivalent to {@link #get(Object, long) #get(key, timestamp)}
|
||||
* followed by {@link #put(Object, Object, long) #put(key, null, timestamp)}, with
|
||||
* a caveat that if the deletion timestamp is older than the store's grace period
|
||||
* (i.e., history retention) then the return value is always {@code null}, regardless
|
||||
* of what {@link #get(Object, long) #get(key, timestamp)} would return.
|
||||
*
|
||||
* @param key The key
|
||||
* @param timestamp The timestamp for this delete
|
||||
|
|
@ -105,7 +117,9 @@ public interface VersionedKeyValueStore<K, V> extends StateStore {
|
|||
* retention time, i.e., the store no longer contains data for the provided
|
||||
* timestamp). Note that the record timestamp {@code r.timestamp()} of the
|
||||
* returned {@link VersionedRecord} may be smaller than the provided timestamp
|
||||
* bound.
|
||||
* bound. Additionally, if the latest record version for the key is eligible
|
||||
* for the provided timestamp bound, then that record will be returned even if
|
||||
* the timestamp bound is older than the store's history retention.
|
||||
* @throws NullPointerException If null is used for key.
|
||||
* @throws InvalidStateStoreException if the store is not initialized
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -133,8 +133,23 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
|
|||
|
||||
@Override
|
||||
public VersionedRecord<byte[]> delete(final Bytes key, final long timestamp) {
|
||||
if (timestamp < observedStreamTime - gracePeriod) {
|
||||
expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
|
||||
LOG.warn("Skipping record for expired delete.");
|
||||
return null;
|
||||
}
|
||||
|
||||
final VersionedRecord<byte[]> existingRecord = get(key, timestamp);
|
||||
put(key, null, timestamp);
|
||||
|
||||
observedStreamTime = Math.max(observedStreamTime, timestamp);
|
||||
doPut(
|
||||
versionedStoreClient,
|
||||
observedStreamTime,
|
||||
key,
|
||||
null,
|
||||
timestamp
|
||||
);
|
||||
|
||||
return existingRecord;
|
||||
}
|
||||
|
||||
|
|
@ -156,9 +171,25 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
|
|||
public VersionedRecord<byte[]> get(final Bytes key, final long asOfTimestamp) {
|
||||
|
||||
if (asOfTimestamp < observedStreamTime - historyRetention) {
|
||||
// history retention exceeded. we still check the latest value store in case the
|
||||
// latest record version satisfies the timestamp bound, in which case it should
|
||||
// still be returned (i.e., the latest record version per key never expires).
|
||||
final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key);
|
||||
if (rawLatestValueAndTimestamp != null) {
|
||||
final long latestTimestamp = LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp);
|
||||
if (latestTimestamp <= asOfTimestamp) {
|
||||
// latest value satisfies timestamp bound
|
||||
return new VersionedRecord<>(
|
||||
LatestValueFormatter.getValue(rawLatestValueAndTimestamp),
|
||||
latestTimestamp
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// history retention has elapsed and the latest record version (if present) does
|
||||
// not satisfy the timestamp bound. return null for predictability, even if data
|
||||
// is still present in segments.
|
||||
LOG.warn("Returning null for expired get.");
|
||||
// history retention has elapsed. return null for predictability, even if data
|
||||
// is still present in store.
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -374,6 +374,26 @@ public class RocksDBVersionedStoreTest {
|
|||
verifyExpiredRecordSensor(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotDeleteExpired() {
|
||||
putToStore("k1", "v1", 1L);
|
||||
putToStore("k2", "v2", 1L);
|
||||
putToStore("kother", "vother", HISTORY_RETENTION + 10); // use separate key to advance stream time
|
||||
|
||||
// grace period has not elapsed
|
||||
VersionedRecord<String> deleted = deleteFromStore("k1", HISTORY_RETENTION + 10 - GRACE_PERIOD);
|
||||
assertThat(deleted.value(), equalTo("v1"));
|
||||
assertThat(deleted.timestamp(), equalTo(1L));
|
||||
verifyGetNullFromStore("k1");
|
||||
|
||||
// grace period has elapsed, so this delete does not take place
|
||||
deleted = deleteFromStore("k2", HISTORY_RETENTION + 9 - GRACE_PERIOD);
|
||||
assertThat(deleted, nullValue()); // return value is null even though record exists because delete did not take place
|
||||
verifyGetValueFromStore("k2", "v2", 1L);
|
||||
|
||||
verifyExpiredRecordSensor(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldGetFromOlderSegments() {
|
||||
// use a different key to create three different segments
|
||||
|
|
@ -421,6 +441,19 @@ public class RocksDBVersionedStoreTest {
|
|||
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 11);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldGetExpiredIfLatestValue() {
|
||||
putToStore("k", "v", 1);
|
||||
putToStore("ko", "vo_old", 1);
|
||||
putToStore("ko", "vo_new", HISTORY_RETENTION + 12);
|
||||
|
||||
// expired get on key where latest satisfies timestamp bound still returns data
|
||||
verifyTimestampedGetValueFromStore("k", 10, "v", 1);
|
||||
|
||||
// same expired get on key where latest value does not satisfy timestamp bound does not return data
|
||||
verifyTimestampedGetNullFromStore("ko", 10);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldDistinguishEmptyAndNull() {
|
||||
putToStore("k", null, SEGMENT_INTERVAL + 20);
|
||||
|
|
|
|||
Loading…
Reference in New Issue