mirror of https://github.com/apache/kafka.git
KAFKA-8284: enable static membership on KStream (#6673)
Part of KIP-345 effort. The strategy is to extract user passed in group.instance.id config and pass it in with given thread-id (because consumer is currently per-thread level). Reviewers: Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
parent
cc097e909c
commit
b0e82a68b3
|
@ -249,17 +249,6 @@ public class ConsumerConfig extends AbstractConfig {
|
||||||
"be excluded from the subscription. It is always possible to explicitly subscribe to an internal topic.";
|
"be excluded from the subscription. It is always possible to explicitly subscribe to an internal topic.";
|
||||||
public static final boolean DEFAULT_EXCLUDE_INTERNAL_TOPICS = true;
|
public static final boolean DEFAULT_EXCLUDE_INTERNAL_TOPICS = true;
|
||||||
|
|
||||||
/**
|
|
||||||
* <code>internal.leave.group.on.close</code>
|
|
||||||
* Whether or not the consumer should leave the group on close. If set to <code>false</code> then a rebalance
|
|
||||||
* won't occur until <code>session.timeout.ms</code> expires.
|
|
||||||
*
|
|
||||||
* <p>
|
|
||||||
* Note: this is an internal configuration and could be changed in the future in a backward incompatible way
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
static final String LEAVE_GROUP_ON_CLOSE_CONFIG = "internal.leave.group.on.close";
|
|
||||||
|
|
||||||
/** <code>isolation.level</code> */
|
/** <code>isolation.level</code> */
|
||||||
public static final String ISOLATION_LEVEL_CONFIG = "isolation.level";
|
public static final String ISOLATION_LEVEL_CONFIG = "isolation.level";
|
||||||
public static final String ISOLATION_LEVEL_DOC = "<p>Controls how to read messages written transactionally. If set to <code>read_committed</code>, consumer.poll() will only return" +
|
public static final String ISOLATION_LEVEL_DOC = "<p>Controls how to read messages written transactionally. If set to <code>read_committed</code>, consumer.poll() will only return" +
|
||||||
|
@ -469,10 +458,6 @@ public class ConsumerConfig extends AbstractConfig {
|
||||||
DEFAULT_EXCLUDE_INTERNAL_TOPICS,
|
DEFAULT_EXCLUDE_INTERNAL_TOPICS,
|
||||||
Importance.MEDIUM,
|
Importance.MEDIUM,
|
||||||
EXCLUDE_INTERNAL_TOPICS_DOC)
|
EXCLUDE_INTERNAL_TOPICS_DOC)
|
||||||
.defineInternal(LEAVE_GROUP_ON_CLOSE_CONFIG,
|
|
||||||
Type.BOOLEAN,
|
|
||||||
true,
|
|
||||||
Importance.LOW)
|
|
||||||
.define(ISOLATION_LEVEL_CONFIG,
|
.define(ISOLATION_LEVEL_CONFIG,
|
||||||
Type.STRING,
|
Type.STRING,
|
||||||
DEFAULT_ISOLATION_LEVEL,
|
DEFAULT_ISOLATION_LEVEL,
|
||||||
|
|
|
@ -788,8 +788,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
|
||||||
retryBackoffMs,
|
retryBackoffMs,
|
||||||
enableAutoCommit,
|
enableAutoCommit,
|
||||||
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
|
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
|
||||||
this.interceptors,
|
this.interceptors);
|
||||||
config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG));
|
|
||||||
this.fetcher = new Fetcher<>(
|
this.fetcher = new Fetcher<>(
|
||||||
logContext,
|
logContext,
|
||||||
this.client,
|
this.client,
|
||||||
|
|
|
@ -110,7 +110,6 @@ public abstract class AbstractCoordinator implements Closeable {
|
||||||
|
|
||||||
private final Logger log;
|
private final Logger log;
|
||||||
private final int sessionTimeoutMs;
|
private final int sessionTimeoutMs;
|
||||||
private final boolean leaveGroupOnClose;
|
|
||||||
private final GroupCoordinatorMetrics sensors;
|
private final GroupCoordinatorMetrics sensors;
|
||||||
private final Heartbeat heartbeat;
|
private final Heartbeat heartbeat;
|
||||||
protected final int rebalanceTimeoutMs;
|
protected final int rebalanceTimeoutMs;
|
||||||
|
@ -144,8 +143,7 @@ public abstract class AbstractCoordinator implements Closeable {
|
||||||
Metrics metrics,
|
Metrics metrics,
|
||||||
String metricGrpPrefix,
|
String metricGrpPrefix,
|
||||||
Time time,
|
Time time,
|
||||||
long retryBackoffMs,
|
long retryBackoffMs) {
|
||||||
boolean leaveGroupOnClose) {
|
|
||||||
this.log = logContext.logger(AbstractCoordinator.class);
|
this.log = logContext.logger(AbstractCoordinator.class);
|
||||||
this.client = client;
|
this.client = client;
|
||||||
this.time = time;
|
this.time = time;
|
||||||
|
@ -154,7 +152,6 @@ public abstract class AbstractCoordinator implements Closeable {
|
||||||
this.groupInstanceId = groupInstanceId;
|
this.groupInstanceId = groupInstanceId;
|
||||||
this.rebalanceTimeoutMs = rebalanceTimeoutMs;
|
this.rebalanceTimeoutMs = rebalanceTimeoutMs;
|
||||||
this.sessionTimeoutMs = sessionTimeoutMs;
|
this.sessionTimeoutMs = sessionTimeoutMs;
|
||||||
this.leaveGroupOnClose = leaveGroupOnClose;
|
|
||||||
this.heartbeat = heartbeat;
|
this.heartbeat = heartbeat;
|
||||||
this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix);
|
this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix);
|
||||||
this.retryBackoffMs = retryBackoffMs;
|
this.retryBackoffMs = retryBackoffMs;
|
||||||
|
@ -170,11 +167,10 @@ public abstract class AbstractCoordinator implements Closeable {
|
||||||
Metrics metrics,
|
Metrics metrics,
|
||||||
String metricGrpPrefix,
|
String metricGrpPrefix,
|
||||||
Time time,
|
Time time,
|
||||||
long retryBackoffMs,
|
long retryBackoffMs) {
|
||||||
boolean leaveGroupOnClose) {
|
|
||||||
this(logContext, client, groupId, groupInstanceId, rebalanceTimeoutMs, sessionTimeoutMs,
|
this(logContext, client, groupId, groupInstanceId, rebalanceTimeoutMs, sessionTimeoutMs,
|
||||||
new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, rebalanceTimeoutMs, retryBackoffMs),
|
new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, rebalanceTimeoutMs, retryBackoffMs),
|
||||||
metrics, metricGrpPrefix, time, retryBackoffMs, leaveGroupOnClose);
|
metrics, metricGrpPrefix, time, retryBackoffMs);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -814,9 +810,7 @@ public abstract class AbstractCoordinator implements Closeable {
|
||||||
// Synchronize after closing the heartbeat thread since heartbeat thread
|
// Synchronize after closing the heartbeat thread since heartbeat thread
|
||||||
// needs this lock to complete and terminate after close flag is set.
|
// needs this lock to complete and terminate after close flag is set.
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (leaveGroupOnClose) {
|
maybeLeaveGroup();
|
||||||
maybeLeaveGroup();
|
|
||||||
}
|
|
||||||
|
|
||||||
// At this point, there may be pending commits (async commits or sync commits that were
|
// At this point, there may be pending commits (async commits or sync commits that were
|
||||||
// interrupted using wakeup) and the leave group request which have been queued, but not
|
// interrupted using wakeup) and the leave group request which have been queued, but not
|
||||||
|
|
|
@ -134,8 +134,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
|
||||||
long retryBackoffMs,
|
long retryBackoffMs,
|
||||||
boolean autoCommitEnabled,
|
boolean autoCommitEnabled,
|
||||||
int autoCommitIntervalMs,
|
int autoCommitIntervalMs,
|
||||||
ConsumerInterceptors<?, ?> interceptors,
|
ConsumerInterceptors<?, ?> interceptors) {
|
||||||
final boolean leaveGroupOnClose) {
|
|
||||||
super(logContext,
|
super(logContext,
|
||||||
client,
|
client,
|
||||||
groupId,
|
groupId,
|
||||||
|
@ -146,8 +145,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
|
||||||
metrics,
|
metrics,
|
||||||
metricGrpPrefix,
|
metricGrpPrefix,
|
||||||
time,
|
time,
|
||||||
retryBackoffMs,
|
retryBackoffMs);
|
||||||
leaveGroupOnClose);
|
|
||||||
this.log = logContext.logger(ConsumerCoordinator.class);
|
this.log = logContext.logger(ConsumerCoordinator.class);
|
||||||
this.metadata = metadata;
|
this.metadata = metadata;
|
||||||
this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch(), metadata.updateVersion());
|
this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch(), metadata.updateVersion());
|
||||||
|
|
|
@ -1894,8 +1894,7 @@ public class KafkaConsumerTest {
|
||||||
retryBackoffMs,
|
retryBackoffMs,
|
||||||
autoCommitEnabled,
|
autoCommitEnabled,
|
||||||
autoCommitIntervalMs,
|
autoCommitIntervalMs,
|
||||||
interceptors,
|
interceptors);
|
||||||
true);
|
|
||||||
|
|
||||||
Fetcher<String, String> fetcher = new Fetcher<>(
|
Fetcher<String, String> fetcher = new Fetcher<>(
|
||||||
loggerFactory,
|
loggerFactory,
|
||||||
|
|
|
@ -809,7 +809,7 @@ public class AbstractCoordinatorTest {
|
||||||
int retryBackoffMs,
|
int retryBackoffMs,
|
||||||
Optional<String> groupInstanceId) {
|
Optional<String> groupInstanceId) {
|
||||||
super(new LogContext(), client, GROUP_ID, groupInstanceId, rebalanceTimeoutMs,
|
super(new LogContext(), client, GROUP_ID, groupInstanceId, rebalanceTimeoutMs,
|
||||||
SESSION_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS, metrics, METRIC_GROUP_PREFIX, time, retryBackoffMs, !groupInstanceId.isPresent());
|
SESSION_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS, metrics, METRIC_GROUP_PREFIX, time, retryBackoffMs);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -2178,8 +2178,7 @@ public class ConsumerCoordinatorTest {
|
||||||
retryBackoffMs,
|
retryBackoffMs,
|
||||||
autoCommitEnabled,
|
autoCommitEnabled,
|
||||||
autoCommitIntervalMs,
|
autoCommitIntervalMs,
|
||||||
null,
|
null
|
||||||
true
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -86,8 +86,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
|
||||||
metrics,
|
metrics,
|
||||||
metricGrpPrefix,
|
metricGrpPrefix,
|
||||||
time,
|
time,
|
||||||
retryBackoffMs,
|
retryBackoffMs);
|
||||||
true);
|
|
||||||
this.log = logContext.logger(WorkerCoordinator.class);
|
this.log = logContext.logger(WorkerCoordinator.class);
|
||||||
this.restUrl = restUrl;
|
this.restUrl = restUrl;
|
||||||
this.configStorage = configStorage;
|
this.configStorage = configStorage;
|
||||||
|
|
|
@ -50,7 +50,7 @@ public interface KafkaClientSupplier {
|
||||||
/**
|
/**
|
||||||
* Create a {@link Consumer} which is used to read records of source topics.
|
* Create a {@link Consumer} which is used to read records of source topics.
|
||||||
*
|
*
|
||||||
* @param config {@link StreamsConfig#getMainConsumerConfigs(String, String) consumer config} which is
|
* @param config {@link StreamsConfig#getMainConsumerConfigs(String, String, int) consumer config} which is
|
||||||
* supplied by the {@link java.util.Properties} given to the {@link KafkaStreams} instance
|
* supplied by the {@link java.util.Properties} given to the {@link KafkaStreams} instance
|
||||||
* @return an instance of Kafka consumer
|
* @return an instance of Kafka consumer
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -139,6 +139,8 @@ public class StreamsConfig extends AbstractConfig {
|
||||||
private final static long DEFAULT_COMMIT_INTERVAL_MS = 30000L;
|
private final static long DEFAULT_COMMIT_INTERVAL_MS = 30000L;
|
||||||
private final static long EOS_DEFAULT_COMMIT_INTERVAL_MS = 100L;
|
private final static long EOS_DEFAULT_COMMIT_INTERVAL_MS = 100L;
|
||||||
|
|
||||||
|
public final static int DUMMY_THREAD_INDEX = 1;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Prefix used to provide default topic configs to be applied when creating internal topics.
|
* Prefix used to provide default topic configs to be applied when creating internal topics.
|
||||||
* These should be valid properties from {@link org.apache.kafka.common.config.TopicConfig TopicConfig}.
|
* These should be valid properties from {@link org.apache.kafka.common.config.TopicConfig TopicConfig}.
|
||||||
|
@ -718,7 +720,6 @@ public class StreamsConfig extends AbstractConfig {
|
||||||
tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
|
tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
|
||||||
tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||||
tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
|
tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
|
||||||
tempConsumerDefaultOverrides.put("internal.leave.group.on.close", false);
|
|
||||||
CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides);
|
CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -943,12 +944,12 @@ public class StreamsConfig extends AbstractConfig {
|
||||||
* @param groupId consumer groupId
|
* @param groupId consumer groupId
|
||||||
* @param clientId clientId
|
* @param clientId clientId
|
||||||
* @return Map of the consumer configuration.
|
* @return Map of the consumer configuration.
|
||||||
* @deprecated use {@link StreamsConfig#getMainConsumerConfigs(String, String)}
|
* @deprecated use {@link StreamsConfig#getMainConsumerConfigs(String, String, int)}
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("WeakerAccess")
|
@SuppressWarnings("WeakerAccess")
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public Map<String, Object> getConsumerConfigs(final String groupId, final String clientId) {
|
public Map<String, Object> getConsumerConfigs(final String groupId, final String clientId) {
|
||||||
return getMainConsumerConfigs(groupId, clientId);
|
return getMainConsumerConfigs(groupId, clientId, DUMMY_THREAD_INDEX);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -963,10 +964,11 @@ public class StreamsConfig extends AbstractConfig {
|
||||||
*
|
*
|
||||||
* @param groupId consumer groupId
|
* @param groupId consumer groupId
|
||||||
* @param clientId clientId
|
* @param clientId clientId
|
||||||
|
* @param threadIdx stream thread index
|
||||||
* @return Map of the consumer configuration.
|
* @return Map of the consumer configuration.
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("WeakerAccess")
|
@SuppressWarnings("WeakerAccess")
|
||||||
public Map<String, Object> getMainConsumerConfigs(final String groupId, final String clientId) {
|
public Map<String, Object> getMainConsumerConfigs(final String groupId, final String clientId, final int threadIdx) {
|
||||||
final Map<String, Object> consumerProps = getCommonConsumerConfigs();
|
final Map<String, Object> consumerProps = getCommonConsumerConfigs();
|
||||||
|
|
||||||
// Get main consumer override configs
|
// Get main consumer override configs
|
||||||
|
@ -977,9 +979,15 @@ public class StreamsConfig extends AbstractConfig {
|
||||||
|
|
||||||
// this is a hack to work around StreamsConfig constructor inside StreamsPartitionAssignor to avoid casting
|
// this is a hack to work around StreamsConfig constructor inside StreamsPartitionAssignor to avoid casting
|
||||||
consumerProps.put(APPLICATION_ID_CONFIG, groupId);
|
consumerProps.put(APPLICATION_ID_CONFIG, groupId);
|
||||||
// add client id with stream client id prefix, and group id
|
|
||||||
|
// add group id, client id with stream client id prefix, and group instance id
|
||||||
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
|
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
|
||||||
consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
|
consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
|
||||||
|
final String groupInstanceId = (String) consumerProps.get(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG);
|
||||||
|
// Suffix each thread consumer with thread.id to enforce uniqueness of group.instance.id.
|
||||||
|
if (groupInstanceId != null) {
|
||||||
|
consumerProps.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId + "-" + threadIdx);
|
||||||
|
}
|
||||||
|
|
||||||
// add configs required for stream partition assignor
|
// add configs required for stream partition assignor
|
||||||
consumerProps.put(UPGRADE_FROM_CONFIG, getString(UPGRADE_FROM_CONFIG));
|
consumerProps.put(UPGRADE_FROM_CONFIG, getString(UPGRADE_FROM_CONFIG));
|
||||||
|
|
|
@ -661,7 +661,7 @@ public class StreamThread extends Thread {
|
||||||
|
|
||||||
log.info("Creating consumer client");
|
log.info("Creating consumer client");
|
||||||
final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
|
final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
|
||||||
final Map<String, Object> consumerConfigs = config.getMainConsumerConfigs(applicationId, getConsumerClientId(threadClientId));
|
final Map<String, Object> consumerConfigs = config.getMainConsumerConfigs(applicationId, getConsumerClientId(threadClientId), threadIdx);
|
||||||
consumerConfigs.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager);
|
consumerConfigs.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager);
|
||||||
final AtomicInteger assignmentErrorCode = new AtomicInteger();
|
final AtomicInteger assignmentErrorCode = new AtomicInteger();
|
||||||
consumerConfigs.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE, assignmentErrorCode);
|
consumerConfigs.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE, assignmentErrorCode);
|
||||||
|
@ -721,7 +721,8 @@ public class StreamThread extends Thread {
|
||||||
this.assignmentErrorCode = assignmentErrorCode;
|
this.assignmentErrorCode = assignmentErrorCode;
|
||||||
|
|
||||||
this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
|
this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
|
||||||
this.maxPollTimeMs = new InternalConsumerConfig(config.getMainConsumerConfigs("dummyGroupId", "dummyClientId"))
|
final int dummyThreadIdx = 1;
|
||||||
|
this.maxPollTimeMs = new InternalConsumerConfig(config.getMainConsumerConfigs("dummyGroupId", "dummyClientId", dummyThreadIdx))
|
||||||
.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
|
.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
|
||||||
this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
|
this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
|
||||||
|
|
||||||
|
|
|
@ -106,7 +106,6 @@ public class KafkaStreamsTest {
|
||||||
props.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
|
props.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
|
||||||
props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
||||||
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
|
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
|
||||||
props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
|
|
||||||
globalStreams = new KafkaStreams(builder.build(), props);
|
globalStreams = new KafkaStreams(builder.build(), props);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.kafka.streams.errors.StreamsException;
|
||||||
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
|
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
|
||||||
import org.apache.kafka.streams.processor.TimestampExtractor;
|
import org.apache.kafka.streams.processor.TimestampExtractor;
|
||||||
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
|
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
|
||||||
import org.hamcrest.CoreMatchers;
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -62,6 +61,10 @@ public class StreamsConfigTest {
|
||||||
private final Properties props = new Properties();
|
private final Properties props = new Properties();
|
||||||
private StreamsConfig streamsConfig;
|
private StreamsConfig streamsConfig;
|
||||||
|
|
||||||
|
private final String groupId = "example-application";
|
||||||
|
private final String clientId = "client";
|
||||||
|
private final int threadIdx = 1;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() {
|
public void setUp() {
|
||||||
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-config-test");
|
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-config-test");
|
||||||
|
@ -87,7 +90,6 @@ public class StreamsConfigTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetProducerConfigs() {
|
public void testGetProducerConfigs() {
|
||||||
final String clientId = "client";
|
|
||||||
final Map<String, Object> returnedProps = streamsConfig.getProducerConfigs(clientId);
|
final Map<String, Object> returnedProps = streamsConfig.getProducerConfigs(clientId);
|
||||||
assertThat(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), equalTo(clientId));
|
assertThat(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), equalTo(clientId));
|
||||||
assertThat(returnedProps.get(ProducerConfig.LINGER_MS_CONFIG), equalTo("100"));
|
assertThat(returnedProps.get(ProducerConfig.LINGER_MS_CONFIG), equalTo("100"));
|
||||||
|
@ -95,12 +97,19 @@ public class StreamsConfigTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetConsumerConfigs() {
|
public void testGetConsumerConfigs() {
|
||||||
final String groupId = "example-application";
|
final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
|
||||||
final String clientId = "client";
|
|
||||||
final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId);
|
|
||||||
assertThat(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), equalTo(clientId));
|
assertThat(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), equalTo(clientId));
|
||||||
assertThat(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), equalTo(groupId));
|
assertThat(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), equalTo(groupId));
|
||||||
assertThat(returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), equalTo("1000"));
|
assertThat(returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), equalTo("1000"));
|
||||||
|
assertNull(returnedProps.get(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetGroupInstanceIdConfigs() {
|
||||||
|
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "group-instance-id");
|
||||||
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
|
final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
|
||||||
|
assertThat(returnedProps.get(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG), equalTo("group-instance-id-1"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -113,10 +122,7 @@ public class StreamsConfigTest {
|
||||||
props.put(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG), 5);
|
props.put(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG), 5);
|
||||||
props.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), 100);
|
props.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), 100);
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
|
final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
|
||||||
final String groupId = "example-application";
|
|
||||||
final String clientId = "client";
|
|
||||||
final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId);
|
|
||||||
|
|
||||||
assertEquals(42, returnedProps.get(StreamsConfig.REPLICATION_FACTOR_CONFIG));
|
assertEquals(42, returnedProps.get(StreamsConfig.REPLICATION_FACTOR_CONFIG));
|
||||||
assertEquals(1, returnedProps.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG));
|
assertEquals(1, returnedProps.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG));
|
||||||
|
@ -135,10 +141,7 @@ public class StreamsConfigTest {
|
||||||
props.put(StreamsConfig.RETRIES_CONFIG, 10);
|
props.put(StreamsConfig.RETRIES_CONFIG, 10);
|
||||||
props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 100L);
|
props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 100L);
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
|
final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
|
||||||
final String groupId = "example-application";
|
|
||||||
final String clientId = "client";
|
|
||||||
final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId);
|
|
||||||
|
|
||||||
assertEquals(20, returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG)));
|
assertEquals(20, returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG)));
|
||||||
assertEquals(200L, returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRY_BACKOFF_MS_CONFIG)));
|
assertEquals(200L, returnedProps.get(StreamsConfig.adminClientPrefix(StreamsConfig.RETRY_BACKOFF_MS_CONFIG)));
|
||||||
|
@ -148,15 +151,12 @@ public class StreamsConfigTest {
|
||||||
public void testGetMainConsumerConfigsWithMainConsumerOverridenPrefix() {
|
public void testGetMainConsumerConfigsWithMainConsumerOverridenPrefix() {
|
||||||
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "5");
|
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "5");
|
||||||
props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "50");
|
props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "50");
|
||||||
final String groupId = "example-application";
|
final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
|
||||||
final String clientId = "client";
|
|
||||||
final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId);
|
|
||||||
assertEquals("50", returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
|
assertEquals("50", returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetRestoreConsumerConfigs() {
|
public void testGetRestoreConsumerConfigs() {
|
||||||
final String clientId = "client";
|
|
||||||
final Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs(clientId);
|
final Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs(clientId);
|
||||||
assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId);
|
assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId);
|
||||||
assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
|
assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
|
||||||
|
@ -199,7 +199,7 @@ public class StreamsConfigTest {
|
||||||
props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
|
props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
|
||||||
props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
|
props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
|
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
|
||||||
assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
|
assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
|
||||||
assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
|
assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
|
||||||
}
|
}
|
||||||
|
@ -208,7 +208,7 @@ public class StreamsConfigTest {
|
||||||
public void shouldSupportPrefixedRestoreConsumerConfigs() {
|
public void shouldSupportPrefixedRestoreConsumerConfigs() {
|
||||||
props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
|
props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
|
final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs(clientId);
|
||||||
assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
|
assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -216,7 +216,7 @@ public class StreamsConfigTest {
|
||||||
public void shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() {
|
public void shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() {
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
props.put(consumerPrefix("interceptor.statsd.host"), "host");
|
props.put(consumerPrefix("interceptor.statsd.host"), "host");
|
||||||
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
|
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
|
||||||
assertEquals("host", consumerConfigs.get("interceptor.statsd.host"));
|
assertEquals("host", consumerConfigs.get("interceptor.statsd.host"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -224,7 +224,7 @@ public class StreamsConfigTest {
|
||||||
public void shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig() {
|
public void shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig() {
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
props.put(consumerPrefix("interceptor.statsd.host"), "host");
|
props.put(consumerPrefix("interceptor.statsd.host"), "host");
|
||||||
final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
|
final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs(clientId);
|
||||||
assertEquals("host", consumerConfigs.get("interceptor.statsd.host"));
|
assertEquals("host", consumerConfigs.get("interceptor.statsd.host"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -232,7 +232,7 @@ public class StreamsConfigTest {
|
||||||
public void shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() {
|
public void shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() {
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
props.put(producerPrefix("interceptor.statsd.host"), "host");
|
props.put(producerPrefix("interceptor.statsd.host"), "host");
|
||||||
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
|
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs(clientId);
|
||||||
assertEquals("host", producerConfigs.get("interceptor.statsd.host"));
|
assertEquals("host", producerConfigs.get("interceptor.statsd.host"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -241,7 +241,7 @@ public class StreamsConfigTest {
|
||||||
props.put(producerPrefix(ProducerConfig.BUFFER_MEMORY_CONFIG), 10);
|
props.put(producerPrefix(ProducerConfig.BUFFER_MEMORY_CONFIG), 10);
|
||||||
props.put(producerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
|
props.put(producerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
final Map<String, Object> configs = streamsConfig.getProducerConfigs("clientId");
|
final Map<String, Object> configs = streamsConfig.getProducerConfigs(clientId);
|
||||||
assertEquals(10, configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG));
|
assertEquals(10, configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG));
|
||||||
assertEquals(1, configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG));
|
assertEquals(1, configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG));
|
||||||
}
|
}
|
||||||
|
@ -251,7 +251,7 @@ public class StreamsConfigTest {
|
||||||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||||
props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
|
props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
|
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
|
||||||
assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
|
assertEquals("earliest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
|
||||||
assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
|
assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
|
||||||
}
|
}
|
||||||
|
@ -260,7 +260,7 @@ public class StreamsConfigTest {
|
||||||
public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() {
|
public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() {
|
||||||
props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
|
props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("groupId");
|
final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs(groupId);
|
||||||
assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
|
assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -269,7 +269,7 @@ public class StreamsConfigTest {
|
||||||
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 10);
|
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 10);
|
||||||
props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
|
props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
final Map<String, Object> configs = streamsConfig.getProducerConfigs("clientId");
|
final Map<String, Object> configs = streamsConfig.getProducerConfigs(clientId);
|
||||||
assertEquals(10, configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG));
|
assertEquals(10, configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG));
|
||||||
assertEquals(1, configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG));
|
assertEquals(1, configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG));
|
||||||
}
|
}
|
||||||
|
@ -278,10 +278,10 @@ public class StreamsConfigTest {
|
||||||
public void shouldForwardCustomConfigsWithNoPrefixToAllClients() {
|
public void shouldForwardCustomConfigsWithNoPrefixToAllClients() {
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
props.put("custom.property.host", "host");
|
props.put("custom.property.host", "host");
|
||||||
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
|
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
|
||||||
final Map<String, Object> restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
|
final Map<String, Object> restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs(clientId);
|
||||||
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
|
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs(clientId);
|
||||||
final Map<String, Object> adminConfigs = streamsConfig.getAdminConfigs("clientId");
|
final Map<String, Object> adminConfigs = streamsConfig.getAdminConfigs(clientId);
|
||||||
assertEquals("host", consumerConfigs.get("custom.property.host"));
|
assertEquals("host", consumerConfigs.get("custom.property.host"));
|
||||||
assertEquals("host", restoreConsumerConfigs.get("custom.property.host"));
|
assertEquals("host", restoreConsumerConfigs.get("custom.property.host"));
|
||||||
assertEquals("host", producerConfigs.get("custom.property.host"));
|
assertEquals("host", producerConfigs.get("custom.property.host"));
|
||||||
|
@ -295,10 +295,10 @@ public class StreamsConfigTest {
|
||||||
props.put(consumerPrefix("custom.property.host"), "host1");
|
props.put(consumerPrefix("custom.property.host"), "host1");
|
||||||
props.put(producerPrefix("custom.property.host"), "host2");
|
props.put(producerPrefix("custom.property.host"), "host2");
|
||||||
props.put(adminClientPrefix("custom.property.host"), "host3");
|
props.put(adminClientPrefix("custom.property.host"), "host3");
|
||||||
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
|
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
|
||||||
final Map<String, Object> restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
|
final Map<String, Object> restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs(clientId);
|
||||||
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
|
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs(clientId);
|
||||||
final Map<String, Object> adminConfigs = streamsConfig.getAdminConfigs("clientId");
|
final Map<String, Object> adminConfigs = streamsConfig.getAdminConfigs(clientId);
|
||||||
assertEquals("host1", consumerConfigs.get("custom.property.host"));
|
assertEquals("host1", consumerConfigs.get("custom.property.host"));
|
||||||
assertEquals("host1", restoreConsumerConfigs.get("custom.property.host"));
|
assertEquals("host1", restoreConsumerConfigs.get("custom.property.host"));
|
||||||
assertEquals("host2", producerConfigs.get("custom.property.host"));
|
assertEquals("host2", producerConfigs.get("custom.property.host"));
|
||||||
|
@ -309,7 +309,7 @@ public class StreamsConfigTest {
|
||||||
public void shouldSupportNonPrefixedAdminConfigs() {
|
public void shouldSupportNonPrefixedAdminConfigs() {
|
||||||
props.put(AdminClientConfig.RETRIES_CONFIG, 10);
|
props.put(AdminClientConfig.RETRIES_CONFIG, 10);
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
final Map<String, Object> configs = streamsConfig.getAdminConfigs("clientId");
|
final Map<String, Object> configs = streamsConfig.getAdminConfigs(clientId);
|
||||||
assertEquals(10, configs.get(AdminClientConfig.RETRIES_CONFIG));
|
assertEquals(10, configs.get(AdminClientConfig.RETRIES_CONFIG));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -332,7 +332,7 @@ public class StreamsConfigTest {
|
||||||
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
|
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
|
||||||
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
|
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
|
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
|
||||||
assertEquals("latest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
|
assertEquals("latest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
|
||||||
assertEquals("10", consumerConfigs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
|
assertEquals("10", consumerConfigs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
|
||||||
}
|
}
|
||||||
|
@ -341,7 +341,7 @@ public class StreamsConfigTest {
|
||||||
public void shouldOverrideStreamsDefaultProducerConfigs() {
|
public void shouldOverrideStreamsDefaultProducerConfigs() {
|
||||||
props.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), "10000");
|
props.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), "10000");
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
|
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs(clientId);
|
||||||
assertEquals("10000", producerConfigs.get(ProducerConfig.LINGER_MS_CONFIG));
|
assertEquals("10000", producerConfigs.get(ProducerConfig.LINGER_MS_CONFIG));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -349,7 +349,7 @@ public class StreamsConfigTest {
|
||||||
public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() {
|
public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() {
|
||||||
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
|
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
|
final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs(clientId);
|
||||||
assertEquals("10", consumerConfigs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
|
assertEquals("10", consumerConfigs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -357,7 +357,7 @@ public class StreamsConfigTest {
|
||||||
public void shouldResetToDefaultIfConsumerAutoCommitIsOverridden() {
|
public void shouldResetToDefaultIfConsumerAutoCommitIsOverridden() {
|
||||||
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
|
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("a", "b");
|
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("a", "b", threadIdx);
|
||||||
assertEquals("false", consumerConfigs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
|
assertEquals("false", consumerConfigs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -365,7 +365,7 @@ public class StreamsConfigTest {
|
||||||
public void shouldResetToDefaultIfRestoreConsumerAutoCommitIsOverridden() {
|
public void shouldResetToDefaultIfRestoreConsumerAutoCommitIsOverridden() {
|
||||||
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
|
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("client");
|
final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs(clientId);
|
||||||
assertEquals("false", consumerConfigs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
|
assertEquals("false", consumerConfigs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -373,13 +373,12 @@ public class StreamsConfigTest {
|
||||||
public void testGetRestoreConsumerConfigsWithRestoreConsumerOverridenPrefix() {
|
public void testGetRestoreConsumerConfigsWithRestoreConsumerOverridenPrefix() {
|
||||||
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "5");
|
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "5");
|
||||||
props.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "50");
|
props.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "50");
|
||||||
final Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs("clientId");
|
final Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs(clientId);
|
||||||
assertEquals("50", returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
|
assertEquals("50", returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetGlobalConsumerConfigs() {
|
public void testGetGlobalConsumerConfigs() {
|
||||||
final String clientId = "client";
|
|
||||||
final Map<String, Object> returnedProps = streamsConfig.getGlobalConsumerConfigs(clientId);
|
final Map<String, Object> returnedProps = streamsConfig.getGlobalConsumerConfigs(clientId);
|
||||||
assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-global-consumer");
|
assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-global-consumer");
|
||||||
assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
|
assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
|
||||||
|
@ -389,7 +388,7 @@ public class StreamsConfigTest {
|
||||||
public void shouldSupportPrefixedGlobalConsumerConfigs() {
|
public void shouldSupportPrefixedGlobalConsumerConfigs() {
|
||||||
props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
|
props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
final Map<String, Object> consumerConfigs = streamsConfig.getGlobalConsumerConfigs("clientId");
|
final Map<String, Object> consumerConfigs = streamsConfig.getGlobalConsumerConfigs(clientId);
|
||||||
assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
|
assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -397,7 +396,7 @@ public class StreamsConfigTest {
|
||||||
public void shouldSupportPrefixedPropertiesThatAreNotPartOfGlobalConsumerConfig() {
|
public void shouldSupportPrefixedPropertiesThatAreNotPartOfGlobalConsumerConfig() {
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
props.put(consumerPrefix("interceptor.statsd.host"), "host");
|
props.put(consumerPrefix("interceptor.statsd.host"), "host");
|
||||||
final Map<String, Object> consumerConfigs = streamsConfig.getGlobalConsumerConfigs("clientId");
|
final Map<String, Object> consumerConfigs = streamsConfig.getGlobalConsumerConfigs(clientId);
|
||||||
assertEquals("host", consumerConfigs.get("interceptor.statsd.host"));
|
assertEquals("host", consumerConfigs.get("interceptor.statsd.host"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -405,7 +404,7 @@ public class StreamsConfigTest {
|
||||||
public void shouldBeSupportNonPrefixedGlobalConsumerConfigs() {
|
public void shouldBeSupportNonPrefixedGlobalConsumerConfigs() {
|
||||||
props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
|
props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
final Map<String, Object> consumerConfigs = streamsConfig.getGlobalConsumerConfigs("groupId");
|
final Map<String, Object> consumerConfigs = streamsConfig.getGlobalConsumerConfigs(groupId);
|
||||||
assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
|
assertEquals(1, consumerConfigs.get(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -413,7 +412,7 @@ public class StreamsConfigTest {
|
||||||
public void shouldResetToDefaultIfGlobalConsumerAutoCommitIsOverridden() {
|
public void shouldResetToDefaultIfGlobalConsumerAutoCommitIsOverridden() {
|
||||||
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
|
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
final Map<String, Object> consumerConfigs = streamsConfig.getGlobalConsumerConfigs("client");
|
final Map<String, Object> consumerConfigs = streamsConfig.getGlobalConsumerConfigs(clientId);
|
||||||
assertEquals("false", consumerConfigs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
|
assertEquals("false", consumerConfigs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -421,17 +420,10 @@ public class StreamsConfigTest {
|
||||||
public void testGetGlobalConsumerConfigsWithGlobalConsumerOverridenPrefix() {
|
public void testGetGlobalConsumerConfigsWithGlobalConsumerOverridenPrefix() {
|
||||||
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "5");
|
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "5");
|
||||||
props.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "50");
|
props.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "50");
|
||||||
final Map<String, Object> returnedProps = streamsConfig.getGlobalConsumerConfigs("clientId");
|
final Map<String, Object> returnedProps = streamsConfig.getGlobalConsumerConfigs(clientId);
|
||||||
assertEquals("50", returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
|
assertEquals("50", returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() {
|
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
|
||||||
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
|
|
||||||
assertThat(consumerConfigs.get("internal.leave.group.on.close"), CoreMatchers.equalTo(false));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldAcceptAtLeastOnce() {
|
public void shouldAcceptAtLeastOnce() {
|
||||||
// don't use `StreamsConfig.AT_LEAST_ONCE` to actually do a useful test
|
// don't use `StreamsConfig.AT_LEAST_ONCE` to actually do a useful test
|
||||||
|
@ -457,7 +449,7 @@ public class StreamsConfigTest {
|
||||||
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
|
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
|
||||||
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "anyValue");
|
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "anyValue");
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
|
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
|
||||||
assertThat(consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
|
assertThat(consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -465,7 +457,7 @@ public class StreamsConfigTest {
|
||||||
public void shouldAllowSettingConsumerIsolationLevelIfEosDisabled() {
|
public void shouldAllowSettingConsumerIsolationLevelIfEosDisabled() {
|
||||||
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
|
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientrId");
|
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
|
||||||
assertThat(consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT)));
|
assertThat(consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -475,7 +467,7 @@ public class StreamsConfigTest {
|
||||||
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
|
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
|
||||||
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "anyValue");
|
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "anyValue");
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
|
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs(clientId);
|
||||||
assertTrue((Boolean) producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG));
|
assertTrue((Boolean) producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -483,7 +475,7 @@ public class StreamsConfigTest {
|
||||||
public void shouldAllowSettingProducerEnableIdempotenceIfEosDisabled() {
|
public void shouldAllowSettingProducerEnableIdempotenceIfEosDisabled() {
|
||||||
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
|
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
|
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs(clientId);
|
||||||
assertThat(producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), equalTo(false));
|
assertThat(producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), equalTo(false));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -492,8 +484,8 @@ public class StreamsConfigTest {
|
||||||
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
|
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
|
|
||||||
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
|
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
|
||||||
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
|
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs(clientId);
|
||||||
|
|
||||||
assertThat(consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
|
assertThat(consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
|
||||||
assertTrue((Boolean) producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG));
|
assertTrue((Boolean) producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG));
|
||||||
|
@ -508,7 +500,7 @@ public class StreamsConfigTest {
|
||||||
props.put(ProducerConfig.RETRIES_CONFIG, numberOfRetries);
|
props.put(ProducerConfig.RETRIES_CONFIG, numberOfRetries);
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
|
|
||||||
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
|
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs(clientId);
|
||||||
|
|
||||||
assertThat(producerConfigs.get(ProducerConfig.RETRIES_CONFIG), equalTo(numberOfRetries));
|
assertThat(producerConfigs.get(ProducerConfig.RETRIES_CONFIG), equalTo(numberOfRetries));
|
||||||
}
|
}
|
||||||
|
@ -589,7 +581,7 @@ public class StreamsConfigTest {
|
||||||
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 7);
|
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 7);
|
||||||
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
final StreamsConfig streamsConfig = new StreamsConfig(props);
|
||||||
try {
|
try {
|
||||||
streamsConfig.getProducerConfigs("clientId");
|
streamsConfig.getProducerConfigs(clientId);
|
||||||
fail("Should throw ConfigException when ESO is enabled and maxInFlight requests exceeds 5");
|
fail("Should throw ConfigException when ESO is enabled and maxInFlight requests exceeds 5");
|
||||||
} catch (final ConfigException e) {
|
} catch (final ConfigException e) {
|
||||||
assertEquals("Invalid value 7 for configuration max.in.flight.requests.per.connection: Can't exceed 5 when exactly-once processing is enabled", e.getMessage());
|
assertEquals("Invalid value 7 for configuration max.in.flight.requests.per.connection: Can't exceed 5 when exactly-once processing is enabled", e.getMessage());
|
||||||
|
@ -601,7 +593,7 @@ public class StreamsConfigTest {
|
||||||
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
|
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
|
||||||
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "3");
|
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "3");
|
||||||
|
|
||||||
new StreamsConfig(props).getProducerConfigs("clientId");
|
new StreamsConfig(props).getProducerConfigs(clientId);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -610,7 +602,7 @@ public class StreamsConfigTest {
|
||||||
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "not-a-number");
|
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "not-a-number");
|
||||||
|
|
||||||
try {
|
try {
|
||||||
new StreamsConfig(props).getProducerConfigs("clientId");
|
new StreamsConfig(props).getProducerConfigs(clientId);
|
||||||
fail("Should throw ConfigException when EOS is enabled and maxInFlight cannot be paresed into an integer");
|
fail("Should throw ConfigException when EOS is enabled and maxInFlight cannot be paresed into an integer");
|
||||||
} catch (final ConfigException e) {
|
} catch (final ConfigException e) {
|
||||||
assertEquals("Invalid value not-a-number for configuration max.in.flight.requests.per.connection: String value could not be parsed as 32-bit integer", e.getMessage());
|
assertEquals("Invalid value not-a-number for configuration max.in.flight.requests.per.connection: String value could not be parsed as 32-bit integer", e.getMessage());
|
||||||
|
|
|
@ -139,7 +139,6 @@ public abstract class AbstractJoinIntegrationTest {
|
||||||
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||||
|
|
||||||
STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||||
STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
|
|
||||||
STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
|
STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
|
||||||
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
|
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
|
||||||
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||||
|
|
|
@ -156,7 +156,6 @@ public abstract class AbstractResetIntegrationTest {
|
||||||
streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
|
streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
|
||||||
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||||
streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT);
|
streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT);
|
||||||
streamsConfig.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
|
|
||||||
streamsConfig.putAll(commonClientConfig);
|
streamsConfig.putAll(commonClientConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -160,7 +160,6 @@ public class EosIntegrationTest {
|
||||||
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1);
|
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1);
|
||||||
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000");
|
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000");
|
||||||
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
|
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
|
||||||
properties.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
|
|
||||||
|
|
||||||
for (int i = 0; i < numberOfRestarts; ++i) {
|
for (int i = 0; i < numberOfRestarts; ++i) {
|
||||||
final Properties config = StreamsTestUtils.getStreamsConfig(
|
final Properties config = StreamsTestUtils.getStreamsConfig(
|
||||||
|
|
|
@ -143,7 +143,6 @@ public class FineGrainedAutoResetIntegrationTest {
|
||||||
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
|
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
|
||||||
props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
|
props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
|
||||||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||||
props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
|
|
||||||
|
|
||||||
streamsConfiguration = StreamsTestUtils.getStreamsConfig(
|
streamsConfiguration = StreamsTestUtils.getStreamsConfig(
|
||||||
"testAutoOffsetId",
|
"testAutoOffsetId",
|
||||||
|
|
|
@ -95,7 +95,6 @@ public class GlobalKTableEOSIntegrationTest {
|
||||||
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||||
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
||||||
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||||
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
|
|
||||||
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
|
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
|
||||||
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
|
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
|
||||||
globalTable = builder.globalTable(globalTableTopic, Consumed.with(Serdes.Long(), Serdes.String()),
|
globalTable = builder.globalTable(globalTableTopic, Consumed.with(Serdes.Long(), Serdes.String()),
|
||||||
|
|
|
@ -89,7 +89,6 @@ public class GlobalKTableIntegrationTest {
|
||||||
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||||
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
||||||
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||||
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
|
|
||||||
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
|
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
|
||||||
globalTable = builder.globalTable(globalTableTopic, Consumed.with(Serdes.Long(), Serdes.String()),
|
globalTable = builder.globalTable(globalTableTopic, Consumed.with(Serdes.Long(), Serdes.String()),
|
||||||
Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as(globalStore)
|
Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as(globalStore)
|
||||||
|
|
|
@ -88,7 +88,6 @@ public class GlobalThreadShutDownOrderTest {
|
||||||
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||||
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
||||||
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||||
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
|
|
||||||
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
|
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
|
||||||
|
|
||||||
final Consumed<String, Long> stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long());
|
final Consumed<String, Long> stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long());
|
||||||
|
|
|
@ -93,7 +93,6 @@ public class InternalTopicIntegrationTest {
|
||||||
streamsProp.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
|
streamsProp.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
|
||||||
streamsProp.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
streamsProp.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||||
streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||||
streamsProp.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
|
|
@ -92,7 +92,6 @@ public class KStreamAggregationDedupIntegrationTest {
|
||||||
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
||||||
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
|
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
|
||||||
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
|
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
|
||||||
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
|
|
||||||
|
|
||||||
final KeyValueMapper<Integer, String, String> mapper = MockMapper.selectValueMapper();
|
final KeyValueMapper<Integer, String, String> mapper = MockMapper.selectValueMapper();
|
||||||
stream = builder.stream(streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String()));
|
stream = builder.stream(streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String()));
|
||||||
|
|
|
@ -129,7 +129,6 @@ public class KStreamAggregationIntegrationTest {
|
||||||
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||||
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
||||||
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||||
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
|
|
||||||
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
|
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
|
||||||
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||||
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
|
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
|
||||||
|
|
|
@ -78,7 +78,6 @@ public class KTableSourceTopicRestartIntegrationTest {
|
||||||
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
|
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
|
||||||
STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
||||||
STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
|
||||||
STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
|
|
||||||
STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5);
|
STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5);
|
||||||
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
|
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
|
||||||
|
|
||||||
|
|
|
@ -160,7 +160,6 @@ public class PurgeRepartitionTopicIntegrationTest {
|
||||||
streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_MS_CONFIG), PURGE_INTERVAL_MS);
|
streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_MS_CONFIG), PURGE_INTERVAL_MS);
|
||||||
streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), PURGE_SEGMENT_BYTES);
|
streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), PURGE_SEGMENT_BYTES);
|
||||||
streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG), PURGE_SEGMENT_BYTES / 2); // we cannot allow batch size larger than segment size
|
streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG), PURGE_SEGMENT_BYTES / 2); // we cannot allow batch size larger than segment size
|
||||||
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
|
|
||||||
|
|
||||||
final StreamsBuilder builder = new StreamsBuilder();
|
final StreamsBuilder builder = new StreamsBuilder();
|
||||||
builder.stream(INPUT_TOPIC)
|
builder.stream(INPUT_TOPIC)
|
||||||
|
|
|
@ -188,8 +188,6 @@ public class QueryableStateIntegrationTest {
|
||||||
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||||
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||||
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
|
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
|
||||||
// override this to make the rebalances happen quickly
|
|
||||||
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
|
|
||||||
|
|
||||||
stringComparator = Comparator.comparing((KeyValue<String, String> o) -> o.key).thenComparing(o -> o.value);
|
stringComparator = Comparator.comparing((KeyValue<String, String> o) -> o.key).thenComparing(o -> o.value);
|
||||||
stringLongComparator = Comparator.comparing((KeyValue<String, Long> o) -> o.key).thenComparingLong(o -> o.value);
|
stringLongComparator = Comparator.comparing((KeyValue<String, Long> o) -> o.key).thenComparingLong(o -> o.value);
|
||||||
|
|
|
@ -122,7 +122,6 @@ public class RegexSourceIntegrationTest {
|
||||||
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
|
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
|
||||||
properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
|
properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
|
||||||
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||||
properties.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
|
|
||||||
|
|
||||||
streamsConfiguration = StreamsTestUtils.getStreamsConfig("regex-source-integration-test",
|
streamsConfiguration = StreamsTestUtils.getStreamsConfig("regex-source-integration-test",
|
||||||
CLUSTER.bootstrapServers(),
|
CLUSTER.bootstrapServers(),
|
||||||
|
|
|
@ -91,7 +91,6 @@ public class RepartitionOptimizingIntegrationTest {
|
||||||
final Properties props = new Properties();
|
final Properties props = new Properties();
|
||||||
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1024 * 10);
|
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1024 * 10);
|
||||||
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000);
|
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000);
|
||||||
props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
|
|
||||||
|
|
||||||
streamsConfiguration = StreamsTestUtils.getStreamsConfig(
|
streamsConfiguration = StreamsTestUtils.getStreamsConfig(
|
||||||
"maybe-optimized-test-app",
|
"maybe-optimized-test-app",
|
||||||
|
|
|
@ -79,7 +79,6 @@ public class RepartitionWithMergeOptimizingIntegrationTest {
|
||||||
final Properties props = new Properties();
|
final Properties props = new Properties();
|
||||||
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1024 * 10);
|
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1024 * 10);
|
||||||
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000);
|
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000);
|
||||||
props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
|
|
||||||
|
|
||||||
streamsConfiguration = StreamsTestUtils.getStreamsConfig(
|
streamsConfiguration = StreamsTestUtils.getStreamsConfig(
|
||||||
"maybe-optimized-with-merge-test-app",
|
"maybe-optimized-with-merge-test-app",
|
||||||
|
|
|
@ -104,7 +104,6 @@ public class RestoreIntegrationTest {
|
||||||
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
|
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
|
||||||
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
|
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
|
||||||
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||||
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
|
|
||||||
return streamsConfiguration;
|
return streamsConfiguration;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -74,7 +74,6 @@ import static org.hamcrest.Matchers.equalTo;
|
||||||
public class IntegrationTestUtils {
|
public class IntegrationTestUtils {
|
||||||
|
|
||||||
public static final long DEFAULT_TIMEOUT = 60 * 1000L;
|
public static final long DEFAULT_TIMEOUT = 60 * 1000L;
|
||||||
public static final String INTERNAL_LEAVE_GROUP_ON_CLOSE = "internal.leave.group.on.close";
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Records state transition for StreamThread
|
* Records state transition for StreamThread
|
||||||
|
|
Loading…
Reference in New Issue