diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index bd1c984b639..e285adeb1e0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -249,17 +249,6 @@ public class ConsumerConfig extends AbstractConfig { "be excluded from the subscription. It is always possible to explicitly subscribe to an internal topic."; public static final boolean DEFAULT_EXCLUDE_INTERNAL_TOPICS = true; - /** - * internal.leave.group.on.close - * Whether or not the consumer should leave the group on close. If set to false then a rebalance - * won't occur until session.timeout.ms expires. - * - *

- * Note: this is an internal configuration and could be changed in the future in a backward incompatible way - * - */ - static final String LEAVE_GROUP_ON_CLOSE_CONFIG = "internal.leave.group.on.close"; - /** isolation.level */ public static final String ISOLATION_LEVEL_CONFIG = "isolation.level"; public static final String ISOLATION_LEVEL_DOC = "

Controls how to read messages written transactionally. If set to read_committed, consumer.poll() will only return" + @@ -469,10 +458,6 @@ public class ConsumerConfig extends AbstractConfig { DEFAULT_EXCLUDE_INTERNAL_TOPICS, Importance.MEDIUM, EXCLUDE_INTERNAL_TOPICS_DOC) - .defineInternal(LEAVE_GROUP_ON_CLOSE_CONFIG, - Type.BOOLEAN, - true, - Importance.LOW) .define(ISOLATION_LEVEL_CONFIG, Type.STRING, DEFAULT_ISOLATION_LEVEL, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 9030308d6d1..5c8c1dca051 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -788,8 +788,7 @@ public class KafkaConsumer implements Consumer { retryBackoffMs, enableAutoCommit, config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), - this.interceptors, - config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG)); + this.interceptors); this.fetcher = new Fetcher<>( logContext, this.client, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 2cf39103f2a..69d4928a7ca 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -110,7 +110,6 @@ public abstract class AbstractCoordinator implements Closeable { private final Logger log; private final int sessionTimeoutMs; - private final boolean leaveGroupOnClose; private final GroupCoordinatorMetrics sensors; private final Heartbeat heartbeat; protected final int rebalanceTimeoutMs; @@ -144,8 +143,7 @@ public abstract class AbstractCoordinator implements Closeable { Metrics metrics, String metricGrpPrefix, Time time, - long retryBackoffMs, - boolean leaveGroupOnClose) { + long retryBackoffMs) { this.log = logContext.logger(AbstractCoordinator.class); this.client = client; this.time = time; @@ -154,7 +152,6 @@ public abstract class AbstractCoordinator implements Closeable { this.groupInstanceId = groupInstanceId; this.rebalanceTimeoutMs = rebalanceTimeoutMs; this.sessionTimeoutMs = sessionTimeoutMs; - this.leaveGroupOnClose = leaveGroupOnClose; this.heartbeat = heartbeat; this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix); this.retryBackoffMs = retryBackoffMs; @@ -170,11 +167,10 @@ public abstract class AbstractCoordinator implements Closeable { Metrics metrics, String metricGrpPrefix, Time time, - long retryBackoffMs, - boolean leaveGroupOnClose) { + long retryBackoffMs) { this(logContext, client, groupId, groupInstanceId, rebalanceTimeoutMs, sessionTimeoutMs, new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, rebalanceTimeoutMs, retryBackoffMs), - metrics, metricGrpPrefix, time, retryBackoffMs, leaveGroupOnClose); + metrics, metricGrpPrefix, time, retryBackoffMs); } /** @@ -814,9 +810,7 @@ public abstract class AbstractCoordinator implements Closeable { // Synchronize after closing the heartbeat thread since heartbeat thread // needs this lock to complete and terminate after close flag is set. synchronized (this) { - if (leaveGroupOnClose) { - maybeLeaveGroup(); - } + maybeLeaveGroup(); // At this point, there may be pending commits (async commits or sync commits that were // interrupted using wakeup) and the leave group request which have been queued, but not diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index deed257df13..829d9dcc521 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -134,8 +134,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { long retryBackoffMs, boolean autoCommitEnabled, int autoCommitIntervalMs, - ConsumerInterceptors interceptors, - final boolean leaveGroupOnClose) { + ConsumerInterceptors interceptors) { super(logContext, client, groupId, @@ -146,8 +145,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { metrics, metricGrpPrefix, time, - retryBackoffMs, - leaveGroupOnClose); + retryBackoffMs); this.log = logContext.logger(ConsumerCoordinator.class); this.metadata = metadata; this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch(), metadata.updateVersion()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index ccd9e94aba9..524ee25528c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1894,8 +1894,7 @@ public class KafkaConsumerTest { retryBackoffMs, autoCommitEnabled, autoCommitIntervalMs, - interceptors, - true); + interceptors); Fetcher fetcher = new Fetcher<>( loggerFactory, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index 5aaf4764384..93c074b1e46 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -809,7 +809,7 @@ public class AbstractCoordinatorTest { int retryBackoffMs, Optional groupInstanceId) { super(new LogContext(), client, GROUP_ID, groupInstanceId, rebalanceTimeoutMs, - SESSION_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS, metrics, METRIC_GROUP_PREFIX, time, retryBackoffMs, !groupInstanceId.isPresent()); + SESSION_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS, metrics, METRIC_GROUP_PREFIX, time, retryBackoffMs); } @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index a60316f976d..a83df5e3d22 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -2178,8 +2178,7 @@ public class ConsumerCoordinatorTest { retryBackoffMs, autoCommitEnabled, autoCommitIntervalMs, - null, - true + null ); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java index a5882e8d6ed..968855abf9a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java @@ -86,8 +86,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos metrics, metricGrpPrefix, time, - retryBackoffMs, - true); + retryBackoffMs); this.log = logContext.logger(WorkerCoordinator.class); this.restUrl = restUrl; this.configStorage = configStorage; diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java index 888edf3c714..4ed277018f3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java @@ -50,7 +50,7 @@ public interface KafkaClientSupplier { /** * Create a {@link Consumer} which is used to read records of source topics. * - * @param config {@link StreamsConfig#getMainConsumerConfigs(String, String) consumer config} which is + * @param config {@link StreamsConfig#getMainConsumerConfigs(String, String, int) consumer config} which is * supplied by the {@link java.util.Properties} given to the {@link KafkaStreams} instance * @return an instance of Kafka consumer */ diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index f3e63863115..5024c281433 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -139,6 +139,8 @@ public class StreamsConfig extends AbstractConfig { private final static long DEFAULT_COMMIT_INTERVAL_MS = 30000L; private final static long EOS_DEFAULT_COMMIT_INTERVAL_MS = 100L; + public final static int DUMMY_THREAD_INDEX = 1; + /** * Prefix used to provide default topic configs to be applied when creating internal topics. * These should be valid properties from {@link org.apache.kafka.common.config.TopicConfig TopicConfig}. @@ -718,7 +720,6 @@ public class StreamsConfig extends AbstractConfig { tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000"); tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - tempConsumerDefaultOverrides.put("internal.leave.group.on.close", false); CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides); } @@ -943,12 +944,12 @@ public class StreamsConfig extends AbstractConfig { * @param groupId consumer groupId * @param clientId clientId * @return Map of the consumer configuration. - * @deprecated use {@link StreamsConfig#getMainConsumerConfigs(String, String)} + * @deprecated use {@link StreamsConfig#getMainConsumerConfigs(String, String, int)} */ @SuppressWarnings("WeakerAccess") @Deprecated public Map getConsumerConfigs(final String groupId, final String clientId) { - return getMainConsumerConfigs(groupId, clientId); + return getMainConsumerConfigs(groupId, clientId, DUMMY_THREAD_INDEX); } /** @@ -963,10 +964,11 @@ public class StreamsConfig extends AbstractConfig { * * @param groupId consumer groupId * @param clientId clientId + * @param threadIdx stream thread index * @return Map of the consumer configuration. */ @SuppressWarnings("WeakerAccess") - public Map getMainConsumerConfigs(final String groupId, final String clientId) { + public Map getMainConsumerConfigs(final String groupId, final String clientId, final int threadIdx) { final Map consumerProps = getCommonConsumerConfigs(); // Get main consumer override configs @@ -977,9 +979,15 @@ public class StreamsConfig extends AbstractConfig { // this is a hack to work around StreamsConfig constructor inside StreamsPartitionAssignor to avoid casting consumerProps.put(APPLICATION_ID_CONFIG, groupId); - // add client id with stream client id prefix, and group id + + // add group id, client id with stream client id prefix, and group instance id consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId); + final String groupInstanceId = (String) consumerProps.get(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG); + // Suffix each thread consumer with thread.id to enforce uniqueness of group.instance.id. + if (groupInstanceId != null) { + consumerProps.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId + "-" + threadIdx); + } // add configs required for stream partition assignor consumerProps.put(UPGRADE_FROM_CONFIG, getString(UPGRADE_FROM_CONFIG)); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 46612e5d18d..419e1811963 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -661,7 +661,7 @@ public class StreamThread extends Thread { log.info("Creating consumer client"); final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); - final Map consumerConfigs = config.getMainConsumerConfigs(applicationId, getConsumerClientId(threadClientId)); + final Map consumerConfigs = config.getMainConsumerConfigs(applicationId, getConsumerClientId(threadClientId), threadIdx); consumerConfigs.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager); final AtomicInteger assignmentErrorCode = new AtomicInteger(); consumerConfigs.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE, assignmentErrorCode); @@ -721,7 +721,8 @@ public class StreamThread extends Thread { this.assignmentErrorCode = assignmentErrorCode; this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)); - this.maxPollTimeMs = new InternalConsumerConfig(config.getMainConsumerConfigs("dummyGroupId", "dummyClientId")) + final int dummyThreadIdx = 1; + this.maxPollTimeMs = new InternalConsumerConfig(config.getMainConsumerConfigs("dummyGroupId", "dummyClientId", dummyThreadIdx)) .getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG); this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG); diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 3e55f2922cd..2cd31ba7362 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -106,7 +106,6 @@ public class KafkaStreamsTest { props.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS); - props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); globalStreams = new KafkaStreams(builder.build(), props); } diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 2c9a97b4e49..c202c93cec7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -31,7 +31,6 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor; -import org.hamcrest.CoreMatchers; import org.junit.Before; import org.junit.Test; @@ -62,6 +61,10 @@ public class StreamsConfigTest { private final Properties props = new Properties(); private StreamsConfig streamsConfig; + private final String groupId = "example-application"; + private final String clientId = "client"; + private final int threadIdx = 1; + @Before public void setUp() { props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-config-test"); @@ -87,7 +90,6 @@ public class StreamsConfigTest { @Test public void testGetProducerConfigs() { - final String clientId = "client"; final Map returnedProps = streamsConfig.getProducerConfigs(clientId); assertThat(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), equalTo(clientId)); assertThat(returnedProps.get(ProducerConfig.LINGER_MS_CONFIG), equalTo("100")); @@ -95,12 +97,19 @@ public class StreamsConfigTest { @Test public void testGetConsumerConfigs() { - final String groupId = "example-application"; - final String clientId = "client"; - final Map returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId); + final Map returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx); assertThat(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), equalTo(clientId)); assertThat(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), equalTo(groupId)); assertThat(returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), equalTo("1000")); + assertNull(returnedProps.get(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG)); + } + + @Test + public void testGetGroupInstanceIdConfigs() { + props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "group-instance-id"); + final StreamsConfig streamsConfig = new StreamsConfig(props); + final Map returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx); + assertThat(returnedProps.get(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG), equalTo("group-instance-id-1")); } @Test @@ -113,10 +122,7 @@ public class StreamsConfigTest { props.put(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG), 5); props.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), 100); final StreamsConfig streamsConfig = new StreamsConfig(props); - - final String groupId = "example-application"; - final String clientId = "client"; - final Map returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId); + final Map returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx); assertEquals(42, returnedProps.get(StreamsConfig.REPLICATION_FACTOR_CONFIG)); assertEquals(1, returnedProps.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG)); @@ -135,10 +141,7 @@ public class StreamsConfigTest { props.put(StreamsConfig.RETRIES_CONFIG, 10); props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 100L); final StreamsConfig streamsConfig = new StreamsConfig(props); - - final String groupId = "example-application"; - final String clientId = "client"; - final Map returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId); + final Map returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx); assertEquals(20, returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG))); assertEquals(200L, returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRY_BACKOFF_MS_CONFIG))); @@ -148,15 +151,12 @@ public class StreamsConfigTest { public void testGetMainConsumerConfigsWithMainConsumerOverridenPrefix() { props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "5"); props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "50"); - final String groupId = "example-application"; - final String clientId = "client"; - final Map returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId); + final Map returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx); assertEquals("50", returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)); } @Test public void testGetRestoreConsumerConfigs() { - final String clientId = "client"; final Map returnedProps = streamsConfig.getRestoreConsumerConfigs(clientId); assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId); assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG)); @@ -199,7 +199,7 @@ public class StreamsConfigTest { props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1); final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId"); + final Map consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx); assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)); } @@ -208,7 +208,7 @@ public class StreamsConfigTest { public void shouldSupportPrefixedRestoreConsumerConfigs() { props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1); final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map consumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId"); + final Map consumerConfigs = streamsConfig.getRestoreConsumerConfigs(clientId); assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)); } @@ -216,7 +216,7 @@ public class StreamsConfigTest { public void shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() { final StreamsConfig streamsConfig = new StreamsConfig(props); props.put(consumerPrefix("interceptor.statsd.host"), "host"); - final Map consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId"); + final Map consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx); assertEquals("host", consumerConfigs.get("interceptor.statsd.host")); } @@ -224,7 +224,7 @@ public class StreamsConfigTest { public void shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig() { final StreamsConfig streamsConfig = new StreamsConfig(props); props.put(consumerPrefix("interceptor.statsd.host"), "host"); - final Map consumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId"); + final Map consumerConfigs = streamsConfig.getRestoreConsumerConfigs(clientId); assertEquals("host", consumerConfigs.get("interceptor.statsd.host")); } @@ -232,7 +232,7 @@ public class StreamsConfigTest { public void shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() { final StreamsConfig streamsConfig = new StreamsConfig(props); props.put(producerPrefix("interceptor.statsd.host"), "host"); - final Map producerConfigs = streamsConfig.getProducerConfigs("clientId"); + final Map producerConfigs = streamsConfig.getProducerConfigs(clientId); assertEquals("host", producerConfigs.get("interceptor.statsd.host")); } @@ -241,7 +241,7 @@ public class StreamsConfigTest { props.put(producerPrefix(ProducerConfig.BUFFER_MEMORY_CONFIG), 10); props.put(producerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1); final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map configs = streamsConfig.getProducerConfigs("clientId"); + final Map configs = streamsConfig.getProducerConfigs(clientId); assertEquals(10, configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG)); assertEquals(1, configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)); } @@ -251,7 +251,7 @@ public class StreamsConfigTest { props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1); final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId"); + final Map consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx); assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)); } @@ -260,7 +260,7 @@ public class StreamsConfigTest { public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() { props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1); final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map consumerConfigs = streamsConfig.getRestoreConsumerConfigs("groupId"); + final Map consumerConfigs = streamsConfig.getRestoreConsumerConfigs(groupId); assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)); } @@ -269,7 +269,7 @@ public class StreamsConfigTest { props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 10); props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1); final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map configs = streamsConfig.getProducerConfigs("clientId"); + final Map configs = streamsConfig.getProducerConfigs(clientId); assertEquals(10, configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG)); assertEquals(1, configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)); } @@ -278,10 +278,10 @@ public class StreamsConfigTest { public void shouldForwardCustomConfigsWithNoPrefixToAllClients() { final StreamsConfig streamsConfig = new StreamsConfig(props); props.put("custom.property.host", "host"); - final Map consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId"); - final Map restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId"); - final Map producerConfigs = streamsConfig.getProducerConfigs("clientId"); - final Map adminConfigs = streamsConfig.getAdminConfigs("clientId"); + final Map consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx); + final Map restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs(clientId); + final Map producerConfigs = streamsConfig.getProducerConfigs(clientId); + final Map adminConfigs = streamsConfig.getAdminConfigs(clientId); assertEquals("host", consumerConfigs.get("custom.property.host")); assertEquals("host", restoreConsumerConfigs.get("custom.property.host")); assertEquals("host", producerConfigs.get("custom.property.host")); @@ -295,10 +295,10 @@ public class StreamsConfigTest { props.put(consumerPrefix("custom.property.host"), "host1"); props.put(producerPrefix("custom.property.host"), "host2"); props.put(adminClientPrefix("custom.property.host"), "host3"); - final Map consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId"); - final Map restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId"); - final Map producerConfigs = streamsConfig.getProducerConfigs("clientId"); - final Map adminConfigs = streamsConfig.getAdminConfigs("clientId"); + final Map consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx); + final Map restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs(clientId); + final Map producerConfigs = streamsConfig.getProducerConfigs(clientId); + final Map adminConfigs = streamsConfig.getAdminConfigs(clientId); assertEquals("host1", consumerConfigs.get("custom.property.host")); assertEquals("host1", restoreConsumerConfigs.get("custom.property.host")); assertEquals("host2", producerConfigs.get("custom.property.host")); @@ -309,7 +309,7 @@ public class StreamsConfigTest { public void shouldSupportNonPrefixedAdminConfigs() { props.put(AdminClientConfig.RETRIES_CONFIG, 10); final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map configs = streamsConfig.getAdminConfigs("clientId"); + final Map configs = streamsConfig.getAdminConfigs(clientId); assertEquals(10, configs.get(AdminClientConfig.RETRIES_CONFIG)); } @@ -332,7 +332,7 @@ public class StreamsConfigTest { props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest"); props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10"); final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId"); + final Map consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx); assertEquals("latest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); assertEquals("10", consumerConfigs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)); } @@ -341,7 +341,7 @@ public class StreamsConfigTest { public void shouldOverrideStreamsDefaultProducerConfigs() { props.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), "10000"); final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map producerConfigs = streamsConfig.getProducerConfigs("clientId"); + final Map producerConfigs = streamsConfig.getProducerConfigs(clientId); assertEquals("10000", producerConfigs.get(ProducerConfig.LINGER_MS_CONFIG)); } @@ -349,7 +349,7 @@ public class StreamsConfigTest { public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() { props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10"); final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map consumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId"); + final Map consumerConfigs = streamsConfig.getRestoreConsumerConfigs(clientId); assertEquals("10", consumerConfigs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)); } @@ -357,7 +357,7 @@ public class StreamsConfigTest { public void shouldResetToDefaultIfConsumerAutoCommitIsOverridden() { props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true"); final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map consumerConfigs = streamsConfig.getMainConsumerConfigs("a", "b"); + final Map consumerConfigs = streamsConfig.getMainConsumerConfigs("a", "b", threadIdx); assertEquals("false", consumerConfigs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); } @@ -365,7 +365,7 @@ public class StreamsConfigTest { public void shouldResetToDefaultIfRestoreConsumerAutoCommitIsOverridden() { props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true"); final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map consumerConfigs = streamsConfig.getRestoreConsumerConfigs("client"); + final Map consumerConfigs = streamsConfig.getRestoreConsumerConfigs(clientId); assertEquals("false", consumerConfigs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); } @@ -373,13 +373,12 @@ public class StreamsConfigTest { public void testGetRestoreConsumerConfigsWithRestoreConsumerOverridenPrefix() { props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "5"); props.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "50"); - final Map returnedProps = streamsConfig.getRestoreConsumerConfigs("clientId"); + final Map returnedProps = streamsConfig.getRestoreConsumerConfigs(clientId); assertEquals("50", returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)); } @Test public void testGetGlobalConsumerConfigs() { - final String clientId = "client"; final Map returnedProps = streamsConfig.getGlobalConsumerConfigs(clientId); assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-global-consumer"); assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG)); @@ -389,7 +388,7 @@ public class StreamsConfigTest { public void shouldSupportPrefixedGlobalConsumerConfigs() { props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1); final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map consumerConfigs = streamsConfig.getGlobalConsumerConfigs("clientId"); + final Map consumerConfigs = streamsConfig.getGlobalConsumerConfigs(clientId); assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)); } @@ -397,7 +396,7 @@ public class StreamsConfigTest { public void shouldSupportPrefixedPropertiesThatAreNotPartOfGlobalConsumerConfig() { final StreamsConfig streamsConfig = new StreamsConfig(props); props.put(consumerPrefix("interceptor.statsd.host"), "host"); - final Map consumerConfigs = streamsConfig.getGlobalConsumerConfigs("clientId"); + final Map consumerConfigs = streamsConfig.getGlobalConsumerConfigs(clientId); assertEquals("host", consumerConfigs.get("interceptor.statsd.host")); } @@ -405,7 +404,7 @@ public class StreamsConfigTest { public void shouldBeSupportNonPrefixedGlobalConsumerConfigs() { props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1); final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map consumerConfigs = streamsConfig.getGlobalConsumerConfigs("groupId"); + final Map consumerConfigs = streamsConfig.getGlobalConsumerConfigs(groupId); assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)); } @@ -413,7 +412,7 @@ public class StreamsConfigTest { public void shouldResetToDefaultIfGlobalConsumerAutoCommitIsOverridden() { props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true"); final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map consumerConfigs = streamsConfig.getGlobalConsumerConfigs("client"); + final Map consumerConfigs = streamsConfig.getGlobalConsumerConfigs(clientId); assertEquals("false", consumerConfigs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); } @@ -421,17 +420,10 @@ public class StreamsConfigTest { public void testGetGlobalConsumerConfigsWithGlobalConsumerOverridenPrefix() { props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "5"); props.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "50"); - final Map returnedProps = streamsConfig.getGlobalConsumerConfigs("clientId"); + final Map returnedProps = streamsConfig.getGlobalConsumerConfigs(clientId); assertEquals("50", returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)); } - @Test - public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() { - final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId"); - assertThat(consumerConfigs.get("internal.leave.group.on.close"), CoreMatchers.equalTo(false)); - } - @Test public void shouldAcceptAtLeastOnce() { // don't use `StreamsConfig.AT_LEAST_ONCE` to actually do a useful test @@ -457,7 +449,7 @@ public class StreamsConfigTest { props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "anyValue"); final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId"); + final Map consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx); assertThat(consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT))); } @@ -465,7 +457,7 @@ public class StreamsConfigTest { public void shouldAllowSettingConsumerIsolationLevelIfEosDisabled() { props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT)); final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientrId"); + final Map consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx); assertThat(consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT))); } @@ -475,7 +467,7 @@ public class StreamsConfigTest { props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "anyValue"); final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map producerConfigs = streamsConfig.getProducerConfigs("clientId"); + final Map producerConfigs = streamsConfig.getProducerConfigs(clientId); assertTrue((Boolean) producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)); } @@ -483,7 +475,7 @@ public class StreamsConfigTest { public void shouldAllowSettingProducerEnableIdempotenceIfEosDisabled() { props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false); final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map producerConfigs = streamsConfig.getProducerConfigs("clientId"); + final Map producerConfigs = streamsConfig.getProducerConfigs(clientId); assertThat(producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), equalTo(false)); } @@ -492,8 +484,8 @@ public class StreamsConfigTest { props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE); final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId"); - final Map producerConfigs = streamsConfig.getProducerConfigs("clientId"); + final Map consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx); + final Map producerConfigs = streamsConfig.getProducerConfigs(clientId); assertThat(consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT))); assertTrue((Boolean) producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)); @@ -508,7 +500,7 @@ public class StreamsConfigTest { props.put(ProducerConfig.RETRIES_CONFIG, numberOfRetries); final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map producerConfigs = streamsConfig.getProducerConfigs("clientId"); + final Map producerConfigs = streamsConfig.getProducerConfigs(clientId); assertThat(producerConfigs.get(ProducerConfig.RETRIES_CONFIG), equalTo(numberOfRetries)); } @@ -589,7 +581,7 @@ public class StreamsConfigTest { props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 7); final StreamsConfig streamsConfig = new StreamsConfig(props); try { - streamsConfig.getProducerConfigs("clientId"); + streamsConfig.getProducerConfigs(clientId); fail("Should throw ConfigException when ESO is enabled and maxInFlight requests exceeds 5"); } catch (final ConfigException e) { assertEquals("Invalid value 7 for configuration max.in.flight.requests.per.connection: Can't exceed 5 when exactly-once processing is enabled", e.getMessage()); @@ -601,7 +593,7 @@ public class StreamsConfigTest { props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE); props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "3"); - new StreamsConfig(props).getProducerConfigs("clientId"); + new StreamsConfig(props).getProducerConfigs(clientId); } @Test @@ -610,7 +602,7 @@ public class StreamsConfigTest { props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "not-a-number"); try { - new StreamsConfig(props).getProducerConfigs("clientId"); + new StreamsConfig(props).getProducerConfigs(clientId); fail("Should throw ConfigException when EOS is enabled and maxInFlight cannot be paresed into an integer"); } catch (final ConfigException e) { assertEquals("Invalid value not-a-number for configuration max.in.flight.requests.per.connection: String value could not be parsed as 32-bit integer", e.getMessage()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java index badaa36cd55..7fa108d59a6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java @@ -139,7 +139,6 @@ public abstract class AbstractJoinIntegrationTest { RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java index aeb581eb185..c9ae1bbcb85 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java @@ -156,7 +156,6 @@ public abstract class AbstractResetIntegrationTest { streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT); - streamsConfig.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); streamsConfig.putAll(commonClientConfig); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index f43b3969210..4f5455b4b15 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -160,7 +160,6 @@ public class EosIntegrationTest { properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000"); properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); - properties.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); for (int i = 0; i < numberOfRestarts; ++i) { final Properties config = StreamsTestUtils.getStreamsConfig( diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java index 6a724ec000f..87d6a169309 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java @@ -143,7 +143,6 @@ public class FineGrainedAutoResetIntegrationTest { props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); streamsConfiguration = StreamsTestUtils.getStreamsConfig( "testAutoOffsetId", diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java index 787cb2921f1..1aa99f6752b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java @@ -95,7 +95,6 @@ public class GlobalKTableEOSIntegrationTest { streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once"); globalTable = builder.globalTable(globalTableTopic, Consumed.with(Serdes.Long(), Serdes.String()), diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java index 5007fa90f11..6617512c4cc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java @@ -89,7 +89,6 @@ public class GlobalKTableIntegrationTest { streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); globalTable = builder.globalTable(globalTableTopic, Consumed.with(Serdes.Long(), Serdes.String()), Materialized.>as(globalStore) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java index 7c16927fcac..7c2b009f642 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java @@ -88,7 +88,6 @@ public class GlobalThreadShutDownOrderTest { streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); final Consumed stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java index 345b581f228..0ee02786234 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java @@ -93,7 +93,6 @@ public class InternalTopicIntegrationTest { streamsProp.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); streamsProp.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsProp.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); } @After diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java index 7493b0691a8..e1c9b5b1096 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java @@ -92,7 +92,6 @@ public class KStreamAggregationDedupIntegrationTest { streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L); - streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); final KeyValueMapper mapper = MockMapper.selectValueMapper(); stream = builder.stream(streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String())); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index 8e05d9e7aa1..a13408e4eed 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -129,7 +129,6 @@ public class KStreamAggregationIntegrationTest { streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java index 32d77c0d5ae..3ec239fab91 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java @@ -78,7 +78,6 @@ public class KTableSourceTopicRestartIntegrationTest { STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java index 4c7859bd86f..0cfa97a1f9d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java @@ -160,7 +160,6 @@ public class PurgeRepartitionTopicIntegrationTest { streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_MS_CONFIG), PURGE_INTERVAL_MS); streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), PURGE_SEGMENT_BYTES); streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG), PURGE_SEGMENT_BYTES / 2); // we cannot allow batch size larger than segment size - streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); final StreamsBuilder builder = new StreamsBuilder(); builder.stream(INPUT_TOPIC) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index 6889525b4ab..14d9e5d80d1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -188,8 +188,6 @@ public class QueryableStateIntegrationTest { streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); - // override this to make the rebalances happen quickly - streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); stringComparator = Comparator.comparing((KeyValue o) -> o.key).thenComparing(o -> o.value); stringLongComparator = Comparator.comparing((KeyValue o) -> o.key).thenComparingLong(o -> o.value); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index 1cb9e0c934d..f74487b7eb3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -122,7 +122,6 @@ public class RegexSourceIntegrationTest { properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - properties.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); streamsConfiguration = StreamsTestUtils.getStreamsConfig("regex-source-integration-test", CLUSTER.bootstrapServers(), diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java index 6fa1bff0cf1..2d996b6a686 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java @@ -91,7 +91,6 @@ public class RepartitionOptimizingIntegrationTest { final Properties props = new Properties(); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1024 * 10); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000); - props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); streamsConfiguration = StreamsTestUtils.getStreamsConfig( "maybe-optimized-test-app", diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java index bf652646846..473a626dd35 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java @@ -79,7 +79,6 @@ public class RepartitionWithMergeOptimizingIntegrationTest { final Properties props = new Properties(); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1024 * 10); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000); - props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); streamsConfiguration = StreamsTestUtils.getStreamsConfig( "maybe-optimized-with-merge-test-app", diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java index f21dbfc7bf1..2d88ff3dc7a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java @@ -104,7 +104,6 @@ public class RestoreIntegrationTest { streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); return streamsConfiguration; } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index 74cac060175..e786a449be8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -74,7 +74,6 @@ import static org.hamcrest.Matchers.equalTo; public class IntegrationTestUtils { public static final long DEFAULT_TIMEOUT = 60 * 1000L; - public static final String INTERNAL_LEAVE_GROUP_ON_CLOSE = "internal.leave.group.on.close"; /* * Records state transition for StreamThread