MINOR: fixing JavaDocs and other cleanup (#17207)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Matthias J. Sax 2024-09-22 20:46:40 -07:00 committed by GitHub
parent 9685aa7547
commit d063443825
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 136 additions and 77 deletions

View File

@ -32,6 +32,7 @@ public interface ClientInstanceIds {
* *
* @throws IllegalStateException If telemetry is disabled on the admin client. * @throws IllegalStateException If telemetry is disabled on the admin client.
*/ */
@SuppressWarnings("unused")
Uuid adminInstanceId(); Uuid adminInstanceId();
/** /**
@ -39,6 +40,7 @@ public interface ClientInstanceIds {
* *
* @return a map from thread key to {@code client instance id} * @return a map from thread key to {@code client instance id}
*/ */
@SuppressWarnings("unused")
Map<String, Uuid> consumerInstanceIds(); Map<String, Uuid> consumerInstanceIds();
/** /**
@ -46,5 +48,6 @@ public interface ClientInstanceIds {
* *
* @return a map from thread key to {@code client instance id} * @return a map from thread key to {@code client instance id}
*/ */
@SuppressWarnings("unused")
Map<String, Uuid> producerInstanceIds(); Map<String, Uuid> producerInstanceIds();
} }

View File

@ -323,11 +323,11 @@ public class KafkaStreams implements AutoCloseable {
if (state == State.PENDING_SHUTDOWN && newState != State.NOT_RUNNING) { if (state == State.PENDING_SHUTDOWN && newState != State.NOT_RUNNING) {
// when the state is already in PENDING_SHUTDOWN, all other transitions than NOT_RUNNING (due to thread dying) will be // when the state is already in PENDING_SHUTDOWN, all other transitions than NOT_RUNNING (due to thread dying) will be
// refused but we do not throw exception here, to allow appropriate error handling // refused, but we do not throw exception here, to allow appropriate error handling
return false; return false;
} else if (state == State.NOT_RUNNING && (newState == State.PENDING_SHUTDOWN || newState == State.NOT_RUNNING)) { } else if (state == State.NOT_RUNNING && (newState == State.PENDING_SHUTDOWN || newState == State.NOT_RUNNING)) {
// when the state is already in NOT_RUNNING, its transition to PENDING_SHUTDOWN or NOT_RUNNING (due to consecutive close calls) // when the state is already in NOT_RUNNING, its transition to PENDING_SHUTDOWN or NOT_RUNNING (due to consecutive close calls)
// will be refused but we do not throw exception here, to allow idempotent close calls // will be refused, but we do not throw exception here, to allow idempotent close calls
return false; return false;
} else if (state == State.REBALANCING && newState == State.REBALANCING) { } else if (state == State.REBALANCING && newState == State.REBALANCING) {
// when the state is already in REBALANCING, it should not transit to REBALANCING again // when the state is already in REBALANCING, it should not transit to REBALANCING again
@ -337,7 +337,7 @@ public class KafkaStreams implements AutoCloseable {
return false; return false;
} else if (state == State.PENDING_ERROR && newState != State.ERROR) { } else if (state == State.PENDING_ERROR && newState != State.ERROR) {
// when the state is already in PENDING_ERROR, all other transitions than ERROR (due to thread dying) will be // when the state is already in PENDING_ERROR, all other transitions than ERROR (due to thread dying) will be
// refused but we do not throw exception here, to allow appropriate error handling // refused, but we do not throw exception here, to allow appropriate error handling
return false; return false;
} else if (!state.isValidTransition(newState)) { } else if (!state.isValidTransition(newState)) {
throw new IllegalStateException("Stream-client " + clientId + ": Unexpected state transition from " + oldState + " to " + newState); throw new IllegalStateException("Stream-client " + clientId + ": Unexpected state transition from " + oldState + " to " + newState);
@ -539,9 +539,12 @@ public class KafkaStreams implements AutoCloseable {
} }
break; break;
case SHUTDOWN_CLIENT: case SHUTDOWN_CLIENT:
log.error("Encountered the following exception during processing " + log.error(
"and the registered exception handler opted to " + action + "." + "Encountered the following exception during processing and the registered exception handler" +
" The streams client is going to shut down now. ", throwable); "opted to {}. The streams client is going to shut down now.",
action,
throwable
);
closeToError(); closeToError();
break; break;
case SHUTDOWN_APPLICATION: case SHUTDOWN_APPLICATION:
@ -979,7 +982,7 @@ public class KafkaStreams implements AutoCloseable {
// The application ID is a required config and hence should always have value // The application ID is a required config and hence should always have value
final String userClientId = applicationConfigs.getString(StreamsConfig.CLIENT_ID_CONFIG); final String userClientId = applicationConfigs.getString(StreamsConfig.CLIENT_ID_CONFIG);
final String applicationId = applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG); final String applicationId = applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG);
if (userClientId.length() <= 0) { if (userClientId.isEmpty()) {
clientId = applicationId + "-" + processId; clientId = applicationId + "-" + processId;
} else { } else {
clientId = userClientId; clientId = userClientId;
@ -1194,7 +1197,7 @@ public class KafkaStreams implements AutoCloseable {
for (final StreamThread streamThread : new ArrayList<>(threads)) { for (final StreamThread streamThread : new ArrayList<>(threads)) {
final boolean callingThreadIsNotCurrentStreamThread = !streamThread.getName().equals(Thread.currentThread().getName()); final boolean callingThreadIsNotCurrentStreamThread = !streamThread.getName().equals(Thread.currentThread().getName());
if (streamThread.isThreadAlive() && (callingThreadIsNotCurrentStreamThread || numLiveStreamThreads() == 1)) { if (streamThread.isThreadAlive() && (callingThreadIsNotCurrentStreamThread || numLiveStreamThreads() == 1)) {
log.info("Removing StreamThread " + streamThread.getName()); log.info("Removing StreamThread {}", streamThread.getName());
final Optional<String> groupInstanceID = streamThread.groupInstanceID(); final Optional<String> groupInstanceID = streamThread.groupInstanceID();
streamThread.requestLeaveGroupDuringShutdown(); streamThread.requestLeaveGroupDuringShutdown();
streamThread.shutdown(); streamThread.shutdown();
@ -1268,7 +1271,7 @@ public class KafkaStreams implements AutoCloseable {
} }
log.warn("There are no threads eligible for removal"); log.warn("There are no threads eligible for removal");
} else { } else {
log.warn("Cannot remove a stream thread when Kafka Streams client is in state " + state()); log.warn("Cannot remove a stream thread when Kafka Streams client is in state {}", state());
} }
return Optional.empty(); return Optional.empty();
} }
@ -1276,9 +1279,10 @@ public class KafkaStreams implements AutoCloseable {
/* /*
* Takes a snapshot and counts the number of stream threads which are not in PENDING_SHUTDOWN or DEAD * Takes a snapshot and counts the number of stream threads which are not in PENDING_SHUTDOWN or DEAD
* *
* note: iteration over SynchronizedList is not thread safe so it must be manually synchronized. However, we may * Note: iteration over SynchronizedList is not thread safe, so it must be manually synchronized. However, we may
* require other locks when looping threads and it could cause deadlock. Hence, we create a copy to avoid holding * require other locks when looping threads, and it could cause deadlock. Hence, we create a copy to avoid holding
* threads lock when looping threads. * threads lock when looping threads.
*
* @return number of alive stream threads * @return number of alive stream threads
*/ */
private int numLiveStreamThreads() { private int numLiveStreamThreads() {
@ -1305,7 +1309,7 @@ public class KafkaStreams implements AutoCloseable {
final AtomicInteger maxThreadId = new AtomicInteger(1); final AtomicInteger maxThreadId = new AtomicInteger(1);
synchronized (threads) { synchronized (threads) {
processStreamThread(thread -> { processStreamThread(thread -> {
// trim any DEAD threads from the list so we can reuse the thread.id // trim any DEAD threads from the list, so we can reuse the thread.id
// this is only safe to do once the thread has fully completed shutdown // this is only safe to do once the thread has fully completed shutdown
if (thread.state() == StreamThread.State.DEAD) { if (thread.state() == StreamThread.State.DEAD) {
threads.remove(thread); threads.remove(thread);
@ -1452,7 +1456,7 @@ public class KafkaStreams implements AutoCloseable {
* This will block until all threads have stopped. * This will block until all threads have stopped.
*/ */
public void close() { public void close() {
close(Long.MAX_VALUE, false); close(Optional.empty(), false);
} }
private Thread shutdownHelper(final boolean error, final long timeoutMs, final boolean leaveGroup) { private Thread shutdownHelper(final boolean error, final long timeoutMs, final boolean leaveGroup) {
@ -1531,7 +1535,15 @@ public class KafkaStreams implements AutoCloseable {
}, clientId + "-CloseThread"); }, clientId + "-CloseThread");
} }
private boolean close(final long timeoutMs, final boolean leaveGroup) { private boolean close(final Optional<Long> timeout, final boolean leaveGroup) {
final long timeoutMs;
if (timeout.isPresent()) {
timeoutMs = timeout.get();
log.debug("Stopping Streams client with timeout = {}ms.", timeoutMs);
} else {
timeoutMs = Long.MAX_VALUE;
}
if (state.hasCompletedShutdown()) { if (state.hasCompletedShutdown()) {
log.info("Streams client is already in the terminal {} state, all resources are closed and the client has stopped.", state); log.info("Streams client is already in the terminal {} state, all resources are closed and the client has stopped.", state);
return true; return true;
@ -1574,7 +1586,7 @@ public class KafkaStreams implements AutoCloseable {
private void closeToError() { private void closeToError() {
if (!setState(State.PENDING_ERROR)) { if (!setState(State.PENDING_ERROR)) {
log.info("Skipping shutdown since we are already in " + state()); log.info("Skipping shutdown since we are already in {}", state());
} else { } else {
final Thread shutdownThread = shutdownHelper(true, -1, false); final Thread shutdownThread = shutdownHelper(true, -1, false);
@ -1586,7 +1598,7 @@ public class KafkaStreams implements AutoCloseable {
/** /**
* Shutdown this {@code KafkaStreams} by signaling all the threads to stop, and then wait up to the timeout for the * Shutdown this {@code KafkaStreams} by signaling all the threads to stop, and then wait up to the timeout for the
* threads to join. * threads to join.
* A {@code timeout} of Duration.ZERO (or any other zero duration) makes the close operation asynchronous. * A {@code timeout} of {@link Duration#ZERO} (or any other zero duration) makes the close operation asynchronous.
* Negative-duration timeouts are rejected. * Negative-duration timeouts are rejected.
* *
* @param timeout how long to wait for the threads to shut down * @param timeout how long to wait for the threads to shut down
@ -1602,9 +1614,7 @@ public class KafkaStreams implements AutoCloseable {
throw new IllegalArgumentException("Timeout can't be negative."); throw new IllegalArgumentException("Timeout can't be negative.");
} }
log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMs); return close(Optional.of(timeoutMs), false);
return close(timeoutMs, false);
} }
/** /**
@ -1624,8 +1634,8 @@ public class KafkaStreams implements AutoCloseable {
if (timeoutMs < 0) { if (timeoutMs < 0) {
throw new IllegalArgumentException("Timeout can't be negative."); throw new IllegalArgumentException("Timeout can't be negative.");
} }
log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMs);
return close(timeoutMs, options.leaveGroup); return close(Optional.of(timeoutMs), options.leaveGroup);
} }
private Consumer<StreamThread> streamThreadLeaveConsumerGroup(final long remainingTimeMs) { private Consumer<StreamThread> streamThreadLeaveConsumerGroup(final long remainingTimeMs) {
@ -1679,7 +1689,7 @@ public class KafkaStreams implements AutoCloseable {
* {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all instances that belong to * {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all instances that belong to
* the same Kafka Streams application) and return {@link StreamsMetadata} for each discovered instance. * the same Kafka Streams application) and return {@link StreamsMetadata} for each discovered instance.
* <p> * <p>
* Note: this is a point in time view and it may change due to partition reassignment. * Note: this is a point in time view, and it may change due to partition reassignment.
* *
* @return {@link StreamsMetadata} for each {@code KafkaStreams} instances of this application * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances of this application
*/ */
@ -1697,10 +1707,10 @@ public class KafkaStreams implements AutoCloseable {
* </ul> * </ul>
* and return {@link StreamsMetadata} for each discovered instance. * and return {@link StreamsMetadata} for each discovered instance.
* <p> * <p>
* Note: this is a point in time view and it may change due to partition reassignment. * Note: this is a point in time view, and it may change due to partition reassignment.
* *
* @param storeName the {@code storeName} to find metadata for * @param storeName the {@code storeName} to find metadata for
* @return {@link StreamsMetadata} for each {@code KafkaStreams} instances with the provide {@code storeName} of * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances with the provided {@code storeName} of
* this application * this application
*/ */
public Collection<StreamsMetadata> streamsMetadataForStore(final String storeName) { public Collection<StreamsMetadata> streamsMetadataForStore(final String storeName) {
@ -1730,9 +1740,9 @@ public class KafkaStreams implements AutoCloseable {
* *
* @param storeName the {@code storeName} to find metadata for * @param storeName the {@code storeName} to find metadata for
* @param key the key to find metadata for * @param key the key to find metadata for
* @param partitioner the partitioner to be use to locate the host for the key * @param partitioner the partitioner to be used to locate the host for the key
* @param <K> key type * @param <K> key type
* Returns {@link KeyQueryMetadata} containing all metadata about hosting the given key for the given store, using the * Returns {@link KeyQueryMetadata} containing all metadata about hosting the given key for the given store, using
* the supplied partitioner, or {@code null} if no matching metadata could be found. * the supplied partitioner, or {@code null} if no matching metadata could be found.
*/ */
public <K> KeyQueryMetadata queryMetadataForKey(final String storeName, public <K> KeyQueryMetadata queryMetadataForKey(final String storeName,
@ -1772,7 +1782,7 @@ public class KafkaStreams implements AutoCloseable {
/** /**
* This method pauses processing for the KafkaStreams instance. * This method pauses processing for the KafkaStreams instance.
* *
* <p>Paused topologies will only skip over a) processing, b) punctuation, and c) standby tasks. * <p>Paused topologies will only skip over (a) processing, (b) punctuation, and (c) standby tasks.
* Notably, paused topologies will still poll Kafka consumers, and commit offsets. * Notably, paused topologies will still poll Kafka consumers, and commit offsets.
* This method sets transient state that is not maintained or managed among instances. * This method sets transient state that is not maintained or managed among instances.
* Note that pause() can be called before start() in order to start a KafkaStreams instance * Note that pause() can be called before start() in order to start a KafkaStreams instance
@ -1817,8 +1827,8 @@ public class KafkaStreams implements AutoCloseable {
/** /**
* handle each stream thread in a snapshot of threads. * handle each stream thread in a snapshot of threads.
* noted: iteration over SynchronizedList is not thread safe so it must be manually synchronized. However, we may * noted: iteration over SynchronizedList is not thread safe, so it must be manually synchronized. However, we may
* require other locks when looping threads and it could cause deadlock. Hence, we create a copy to avoid holding * require other locks when looping threads, and it could cause deadlock. Hence, we create a copy to avoid holding
* threads lock when looping threads. * threads lock when looping threads.
* @param consumer handler * @param consumer handler
*/ */
@ -2060,7 +2070,7 @@ public class KafkaStreams implements AutoCloseable {
/** /**
* Run an interactive query against a state store. * Run an interactive query against a state store.
* <p> * <p>
* This method allows callers outside of the Streams runtime to access the internal state of * This method allows callers outside the Streams runtime to access the internal state of
* stateful processors. See <a href="https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html">IQ docs</a> * stateful processors. See <a href="https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html">IQ docs</a>
* for more information. * for more information.
* <p> * <p>

View File

@ -61,7 +61,7 @@ public class KeyQueryMetadata {
/** /**
* Get the Kafka Streams instances that host the key as standbys. * Get the Kafka Streams instances that host the key as standbys.
* *
* @return set of standby {@link HostInfo} or a empty set, if no standbys are configured * @return set of standby {@link HostInfo} or an empty set, if no standbys are configured
*/ */
public Set<HostInfo> standbyHosts() { public Set<HostInfo> standbyHosts() {
return standbyHosts; return standbyHosts;

View File

@ -697,7 +697,7 @@ public class StreamsConfig extends AbstractConfig {
@Deprecated @Deprecated
public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The strategy we use for rack aware assignment. Rack aware assignment will take <code>client.rack</code> and <code>racks</code> of <code>TopicPartition</code> into account when assigning" public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The strategy we use for rack aware assignment. Rack aware assignment will take <code>client.rack</code> and <code>racks</code> of <code>TopicPartition</code> into account when assigning"
+ " tasks to minimize cross rack traffic. Valid settings are : <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_NONE + "</code> (default), which will disable rack aware assignment; <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC + " tasks to minimize cross rack traffic. Valid settings are : <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_NONE + "</code> (default), which will disable rack aware assignment; <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC
+ "</code>, which will compute minimum cross rack traffic assignment; <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY + "</code>, which will compute minimum cross rack traffic and try to balance the tasks of same subtopolgies across different clients"; + "</code>, which will compute minimum cross rack traffic assignment; <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY + "</code>, which will compute minimum cross rack traffic and try to balance the tasks of same subtopologies across different clients";
/** {@code rack.aware.assignment.tags} */ /** {@code rack.aware.assignment.tags} */
@SuppressWarnings("WeakerAccess") @SuppressWarnings("WeakerAccess")
@ -1281,7 +1281,11 @@ public class StreamsConfig extends AbstractConfig {
} else if (value instanceof String) { } else if (value instanceof String) {
return Boolean.parseBoolean((String) value); return Boolean.parseBoolean((String) value);
} else { } else {
log.warn("Invalid value (" + value + ") on internal configuration '" + key + "'. Please specify a true/false value."); log.warn(
"Invalid value ({}) on internal configuration '{}'. Please specify a true/false value.",
value,
key
);
return defaultValue; return defaultValue;
} }
} }
@ -1293,7 +1297,11 @@ public class StreamsConfig extends AbstractConfig {
} else if (value instanceof String) { } else if (value instanceof String) {
return Long.parseLong((String) value); return Long.parseLong((String) value);
} else { } else {
log.warn("Invalid value (" + value + ") on internal configuration '" + key + "'. Please specify a numeric value."); log.warn(
"Invalid value ({}) on internal configuration '{}'. Please specify a numeric value.",
value,
key
);
return defaultValue; return defaultValue;
} }
} }
@ -1303,7 +1311,11 @@ public class StreamsConfig extends AbstractConfig {
if (value instanceof String) { if (value instanceof String) {
return (String) value; return (String) value;
} else { } else {
log.warn("Invalid value (" + value + ") on internal configuration '" + key + "'. Please specify a String value."); log.warn(
"Invalid value ({}) on internal configuration '{}'. Please specify a String value.",
value,
key
);
return defaultValue; return defaultValue;
} }
} }
@ -1444,12 +1456,15 @@ public class StreamsConfig extends AbstractConfig {
DEFAULT_TRANSACTION_TIMEOUT; DEFAULT_TRANSACTION_TIMEOUT;
if (transactionTimeout < commitInterval) { if (transactionTimeout < commitInterval) {
throw new IllegalArgumentException(String.format("Transaction timeout %d was set lower than " + throw new IllegalArgumentException(String.format(
"Transaction timeout %d was set lower than " +
"streams commit interval %d. This will cause ongoing transaction always timeout due to inactivity " + "streams commit interval %d. This will cause ongoing transaction always timeout due to inactivity " +
"caused by long commit interval. Consider reconfiguring commit interval to match " + "caused by long commit interval. Consider reconfiguring commit interval to match " +
"transaction timeout by tuning 'commit.interval.ms' config, or increase the transaction timeout to match " + "transaction timeout by tuning 'commit.interval.ms' config, or increase the transaction timeout to match " +
"commit interval by tuning `producer.transaction.timeout.ms` config.", "commit interval by tuning `producer.transaction.timeout.ms` config.",
transactionTimeout, commitInterval)); transactionTimeout,
commitInterval
));
} }
} }
@ -1525,36 +1540,61 @@ public class StreamsConfig extends AbstractConfig {
// Streams does not allow users to configure certain consumer/producer configurations, for example, // Streams does not allow users to configure certain consumer/producer configurations, for example,
// enable.auto.commit. In cases where user tries to override such non-configurable // enable.auto.commit. In cases where user tries to override such non-configurable
// consumer/producer configurations, log a warning and remove the user defined value from the Map. // consumer/producer configurations, log a warning and remove the user defined value from the Map.
// Thus the default values for these consumer/producer configurations that are suitable for // Thus, the default values for these consumer/producer configurations that are suitable for
// Streams will be used instead. // Streams will be used instead.
final String nonConfigurableConfigMessage = "Unexpected user-specified %s config: %s found. %sUser setting (%s) will be ignored and the Streams default setting (%s) will be used "; final String nonConfigurableConfigMessage = "Unexpected user-specified {} config '{}' found. {} setting ({}) will be ignored and the Streams default setting ({}) will be used.";
final String eosMessage = PROCESSING_GUARANTEE_CONFIG + " is set to " + getString(PROCESSING_GUARANTEE_CONFIG) + ". Hence, "; final String eosMessage = "'" + PROCESSING_GUARANTEE_CONFIG + "' is set to \"" + getString(PROCESSING_GUARANTEE_CONFIG) + "\". Hence, user";
for (final String config: nonConfigurableConfigs) { for (final String config: nonConfigurableConfigs) {
if (clientProvidedProps.containsKey(config)) { if (clientProvidedProps.containsKey(config)) {
if (CONSUMER_DEFAULT_OVERRIDES.containsKey(config)) { if (CONSUMER_DEFAULT_OVERRIDES.containsKey(config)) {
if (!clientProvidedProps.get(config).equals(CONSUMER_DEFAULT_OVERRIDES.get(config))) { if (!clientProvidedProps.get(config).equals(CONSUMER_DEFAULT_OVERRIDES.get(config))) {
log.warn(String.format(nonConfigurableConfigMessage, "consumer", config, "", clientProvidedProps.get(config), CONSUMER_DEFAULT_OVERRIDES.get(config))); log.error(
nonConfigurableConfigMessage,
"consumer",
config,
"User",
clientProvidedProps.get(config),
CONSUMER_DEFAULT_OVERRIDES.get(config)
);
clientProvidedProps.remove(config); clientProvidedProps.remove(config);
} }
} else if (eosEnabled) { } else if (eosEnabled) {
if (CONSUMER_EOS_OVERRIDES.containsKey(config)) { if (CONSUMER_EOS_OVERRIDES.containsKey(config)) {
if (!clientProvidedProps.get(config).equals(CONSUMER_EOS_OVERRIDES.get(config))) { if (!clientProvidedProps.get(config).equals(CONSUMER_EOS_OVERRIDES.get(config))) {
log.warn(String.format(nonConfigurableConfigMessage, log.warn(
"consumer", config, eosMessage, clientProvidedProps.get(config), CONSUMER_EOS_OVERRIDES.get(config))); nonConfigurableConfigMessage,
"consumer",
config,
eosMessage,
clientProvidedProps.get(config),
CONSUMER_EOS_OVERRIDES.get(config)
);
clientProvidedProps.remove(config); clientProvidedProps.remove(config);
} }
} else if (PRODUCER_EOS_OVERRIDES.containsKey(config)) { } else if (PRODUCER_EOS_OVERRIDES.containsKey(config)) {
if (!clientProvidedProps.get(config).equals(PRODUCER_EOS_OVERRIDES.get(config))) { if (!clientProvidedProps.get(config).equals(PRODUCER_EOS_OVERRIDES.get(config))) {
log.warn(String.format(nonConfigurableConfigMessage, log.warn(
"producer", config, eosMessage, clientProvidedProps.get(config), PRODUCER_EOS_OVERRIDES.get(config))); nonConfigurableConfigMessage,
"producer",
config,
eosMessage,
clientProvidedProps.get(config),
PRODUCER_EOS_OVERRIDES.get(config)
);
clientProvidedProps.remove(config); clientProvidedProps.remove(config);
} }
} else if (ProducerConfig.TRANSACTIONAL_ID_CONFIG.equals(config)) { } else if (ProducerConfig.TRANSACTIONAL_ID_CONFIG.equals(config)) {
log.warn(String.format(nonConfigurableConfigMessage, log.warn(
"producer", config, eosMessage, clientProvidedProps.get(config), "<appId>-<generatedSuffix>")); nonConfigurableConfigMessage,
"producer",
config,
eosMessage,
clientProvidedProps.get(config),
"<appId>-<generatedSuffix>"
);
clientProvidedProps.remove(config); clientProvidedProps.remove(config);
} }
} }
@ -1651,9 +1691,11 @@ public class StreamsConfig extends AbstractConfig {
final int batchSize = Integer.parseInt(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG).toString()); final int batchSize = Integer.parseInt(producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG).toString());
if (segmentSize < batchSize) { if (segmentSize < batchSize) {
throw new IllegalArgumentException(String.format("Specified topic segment size %d is is smaller than the configured producer batch size %d, this will cause produced batch not able to be appended to the topic", throw new IllegalArgumentException(String.format(
"Specified topic segment size %d is is smaller than the configured producer batch size %d, this will cause produced batch not able to be appended to the topic",
segmentSize, segmentSize,
batchSize)); batchSize
));
} }
} }
@ -1867,7 +1909,9 @@ public class StreamsConfig extends AbstractConfig {
return serde; return serde;
} catch (final Exception e) { } catch (final Exception e) {
throw new StreamsException( throw new StreamsException(
String.format("Failed to configure key serde %s", keySerdeConfigSetting), e); String.format("Failed to configure key serde %s", keySerdeConfigSetting),
e
);
} }
} }
@ -1889,7 +1933,9 @@ public class StreamsConfig extends AbstractConfig {
return serde; return serde;
} catch (final Exception e) { } catch (final Exception e) {
throw new StreamsException( throw new StreamsException(
String.format("Failed to configure value serde %s", valueSerdeConfigSetting), e); String.format("Failed to configure value serde %s", valueSerdeConfigSetting),
e
);
} }
} }

View File

@ -42,7 +42,7 @@ public interface StreamsMetrics {
* <li>invocation rate (num.operations / seconds)</li> * <li>invocation rate (num.operations / seconds)</li>
* <li>total invocation count</li> * <li>total invocation count</li>
* </ol> * </ol>
* Whenever a user records this sensor via {@link Sensor#record(double)} etc, it will be counted as one invocation * Whenever a user records this sensor via {@link Sensor#record(double)} etc., it will be counted as one invocation
* of the operation, and hence the rate / count metrics will be updated accordingly; and the recorded latency value * of the operation, and hence the rate / count metrics will be updated accordingly; and the recorded latency value
* will be used to update the average / max latency as well. * will be used to update the average / max latency as well.
* *
@ -73,7 +73,7 @@ public interface StreamsMetrics {
* <li>invocation rate (num.operations / time unit)</li> * <li>invocation rate (num.operations / time unit)</li>
* <li>total invocation count</li> * <li>total invocation count</li>
* </ol> * </ol>
* Whenever a user records this sensor via {@link Sensor#record(double)} etc, * Whenever a user records this sensor via {@link Sensor#record(double)} etc.,
* it will be counted as one invocation of the operation, and hence the rate / count metrics will be updated accordingly. * it will be counted as one invocation of the operation, and hence the rate / count metrics will be updated accordingly.
* *
* <p>Note that you can add more metrics to this sensor after you created it, which can then be updated upon * <p>Note that you can add more metrics to this sensor after you created it, which can then be updated upon

View File

@ -68,7 +68,7 @@ import static org.apache.kafka.streams.internals.StreamsConfigUtils.totalCacheSi
* Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the * Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the
* {@link org.apache.kafka.streams.KafkaStreams} or {@link KafkaStreamsNamedTopologyWrapper} constructors will * {@link org.apache.kafka.streams.KafkaStreams} or {@link KafkaStreamsNamedTopologyWrapper} constructors will
* determine the defaults, which can then be overridden for specific topologies by passing them in when creating the * determine the defaults, which can then be overridden for specific topologies by passing them in when creating the
* topology builders via the {@link org.apache.kafka.streams.StreamsBuilder#StreamsBuilder(TopologyConfig)} StreamsBuilder(TopologyConfig)} method. * topology builders via the {@link org.apache.kafka.streams.StreamsBuilder#StreamsBuilder(TopologyConfig) StreamsBuilder(TopologyConfig)} method.
*/ */
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public class TopologyConfig extends AbstractConfig { public class TopologyConfig extends AbstractConfig {

View File

@ -32,7 +32,7 @@ import java.util.regex.Pattern;
* In contrast, two sub-topologies are not connected but can be linked to each other via topics, i.e., if one * In contrast, two sub-topologies are not connected but can be linked to each other via topics, i.e., if one
* sub-topology {@link Topology#addSink(String, String, String...) writes} into a topic and another sub-topology * sub-topology {@link Topology#addSink(String, String, String...) writes} into a topic and another sub-topology
* {@link Topology#addSource(String, String...) reads} from the same topic. * {@link Topology#addSource(String, String...) reads} from the same topic.
* Message {@link ProcessorContext#forward(Record, String)} forwards} using custom Processors and Transformers are not considered in the topology graph. * Message {@link ProcessorContext#forward(Record, String) forwards} using custom Processors and Transformers are not considered in the topology graph.
* <p> * <p>
* When {@link KafkaStreams#start()} is called, different sub-topologies will be constructed and executed as independent * When {@link KafkaStreams#start()} is called, different sub-topologies will be constructed and executed as independent
* {@link StreamTask tasks}. * {@link StreamTask tasks}.

View File

@ -236,8 +236,7 @@ public class StreamsConfigTest {
final Map<String, Object> serializerConfigs = new HashMap<>(); final Map<String, Object> serializerConfigs = new HashMap<>();
serializerConfigs.put("key.serializer.encoding", StandardCharsets.UTF_8.name()); serializerConfigs.put("key.serializer.encoding", StandardCharsets.UTF_8.name());
serializerConfigs.put("value.serializer.encoding", StandardCharsets.UTF_16.name()); serializerConfigs.put("value.serializer.encoding", StandardCharsets.UTF_16.name());
final Serializer<String> serializer = new StringSerializer(); try (final Serializer<String> serializer = new StringSerializer()) {
final String str = "my string for testing"; final String str = "my string for testing";
final String topic = "my topic"; final String topic = "my topic";
@ -255,6 +254,7 @@ public class StreamsConfigTest {
"Should get the original string after serialization and deserialization with the configured encoding" "Should get the original string after serialization and deserialization with the configured encoding"
); );
} }
}
@Test @Test
public void shouldSupportMultipleBootstrapServers() { public void shouldSupportMultipleBootstrapServers() {