KAFKA-15629: Support ResultOrder to TimestampedRangeQuery. (#14907)

Update to KIP-992.

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Hanyu Zheng 2023-12-07 15:29:29 -08:00 committed by GitHub
parent 02915a2c5e
commit 5ba7bfaa57
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 25 additions and 16 deletions

View File

@ -143,7 +143,6 @@
namely <code>TimestampedKeyQuery</code> and <code>TimestampedRangeQuery</code>. Both should be used to query a timestamped key-value store, to retrieve a <code>ValueAndTimestamp</code> result.
The existing <code>KeyQuery</code> and <code>RangeQuery</code> are changed to always return the value only for timestamped key-value stores.
</p>
<p>
The non-null key requirements for Kafka Streams join operators were relaxed as part of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams">KIP-962</a>.
The behavior of the following operators changed.

View File

@ -44,13 +44,12 @@ public final class TimestampedRangeQuery<K, V> implements Query<KeyValueIterator
private final Optional<K> lower;
private final Optional<K> upper;
private final ResultOrder order;
private final boolean isKeyAscending;
private TimestampedRangeQuery(final Optional<K> lower, final Optional<K> upper, final boolean isKeyAscending) {
private TimestampedRangeQuery(final Optional<K> lower, final Optional<K> upper, final ResultOrder order) {
this.lower = lower;
this.upper = upper;
this.isKeyAscending = isKeyAscending;
this.order = order;
}
/**
@ -61,7 +60,7 @@ public final class TimestampedRangeQuery<K, V> implements Query<KeyValueIterator
* @param <V> The value type
*/
public static <K, V> TimestampedRangeQuery<K, V> withRange(final K lower, final K upper) {
return new TimestampedRangeQuery<>(Optional.ofNullable(lower), Optional.ofNullable(upper), true);
return new TimestampedRangeQuery<>(Optional.ofNullable(lower), Optional.ofNullable(upper), ResultOrder.ANY);
}
/**
@ -72,7 +71,7 @@ public final class TimestampedRangeQuery<K, V> implements Query<KeyValueIterator
* @param <V> The value type
*/
public static <K, V> TimestampedRangeQuery<K, V> withUpperBound(final K upper) {
return new TimestampedRangeQuery<>(Optional.empty(), Optional.of(upper), true);
return new TimestampedRangeQuery<>(Optional.empty(), Optional.of(upper), ResultOrder.ANY);
}
/**
@ -82,16 +81,16 @@ public final class TimestampedRangeQuery<K, V> implements Query<KeyValueIterator
* @param <V> The value type
*/
public static <K, V> TimestampedRangeQuery<K, V> withLowerBound(final K lower) {
return new TimestampedRangeQuery<>(Optional.of(lower), Optional.empty(), true);
return new TimestampedRangeQuery<>(Optional.of(lower), Optional.empty(), ResultOrder.ANY);
}
/**
* Determines if the serialized byte[] of the keys in ascending order.
* Determines if the serialized byte[] of the keys in ascending or descending or unordered order.
* Order is based on the serialized byte[] of the keys, not the 'logical' key order.
* @return true if ascending, false otherwise.
* @return return the order of return records base on the serialized byte[] of the keys (can be unordered, or in ascending, or in descending order).
*/
public boolean isKeyAscending() {
return isKeyAscending;
public ResultOrder resultOrder() {
return order;
}
/**
@ -100,9 +99,17 @@ public final class TimestampedRangeQuery<K, V> implements Query<KeyValueIterator
* @return a new RangeQuery instance with descending flag set.
*/
public TimestampedRangeQuery<K, V> withDescendingKeys() {
return new TimestampedRangeQuery<>(this.lower, this.upper, false);
return new TimestampedRangeQuery<>(this.lower, this.upper, ResultOrder.DESCENDING);
}
/**
* Set the query to return the serialized byte[] of the keys in ascending order.
* Order is based on the serialized byte[] of the keys, not the 'logical' key order.
* @return a new RangeQuery instance with ascending flag set.
*/
public TimestampedRangeQuery<K, V> withAscendingKeys() {
return new TimestampedRangeQuery<>(this.lower, this.upper, ResultOrder.ASCENDING);
}
/**
* Interactive scan query that returns all records in the store.
@ -110,7 +117,7 @@ public final class TimestampedRangeQuery<K, V> implements Query<KeyValueIterator
* @param <V> The value type
*/
public static <K, V> TimestampedRangeQuery<K, V> withNoBounds() {
return new TimestampedRangeQuery<>(Optional.empty(), Optional.empty(), true);
return new TimestampedRangeQuery<>(Optional.empty(), Optional.empty(), ResultOrder.ANY);
}

View File

@ -206,14 +206,17 @@ public class MeteredTimestampedKeyValueStore<K, V>
final QueryResult<R> result;
final TimestampedRangeQuery<K, V> typedQuery = (TimestampedRangeQuery<K, V>) query;
RangeQuery<Bytes, byte[]> rawRangeQuery;
final boolean isKeyAscending = typedQuery.isKeyAscending();
final ResultOrder order = typedQuery.resultOrder();
rawRangeQuery = RangeQuery.withRange(
keyBytes(typedQuery.lowerBound().orElse(null)),
keyBytes(typedQuery.upperBound().orElse(null))
);
if (!isKeyAscending) {
if (order.equals(ResultOrder.DESCENDING)) {
rawRangeQuery = rawRangeQuery.withDescendingKeys();
}
if (order.equals(ResultOrder.ASCENDING)) {
rawRangeQuery = rawRangeQuery.withAscendingKeys();
}
final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
wrapped().query(rawRangeQuery, positionBound, config);
if (rawResult.isSuccess()) {