KAFKA-8880: Add overloaded function of Consumer.committed (#7304)

1. Add the overloaded functions.
2. Update the code in Streams to use the batch API for better latency (this applies to both active StreamsTask for initialize the offsets, as well as the StandbyTasks for updating offset limits).
3. Also update all unit test to replace the deprecated APIs.

Reviewers: Christopher Pettitt <cpettitt@confluent.io>, Kamal Chandraprakash  <kamal.chandraprakash@gmail.com>, Bill Bejeck <bill@confluent.io>
This commit is contained in:
Guozhang Wang 2019-09-24 13:23:27 -07:00 committed by GitHub
parent 1ae0956892
commit bcc023773f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 236 additions and 128 deletions

View File

@ -154,13 +154,25 @@ public interface Consumer<K, V> extends Closeable {
/**
* @see KafkaConsumer#committed(TopicPartition)
*/
@Deprecated
OffsetAndMetadata committed(TopicPartition partition);
/**
* @see KafkaConsumer#committed(TopicPartition, Duration)
*/
@Deprecated
OffsetAndMetadata committed(TopicPartition partition, final Duration timeout);
/**
* @see KafkaConsumer#committed(Set)
*/
Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions);
/**
* @see KafkaConsumer#committed(Set, Duration)
*/
Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions, final Duration timeout);
/**
* @see KafkaConsumer#metrics()
*/

View File

@ -1735,7 +1735,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
* @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before
* the timeout specified by {@code default.api.timeout.ms} expires.
*
* @deprecated since 2.4 Use {@link #committed(Set)} instead
*/
@Deprecated
@Override
public OffsetAndMetadata committed(TopicPartition partition) {
return committed(partition, Duration.ofMillis(defaultApiTimeoutMs));
@ -1745,7 +1748,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* Get the last committed offset for the given partition (whether the commit happened by this process or
* another). This offset will be used as the position for the consumer in the event of a failure.
* <p>
* This call will block to do a remote call to get the latest committed offsets from the server.
* This call will block until the position can be determined, an unrecoverable error is
* encountered (in which case it is thrown to the caller), or the timeout expires.
*
* @param partition The partition to check
* @param timeout The maximum amount of time to await the current committed offset
@ -1760,21 +1764,85 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
* @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before
* expiration of the timeout
*
* @deprecated since 2.4 Use {@link #committed(Set, Duration)} instead
*/
@Deprecated
@Override
public OffsetAndMetadata committed(TopicPartition partition, final Duration timeout) {
return committed(Collections.singleton(partition), timeout).get(partition);
}
/**
* Get the last committed offsets for the given partitions (whether the commit happened by this process or
* another). The returned offsets will be used as the position for the consumer in the event of a failure.
* <p>
* Partitions that do not have a committed offset would not be included in the returned map.
* <p>
* If any of the partitions requested do not exist, an exception would be thrown.
* <p>
* This call will do a remote call to get the latest committed offsets from the server, and will block until the
* committed offsets are gotten successfully, an unrecoverable error is encountered (in which case it is thrown to
* the caller), or the timeout specified by {@code default.api.timeout.ms} expires (in which case a
* {@link org.apache.kafka.common.errors.TimeoutException} is thrown to the caller).
*
* @param partitions The partitions to check
* @return The latest committed offsets for the given partitions; partitions that do not have any committed offsets
* would not be included in the returned result
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
* @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
* this function is called
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
* configured groupId. See the exception for more details
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
* @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before
* the timeout specified by {@code default.api.timeout.ms} expires.
*/
@Override
public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions) {
return committed(partitions, Duration.ofMillis(defaultApiTimeoutMs));
}
/**
* Get the last committed offsets for the given partitions (whether the commit happened by this process or
* another). The returned offsets will be used as the position for the consumer in the event of a failure.
* <p>
* Partitions that do not have a committed offset would not be included in the returned map.
* <p>
* If any of the partitions requested do not exist, an exception would be thrown.
* <p>
* This call will block to do a remote call to get the latest committed offsets from the server.
*
* @param partitions The partitions to check
* @param timeout The maximum amount of time to await the latest committed offsets
* @return The latest committed offsets for the given partitions; partitions that do not have any committed offsets
* would not be included in the returned result
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
* @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
* this function is called
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
* configured groupId. See the exception for more details
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
* @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before
* expiration of the timeout
*/
@Override
public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions, final Duration timeout) {
acquireAndEnsureOpen();
try {
maybeThrowInvalidGroupIdException();
Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(
Collections.singleton(partition), time.timer(timeout));
Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(partitions, time.timer(timeout));
if (offsets == null) {
throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the last " +
"committed offset for partition " + partition + " could be determined. Try tuning default.api.timeout.ms " +
"larger to relax the threshold.");
"committed offset for partitions " + partitions + " could be determined. Try tuning default.api.timeout.ms " +
"larger to relax the threshold.");
} else {
offsets.forEach(this::updateLastSeenEpochIfNewer);
return offsets.get(partition);
return offsets;
}
} finally {
release();

View File

@ -42,6 +42,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
@ -290,18 +291,31 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
subscriptions.seek(partition, offsetAndMetadata.offset());
}
@Deprecated
@Override
public synchronized OffsetAndMetadata committed(TopicPartition partition) {
ensureNotClosed();
if (subscriptions.isAssigned(partition)) {
return committed.get(partition);
}
return new OffsetAndMetadata(0);
public synchronized OffsetAndMetadata committed(final TopicPartition partition) {
return committed(Collections.singleton(partition)).get(partition);
}
@Deprecated
@Override
public OffsetAndMetadata committed(final TopicPartition partition, final Duration timeout) {
return committed(partition);
}
@Override
public OffsetAndMetadata committed(TopicPartition partition, final Duration timeout) {
return committed(partition);
public synchronized Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions) {
ensureNotClosed();
return partitions.stream()
.filter(committed::containsKey)
.collect(Collectors.toMap(tp -> tp, tp -> subscriptions.isAssigned(tp) ?
committed.get(tp) : new OffsetAndMetadata(0)));
}
@Override
public synchronized Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions, final Duration timeout) {
return committed(partitions);
}
@Override

View File

@ -696,7 +696,7 @@ public class KafkaConsumerTest {
// fetch offset for one topic
client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, offset1), Errors.NONE), coordinator);
assertEquals(offset1, consumer.committed(tp0).offset());
assertEquals(offset1, consumer.committed(Collections.singleton(tp0)).get(tp0).offset());
consumer.assign(Arrays.asList(tp0, tp1));
@ -704,12 +704,12 @@ public class KafkaConsumerTest {
Map<TopicPartition, Long> offsets = new HashMap<>();
offsets.put(tp0, offset1);
client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator);
assertEquals(offset1, consumer.committed(tp0).offset());
assertEquals(offset1, consumer.committed(Collections.singleton(tp0)).get(tp0).offset());
offsets.remove(tp0);
offsets.put(tp1, offset2);
client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator);
assertEquals(offset2, consumer.committed(tp1).offset());
assertEquals(offset2, consumer.committed(Collections.singleton(tp1)).get(tp1).offset());
consumer.close(Duration.ofMillis(0));
}
@ -1137,7 +1137,7 @@ public class KafkaConsumerTest {
// fetch offset for one topic
client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, 0L), Errors.NONE), coordinator);
assertEquals(0, consumer.committed(tp0).offset());
assertEquals(0, consumer.committed(Collections.singleton(tp0)).get(tp0).offset());
// verify that assignment immediately changes
assertTrue(consumer.assignment().equals(singleton(tp0)));
@ -1195,7 +1195,7 @@ public class KafkaConsumerTest {
client.prepareResponseFrom(
offsetResponse(Collections.singletonMap(tp0, 0L), Errors.NONE),
coordinator);
assertEquals(0, consumer.committed(tp0).offset());
assertEquals(0, consumer.committed(Collections.singleton(tp0)).get(tp0).offset());
// verify that assignment immediately changes
assertTrue(consumer.assignment().equals(singleton(tp0)));
@ -1256,12 +1256,12 @@ public class KafkaConsumerTest {
offsets.put(tp1, 0L);
client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator);
assertEquals(0, consumer.committed(tp0).offset());
assertEquals(0, consumer.committed(Collections.singleton(tp0)).get(tp0).offset());
offsets.remove(tp0);
offsets.put(tp1, 0L);
client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator);
assertEquals(0, consumer.committed(tp1).offset());
assertEquals(0, consumer.committed(Collections.singleton(tp1)).get(tp1).offset());
// fetch and verify consumer's position in the two partitions
final Map<TopicPartition, Long> offsetResponse = new HashMap<>();
@ -1356,7 +1356,7 @@ public class KafkaConsumerTest {
}
try {
newConsumer((String) null).committed(tp0);
newConsumer((String) null).committed(Collections.singleton(tp0)).get(tp0);
fail("Expected an InvalidGroupIdException");
} catch (InvalidGroupIdException e) {
// OK, expected
@ -1383,7 +1383,7 @@ public class KafkaConsumerTest {
consumer.assign(singleton(tp0));
try {
consumer.committed(tp0);
consumer.committed(Collections.singleton(tp0)).get(tp0);
fail("Expected an InvalidGroupIdException");
} catch (InvalidGroupIdException e) {
// OK, expected
@ -1636,7 +1636,7 @@ public class KafkaConsumerTest {
@Test(expected = AuthenticationException.class)
public void testCommittedAuthenticationFaiure() {
final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError();
consumer.committed(tp0);
consumer.committed(Collections.singleton(tp0)).get(tp0);
}
@Test

View File

@ -55,9 +55,10 @@ public class MockConsumerTest {
assertEquals(rec1, iter.next());
assertEquals(rec2, iter.next());
assertFalse(iter.hasNext());
assertEquals(2L, consumer.position(new TopicPartition("test", 0)));
final TopicPartition tp = new TopicPartition("test", 0);
assertEquals(2L, consumer.position(tp));
consumer.commitSync();
assertEquals(2L, consumer.committed(new TopicPartition("test", 0)).offset());
assertEquals(2L, consumer.committed(Collections.singleton(tp)).get(tp).offset());
}
@SuppressWarnings("deprecation")
@ -81,9 +82,10 @@ public class MockConsumerTest {
assertEquals(rec1, iter.next());
assertEquals(rec2, iter.next());
assertFalse(iter.hasNext());
assertEquals(2L, consumer.position(new TopicPartition("test", 0)));
final TopicPartition tp = new TopicPartition("test", 0);
assertEquals(2L, consumer.position(tp));
consumer.commitSync();
assertEquals(2L, consumer.committed(new TopicPartition("test", 0)).offset());
assertEquals(2L, consumer.committed(Collections.singleton(tp)).get(tp).offset());
}
@Test

View File

@ -108,7 +108,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
if (records.nonEmpty) {
consumer.commitSync()
assertEquals(consumer.position(tp), consumer.committed(tp).offset)
assertEquals(consumer.position(tp), consumer.committed(Set(tp).asJava).get(tp).offset)
if (consumer.position(tp) == numRecords) {
consumer.seekToBeginning(Collections.emptyList())
@ -153,7 +153,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
} else if (coin == 2) {
info("Committing offset.")
consumer.commitSync()
assertEquals(consumer.position(tp), consumer.committed(tp).offset)
assertEquals(consumer.position(tp), consumer.committed(Set(tp).asJava).get(tp).offset)
}
}
}
@ -485,7 +485,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
consumer.poll(time.Duration.ofSeconds(3L))
assertTrue("Assignment did not complete on time", assignSemaphore.tryAcquire(1, TimeUnit.SECONDS))
if (committedRecords > 0)
assertEquals(committedRecords, consumer.committed(tp).offset)
assertEquals(committedRecords, consumer.committed(Set(tp).asJava).get(tp).offset)
consumer.close()
}

View File

@ -278,8 +278,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// now we should see the committed positions from another consumer
val anotherConsumer = createConsumer()
assertEquals(300, anotherConsumer.committed(tp).offset)
assertEquals(500, anotherConsumer.committed(tp2).offset)
assertEquals(300, anotherConsumer.committed(Set(tp).asJava).get(tp).offset)
assertEquals(500, anotherConsumer.committed(Set(tp2).asJava).get(tp2).offset)
}
@Test
@ -305,8 +305,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// now we should see the committed positions from another consumer
val anotherConsumer = createConsumer()
assertEquals(300, anotherConsumer.committed(tp).offset)
assertEquals(500, anotherConsumer.committed(tp2).offset)
assertEquals(300, anotherConsumer.committed(Set(tp).asJava).get(tp).offset)
assertEquals(500, anotherConsumer.committed(Set(tp2).asJava).get(tp2).offset)
}
@Test
@ -480,17 +480,17 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// sync commit
val syncMetadata = new OffsetAndMetadata(5, Optional.of(15), "foo")
consumer.commitSync(Map((tp, syncMetadata)).asJava)
assertEquals(syncMetadata, consumer.committed(tp))
assertEquals(syncMetadata, consumer.committed(Set(tp).asJava).get(tp))
// async commit
val asyncMetadata = new OffsetAndMetadata(10, "bar")
sendAndAwaitAsyncCommit(consumer, Some(Map(tp -> asyncMetadata)))
assertEquals(asyncMetadata, consumer.committed(tp))
assertEquals(asyncMetadata, consumer.committed(Set(tp).asJava).get(tp))
// handle null metadata
val nullMetadata = new OffsetAndMetadata(5, null)
consumer.commitSync(Map(tp -> nullMetadata).asJava)
assertEquals(nullMetadata, consumer.committed(tp))
assertEquals(nullMetadata, consumer.committed(Set(tp).asJava).get(tp))
}
@Test
@ -509,7 +509,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertEquals(None, callback.lastError)
assertEquals(count, callback.successCount)
assertEquals(new OffsetAndMetadata(count), consumer.committed(tp))
assertEquals(new OffsetAndMetadata(count), consumer.committed(Set(tp).asJava).get(tp))
}
@Test
@ -623,7 +623,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
sendRecords(producer, numRecords = 5, tp)
val consumer = createConsumer()
assertNull(consumer.committed(new TopicPartition(topic, 15)))
assertTrue(consumer.committed(Set(new TopicPartition(topic, 15)).asJava).isEmpty)
// position() on a partition that we aren't subscribed to throws an exception
intercept[IllegalStateException] {
@ -634,12 +634,12 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, consumer.position(tp))
consumer.commitSync()
assertEquals(0L, consumer.committed(tp).offset)
assertEquals(0L, consumer.committed(Set(tp).asJava).get(tp).offset)
consumeAndVerifyRecords(consumer = consumer, numRecords = 5, startingOffset = 0)
assertEquals("After consuming 5 records, position should be 5", 5L, consumer.position(tp))
consumer.commitSync()
assertEquals("Committed offset should be returned", 5L, consumer.committed(tp).offset)
assertEquals("Committed offset should be returned", 5L, consumer.committed(Set(tp).asJava).get(tp).offset)
sendRecords(producer, numRecords = 1, tp)
@ -1024,12 +1024,12 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// commit sync and verify onCommit is called
val commitCountBefore = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue
testConsumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(2L))).asJava)
assertEquals(2, testConsumer.committed(tp).offset)
assertEquals(2, testConsumer.committed(Set(tp).asJava).get(tp).offset)
assertEquals(commitCountBefore + 1, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue)
// commit async and verify onCommit is called
sendAndAwaitAsyncCommit(testConsumer, Some(Map(tp -> new OffsetAndMetadata(5L))))
assertEquals(5, testConsumer.committed(tp).offset)
assertEquals(5, testConsumer.committed(Set(tp).asJava).get(tp).offset)
assertEquals(commitCountBefore + 2, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue)
testConsumer.close()
@ -1076,8 +1076,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
rebalanceListener)
// after rebalancing, we should have reset to the committed positions
assertEquals(10, testConsumer.committed(tp).offset)
assertEquals(20, testConsumer.committed(tp2).offset)
assertEquals(10, testConsumer.committed(Set(tp).asJava).get(tp).offset)
assertEquals(20, testConsumer.committed(Set(tp2).asJava).get(tp2).offset)
assertTrue(MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() > commitCountBeforeRebalance)
// verify commits are intercepted on close
@ -1321,19 +1321,19 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val pos1 = consumer.position(tp)
val pos2 = consumer.position(tp2)
consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(3L))).asJava)
assertEquals(3, consumer.committed(tp).offset)
assertNull(consumer.committed(tp2))
assertEquals(3, consumer.committed(Set(tp).asJava).get(tp).offset)
assertNull(consumer.committed(Set(tp2).asJava).get(tp2))
// Positions should not change
assertEquals(pos1, consumer.position(tp))
assertEquals(pos2, consumer.position(tp2))
consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(5L))).asJava)
assertEquals(3, consumer.committed(tp).offset)
assertEquals(5, consumer.committed(tp2).offset)
assertEquals(3, consumer.committed(Set(tp).asJava).get(tp).offset)
assertEquals(5, consumer.committed(Set(tp2).asJava).get(tp2).offset)
// Using async should pick up the committed changes after commit completes
sendAndAwaitAsyncCommit(consumer, Some(Map(tp2 -> new OffsetAndMetadata(7L))))
assertEquals(7, consumer.committed(tp2).offset)
assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
}
@Test
@ -1371,8 +1371,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
awaitAssignment(consumer, newAssignment)
// after rebalancing, we should have reset to the committed positions
assertEquals(300, consumer.committed(tp).offset)
assertEquals(500, consumer.committed(tp2).offset)
assertEquals(300, consumer.committed(Set(tp).asJava).get(tp).offset)
assertEquals(500, consumer.committed(Set(tp2).asJava).get(tp2).offset)
}
@Test
@ -1808,7 +1808,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
}
try {
consumer2.committed(tp)
consumer2.committed(Set(tp).asJava)
fail("Expected committed offset fetch to fail due to null group id")
} catch {
case e: InvalidGroupIdException => // OK

View File

@ -389,7 +389,7 @@ class TransactionsTest extends KafkaServerTestHarness {
val producer2 = transactionalProducers(1)
producer2.initTransactions()
assertEquals(offsetAndMetadata, consumer.committed(tp))
assertEquals(offsetAndMetadata, consumer.committed(Set(tp).asJava).get(tp))
}
@Test

View File

@ -27,7 +27,7 @@ import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.{PartitionInfo, TopicPartition}
import org.apache.kafka.common.errors.WakeupException
import org.apache.kafka.common.serialization.StringDeserializer
import org.junit.{After, Before}
@ -70,14 +70,10 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness {
props.put("group.id", group)
val consumer = new KafkaConsumer(props, new StringDeserializer, new StringDeserializer)
try {
consumer.partitionsFor(topic).asScala.flatMap { partitionInfo =>
val tp = new TopicPartition(partitionInfo.topic, partitionInfo.partition)
val committed = consumer.committed(tp)
if (committed == null)
None
else
Some(tp -> committed.offset)
}.toMap
val partitions: Set[TopicPartition] = consumer.partitionsFor(topic)
.asScala.toSet.map {partitionInfo : PartitionInfo => new TopicPartition(partitionInfo.topic, partitionInfo.partition)}
consumer.committed(partitions.asJava).asScala.mapValues(_.offset()).toMap
} finally {
consumer.close()
}

View File

@ -1432,11 +1432,12 @@ object TestUtils extends Logging {
offsetsToCommit.toMap
}
def resetToCommittedPositions(consumer: KafkaConsumer[Array[Byte], Array[Byte]]) = {
def resetToCommittedPositions(consumer: KafkaConsumer[Array[Byte], Array[Byte]]) {
val committed = consumer.committed(consumer.assignment).asScala.mapValues(_.offset)
consumer.assignment.asScala.foreach { topicPartition =>
val offset = consumer.committed(topicPartition)
if (offset != null)
consumer.seek(topicPartition, offset.offset)
if (committed.contains(topicPartition))
consumer.seek(topicPartition, committed(topicPartition))
else
consumer.seekToBeginning(Collections.singletonList(topicPartition))
}

View File

@ -17,7 +17,6 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
@ -35,7 +34,9 @@ import org.slf4j.Logger;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public abstract class AbstractTask implements Task {
@ -250,16 +251,23 @@ public abstract class AbstractTask implements Task {
return stateMgr.changelogPartitions();
}
long committedOffsetForPartition(final TopicPartition partition) {
Map<TopicPartition, Long> committedOffsetForPartitions(final Set<TopicPartition> partitions) {
try {
final OffsetAndMetadata metadata = consumer.committed(partition);
return metadata != null ? metadata.offset() : 0L;
final Map<TopicPartition, Long> results = consumer.committed(partitions)
.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset()));
// those do not have a committed offset would default to 0
for (final TopicPartition tp : partitions) {
results.putIfAbsent(tp, 0L);
}
return results;
} catch (final AuthorizationException e) {
throw new ProcessorStateException(String.format("task [%s] AuthorizationException when initializing offsets for %s", id, partition), e);
throw new ProcessorStateException(String.format("task [%s] AuthorizationException when initializing offsets for %s", id, partitions), e);
} catch (final WakeupException e) {
throw e;
} catch (final KafkaException e) {
throw new ProcessorStateException(String.format("task [%s] Failed to initialize offsets for %s", id, partition), e);
throw new ProcessorStateException(String.format("task [%s] Failed to initialize offsets for %s", id, partitions), e);
}
}

View File

@ -247,13 +247,12 @@ public class ProcessorStateManager implements StateManager {
standbyRestoredOffsets.put(storePartition, lastOffset + 1);
}
void putOffsetLimit(final TopicPartition partition,
final long limit) {
log.trace("Updating store offset limit for partition {} to {}", partition, limit);
offsetLimits.put(partition, limit);
void putOffsetLimits(final Map<TopicPartition, Long> offsets) {
log.trace("Updating store offset limit with {}", offsets);
offsetLimits.putAll(offsets);
}
long offsetLimit(final TopicPartition partition) {
private long offsetLimit(final TopicPartition partition) {
final Long limit = offsetLimits.get(partition);
return limit != null ? limit : Long.MAX_VALUE;
}

View File

@ -37,10 +37,10 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
* A StandbyTask
*/
public class StandbyTask extends AbstractTask {
private Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
private boolean updateOffsetLimits;
private final Sensor closeTaskSensor;
private final Map<TopicPartition, Long> offsetLimits = new HashMap<>();
private final Set<TopicPartition> updateableOffsetLimits = new HashSet<>();
private Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
/**
* Create {@link StandbyTask} with its assigned partitions
@ -69,10 +69,8 @@ public class StandbyTask extends AbstractTask {
final Set<String> changelogTopicNames = new HashSet<>(topology.storeToChangelogTopic().values());
partitions.stream()
.filter(tp -> changelogTopicNames.contains(tp.topic()))
.forEach(tp -> {
offsetLimits.put(tp, 0L);
updateableOffsetLimits.add(tp);
});
.forEach(tp -> offsetLimits.put(tp, 0L));
updateOffsetLimits = true;
}
@Override
@ -193,7 +191,7 @@ public class StandbyTask extends AbstractTask {
// Check if we're unable to process records due to an offset limit (e.g. when our
// partition is both a source and a changelog). If we're limited then try to refresh
// the offset limit if possible.
if (record.offset() >= limit && updateableOffsetLimits.contains(partition)) {
if (record.offset() >= limit && updateOffsetLimits) {
limit = updateOffsetLimits(partition);
}
@ -222,18 +220,24 @@ public class StandbyTask extends AbstractTask {
throw new IllegalArgumentException("Topic is not both a source and a changelog: " + partition);
}
updateableOffsetLimits.remove(partition);
final Map<TopicPartition, Long> newLimits = committedOffsetForPartitions(offsetLimits.keySet());
for (final Map.Entry<TopicPartition, Long> newlimit : newLimits.entrySet()) {
final Long previousLimit = offsetLimits.get(newlimit.getKey());
if (previousLimit != null && previousLimit > newlimit.getValue()) {
throw new IllegalStateException("Offset limit should monotonically increase, but was reduced. " +
"New limit: " + newlimit.getValue() + ". Previous limit: " + previousLimit);
}
final long newLimit = committedOffsetForPartition(partition);
final long previousLimit = offsetLimits.put(partition, newLimit);
if (previousLimit > newLimit) {
throw new IllegalStateException("Offset limit should monotonically increase, but was reduced. " +
"New limit: " + newLimit + ". Previous limit: " + previousLimit);
}
return newLimit;
offsetLimits.putAll(newLimits);
updateOffsetLimits = false;
return offsetLimits.get(partition);
}
void allowUpdateOfOffsetLimit() {
updateableOffsetLimits.addAll(offsetLimits.keySet());
updateOffsetLimits = true;
}
}

View File

@ -31,6 +31,8 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@ -249,13 +251,11 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
// partitions of topics that are both sources and changelogs and set the consumer committed
// offset via stateMgr as there is not a more direct route.
final Set<String> changelogTopicNames = new HashSet<>(topology.storeToChangelogTopic().values());
partitions.stream()
.filter(tp -> changelogTopicNames.contains(tp.topic()))
.forEach(tp -> {
final long offset = committedOffsetForPartition(tp);
stateMgr.putOffsetLimit(tp, offset);
log.trace("Updating store offset limits {} for changelog {}", offset, tp);
});
final Set<TopicPartition> sourcePartitionsAsChangelog = new HashSet<>(partitions)
.stream().filter(tp -> changelogTopicNames.contains(tp.topic())).collect(Collectors.toSet());
final Map<TopicPartition, Long> committedOffsets = committedOffsetForPartitions(sourcePartitionsAsChangelog);
stateMgr.putOffsetLimits(committedOffsets);
registerStateStores();
return changelogPartitions().isEmpty();
@ -481,7 +481,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
final long offset = entry.getValue() + 1;
final long partitionTime = partitionTimes.get(partition);
consumedOffsetsAndMetadata.put(partition, new OffsetAndMetadata(offset, encodeTimestamp(partitionTime)));
stateMgr.putOffsetLimit(partition, offset);
}
try {
@ -735,25 +734,30 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
taskClosed = true;
}
private void initializeCommittedTimestamp(final TopicPartition partition) {
final OffsetAndMetadata metadata = consumer.committed(partition);
if (metadata != null) {
final long committedTimestamp = decodeTimestamp(metadata.metadata());
partitionGroup.setPartitionTime(partition, committedTimestamp);
log.debug("A committed timestamp was detected: setting the partition time of partition {}"
+ " to {} in stream task {}", partition, committedTimestamp, this);
} else {
log.debug("No committed timestamp was found in metadata for partition {}", partition);
}
}
/**
* Retrieves formerly committed timestamps and updates the local queue's partition time.
*/
public void initializeTaskTime() {
for (final TopicPartition partition : partitionGroup.partitions()) {
initializeCommittedTimestamp(partition);
final Map<TopicPartition, OffsetAndMetadata> committed = consumer.committed(partitionGroup.partitions());
for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : committed.entrySet()) {
final TopicPartition partition = entry.getKey();
final OffsetAndMetadata metadata = entry.getValue();
if (metadata != null) {
final long committedTimestamp = decodeTimestamp(metadata.metadata());
partitionGroup.setPartitionTime(partition, committedTimestamp);
log.debug("A committed timestamp was detected: setting the partition time of partition {}"
+ " to {} in stream task {}", partition, committedTimestamp, this);
} else {
log.debug("No committed timestamp was found in metadata for partition {}", partition);
}
}
final Set<TopicPartition> nonCommitted = new HashSet<>(partitionGroup.partitions());
nonCommitted.removeAll(committed.keySet());
for (final TopicPartition partition : nonCommitted) {
log.debug("No committed offset for partition {}, therefore no timestamp can be found for this partition", partition);
}
}

View File

@ -563,9 +563,9 @@ public class StandbyTaskTest {
final Consumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
@Override
public synchronized OffsetAndMetadata committed(final TopicPartition partition) {
public synchronized Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions) {
committedCallCount.getAndIncrement();
return super.committed(partition);
return super.committed(partitions);
}
};
@ -596,9 +596,9 @@ public class StandbyTaskTest {
final Consumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
@Override
public synchronized OffsetAndMetadata committed(final TopicPartition partition) {
public synchronized Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions) {
committedCallCount.getAndIncrement();
return super.committed(partition);
return super.committed(partitions);
}
};

View File

@ -657,7 +657,7 @@ public class StreamTaskTest {
public void shouldRestorePartitionTimeAfterRestartWithEosDisabled() {
createTaskWithProcessAndCommit(false);
assertEquals(DEFAULT_TIMESTAMP, task.decodeTimestamp(consumer.committed(partition1).metadata()));
assertEquals(DEFAULT_TIMESTAMP, task.decodeTimestamp(consumer.committed(Collections.singleton(partition1)).get(partition1).metadata()));
// reset times here by creating a new task
task = createStatelessTask(createConfig(false));
@ -1585,7 +1585,7 @@ public class StreamTaskTest {
private Consumer<byte[], byte[]> mockConsumerWithCommittedException(final RuntimeException toThrow) {
return new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
@Override
public OffsetAndMetadata committed(final TopicPartition partition) {
public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition> partitions) {
throw toThrow;
}
};

View File

@ -195,13 +195,13 @@ public class TransactionalMessageCopier {
}
private static void resetToLastCommittedPositions(KafkaConsumer<String, String> consumer) {
for (TopicPartition topicPartition : consumer.assignment()) {
OffsetAndMetadata offsetAndMetadata = consumer.committed(topicPartition);
final Map<TopicPartition, OffsetAndMetadata> committed = consumer.committed(consumer.assignment());
committed.forEach((tp, offsetAndMetadata) -> {
if (offsetAndMetadata != null)
consumer.seek(topicPartition, offsetAndMetadata.offset());
consumer.seek(tp, offsetAndMetadata.offset());
else
consumer.seekToBeginning(singleton(topicPartition));
}
consumer.seekToBeginning(singleton(tp));
});
}
private static long messagesRemaining(KafkaConsumer<String, String> consumer, TopicPartition partition) {