From 197a5d5a6d160e9f3f1642caf85bad968a35f109 Mon Sep 17 00:00:00 2001 From: Sachin Mittal Date: Mon, 20 Mar 2017 21:56:15 -0700 Subject: [PATCH] KAFKA-4848: Fix retryWithBackoff deadlock issue Fixes related to handling of MAX_POLL_INTERVAL_MS_CONFIG during deadlock and CommitFailedException on partition revoked. Author: Sachin Mittal Reviewers: Matthias J. Sax, Damian Guy, Guozhang Wang Closes #2642 from sjmittal/trunk --- clients/.gitignore | 1 + connect/api/.gitignore | 1 + connect/json/.gitignore | 1 + core/.gitignore | 3 +++ streams/.gitignore | 1 + .../apache/kafka/streams/StreamsConfig.java | 1 + .../processor/internals/StreamThread.java | 22 ++++++++++++------- 7 files changed, 22 insertions(+), 8 deletions(-) create mode 100644 clients/.gitignore create mode 100644 connect/api/.gitignore create mode 100644 connect/json/.gitignore create mode 100644 core/.gitignore create mode 100644 streams/.gitignore diff --git a/clients/.gitignore b/clients/.gitignore new file mode 100644 index 00000000000..ae3c1726048 --- /dev/null +++ b/clients/.gitignore @@ -0,0 +1 @@ +/bin/ diff --git a/connect/api/.gitignore b/connect/api/.gitignore new file mode 100644 index 00000000000..ae3c1726048 --- /dev/null +++ b/connect/api/.gitignore @@ -0,0 +1 @@ +/bin/ diff --git a/connect/json/.gitignore b/connect/json/.gitignore new file mode 100644 index 00000000000..ae3c1726048 --- /dev/null +++ b/connect/json/.gitignore @@ -0,0 +1 @@ +/bin/ diff --git a/core/.gitignore b/core/.gitignore new file mode 100644 index 00000000000..0d7e8b05ea8 --- /dev/null +++ b/core/.gitignore @@ -0,0 +1,3 @@ +.cache-main +.cache-tests +/bin/ diff --git a/streams/.gitignore b/streams/.gitignore new file mode 100644 index 00000000000..ae3c1726048 --- /dev/null +++ b/streams/.gitignore @@ -0,0 +1 @@ +/bin/ diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 0eb3f7b2481..d2ba063d942 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -401,6 +401,7 @@ public class StreamsConfig extends AbstractConfig { tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000"); tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000"); CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 6a6b508166e..7cd4b930368 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -26,6 +26,8 @@ import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; @@ -197,6 +199,7 @@ public class StreamThread extends Thread { private final Map suspendedTasks; private final Map suspendedStandbyTasks; private final Time time; + private final int rebalanceTimeoutMs; private final long pollTimeMs; private final long cleanTimeMs; private final long commitTimeMs; @@ -290,6 +293,8 @@ public class StreamThread extends Thread { this.standbyRecords = new HashMap<>(); this.stateDirectory = new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time); + final Object maxPollInterval = consumerConfigs.get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG); + this.rebalanceTimeoutMs = (Integer) ConfigDef.parseType(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval, Type.INT); this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG); this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG); this.cleanTimeMs = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG); @@ -855,7 +860,7 @@ public class StreamThread extends Thread { } } - private void addStreamTasks(Collection assignment) { + private void addStreamTasks(Collection assignment, final long start) { if (partitionAssignor == null) throw new IllegalStateException(logPrefix + " Partition assignor has not been initialized while adding stream tasks: this should not happen."); @@ -893,7 +898,7 @@ public class StreamThread extends Thread { // create all newly assigned tasks (guard against race condition with other thread via backoff and retry) // -> other thread will call removeSuspendedTasks(); eventually - taskCreator.retryWithBackoff(newTasks); + taskCreator.retryWithBackoff(newTasks, start); } StandbyTask createStandbyTask(TaskId id, Collection partitions) { @@ -910,7 +915,7 @@ public class StreamThread extends Thread { } } - private void addStandbyTasks() { + private void addStandbyTasks(final long start) { if (partitionAssignor == null) throw new IllegalStateException(logPrefix + " Partition assignor has not been initialized while adding standby tasks: this should not happen."); @@ -937,7 +942,7 @@ public class StreamThread extends Thread { // create all newly assigned standby tasks (guard against race condition with other thread via backoff and retry) // -> other thread will call removeSuspendedStandbyTasks(); eventually - new StandbyTaskCreator(checkpointedOffsets).retryWithBackoff(newStandbyTasks); + new StandbyTaskCreator(checkpointedOffsets).retryWithBackoff(newStandbyTasks, start); restoreConsumer.assign(new ArrayList<>(checkpointedOffsets.keySet())); @@ -1126,7 +1131,7 @@ public class StreamThread extends Thread { } abstract class AbstractTaskCreator { - void retryWithBackoff(final Map> tasksToBeCreated) { + void retryWithBackoff(final Map> tasksToBeCreated, final long start) { long backoffTimeMs = 50L; while (true) { final Iterator>> it = tasksToBeCreated.entrySet().iterator(); @@ -1138,13 +1143,14 @@ public class StreamThread extends Thread { try { createTask(taskId, partitions); it.remove(); + backoffTimeMs = 50L; } catch (final LockException e) { // ignore and retry log.warn("Could not create task {}. Will retry.", taskId, e); } } - if (tasksToBeCreated.isEmpty()) { + if (tasksToBeCreated.isEmpty() || time.milliseconds() - start > rebalanceTimeoutMs) { break; } @@ -1207,9 +1213,9 @@ public class StreamThread extends Thread { // will become active or vice versa closeNonAssignedSuspendedStandbyTasks(); closeNonAssignedSuspendedTasks(); - addStreamTasks(assignment); + addStreamTasks(assignment, start); storeChangelogReader.restore(); - addStandbyTasks(); + addStandbyTasks(start); streamsMetadataState.onChange(partitionAssignor.getPartitionsByHostState(), partitionAssignor.clusterMetadata()); lastCleanMs = time.milliseconds(); // start the cleaning cycle setStateWhenNotInPendingShutdown(State.RUNNING);