KAFKA-12449: Remove deprecated WindowStore#put (#10293)

Removes `WindowStore#put(K,V)` that was deprecated via KIP-474.

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Jorge Esteban Quilcate Otoya 2021-04-09 19:49:37 +01:00 committed by GitHub
parent e4b6ffae5d
commit db0323e9ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 119 additions and 316 deletions

View File

@ -111,6 +111,7 @@
<li> Overloaded <code>KStream#join, leftJoin, outerJoin</code> with <code>KStream</code> and <code>Joined</code> parameters: deprecated in Kafka 2.4.0 (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+StreamJoined+config+object+to+Join">KIP-479</a>).</li>
<li> Overloaded <code>KafkaStreams#metadataForKey</code>: deprecated in Kafka 2.5.0 (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance">KIP-535</a>).</li>
<li> Overloaded <code>KafkaStreams#store</code>: deprecated in Kafka 2.5.0 (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance">KIP-562</a>).</li>
<li> <code>WindowStore#put(K key, V value)</code>: deprecated in Kafka 2.4.0 (<a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=115526545">KIP-474</a>).</li>
</ul>
<p>
The following dependencies were removed from Kafka Streams:

View File

@ -160,13 +160,6 @@ abstract class AbstractReadOnlyDecorator<T extends StateStore, K, V> extends Wra
super(inner);
}
@Deprecated
@Override
public void put(final K key,
final V value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public void put(final K key,
final V value,

View File

@ -154,13 +154,6 @@ abstract class AbstractReadWriteDecorator<T extends StateStore, K, V> extends Wr
super(inner);
}
@Deprecated
@Override
public void put(final K key,
final V value) {
wrapped().put(key, value);
}
@Override
public void put(final K key,
final V value,

View File

@ -35,24 +35,6 @@ import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFail
*/
public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V> {
/**
* Use the current record timestamp as the {@code windowStartTimestamp} and
* delegate to {@link WindowStore#put(Object, Object, long)}.
* <p>
* It's highly recommended to use {@link WindowStore#put(Object, Object, long)} instead, as the record timestamp
* is unlikely to be the correct windowStartTimestamp in general.
*
* @param key The key to associate the value to
* @param value The value to update, it can be null;
* if the serialized bytes are also null it is interpreted as delete
* @throws NullPointerException if the given key is {@code null}
* @deprecated as timestamp is not provided for the key-value pair, this causes inconsistency
* to identify the window frame to which the key belongs.
* Use {@link #put(Object, Object, long)} instead.
*/
@Deprecated
void put(K key, V value);
/**
* Put a key-value pair into the window with given window start timestamp
* <p>

View File

@ -142,12 +142,6 @@ class CachingWindowStore
return true;
}
@Deprecated
@Override
public synchronized void put(final Bytes key,
final byte[] value) {
put(key, value, context.timestamp());
}
@Override
public synchronized void put(final Bytes key,

View File

@ -131,16 +131,6 @@ class ChangeLoggingWindowBytesStore
return wrapped().backwardFetchAll(timeFrom, timeTo);
}
@Deprecated
@Override
public void put(final Bytes key, final byte[] value) {
// Note: It's incorrect to bypass the wrapped store here by delegating to another method,
// but we have no alternative. We must send a timestamped key to the changelog, which means
// we need to know what timestamp gets used for the record. Hopefully, we can deprecate this
// method in the future to resolve the situation.
put(key, value, context.timestamp());
}
@Override
public void put(final Bytes key,
final byte[] value,

View File

@ -108,12 +108,6 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
open = true;
}
@Deprecated
@Override
public void put(final Bytes key, final byte[] value) {
put(key, value, context.timestamp());
}
@Override
public void put(final Bytes key, final byte[] value, final long windowStartTimestamp) {
removeExpiredSegments();

View File

@ -157,13 +157,6 @@ public class MeteredWindowStore<K, V>
return false;
}
@Deprecated
@Override
public void put(final K key,
final V value) {
put(key, value, context != null ? context.timestamp() : 0L);
}
@Override
public void put(final K key,
final V value,

View File

@ -18,10 +18,6 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
@ -33,7 +29,6 @@ public class RocksDBWindowStore
private final boolean retainDuplicates;
private final long windowSize;
private InternalProcessorContext context;
private int seqnum = 0;
RocksDBWindowStore(final SegmentedBytesStore bytesStore,
@ -44,25 +39,6 @@ public class RocksDBWindowStore
this.windowSize = windowSize;
}
@Deprecated
@Override
public void init(final ProcessorContext context, final StateStore root) {
this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
super.init(context, root);
}
@Override
public void init(final StateStoreContext context, final StateStore root) {
this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
super.init(context, root);
}
@Deprecated
@Override
public void put(final Bytes key, final byte[] value) {
put(key, value, context != null ? context.timestamp() : 0L);
}
@Override
public void put(final Bytes key, final byte[] value, final long windowStartTimestamp) {
// Skip if value is null and duplicates are allowed since this delete is a no-op

View File

@ -122,13 +122,6 @@ public class TimestampedWindowStoreBuilder<K, V>
wrapped.init(context, root);
}
@Deprecated
@Override
public void put(final Bytes key,
final byte[] value) {
wrapped.put(key, value);
}
@Override
public void put(final Bytes key,
final byte[] value,

View File

@ -40,13 +40,6 @@ class WindowToTimestampedWindowByteStoreAdapter implements WindowStore<Bytes, by
this.store = store;
}
@Deprecated
@Override
public void put(final Bytes key,
final byte[] valueWithTimestamp) {
store.put(key, valueWithTimestamp == null ? null : rawValue(valueWithTimestamp));
}
@Override
public void put(final Bytes key,
final byte[] valueWithTimestamp,

View File

@ -227,7 +227,6 @@ public class ProcessorContextImplTest {
checkThrowsUnsupportedOperation(store::flush, "flush()");
checkThrowsUnsupportedOperation(() -> store.put("1", 1L, 1L), "put()");
checkThrowsUnsupportedOperation(() -> store.put("1", 1L), "put()");
assertEquals(iters.get(0), store.fetchAll(0L, 0L));
assertEquals(windowStoreIter, store.fetch(KEY, 0L, 1L));
@ -246,7 +245,6 @@ public class ProcessorContextImplTest {
checkThrowsUnsupportedOperation(store::flush, "flush()");
checkThrowsUnsupportedOperation(() -> store.put("1", ValueAndTimestamp.make(1L, 1L), 1L), "put() [with timestamp]");
checkThrowsUnsupportedOperation(() -> store.put("1", ValueAndTimestamp.make(1L, 1L)), "put() [no timestamp]");
assertEquals(timestampedIters.get(0), store.fetchAll(0L, 0L));
assertEquals(windowStoreIter, store.fetch(KEY, 0L, 1L));
@ -335,7 +333,7 @@ public class ProcessorContextImplTest {
store.flush();
assertTrue(flushExecuted);
store.put("1", 1L);
store.put("1", 1L, 1L);
assertTrue(putExecuted);
assertEquals(iters.get(0), store.fetchAll(0L, 0L));
@ -355,7 +353,7 @@ public class ProcessorContextImplTest {
store.flush();
assertTrue(flushExecuted);
store.put("1", ValueAndTimestamp.make(1L, 1L));
store.put("1", ValueAndTimestamp.make(1L, 1L), 1L);
assertTrue(putExecuted);
store.put("1", ValueAndTimestamp.make(1L, 1L), 1L);
@ -639,7 +637,7 @@ public class ProcessorContextImplTest {
expect(windowStore.fetch(anyString(), anyLong())).andReturn(VALUE);
expect(windowStore.all()).andReturn(iters.get(2));
windowStore.put(anyString(), anyLong());
windowStore.put(anyString(), anyLong(), anyLong());
expectLastCall().andAnswer(() -> {
putExecuted = true;
return null;
@ -662,7 +660,7 @@ public class ProcessorContextImplTest {
expect(windowStore.fetch(anyString(), anyLong())).andReturn(VALUE_AND_TIMESTAMP);
expect(windowStore.all()).andReturn(timestampedIters.get(2));
windowStore.put(anyString(), anyObject(ValueAndTimestamp.class));
windowStore.put(anyString(), anyObject(ValueAndTimestamp.class), anyLong());
expectLastCall().andAnswer(() -> {
putExecuted = true;
return null;

View File

@ -782,24 +782,22 @@ public abstract class AbstractWindowBytesStoreTest {
}
@Test
@SuppressWarnings("deprecation")
public void testPutSameKeyTimestamp() {
windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(), Serdes.String());
windowStore.init((StateStoreContext) context, windowStore);
final long startTime = SEGMENT_INTERVAL - 4L;
setCurrentTime(startTime);
windowStore.put(0, "zero");
windowStore.put(0, "zero", startTime);
assertEquals(
new HashSet<>(Collections.singletonList("zero")),
valuesToSet(windowStore.fetch(0, ofEpochMilli(startTime - WINDOW_SIZE),
ofEpochMilli(startTime + WINDOW_SIZE))));
windowStore.put(0, "zero");
windowStore.put(0, "zero+");
windowStore.put(0, "zero++");
windowStore.put(0, "zero", startTime);
windowStore.put(0, "zero+", startTime);
windowStore.put(0, "zero++", startTime);
assertEquals(
new HashSet<>(asList("zero", "zero", "zero+", "zero++")),
@ -847,7 +845,6 @@ public abstract class AbstractWindowBytesStoreTest {
@Test
public void shouldCloseOpenIteratorsWhenStoreIsClosedAndNotThrowInvalidStateStoreExceptionOnHasNext() {
setCurrentTime(0);
windowStore.put(1, "one", 1L);
windowStore.put(1, "two", 2L);
windowStore.put(1, "three", 3L);
@ -905,18 +902,16 @@ public abstract class AbstractWindowBytesStoreTest {
}
@Test
@SuppressWarnings("deprecation")
public void testDeleteAndUpdate() {
final long currentTime = 0;
setCurrentTime(currentTime);
windowStore.put(1, "one");
windowStore.put(1, "one v2");
windowStore.put(1, "one", currentTime);
windowStore.put(1, "one v2", currentTime);
WindowStoreIterator<String> iterator = windowStore.fetch(1, 0, currentTime);
assertEquals(new KeyValue<>(currentTime, "one v2"), iterator.next());
windowStore.put(1, null);
windowStore.put(1, null, currentTime);
iterator = windowStore.fetch(1, 0, currentTime);
assertFalse(iterator.hasNext());
}
@ -927,9 +922,8 @@ public abstract class AbstractWindowBytesStoreTest {
}
@Test
@SuppressWarnings("deprecation")
public void shouldThrowNullPointerExceptionOnPutNullKey() {
assertThrows(NullPointerException.class, () -> windowStore.put(null, "anyValue"));
assertThrows(NullPointerException.class, () -> windowStore.put(null, "anyValue", 0L));
}
@Test
@ -1120,11 +1114,9 @@ public abstract class AbstractWindowBytesStoreTest {
}
@Test
@SuppressWarnings("deprecation")
public void testWindowIteratorPeek() {
final long currentTime = 0;
setCurrentTime(currentTime);
windowStore.put(1, "one");
windowStore.put(1, "one", currentTime);
final KeyValueIterator<Windowed<Integer>, String> iterator = windowStore.fetchAll(0L, currentTime);
@ -1151,25 +1143,20 @@ public abstract class AbstractWindowBytesStoreTest {
}
@Test
@SuppressWarnings("deprecation")
public void shouldNotThrowConcurrentModificationException() {
long currentTime = 0;
setCurrentTime(currentTime);
windowStore.put(1, "one");
windowStore.put(1, "one", currentTime);
currentTime += WINDOW_SIZE * 10;
setCurrentTime(currentTime);
windowStore.put(1, "two");
windowStore.put(1, "two", currentTime);
final KeyValueIterator<Windowed<Integer>, String> iterator = windowStore.all();
currentTime += WINDOW_SIZE * 10;
setCurrentTime(currentTime);
windowStore.put(1, "three");
windowStore.put(1, "three", currentTime);
currentTime += WINDOW_SIZE * 10;
setCurrentTime(currentTime);
windowStore.put(2, "four");
windowStore.put(2, "four", currentTime);
// Iterator should return all records in store and not throw exception b/c some were added after fetch
assertEquals(windowedPair(1, "one", 0), iterator.next());
@ -1180,25 +1167,21 @@ public abstract class AbstractWindowBytesStoreTest {
}
@Test
@SuppressWarnings("deprecation")
public void testFetchDuplicates() {
windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(), Serdes.String());
windowStore.init((StateStoreContext) context, windowStore);
long currentTime = 0;
setCurrentTime(currentTime);
windowStore.put(1, "one");
windowStore.put(1, "one-2");
windowStore.put(1, "one", currentTime);
windowStore.put(1, "one-2", currentTime);
currentTime += WINDOW_SIZE * 10;
setCurrentTime(currentTime);
windowStore.put(1, "two");
windowStore.put(1, "two-2");
windowStore.put(1, "two", currentTime);
windowStore.put(1, "two-2", currentTime);
currentTime += WINDOW_SIZE * 10;
setCurrentTime(currentTime);
windowStore.put(1, "three");
windowStore.put(1, "three-2");
windowStore.put(1, "three", currentTime);
windowStore.put(1, "three-2", currentTime);
final WindowStoreIterator<String> iterator = windowStore.fetch(1, 0, WINDOW_SIZE * 10);
@ -1210,38 +1193,26 @@ public abstract class AbstractWindowBytesStoreTest {
}
@SuppressWarnings("deprecation")
private void putFirstBatch(final WindowStore<Integer, String> store,
@SuppressWarnings("SameParameterValue") final long startTime,
final InternalMockProcessorContext context) {
context.setRecordContext(createRecordContext(startTime));
store.put(0, "zero");
context.setRecordContext(createRecordContext(startTime + 1L));
store.put(1, "one");
context.setRecordContext(createRecordContext(startTime + 2L));
store.put(2, "two");
context.setRecordContext(createRecordContext(startTime + 4L));
store.put(4, "four");
context.setRecordContext(createRecordContext(startTime + 5L));
store.put(5, "five");
store.put(0, "zero", startTime);
store.put(1, "one", startTime + 1L);
store.put(2, "two", startTime + 2L);
store.put(4, "four", startTime + 4L);
store.put(5, "five", startTime + 5L);
}
@SuppressWarnings("deprecation")
private void putSecondBatch(final WindowStore<Integer, String> store,
@SuppressWarnings("SameParameterValue") final long startTime,
final InternalMockProcessorContext context) {
context.setRecordContext(createRecordContext(startTime + 3L));
store.put(2, "two+1");
context.setRecordContext(createRecordContext(startTime + 4L));
store.put(2, "two+2");
context.setRecordContext(createRecordContext(startTime + 5L));
store.put(2, "two+3");
context.setRecordContext(createRecordContext(startTime + 6L));
store.put(2, "two+4");
context.setRecordContext(createRecordContext(startTime + 7L));
store.put(2, "two+5");
context.setRecordContext(createRecordContext(startTime + 8L));
store.put(2, "two+6");
store.put(2, "two+1", startTime + 3L);
store.put(2, "two+2", startTime + 4L);
store.put(2, "two+3", startTime + 5L);
store.put(2, "two+4", startTime + 6L);
store.put(2, "two+5", startTime + 7L);
store.put(2, "two+6", startTime + 8L);
}
long extractStoreTimestamp(final byte[] binaryKey) {
@ -1278,10 +1249,6 @@ public abstract class AbstractWindowBytesStoreTest {
return KeyValue.pair(new Windowed<>(key, WindowKeySchema.timeWindowForSize(timestamp, windowSize)), value);
}
protected void setCurrentTime(final long currentTime) {
context.setRecordContext(createRecordContext(currentTime));
}
private ProcessorRecordContext createRecordContext(final long time) {
return new ProcessorRecordContext(time, 0, 0, "topic", null);
}

View File

@ -236,8 +236,8 @@ public class CachingPersistentWindowStoreTest {
@Test
@SuppressWarnings("deprecation")
public void shouldPutFetchFromCache() {
cachingStore.put(bytesKey("a"), bytesValue("a"));
cachingStore.put(bytesKey("b"), bytesValue("b"));
cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
assertThat(cachingStore.fetch(bytesKey("a"), 10), equalTo(bytesValue("a")));
assertThat(cachingStore.fetch(bytesKey("b"), 10), equalTo(bytesValue("b")));
@ -275,8 +275,8 @@ public class CachingPersistentWindowStoreTest {
@Test
@SuppressWarnings("deprecation")
public void shouldPutFetchRangeFromCache() {
cachingStore.put(bytesKey("a"), bytesValue("a"));
cachingStore.put(bytesKey("b"), bytesValue("b"));
cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
cachingStore.fetch(bytesKey("a"), bytesKey("b"), ofEpochMilli(10), ofEpochMilli(10));
@ -293,16 +293,15 @@ public class CachingPersistentWindowStoreTest {
}
@Test
@SuppressWarnings("deprecation")
public void shouldGetAllFromCache() {
cachingStore.put(bytesKey("a"), bytesValue("a"));
cachingStore.put(bytesKey("b"), bytesValue("b"));
cachingStore.put(bytesKey("c"), bytesValue("c"));
cachingStore.put(bytesKey("d"), bytesValue("d"));
cachingStore.put(bytesKey("e"), bytesValue("e"));
cachingStore.put(bytesKey("f"), bytesValue("f"));
cachingStore.put(bytesKey("g"), bytesValue("g"));
cachingStore.put(bytesKey("h"), bytesValue("h"));
cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP);
cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP);
cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP);
cachingStore.put(bytesKey("f"), bytesValue("f"), DEFAULT_TIMESTAMP);
cachingStore.put(bytesKey("g"), bytesValue("g"), DEFAULT_TIMESTAMP);
cachingStore.put(bytesKey("h"), bytesValue("h"), DEFAULT_TIMESTAMP);
final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = cachingStore.all();
final String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"};
@ -318,14 +317,14 @@ public class CachingPersistentWindowStoreTest {
@Test
@SuppressWarnings("deprecation")
public void shouldGetAllBackwardFromCache() {
cachingStore.put(bytesKey("a"), bytesValue("a"));
cachingStore.put(bytesKey("b"), bytesValue("b"));
cachingStore.put(bytesKey("c"), bytesValue("c"));
cachingStore.put(bytesKey("d"), bytesValue("d"));
cachingStore.put(bytesKey("e"), bytesValue("e"));
cachingStore.put(bytesKey("f"), bytesValue("f"));
cachingStore.put(bytesKey("g"), bytesValue("g"));
cachingStore.put(bytesKey("h"), bytesValue("h"));
cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP);
cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP);
cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP);
cachingStore.put(bytesKey("f"), bytesValue("f"), DEFAULT_TIMESTAMP);
cachingStore.put(bytesKey("g"), bytesValue("g"), DEFAULT_TIMESTAMP);
cachingStore.put(bytesKey("h"), bytesValue("h"), DEFAULT_TIMESTAMP);
final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = cachingStore.backwardAll();
final String[] array = {"h", "g", "f", "e", "d", "c", "b", "a"};
@ -343,8 +342,7 @@ public class CachingPersistentWindowStoreTest {
public void shouldFetchAllWithinTimestampRange() {
final String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"};
for (int i = 0; i < array.length; i++) {
context.setTime(i);
cachingStore.put(bytesKey(array[i]), bytesValue(array[i]));
cachingStore.put(bytesKey(array[i]), bytesValue(array[i]), i);
}
final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
@ -386,8 +384,7 @@ public class CachingPersistentWindowStoreTest {
public void shouldFetchAllBackwardWithinTimestampRange() {
final String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"};
for (int i = 0; i < array.length; i++) {
context.setTime(i);
cachingStore.put(bytesKey(array[i]), bytesValue(array[i]));
cachingStore.put(bytesKey(array[i]), bytesValue(array[i]), i);
}
final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
@ -444,7 +441,7 @@ public class CachingPersistentWindowStoreTest {
public void shouldForwardDirtyItemsWhenFlushCalled() {
final Windowed<String> windowedKey =
new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE));
cachingStore.put(bytesKey("1"), bytesValue("a"));
cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
cachingStore.flush();
assertEquals("a", cacheListener.forwarded.get(windowedKey).newValue);
assertNull(cacheListener.forwarded.get(windowedKey).oldValue);
@ -462,24 +459,24 @@ public class CachingPersistentWindowStoreTest {
cachingStore.setFlushListener(cacheListener, true);
final Windowed<String> windowedKey =
new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE));
cachingStore.put(bytesKey("1"), bytesValue("a"));
cachingStore.put(bytesKey("1"), bytesValue("b"));
cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP);
cachingStore.flush();
assertEquals("b", cacheListener.forwarded.get(windowedKey).newValue);
assertNull(cacheListener.forwarded.get(windowedKey).oldValue);
cacheListener.forwarded.clear();
cachingStore.put(bytesKey("1"), bytesValue("c"));
cachingStore.put(bytesKey("1"), bytesValue("c"), DEFAULT_TIMESTAMP);
cachingStore.flush();
assertEquals("c", cacheListener.forwarded.get(windowedKey).newValue);
assertEquals("b", cacheListener.forwarded.get(windowedKey).oldValue);
cachingStore.put(bytesKey("1"), null);
cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP);
cachingStore.flush();
assertNull(cacheListener.forwarded.get(windowedKey).newValue);
assertEquals("c", cacheListener.forwarded.get(windowedKey).oldValue);
cacheListener.forwarded.clear();
cachingStore.put(bytesKey("1"), bytesValue("a"));
cachingStore.put(bytesKey("1"), bytesValue("b"));
cachingStore.put(bytesKey("1"), null);
cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP);
cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP);
cachingStore.flush();
assertNull(cacheListener.forwarded.get(windowedKey));
cacheListener.forwarded.clear();
@ -490,23 +487,23 @@ public class CachingPersistentWindowStoreTest {
public void shouldForwardOldValuesWhenDisabled() {
final Windowed<String> windowedKey =
new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE));
cachingStore.put(bytesKey("1"), bytesValue("a"));
cachingStore.put(bytesKey("1"), bytesValue("b"));
cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP);
cachingStore.flush();
assertEquals("b", cacheListener.forwarded.get(windowedKey).newValue);
assertNull(cacheListener.forwarded.get(windowedKey).oldValue);
cachingStore.put(bytesKey("1"), bytesValue("c"));
cachingStore.put(bytesKey("1"), bytesValue("c"), DEFAULT_TIMESTAMP);
cachingStore.flush();
assertEquals("c", cacheListener.forwarded.get(windowedKey).newValue);
assertNull(cacheListener.forwarded.get(windowedKey).oldValue);
cachingStore.put(bytesKey("1"), null);
cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP);
cachingStore.flush();
assertNull(cacheListener.forwarded.get(windowedKey).newValue);
assertNull(cacheListener.forwarded.get(windowedKey).oldValue);
cacheListener.forwarded.clear();
cachingStore.put(bytesKey("1"), bytesValue("a"));
cachingStore.put(bytesKey("1"), bytesValue("b"));
cachingStore.put(bytesKey("1"), null);
cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP);
cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP);
cachingStore.flush();
assertNull(cacheListener.forwarded.get(windowedKey));
cacheListener.forwarded.clear();
@ -619,7 +616,7 @@ public class CachingPersistentWindowStoreTest {
@Test
@SuppressWarnings("deprecation")
public void shouldClearNamespaceCacheOnClose() {
cachingStore.put(bytesKey("a"), bytesValue("a"));
cachingStore.put(bytesKey("a"), bytesValue("a"), 0L);
assertEquals(1, cache.size());
cachingStore.close();
assertEquals(0, cache.size());
@ -641,7 +638,7 @@ public class CachingPersistentWindowStoreTest {
@SuppressWarnings("deprecation")
public void shouldThrowIfTryingToWriteToClosedCachingStore() {
cachingStore.close();
assertThrows(InvalidStateStoreException.class, () -> cachingStore.put(bytesKey("a"), bytesValue("a")));
assertThrows(InvalidStateStoreException.class, () -> cachingStore.put(bytesKey("a"), bytesValue("a"), 0L));
}
@Test
@ -789,13 +786,13 @@ public class CachingPersistentWindowStoreTest {
@Test
@SuppressWarnings("deprecation")
public void shouldThrowNullPointerExceptionOnPutNullKey() {
assertThrows(NullPointerException.class, () -> cachingStore.put(null, bytesValue("anyValue")));
assertThrows(NullPointerException.class, () -> cachingStore.put(null, bytesValue("anyValue"), 0L));
}
@Test
@SuppressWarnings("deprecation")
public void shouldNotThrowNullPointerExceptionOnPutNullValue() {
cachingStore.put(bytesKey("a"), null);
cachingStore.put(bytesKey("a"), null, 0L);
}
@Test
@ -919,13 +916,12 @@ public class CachingPersistentWindowStoreTest {
bytesValue(value));
}
@SuppressWarnings("deprecation")
private int addItemsToCache() {
int cachedSize = 0;
int i = 0;
while (cachedSize < MAX_CACHE_SIZE_BYTES) {
final String kv = String.valueOf(i++);
cachingStore.put(bytesKey(kv), bytesValue(kv));
cachingStore.put(bytesKey(kv), bytesValue(kv), DEFAULT_TIMESTAMP);
cachedSize += memoryCacheEntrySize(kv.getBytes(), kv.getBytes(), TOPIC) +
8 + // timestamp
4; // sequenceNumber

View File

@ -99,7 +99,7 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
context.logChange(store.name(), key, value, 42);
EasyMock.replay(context);
store.put(bytesKey, valueAndTimestamp);
store.put(bytesKey, valueAndTimestamp, context.timestamp());
EasyMock.verify(inner, context);
}
@ -146,8 +146,8 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
EasyMock.replay(context);
store.put(bytesKey, valueAndTimestamp);
store.put(bytesKey, valueAndTimestamp);
store.put(bytesKey, valueAndTimestamp, context.timestamp());
store.put(bytesKey, valueAndTimestamp, context.timestamp());
EasyMock.verify(inner, context);
}

View File

@ -84,7 +84,6 @@ public class ChangeLoggingWindowBytesStoreTest {
}
@Test
@SuppressWarnings("deprecation")
public void shouldLogPuts() {
inner.put(bytesKey, value, 0);
EasyMock.expectLastCall();
@ -98,7 +97,7 @@ public class ChangeLoggingWindowBytesStoreTest {
context.logChange(store.name(), key, value, 0L);
EasyMock.replay(context);
store.put(bytesKey, value);
store.put(bytesKey, value, context.timestamp());
EasyMock.verify(inner, context);
}
@ -152,7 +151,6 @@ public class ChangeLoggingWindowBytesStoreTest {
}
@Test
@SuppressWarnings("deprecation")
public void shouldRetainDuplicatesWhenSet() {
store = new ChangeLoggingWindowBytesStore(inner, true, WindowKeySchema::toStoreKeyBinary);
@ -171,8 +169,8 @@ public class ChangeLoggingWindowBytesStoreTest {
EasyMock.replay(context);
store.put(bytesKey, value);
store.put(bytesKey, value);
store.put(bytesKey, value, context.timestamp());
store.put(bytesKey, value, context.timestamp());
EasyMock.verify(inner, context);
}

View File

@ -126,37 +126,30 @@ public class InMemoryWindowStoreTest extends AbstractWindowBytesStoreTest {
}
@Test
@SuppressWarnings("deprecation")
public void testExpiration() {
long currentTime = 0;
setCurrentTime(currentTime);
windowStore.put(1, "one");
windowStore.put(1, "one", currentTime);
currentTime += RETENTION_PERIOD / 4;
setCurrentTime(currentTime);
windowStore.put(1, "two");
windowStore.put(1, "two", currentTime);
currentTime += RETENTION_PERIOD / 4;
setCurrentTime(currentTime);
windowStore.put(1, "three");
windowStore.put(1, "three", currentTime);
currentTime += RETENTION_PERIOD / 4;
setCurrentTime(currentTime);
windowStore.put(1, "four");
windowStore.put(1, "four", currentTime);
// increase current time to the full RETENTION_PERIOD to expire first record
currentTime = currentTime + RETENTION_PERIOD / 4;
setCurrentTime(currentTime);
windowStore.put(1, "five");
windowStore.put(1, "five", currentTime);
KeyValueIterator<Windowed<Integer>, String> iterator = windowStore
.fetchAll(0L, currentTime);
// effect of this put (expires next oldest record, adds new one) should not be reflected in the already fetched results
currentTime = currentTime + RETENTION_PERIOD / 4;
setCurrentTime(currentTime);
windowStore.put(1, "six");
windowStore.put(1, "six", currentTime);
// should only have middle 4 values, as (only) the first record was expired at the time of the fetch
// and the last was inserted after the fetch

View File

@ -202,7 +202,6 @@ public class MeteredTimestampedWindowStoreTest {
}
@Test
@SuppressWarnings("deprecation")
public void shouldNotThrowExceptionIfSerdesCorrectlySetFromProcessorContext() {
EasyMock.expect(innerStoreMock.name()).andStubReturn("mocked-store");
EasyMock.replay(innerStoreMock);
@ -217,7 +216,7 @@ public class MeteredTimestampedWindowStoreTest {
store.init((StateStoreContext) context, innerStoreMock);
try {
store.put("key", ValueAndTimestamp.make(42L, 60000));
store.put("key", ValueAndTimestamp.make(42L, 60000), 60000L);
} catch (final StreamsException exception) {
if (exception.getCause() instanceof ClassCastException) {
fail("Serdes are not correctly set from processor context.");
@ -227,7 +226,6 @@ public class MeteredTimestampedWindowStoreTest {
}
@Test
@SuppressWarnings("deprecation")
public void shouldNotThrowExceptionIfSerdesCorrectlySetFromConstructorParameters() {
EasyMock.expect(innerStoreMock.name()).andStubReturn("mocked-store");
EasyMock.replay(innerStoreMock);
@ -242,7 +240,7 @@ public class MeteredTimestampedWindowStoreTest {
store.init((StateStoreContext) context, innerStoreMock);
try {
store.put("key", ValueAndTimestamp.make(42L, 60000));
store.put("key", ValueAndTimestamp.make(42L, 60000), 60000L);
} catch (final StreamsException exception) {
if (exception.getCause() instanceof ClassCastException) {
fail("Serdes are not correctly set from constructor parameters.");

View File

@ -290,7 +290,6 @@ public class MeteredWindowStoreTest {
}
@Test
@SuppressWarnings("deprecation")
public void shouldRecordPutLatency() {
final byte[] bytes = "a".getBytes();
innerStoreMock.put(eq(Bytes.wrap(bytes)), anyObject(), eq(context.timestamp()));
@ -298,7 +297,7 @@ public class MeteredWindowStoreTest {
replay(innerStoreMock);
store.init((StateStoreContext) context, store);
store.put("a", "a");
store.put("a", "a", context.timestamp());
final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) {
assertEquals(1.0, getMetricByNameFilterByTags(

View File

@ -81,26 +81,21 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
}
@Test
@SuppressWarnings("deprecation")
public void shouldOnlyIterateOpenSegments() {
long currentTime = 0;
setCurrentTime(currentTime);
windowStore.put(1, "one");
windowStore.put(1, "one", currentTime);
currentTime = currentTime + SEGMENT_INTERVAL;
setCurrentTime(currentTime);
windowStore.put(1, "two");
windowStore.put(1, "two", currentTime);
currentTime = currentTime + SEGMENT_INTERVAL;
setCurrentTime(currentTime);
windowStore.put(1, "three");
windowStore.put(1, "three", currentTime);
final WindowStoreIterator<String> iterator = windowStore.fetch(1, 0L, currentTime);
// roll to the next segment that will close the first
currentTime = currentTime + SEGMENT_INTERVAL;
setCurrentTime(currentTime);
windowStore.put(1, "four");
windowStore.put(1, "four", currentTime);
// should only have 2 values as the first segment is no longer open
assertEquals(new KeyValue<>(SEGMENT_INTERVAL, "two"), iterator.next());
@ -109,22 +104,18 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
}
@Test
@SuppressWarnings("deprecation")
public void testRolling() {
// to validate segments
final long startTime = SEGMENT_INTERVAL * 2;
final long increment = SEGMENT_INTERVAL / 2;
setCurrentTime(startTime);
windowStore.put(0, "zero");
windowStore.put(0, "zero", startTime);
assertEquals(Utils.mkSet(segments.segmentName(2)), segmentDirs(baseDir));
setCurrentTime(startTime + increment);
windowStore.put(1, "one");
windowStore.put(1, "one", startTime + increment);
assertEquals(Utils.mkSet(segments.segmentName(2)), segmentDirs(baseDir));
setCurrentTime(startTime + increment * 2);
windowStore.put(2, "two");
windowStore.put(2, "two", startTime + increment * 2);
assertEquals(
Utils.mkSet(
segments.segmentName(2),
@ -133,8 +124,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
segmentDirs(baseDir)
);
setCurrentTime(startTime + increment * 4);
windowStore.put(4, "four");
windowStore.put(4, "four", startTime + increment * 4);
assertEquals(
Utils.mkSet(
segments.segmentName(2),
@ -144,8 +134,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
segmentDirs(baseDir)
);
setCurrentTime(startTime + increment * 5);
windowStore.put(5, "five");
windowStore.put(5, "five", startTime + increment * 5);
assertEquals(
Utils.mkSet(
segments.segmentName(2),
@ -192,8 +181,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
ofEpochMilli(startTime + increment * 5 - WINDOW_SIZE),
ofEpochMilli(startTime + increment * 5 + WINDOW_SIZE))));
setCurrentTime(startTime + increment * 6);
windowStore.put(6, "six");
windowStore.put(6, "six", startTime + increment * 6);
assertEquals(
Utils.mkSet(
segments.segmentName(3),
@ -246,8 +234,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
ofEpochMilli(startTime + increment * 6 - WINDOW_SIZE),
ofEpochMilli(startTime + increment * 6 + WINDOW_SIZE))));
setCurrentTime(startTime + increment * 7);
windowStore.put(7, "seven");
windowStore.put(7, "seven", startTime + increment * 7);
assertEquals(
Utils.mkSet(
segments.segmentName(3),
@ -306,8 +293,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
ofEpochMilli(startTime + increment * 7 - WINDOW_SIZE),
ofEpochMilli(startTime + increment * 7 + WINDOW_SIZE))));
setCurrentTime(startTime + increment * 8);
windowStore.put(8, "eight");
windowStore.put(8, "eight", startTime + increment * 8);
assertEquals(
Utils.mkSet(
segments.segmentName(4),
@ -385,7 +371,6 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
}
@Test
@SuppressWarnings("deprecation")
public void testSegmentMaintenance() {
windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(),
@ -393,23 +378,20 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
windowStore.init((StateStoreContext) context, windowStore);
context.setTime(0L);
setCurrentTime(0);
windowStore.put(0, "v");
windowStore.put(0, "v", 0);
assertEquals(
Utils.mkSet(segments.segmentName(0L)),
segmentDirs(baseDir)
);
setCurrentTime(SEGMENT_INTERVAL - 1);
windowStore.put(0, "v");
windowStore.put(0, "v");
windowStore.put(0, "v", SEGMENT_INTERVAL - 1);
windowStore.put(0, "v", SEGMENT_INTERVAL - 1);
assertEquals(
Utils.mkSet(segments.segmentName(0L)),
segmentDirs(baseDir)
);
setCurrentTime(SEGMENT_INTERVAL);
windowStore.put(0, "v");
windowStore.put(0, "v", SEGMENT_INTERVAL);
assertEquals(
Utils.mkSet(segments.segmentName(0L), segments.segmentName(1L)),
segmentDirs(baseDir)
@ -431,8 +413,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
segmentDirs(baseDir)
);
setCurrentTime(SEGMENT_INTERVAL * 3);
windowStore.put(0, "v");
windowStore.put(0, "v", SEGMENT_INTERVAL * 3);
iter = windowStore.fetch(0, ofEpochMilli(0L), ofEpochMilli(SEGMENT_INTERVAL * 4));
fetchedCount = 0;
@ -447,8 +428,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
segmentDirs(baseDir)
);
setCurrentTime(SEGMENT_INTERVAL * 5);
windowStore.put(0, "v");
windowStore.put(0, "v", SEGMENT_INTERVAL * 5);
iter = windowStore.fetch(0, ofEpochMilli(SEGMENT_INTERVAL * 4), ofEpochMilli(SEGMENT_INTERVAL * 10));
fetchedCount = 0;
@ -512,29 +492,19 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
}
@Test
@SuppressWarnings("deprecation")
public void testRestore() throws Exception {
final long startTime = SEGMENT_INTERVAL * 2;
final long increment = SEGMENT_INTERVAL / 2;
setCurrentTime(startTime);
windowStore.put(0, "zero");
setCurrentTime(startTime + increment);
windowStore.put(1, "one");
setCurrentTime(startTime + increment * 2);
windowStore.put(2, "two");
setCurrentTime(startTime + increment * 3);
windowStore.put(3, "three");
setCurrentTime(startTime + increment * 4);
windowStore.put(4, "four");
setCurrentTime(startTime + increment * 5);
windowStore.put(5, "five");
setCurrentTime(startTime + increment * 6);
windowStore.put(6, "six");
setCurrentTime(startTime + increment * 7);
windowStore.put(7, "seven");
setCurrentTime(startTime + increment * 8);
windowStore.put(8, "eight");
windowStore.put(0, "zero", startTime);
windowStore.put(1, "one", startTime + increment);
windowStore.put(2, "two", startTime + increment * 2);
windowStore.put(3, "three", startTime + increment * 3);
windowStore.put(4, "four", startTime + increment * 4);
windowStore.put(5, "five", startTime + increment * 5);
windowStore.put(6, "six", startTime + increment * 6);
windowStore.put(7, "seven", startTime + increment * 7);
windowStore.put(8, "eight", startTime + increment * 8);
windowStore.flush();
windowStore.close();

View File

@ -1360,13 +1360,6 @@ public class TopologyTestDriver implements Closeable {
inner.init(context, root);
}
@Deprecated
@Override
public void put(final K key,
final V value) {
inner.put(key, ValueAndTimestamp.make(value, ConsumerRecord.NO_TIMESTAMP));
}
@Override
public void put(final K key,
final V value,

View File

@ -70,17 +70,6 @@ public class WindowStoreFacadeTest {
verify(mockedWindowTimestampStore);
}
@Test
@SuppressWarnings("deprecation")
public void shouldPutWithUnknownTimestamp() {
mockedWindowTimestampStore.put("key", ValueAndTimestamp.make("value", ConsumerRecord.NO_TIMESTAMP));
expectLastCall();
replay(mockedWindowTimestampStore);
windowStoreFacade.put("key", "value");
verify(mockedWindowTimestampStore);
}
@Test
public void shouldPutWindowStartTimestampWithUnknownTimestamp() {
mockedWindowTimestampStore.put("key", ValueAndTimestamp.make("value", ConsumerRecord.NO_TIMESTAMP), 21L);