mirror of https://github.com/apache/kafka.git
MINOR: Log encountered exception during rebalance
Some other minor changes: 1. Do not throw the exception form callback as it would only be swallowed by consumer coordinator; remembering it and re-throw in the next loop is good enough. 2. Change Creating to Defining in Stores to avoid confusions that the stores have already been successfully created at that time. 3. Do not need unAssignChangeLogPartitions as the restore consumer will be unassigned already inside changelog reader. Author: Guozhang Wang <wangguoz@gmail.com> Reviewers: Matthias J. Sax <matthias@confluent.io>, Damian Guy <damian.guy@gmail.com> Closes #3769 from guozhangwang/KMinor-logging-before-throwing
This commit is contained in:
parent
212bce6e3f
commit
3f155eaa23
|
@ -256,14 +256,11 @@ public class StreamThread extends Thread implements ThreadDataProvider {
|
|||
return;
|
||||
}
|
||||
taskManager.createTasks(assignment);
|
||||
final RuntimeException exception = streamThread.unAssignChangeLogPartitions();
|
||||
if (exception != null) {
|
||||
throw exception;
|
||||
}
|
||||
streamThread.refreshMetadataState();
|
||||
} catch (final Throwable t) {
|
||||
log.error("{} Error caught during partition assignment, " +
|
||||
"will abort the current process and re-throw at the end of rebalance: {}", logPrefix, t.getMessage());
|
||||
streamThread.setRebalanceException(t);
|
||||
throw t;
|
||||
} finally {
|
||||
log.info("{} partition assignment took {} ms.\n" +
|
||||
"\tcurrent active tasks: {}\n" +
|
||||
|
@ -294,8 +291,9 @@ public class StreamThread extends Thread implements ThreadDataProvider {
|
|||
// suspend active tasks
|
||||
taskManager.suspendTasksAndState();
|
||||
} catch (final Throwable t) {
|
||||
log.error("{} Error caught during partition revocation, " +
|
||||
"will abort the current process and re-throw at the end of rebalance: {}", logPrefix, t.getMessage());
|
||||
streamThread.setRebalanceException(t);
|
||||
throw t;
|
||||
} finally {
|
||||
streamThread.refreshMetadataState();
|
||||
streamThread.clearStandbyRecords();
|
||||
|
@ -1163,17 +1161,6 @@ public class StreamThread extends Thread implements ThreadDataProvider {
|
|||
log.info("{} Shutdown complete", logPrefix);
|
||||
}
|
||||
|
||||
private RuntimeException unAssignChangeLogPartitions() {
|
||||
try {
|
||||
// un-assign the change log partitions
|
||||
restoreConsumer.assign(Collections.<TopicPartition>emptyList());
|
||||
} catch (final RuntimeException e) {
|
||||
log.error("{} Failed to un-assign change log partitions due to the following error:", logPrefix, e);
|
||||
return e;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private void clearStandbyRecords() {
|
||||
standbyRecords.clear();
|
||||
}
|
||||
|
|
|
@ -90,7 +90,7 @@ public class Stores {
|
|||
|
||||
@Override
|
||||
public StateStoreSupplier build() {
|
||||
log.trace("Creating InMemory Store name={} capacity={} logged={}", name, capacity, logged);
|
||||
log.trace("Defining InMemory Store name={} capacity={} logged={}", name, capacity, logged);
|
||||
if (capacity < Integer.MAX_VALUE) {
|
||||
return new InMemoryLRUCacheStoreSupplier<>(name, capacity, keySerde, valueSerde, logged, logConfig);
|
||||
}
|
||||
|
@ -154,7 +154,7 @@ public class Stores {
|
|||
|
||||
@Override
|
||||
public StateStoreSupplier build() {
|
||||
log.trace("Creating RocksDb Store name={} numSegments={} logged={}", name, numSegments, logged);
|
||||
log.trace("Defining RocksDb Store name={} numSegments={} logged={}", name, numSegments, logged);
|
||||
if (sessionWindows) {
|
||||
return new RocksDBSessionStoreSupplier<>(name, retentionPeriod, keySerde, valueSerde, logged, logConfig, cachingEnabled);
|
||||
} else if (numSegments > 0) {
|
||||
|
|
|
@ -26,7 +26,6 @@ import org.apache.kafka.clients.producer.Producer;
|
|||
import org.apache.kafka.common.Node;
|
||||
import org.apache.kafka.common.PartitionInfo;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.errors.ProducerFencedException;
|
||||
import org.apache.kafka.common.metrics.Metrics;
|
||||
import org.apache.kafka.common.utils.MockTime;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
|
@ -72,7 +71,6 @@ import static org.junit.Assert.assertNotNull;
|
|||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class StreamThreadTest {
|
||||
|
||||
|
@ -105,25 +103,11 @@ public class StreamThreadTest {
|
|||
private final TopicPartition t3p1 = new TopicPartition("topic3", 1);
|
||||
private final TopicPartition t3p2 = new TopicPartition("topic3", 2);
|
||||
|
||||
private final List<PartitionInfo> infos = Arrays.asList(
|
||||
new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
|
||||
new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
|
||||
new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]),
|
||||
new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]),
|
||||
new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]),
|
||||
new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]),
|
||||
new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]),
|
||||
new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]),
|
||||
new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0])
|
||||
);
|
||||
|
||||
|
||||
// task0 is unused
|
||||
private final TaskId task1 = new TaskId(0, 1);
|
||||
private final TaskId task2 = new TaskId(0, 2);
|
||||
private final TaskId task3 = new TaskId(0, 3);
|
||||
private final TaskId task4 = new TaskId(1, 1);
|
||||
private final TaskId task5 = new TaskId(1, 2);
|
||||
private final TaskId task3 = new TaskId(1, 1);
|
||||
private final TaskId task4 = new TaskId(1, 2);
|
||||
|
||||
private Properties configProps(final boolean enableEos) {
|
||||
return new Properties() {
|
||||
|
@ -281,10 +265,10 @@ public class StreamThreadTest {
|
|||
rebalanceListener.onPartitionsAssigned(assignedPartitions);
|
||||
thread.runOnce(-1);
|
||||
|
||||
assertTrue(thread.tasks().containsKey(task3));
|
||||
assertTrue(thread.tasks().containsKey(task4));
|
||||
assertTrue(thread.tasks().containsKey(task5));
|
||||
assertEquals(expectedGroup1, thread.tasks().get(task4).partitions());
|
||||
assertEquals(expectedGroup2, thread.tasks().get(task5).partitions());
|
||||
assertEquals(expectedGroup1, thread.tasks().get(task3).partitions());
|
||||
assertEquals(expectedGroup2, thread.tasks().get(task4).partitions());
|
||||
assertEquals(2, thread.tasks().size());
|
||||
|
||||
// revoke four partitions and assign three partitions of both subtopologies
|
||||
|
@ -300,9 +284,9 @@ public class StreamThreadTest {
|
|||
thread.runOnce(-1);
|
||||
|
||||
assertTrue(thread.tasks().containsKey(task1));
|
||||
assertTrue(thread.tasks().containsKey(task4));
|
||||
assertTrue(thread.tasks().containsKey(task3));
|
||||
assertEquals(expectedGroup1, thread.tasks().get(task1).partitions());
|
||||
assertEquals(expectedGroup2, thread.tasks().get(task4).partitions());
|
||||
assertEquals(expectedGroup2, thread.tasks().get(task3).partitions());
|
||||
assertEquals(2, thread.tasks().size());
|
||||
|
||||
// revoke all three partitons and reassign the same three partitions (from different subtopologies)
|
||||
|
@ -315,9 +299,9 @@ public class StreamThreadTest {
|
|||
thread.runOnce(-1);
|
||||
|
||||
assertTrue(thread.tasks().containsKey(task1));
|
||||
assertTrue(thread.tasks().containsKey(task4));
|
||||
assertTrue(thread.tasks().containsKey(task3));
|
||||
assertEquals(expectedGroup1, thread.tasks().get(task1).partitions());
|
||||
assertEquals(expectedGroup2, thread.tasks().get(task4).partitions());
|
||||
assertEquals(expectedGroup2, thread.tasks().get(task3).partitions());
|
||||
assertEquals(2, thread.tasks().size());
|
||||
|
||||
// revoke all partitions and assign nothing
|
||||
|
@ -905,11 +889,8 @@ public class StreamThreadTest {
|
|||
|
||||
thread.rebalanceListener.onPartitionsRevoked(null);
|
||||
clientSupplier.producers.get(0).fenceProducer();
|
||||
try {
|
||||
thread.rebalanceListener.onPartitionsAssigned(task0Assignment);
|
||||
thread.runOnce(-1);
|
||||
fail("should have thrown " + ProducerFencedException.class.getSimpleName());
|
||||
} catch (final ProducerFencedException e) { }
|
||||
thread.rebalanceListener.onPartitionsAssigned(task0Assignment);
|
||||
thread.runOnce(-1);
|
||||
|
||||
assertTrue(thread.tasks().isEmpty());
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue