KAFKA-6157; Fix repeated words words in JavaDoc and comments.

Author: Adem Efe Gencer <agencer@linkedin.com>

Reviewers: Jiangjie Qin <becket.qin@gmail.com>

Closes #4170 from efeg/bug/typoFix
This commit is contained in:
Adem Efe Gencer 2017-11-05 18:00:43 -08:00 committed by Jiangjie Qin
parent 520b313628
commit 86062e9a78
40 changed files with 55 additions and 54 deletions

View File

@ -60,7 +60,7 @@ final class InFlightRequests {
} }
/** /**
* Get the oldest request (the one that that will be completed next) for the given node * Get the oldest request (the one that will be completed next) for the given node
*/ */
public NetworkClient.InFlightRequest completeNext(String node) { public NetworkClient.InFlightRequest completeNext(String node) {
return requestQueue(node).pollLast(); return requestQueue(node).pollLast();

View File

@ -369,7 +369,7 @@ public class NetworkClient implements KafkaClient {
if (!isInternalRequest) { if (!isInternalRequest) {
// If this request came from outside the NetworkClient, validate // If this request came from outside the NetworkClient, validate
// that we can send data. If the request is internal, we trust // that we can send data. If the request is internal, we trust
// that that internal code has done this validation. Validation // that internal code has done this validation. Validation
// will be slightly different for some internal requests (for // will be slightly different for some internal requests (for
// example, ApiVersionsRequests can be sent prior to being in // example, ApiVersionsRequests can be sent prior to being in
// READY state.) // READY state.)

View File

@ -423,7 +423,7 @@ import java.util.regex.Pattern;
* <p> * <p>
* Transactions were introduced in Kafka 0.11.0 wherein applications can write to multiple topics and partitions atomically. * Transactions were introduced in Kafka 0.11.0 wherein applications can write to multiple topics and partitions atomically.
* In order for this to work, consumers reading from these partitions should be configured to only read committed data. * In order for this to work, consumers reading from these partitions should be configured to only read committed data.
* This can be achieved by by setting the {@code isolation.level=read_committed} in the consumer's configuration. * This can be achieved by setting the {@code isolation.level=read_committed} in the consumer's configuration.
* *
* <p> * <p>
* In <code>read_committed</code> mode, the consumer will read only those transactional messages which have been * In <code>read_committed</code> mode, the consumer will read only those transactional messages which have been

View File

@ -951,7 +951,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* </p> * </p>
* <p> * <p>
* Applications don't need to call this method for transactional producers, since the {@link #commitTransaction()} will * Applications don't need to call this method for transactional producers, since the {@link #commitTransaction()} will
* flush all buffered records before performing the commit. This ensures that all the the {@link #send(ProducerRecord)} * flush all buffered records before performing the commit. This ensures that all the {@link #send(ProducerRecord)}
* calls made since the previous {@link #beginTransaction()} are completed before the commit. * calls made since the previous {@link #beginTransaction()} are completed before the commit.
* </p> * </p>
* *

View File

@ -384,7 +384,7 @@ public class MemoryRecordsBuilder {
} }
/** /**
* Append a record and return its checksum for message format v0 and v1, or null for for v2 and above. * Append a record and return its checksum for message format v0 and v1, or null for v2 and above.
*/ */
private Long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key, private Long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key,
ByteBuffer value, Header[] headers) { ByteBuffer value, Header[] headers) {

View File

@ -186,7 +186,7 @@ public class SaslClientAuthenticator implements Authenticator {
if (authenticateVersion != null) if (authenticateVersion != null)
saslAuthenticateVersion((short) Math.min(authenticateVersion.maxVersion, ApiKeys.SASL_AUTHENTICATE.latestVersion())); saslAuthenticateVersion((short) Math.min(authenticateVersion.maxVersion, ApiKeys.SASL_AUTHENTICATE.latestVersion()));
setSaslState(SaslState.SEND_HANDSHAKE_REQUEST); setSaslState(SaslState.SEND_HANDSHAKE_REQUEST);
// Fall through to send send handshake request with the latest supported version // Fall through to send handshake request with the latest supported version
} }
case SEND_HANDSHAKE_REQUEST: case SEND_HANDSHAKE_REQUEST:
SaslHandshakeRequest handshakeRequest = createSaslHandshakeRequest(saslHandshakeVersion); SaslHandshakeRequest handshakeRequest = createSaslHandshakeRequest(saslHandshakeVersion);

View File

@ -1213,7 +1213,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
public void onRevoked(String leader, Collection<String> connectors, Collection<ConnectorTaskId> tasks) { public void onRevoked(String leader, Collection<String> connectors, Collection<ConnectorTaskId> tasks) {
log.info("Rebalance started"); log.info("Rebalance started");
// Note that since we don't reset the assignment, we we don't revoke leadership here. During a rebalance, // Note that since we don't reset the assignment, we don't revoke leadership here. During a rebalance,
// it is still important to have a leader that can write configs, offsets, etc. // it is still important to have a leader that can write configs, offsets, etc.
if (rebalanceResolved) { if (rebalanceResolved) {

View File

@ -316,7 +316,7 @@ public class KafkaBasedLog<K, V> {
synchronized (KafkaBasedLog.this) { synchronized (KafkaBasedLog.this) {
// Only invoke exactly the number of callbacks we found before triggering the read to log end // Only invoke exactly the number of callbacks we found before triggering the read to log end
// since it is possible for another write + readToEnd to sneak in in the meantime // since it is possible for another write + readToEnd to sneak in the meantime
for (int i = 0; i < numCallbacks; i++) { for (int i = 0; i < numCallbacks; i++) {
Callback<Void> cb = readLogEndOffsetCallbacks.poll(); Callback<Void> cb = readLogEndOffsetCallbacks.poll();
cb.onCompletion(null, null); cb.onCompletion(null, null);

View File

@ -282,7 +282,7 @@ public class KafkaConfigBackingStoreTest {
assertNull(configState.taskConfig(TASK_IDS.get(0))); assertNull(configState.taskConfig(TASK_IDS.get(0)));
assertNull(configState.taskConfig(TASK_IDS.get(1))); assertNull(configState.taskConfig(TASK_IDS.get(1)));
// Writing task task configs should block until all the writes have been performed and the root record update // Writing task configs should block until all the writes have been performed and the root record update
// has completed // has completed
List<Map<String, String>> taskConfigs = Arrays.asList(SAMPLE_CONFIGS.get(0), SAMPLE_CONFIGS.get(1)); List<Map<String, String>> taskConfigs = Arrays.asList(SAMPLE_CONFIGS.get(0), SAMPLE_CONFIGS.get(1));
configStorage.putTaskConfigs("connector1", taskConfigs); configStorage.putTaskConfigs("connector1", taskConfigs);
@ -335,7 +335,7 @@ public class KafkaConfigBackingStoreTest {
ClusterConfigState configState = configStorage.snapshot(); ClusterConfigState configState = configStorage.snapshot();
assertEquals(-1, configState.offset()); assertEquals(-1, configState.offset());
// Writing task task configs should block until all the writes have been performed and the root record update // Writing task configs should block until all the writes have been performed and the root record update
// has completed // has completed
List<Map<String, String>> taskConfigs = Collections.emptyList(); List<Map<String, String>> taskConfigs = Collections.emptyList();
configStorage.putTaskConfigs("connector1", taskConfigs); configStorage.putTaskConfigs("connector1", taskConfigs);

View File

@ -325,7 +325,7 @@ class Partition(val topic: String,
} }
/** /**
* Update the the follower's state in the leader based on the last fetch request. See * Update the follower's state in the leader based on the last fetch request. See
* [[kafka.cluster.Replica#updateLogReadResult]] for details. * [[kafka.cluster.Replica#updateLogReadResult]] for details.
* *
* @return true if the leader's log start offset or high watermark have been updated * @return true if the leader's log start offset or high watermark have been updated

View File

@ -380,7 +380,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
pendingOffsets.foreach { case (topicPartition, commitRecordMetadataAndOffset) => pendingOffsets.foreach { case (topicPartition, commitRecordMetadataAndOffset) =>
if (commitRecordMetadataAndOffset.appendedBatchOffset.isEmpty) if (commitRecordMetadataAndOffset.appendedBatchOffset.isEmpty)
throw new IllegalStateException(s"Trying to complete a transactional offset commit for producerId $producerId " + throw new IllegalStateException(s"Trying to complete a transactional offset commit for producerId $producerId " +
s"and groupId $groupId even though the the offset commit record itself hasn't been appended to the log.") s"and groupId $groupId even though the offset commit record itself hasn't been appended to the log.")
val currentOffsetOpt = offsets.get(topicPartition) val currentOffsetOpt = offsets.get(topicPartition)
if (currentOffsetOpt.forall(_.olderThan(commitRecordMetadataAndOffset))) { if (currentOffsetOpt.forall(_.olderThan(commitRecordMetadataAndOffset))) {
@ -405,6 +405,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
def removeOffsets(topicPartitions: Seq[TopicPartition]): immutable.Map[TopicPartition, OffsetAndMetadata] = { def removeOffsets(topicPartitions: Seq[TopicPartition]): immutable.Map[TopicPartition, OffsetAndMetadata] = {
topicPartitions.flatMap { topicPartition => topicPartitions.flatMap { topicPartition =>
pendingOffsetCommits.remove(topicPartition) pendingOffsetCommits.remove(topicPartition)
pendingTransactionalOffsetCommits.foreach { case (_, pendingOffsets) => pendingTransactionalOffsetCommits.foreach { case (_, pendingOffsets) =>
pendingOffsets.remove(topicPartition) pendingOffsets.remove(topicPartition)

View File

@ -601,7 +601,7 @@ class TransactionStateManager(brokerId: Int,
val append: Boolean = metadata.inLock { val append: Boolean = metadata.inLock {
if (epochAndMetadata.coordinatorEpoch != coordinatorEpoch) { if (epochAndMetadata.coordinatorEpoch != coordinatorEpoch) {
// the coordinator epoch has changed, reply to client immediately with with NOT_COORDINATOR // the coordinator epoch has changed, reply to client immediately with NOT_COORDINATOR
responseCallback(Errors.NOT_COORDINATOR) responseCallback(Errors.NOT_COORDINATOR)
false false
} else { } else {

View File

@ -62,7 +62,7 @@ class DelayedDeleteRecords(delayMs: Long,
/** /**
* The delayed delete records operation can be completed if every partition specified in the request satisfied one of the following: * The delayed delete records operation can be completed if every partition specified in the request satisfied one of the following:
* *
* 1) There was an error while checking if all replicas have caught up to to the deleteRecordsOffset: set an error in response * 1) There was an error while checking if all replicas have caught up to the deleteRecordsOffset: set an error in response
* 2) The low watermark of the partition has caught up to the deleteRecordsOffset. set the low watermark in response * 2) The low watermark of the partition has caught up to the deleteRecordsOffset. set the low watermark in response
* *
*/ */

View File

@ -67,7 +67,7 @@ object JmxTool extends Logging {
.describedAs("format") .describedAs("format")
.ofType(classOf[String]) .ofType(classOf[String])
val jmxServiceUrlOpt = val jmxServiceUrlOpt =
parser.accepts("jmx-url", "The url to connect to to poll JMX data. See Oracle javadoc for JMXServiceURL for details.") parser.accepts("jmx-url", "The url to connect to poll JMX data. See Oracle javadoc for JMXServiceURL for details.")
.withRequiredArg .withRequiredArg
.describedAs("service-url") .describedAs("service-url")
.ofType(classOf[String]) .ofType(classOf[String])

View File

@ -60,7 +60,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
private val topic1 = "topic-1" private val topic1 = "topic-1"
/** /**
* With replication, producer should able able to find new leader after it detects broker failure * With replication, producer should able to find new leader after it detects broker failure
*/ */
@Ignore // To be re-enabled once we can make it less flaky (KAFKA-2837) @Ignore // To be re-enabled once we can make it less flaky (KAFKA-2837)
@Test @Test

View File

@ -141,7 +141,7 @@ class DeleteConsumerGroupTest extends KafkaServerTestHarness {
AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topicToDelete) AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkUtils, topicToDelete)
TestUtils.waitUntilTrue(() => !groupTopicDirsForTopicToDelete.exists(groupTopicOffsetAndOwnerDirsExist), TestUtils.waitUntilTrue(() => !groupTopicDirsForTopicToDelete.exists(groupTopicOffsetAndOwnerDirsExist),
"Consumer group info on deleted topic topic should be deleted by DeleteAllConsumerGroupInfoForTopicInZK") "Consumer group info on deleted topic should be deleted by DeleteAllConsumerGroupInfoForTopicInZK")
TestUtils.waitUntilTrue(() => groupTopicDirsForOtherTopic.forall(groupTopicOffsetAndOwnerDirsExist), TestUtils.waitUntilTrue(() => groupTopicDirsForOtherTopic.forall(groupTopicOffsetAndOwnerDirsExist),
"Consumer group info on unrelated topics should not be deleted by DeleteAllConsumerGroupInfoForTopicInZK") "Consumer group info on unrelated topics should not be deleted by DeleteAllConsumerGroupInfoForTopicInZK")
} }

View File

@ -211,7 +211,7 @@ class LogManagerTest {
log.appendAsLeader(set, leaderEpoch = 0) log.appendAsLeader(set, leaderEpoch = 0)
} }
time.sleep(logManager.InitialTaskDelayMs) time.sleep(logManager.InitialTaskDelayMs)
assertTrue("Time based flush should have been triggered triggered", lastFlush != log.lastFlushTime) assertTrue("Time based flush should have been triggered", lastFlush != log.lastFlushTime)
} }
/** /**

View File

@ -328,7 +328,7 @@ class SocketServerTest extends JUnitSuite {
TestUtils.waitUntilTrue(() => openChannel.isEmpty, "Idle channel not closed") TestUtils.waitUntilTrue(() => openChannel.isEmpty, "Idle channel not closed")
TestUtils.waitUntilTrue(() => openOrClosingChannel.isDefined, "Channel removed without processing staged receives") TestUtils.waitUntilTrue(() => openOrClosingChannel.isDefined, "Channel removed without processing staged receives")
// Create new connection with same id when when `channel1` is in Selector.closingChannels // Create new connection with same id when `channel1` is in Selector.closingChannels
// Check that new connection is closed and openOrClosingChannel still contains `channel1` // Check that new connection is closed and openOrClosingChannel still contains `channel1`
connectAndWaitForConnectionRegister() connectAndWaitForConnectionRegister()
TestUtils.waitUntilTrue(() => connectionCount == 1, "Failed to close channel") TestUtils.waitUntilTrue(() => connectionCount == 1, "Failed to close channel")

View File

@ -1431,7 +1431,7 @@ object TestUtils extends Logging {
private def asBytes(string: String) = string.getBytes(StandardCharsets.UTF_8) private def asBytes(string: String) = string.getBytes(StandardCharsets.UTF_8)
// Verifies that the record was intended to be committed by checking the the headers for an expected transaction status // Verifies that the record was intended to be committed by checking the headers for an expected transaction status
// If true, this will return the value as a string. It is expected that the record in question should have been created // If true, this will return the value as a string. It is expected that the record in question should have been created
// by the `producerRecordWithExpectedTransactionStatus` method. // by the `producerRecordWithExpectedTransactionStatus` method.
def assertCommittedAndGetValue(record: ConsumerRecord[Array[Byte], Array[Byte]]) : String = { def assertCommittedAndGetValue(record: ConsumerRecord[Array[Byte], Array[Byte]]) : String = {

View File

@ -329,7 +329,7 @@
} }
</pre> </pre>
<p>These are slightly simplified versions, but show that that these methods should be relatively simple and the only work they should perform is allocating or freeing resources. There are two points to note about this implementation. First, the <code>start()</code> method does not yet handle resuming from a previous offset, which will be addressed in a later section. Second, the <code>stop()</code> method is synchronized. This will be necessary because <code>SourceTasks</code> are given a dedicated thread which they can block indefinitely, so they need to be stopped with a call from a different thread in the Worker.</p> <p>These are slightly simplified versions, but show that these methods should be relatively simple and the only work they should perform is allocating or freeing resources. There are two points to note about this implementation. First, the <code>start()</code> method does not yet handle resuming from a previous offset, which will be addressed in a later section. Second, the <code>stop()</code> method is synchronized. This will be necessary because <code>SourceTasks</code> are given a dedicated thread which they can block indefinitely, so they need to be stopped with a call from a different thread in the Worker.</p>
<p>Next, we implement the main functionality of the task, the <code>poll()</code> method which gets events from the input system and returns a <code>List&lt;SourceRecord&gt;</code>:</p> <p>Next, we implement the main functionality of the task, the <code>poll()</code> method which gets events from the input system and returns a <code>List&lt;SourceRecord&gt;</code>:</p>

View File

@ -15,5 +15,5 @@
limitations under the License. limitations under the License.
--> -->
<!-- should always link the the latest release's documentation --> <!-- should always link the latest release's documentation -->
<!--#include virtual="../../streams/architecture.html" --> <!--#include virtual="../../streams/architecture.html" -->

View File

@ -15,5 +15,5 @@
limitations under the License. limitations under the License.
--> -->
<!-- should always link the the latest release's documentation --> <!-- should always link the latest release's documentation -->
<!--#include virtual="../../streams/core-concepts.html" --> <!--#include virtual="../../streams/core-concepts.html" -->

View File

@ -15,5 +15,5 @@
limitations under the License. limitations under the License.
--> -->
<!-- should always link the the latest release's documentation --> <!-- should always link the latest release's documentation -->
<!--#include virtual="../../streams/developer-guide.html" --> <!--#include virtual="../../streams/developer-guide.html" -->

View File

@ -15,5 +15,5 @@
limitations under the License. limitations under the License.
--> -->
<!-- should always link the the latest release's documentation --> <!-- should always link the latest release's documentation -->
<!--#include virtual="../../streams/index.html" --> <!--#include virtual="../../streams/index.html" -->

View File

@ -15,5 +15,5 @@
limitations under the License. limitations under the License.
--> -->
<!-- should always link the the latest release's documentation --> <!-- should always link the latest release's documentation -->
<!--#include virtual="../../streams/quickstart.html" --> <!--#include virtual="../../streams/quickstart.html" -->

View File

@ -15,5 +15,5 @@
limitations under the License. limitations under the License.
--> -->
<!-- should always link the the latest release's documentation --> <!-- should always link the latest release's documentation -->
<!--#include virtual="../../streams/tutorial.html" --> <!--#include virtual="../../streams/tutorial.html" -->

View File

@ -15,5 +15,5 @@
limitations under the License. limitations under the License.
--> -->
<!-- should always link the the latest release's documentation --> <!-- should always link the latest release's documentation -->
<!--#include virtual="../../streams/upgrade-guide.html" --> <!--#include virtual="../../streams/upgrade-guide.html" -->

View File

@ -99,7 +99,7 @@
headerValueLength: varint headerValueLength: varint
Value: byte[] Value: byte[]
</pre></p> </pre></p>
<p>We use the the same varint encoding as Protobuf. More information on the latter can be found <a href="https://developers.google.com/protocol-buffers/docs/encoding#varints">here</a>. The count of headers in a record <p>We use the same varint encoding as Protobuf. More information on the latter can be found <a href="https://developers.google.com/protocol-buffers/docs/encoding#varints">here</a>. The count of headers in a record
is also encoded as a varint.</p> is also encoded as a varint.</p>
<h3><a id="log" href="#log">5.4 Log</a></h3> <h3><a id="log" href="#log">5.4 Log</a></h3>

View File

@ -237,7 +237,7 @@ public class StreamsBuilder {
* If this is not the case the returned {@link KTable} will be corrupted. * If this is not the case the returned {@link KTable} will be corrupted.
* <p> * <p>
* The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
* store name. Note that that store name may not be queriable through Interactive Queries. * store name. Note that store name may not be queriable through Interactive Queries.
* No internal changelog topic is created since the original input topic can be used for recovery (cf. * No internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* *
@ -258,7 +258,7 @@ public class StreamsBuilder {
* If this is not the case the returned {@link KTable} will be corrupted. * If this is not the case the returned {@link KTable} will be corrupted.
* <p> * <p>
* The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
* store name. Note that that store name may not be queriable through Interactive Queries. * store name. Note that store name may not be queriable through Interactive Queries.
* No internal changelog topic is created since the original input topic can be used for recovery (cf. * No internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* *
@ -312,7 +312,7 @@ public class StreamsBuilder {
* Input {@link KeyValue records} with {@code null} key will be dropped. * Input {@link KeyValue records} with {@code null} key will be dropped.
* <p> * <p>
* The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
* store name. Note that that store name may not be queriable through Interactive Queries. * store name. Note that store name may not be queriable through Interactive Queries.
* No internal changelog topic is created since the original input topic can be used for recovery (cf. * No internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* <p> * <p>
@ -343,7 +343,7 @@ public class StreamsBuilder {
* Input {@link KeyValue records} with {@code null} key will be dropped. * Input {@link KeyValue records} with {@code null} key will be dropped.
* <p> * <p>
* The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
* store name. Note that that store name may not be queriable through Interactive Queries. * store name. Note that store name may not be queriable through Interactive Queries.
* No internal changelog topic is created since the original input topic can be used for recovery (cf. * No internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* <p> * <p>

View File

@ -444,7 +444,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
* If this is not the case the returned {@link KTable} will be corrupted. * If this is not the case the returned {@link KTable} will be corrupted.
* <p> * <p>
* The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
* store name. Note that that store name may not be queriable through Interactive Queries. * store name. Note that store name may not be queriable through Interactive Queries.
* No internal changelog topic is created since the original input topic can be used for recovery (cf. * No internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* @param topic the topic name; cannot be {@code null} * @param topic the topic name; cannot be {@code null}
@ -537,7 +537,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
* If this is not the case the returned {@link KTable} will be corrupted. * If this is not the case the returned {@link KTable} will be corrupted.
* <p> * <p>
* The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
* store name. Note that that store name may not be queriable through Interactive Queries. * store name. Note that store name may not be queriable through Interactive Queries.
* No internal changelog topic is created since the original input topic can be used for recovery (cf. * No internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* <p> * <p>
@ -714,7 +714,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
* If this is not the case the returned {@link KTable} will be corrupted. * If this is not the case the returned {@link KTable} will be corrupted.
* <p> * <p>
* The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
* store name. Note that that store name may not be queriable through Interactive Queries. * store name. Note that store name may not be queriable through Interactive Queries.
* No internal changelog topic is created since the original input topic can be used for recovery (cf. * No internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* <p> * <p>
@ -908,7 +908,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
* If this is not the case the returned {@link KTable} will be corrupted. * If this is not the case the returned {@link KTable} will be corrupted.
* <p> * <p>
* The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
* store name. Note that that store name may not be queriable through Interactive Queries. * store name. Note that store name may not be queriable through Interactive Queries.
* No internal changelog topic is created since the original input topic can be used for recovery (cf. * No internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* <p> * <p>
@ -1007,7 +1007,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
* Input {@link KeyValue records} with {@code null} key will be dropped. * Input {@link KeyValue records} with {@code null} key will be dropped.
* <p> * <p>
* The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
* store name. Note that that store name may not be queriable through Interactive Queries. * store name. Note that store name may not be queriable through Interactive Queries.
* No internal changelog topic is created since the original input topic can be used for recovery (cf. * No internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* <p> * <p>
@ -1196,7 +1196,7 @@ public class KStreamBuilder extends org.apache.kafka.streams.processor.TopologyB
* Input {@link KeyValue records} with {@code null} key will be dropped. * Input {@link KeyValue records} with {@code null} key will be dropped.
* <p> * <p>
* The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
* store name. Note that that store name may not be queriable through Interactive Queries. * store name. Note that store name may not be queriable through Interactive Queries.
* No internal changelog topic is created since the original input topic can be used for recovery (cf. * No internal changelog topic is created since the original input topic can be used for recovery (cf.
* methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}). * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
* <p> * <p>

View File

@ -74,7 +74,7 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
T newAgg = oldAgg; T newAgg = oldAgg;
// try to add the new new value // try to add the new value
if (value != null) { if (value != null) {
newAgg = aggregator.apply(key, value, newAgg); newAgg = aggregator.apply(key, value, newAgg);
} }

View File

@ -68,7 +68,7 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
V oldAgg = store.get(key); V oldAgg = store.get(key);
V newAgg = oldAgg; V newAgg = oldAgg;
// try to add the new new value // try to add the new value
if (value != null) { if (value != null) {
if (newAgg == null) { if (newAgg == null) {
newAgg = value; newAgg = value;

View File

@ -104,7 +104,7 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
if (oldAgg == null) if (oldAgg == null)
oldAgg = initializer.apply(); oldAgg = initializer.apply();
// try to add the new new value (there will never be old value) // try to add the new value (there will never be old value)
T newAgg = aggregator.apply(key, value, oldAgg); T newAgg = aggregator.apply(key, value, oldAgg);
// update the store with the new value // update the store with the new value

View File

@ -98,7 +98,7 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
V oldAgg = entry.value; V oldAgg = entry.value;
V newAgg = oldAgg; V newAgg = oldAgg;
// try to add the new new value (there will never be old value) // try to add the new value (there will never be old value)
if (newAgg == null) { if (newAgg == null) {
newAgg = value; newAgg = value;
} else { } else {

View File

@ -84,7 +84,7 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
newAgg = remove.apply(key, value.oldValue, newAgg); newAgg = remove.apply(key, value.oldValue, newAgg);
} }
// then try to add the new new value // then try to add the new value
if (value.newValue != null) { if (value.newValue != null) {
newAgg = add.apply(key, value.newValue, newAgg); newAgg = add.apply(key, value.newValue, newAgg);
} }

View File

@ -72,7 +72,7 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
V oldAgg = store.get(key); V oldAgg = store.get(key);
V newAgg = oldAgg; V newAgg = oldAgg;
// first try to add the new new value // first try to add the new value
if (value.newValue != null) { if (value.newValue != null) {
if (newAgg == null) { if (newAgg == null) {
newAgg = value.newValue; newAgg = value.newValue;

View File

@ -469,7 +469,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
transactionInFlight = false; transactionInFlight = false;
} catch (final ProducerFencedException ignore) { } catch (final ProducerFencedException ignore) {
/* TODO /* TODO
* this should actually never happen atm as we we guard the call to #abortTransaction * this should actually never happen atm as we guard the call to #abortTransaction
* -> the reason for the guard is a "bug" in the Producer -- it throws IllegalStateException * -> the reason for the guard is a "bug" in the Producer -- it throws IllegalStateException
* instead of ProducerFencedException atm. We can remove the isZombie flag after KAFKA-5604 got * instead of ProducerFencedException atm. We can remove the isZombie flag after KAFKA-5604 got
* fixed and fall-back to this catch-and-swallow code * fixed and fall-back to this catch-and-swallow code

View File

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# pytest configuration (can also be defined in in tox.ini or pytest.ini file) # pytest configuration (can also be defined in tox.ini or pytest.ini file)
# #
# This file defines naming convention and root search directory for autodiscovery of # This file defines naming convention and root search directory for autodiscovery of
# pytest unit tests for the system test service classes. # pytest unit tests for the system test service classes.

View File

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# pytest configuration (can also be defined in in tox.ini or pytest.ini file) # pytest configuration (can also be defined in tox.ini or pytest.ini file)
# #
# To ease possible confusion, prefix muckrake *unit* tests with 'check' instead of 'test', since # To ease possible confusion, prefix muckrake *unit* tests with 'check' instead of 'test', since
# many muckrake files, classes, and methods have 'test' somewhere in the name # many muckrake files, classes, and methods have 'test' somewhere in the name

View File

@ -289,7 +289,7 @@ public class ProducerPerformance {
.metavar("TRANSACTION-DURATION") .metavar("TRANSACTION-DURATION")
.dest("transactionDurationMs") .dest("transactionDurationMs")
.setDefault(0L) .setDefault(0L)
.help("The max age of each transaction. The commitTransaction will be called after this this time has elapsed. Transactions are only enabled if this value is positive."); .help("The max age of each transaction. The commitTransaction will be called after this time has elapsed. Transactions are only enabled if this value is positive.");
return parser; return parser;