mirror of https://github.com/apache/kafka.git
KAFKA-3548: Use root locale for case transformation of constant strings
For enums and other constant strings, use locale independent case conversions to enable comparisons to work regardless of the default locale. Author: Rajini Sivaram <rajinisivaram@googlemail.com> Reviewers: Manikumar Reddy, Ismael Juma, Guozhang Wang, Gwen Shapira Closes #1220 from rajinisivaram/KAFKA-3548
This commit is contained in:
parent
f213625fc5
commit
9d71489ff0
|
@ -84,5 +84,12 @@
|
||||||
<module name="MethodParamPad"/>
|
<module name="MethodParamPad"/>
|
||||||
<module name="ParenPad"/>
|
<module name="ParenPad"/>
|
||||||
<module name="TypecastParenPad"/>
|
<module name="TypecastParenPad"/>
|
||||||
|
|
||||||
|
<!-- locale-sensitive methods should specify locale -->
|
||||||
|
<module name="Regexp">
|
||||||
|
<property name="format" value="\.to(Lower|Upper)Case\(\)"/>
|
||||||
|
<property name="illegalPattern" value="true"/>
|
||||||
|
<property name="ignoreComments" value="true"/>
|
||||||
|
</module>
|
||||||
</module>
|
</module>
|
||||||
</module>
|
</module>
|
||||||
|
|
|
@ -51,6 +51,7 @@ import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -586,7 +587,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
|
||||||
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), time);
|
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), time);
|
||||||
this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
|
this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
|
||||||
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
|
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
|
||||||
OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase());
|
OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
|
||||||
this.subscriptions = new SubscriptionState(offsetResetStrategy);
|
this.subscriptions = new SubscriptionState(offsetResetStrategy);
|
||||||
List<PartitionAssignor> assignors = config.getConfiguredInstances(
|
List<PartitionAssignor> assignors = config.getConfiguredInstances(
|
||||||
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
|
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
|
||||||
|
|
|
@ -61,6 +61,7 @@ import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
@ -291,7 +292,7 @@ public class Fetcher<K, V> {
|
||||||
else
|
else
|
||||||
throw new NoOffsetForPartitionException(partition);
|
throw new NoOffsetForPartitionException(partition);
|
||||||
|
|
||||||
log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase());
|
log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase(Locale.ROOT));
|
||||||
long offset = listOffset(partition, timestamp);
|
long offset = listOffset(partition, timestamp);
|
||||||
|
|
||||||
// we might lose the assignment while fetching the offset, so check it is still active
|
// we might lose the assignment while fetching the offset, so check it is still active
|
||||||
|
|
|
@ -338,7 +338,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
|
||||||
|
|
||||||
private static int parseAcks(String acksString) {
|
private static int parseAcks(String acksString) {
|
||||||
try {
|
try {
|
||||||
return acksString.trim().toLowerCase().equals("all") ? -1 : Integer.parseInt(acksString.trim());
|
return acksString.trim().equalsIgnoreCase("all") ? -1 : Integer.parseInt(acksString.trim());
|
||||||
} catch (NumberFormatException e) {
|
} catch (NumberFormatException e) {
|
||||||
throw new ConfigException("Invalid configuration value for 'acks': " + acksString);
|
throw new ConfigException("Invalid configuration value for 'acks': " + acksString);
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@ import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
@ -891,7 +892,7 @@ public class ConfigDef {
|
||||||
b.append(def.documentation);
|
b.append(def.documentation);
|
||||||
b.append("</td>");
|
b.append("</td>");
|
||||||
b.append("<td>");
|
b.append("<td>");
|
||||||
b.append(def.type.toString().toLowerCase());
|
b.append(def.type.toString().toLowerCase(Locale.ROOT));
|
||||||
b.append("</td>");
|
b.append("</td>");
|
||||||
b.append("<td>");
|
b.append("<td>");
|
||||||
if (def.hasDefault()) {
|
if (def.hasDefault()) {
|
||||||
|
@ -908,7 +909,7 @@ public class ConfigDef {
|
||||||
b.append(def.validator != null ? def.validator.toString() : "");
|
b.append(def.validator != null ? def.validator.toString() : "");
|
||||||
b.append("</td>");
|
b.append("</td>");
|
||||||
b.append("<td>");
|
b.append("<td>");
|
||||||
b.append(def.importance.toString().toLowerCase());
|
b.append(def.importance.toString().toLowerCase(Locale.ROOT));
|
||||||
b.append("</td>");
|
b.append("</td>");
|
||||||
b.append("</tr>\n");
|
b.append("</tr>\n");
|
||||||
}
|
}
|
||||||
|
@ -937,7 +938,7 @@ public class ConfigDef {
|
||||||
b.append("\n\n");
|
b.append("\n\n");
|
||||||
}
|
}
|
||||||
b.append(" * Type: ");
|
b.append(" * Type: ");
|
||||||
b.append(def.type.toString().toLowerCase());
|
b.append(def.type.toString().toLowerCase(Locale.ROOT));
|
||||||
b.append("\n");
|
b.append("\n");
|
||||||
if (def.defaultValue != null) {
|
if (def.defaultValue != null) {
|
||||||
b.append(" * Default: ");
|
b.append(" * Default: ");
|
||||||
|
@ -951,7 +952,7 @@ public class ConfigDef {
|
||||||
b.append("\n");
|
b.append("\n");
|
||||||
}
|
}
|
||||||
b.append(" * Importance: ");
|
b.append(" * Importance: ");
|
||||||
b.append(def.importance.toString().toLowerCase());
|
b.append(def.importance.toString().toLowerCase(Locale.ROOT));
|
||||||
b.append("\n\n");
|
b.append("\n\n");
|
||||||
}
|
}
|
||||||
return b.toString();
|
return b.toString();
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
package org.apache.kafka.common.metrics.stats;
|
package org.apache.kafka.common.metrics.stats;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.kafka.common.metrics.MeasurableStat;
|
import org.apache.kafka.common.metrics.MeasurableStat;
|
||||||
|
@ -48,7 +49,7 @@ public class Rate implements MeasurableStat {
|
||||||
}
|
}
|
||||||
|
|
||||||
public String unitName() {
|
public String unitName() {
|
||||||
return unit.name().substring(0, unit.name().length() - 2).toLowerCase();
|
return unit.name().substring(0, unit.name().length() - 2).toLowerCase(Locale.ROOT);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.util.Collections;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
@ -85,7 +86,7 @@ public enum SecurityProtocol {
|
||||||
|
|
||||||
/** Case insensitive lookup by protocol name */
|
/** Case insensitive lookup by protocol name */
|
||||||
public static SecurityProtocol forName(String name) {
|
public static SecurityProtocol forName(String name) {
|
||||||
return SecurityProtocol.valueOf(name.toUpperCase());
|
return SecurityProtocol.valueOf(name.toUpperCase(Locale.ROOT));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.kafka.common.config.ConfigException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
@ -59,7 +60,7 @@ public class MockConsumerInterceptor implements ConsumerInterceptor<String, Stri
|
||||||
record.timestamp(), record.timestampType(),
|
record.timestamp(), record.timestampType(),
|
||||||
record.checksum(), record.serializedKeySize(),
|
record.checksum(), record.serializedKeySize(),
|
||||||
record.serializedValueSize(),
|
record.serializedValueSize(),
|
||||||
record.key(), record.value().toUpperCase()));
|
record.key(), record.value().toUpperCase(Locale.ROOT)));
|
||||||
}
|
}
|
||||||
recordMap.put(tp, lst);
|
recordMap.put(tp, lst);
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.kafka.connect.data;
|
package org.apache.kafka.connect.data;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -48,7 +49,7 @@ public interface Schema {
|
||||||
private String name;
|
private String name;
|
||||||
|
|
||||||
Type() {
|
Type() {
|
||||||
this.name = this.name().toLowerCase();
|
this.name = this.name().toLowerCase(Locale.ROOT);
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getName() {
|
public String getName() {
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.util.Properties
|
||||||
import kafka.api.OffsetRequest
|
import kafka.api.OffsetRequest
|
||||||
import kafka.utils._
|
import kafka.utils._
|
||||||
import kafka.common.{InvalidConfigException, Config}
|
import kafka.common.{InvalidConfigException, Config}
|
||||||
|
import java.util.Locale
|
||||||
|
|
||||||
object ConsumerConfig extends Config {
|
object ConsumerConfig extends Config {
|
||||||
val RefreshMetadataBackoffMs = 200
|
val RefreshMetadataBackoffMs = 200
|
||||||
|
@ -163,7 +164,7 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
|
||||||
val offsetsCommitMaxRetries = props.getInt("offsets.commit.max.retries", OffsetsCommitMaxRetries)
|
val offsetsCommitMaxRetries = props.getInt("offsets.commit.max.retries", OffsetsCommitMaxRetries)
|
||||||
|
|
||||||
/** Specify whether offsets should be committed to "zookeeper" (default) or "kafka" */
|
/** Specify whether offsets should be committed to "zookeeper" (default) or "kafka" */
|
||||||
val offsetsStorage = props.getString("offsets.storage", OffsetsStorage).toLowerCase
|
val offsetsStorage = props.getString("offsets.storage", OffsetsStorage).toLowerCase(Locale.ROOT)
|
||||||
|
|
||||||
/** If you are using "kafka" as offsets.storage, you can dual commit offsets to ZooKeeper (in addition to Kafka). This
|
/** If you are using "kafka" as offsets.storage, you can dual commit offsets to ZooKeeper (in addition to Kafka). This
|
||||||
* is required during migration from zookeeper-based offset storage to kafka-based offset storage. With respect to any
|
* is required during migration from zookeeper-based offset storage to kafka-based offset storage. With respect to any
|
||||||
|
|
|
@ -27,6 +27,7 @@ import kafka.server.KafkaConfig
|
||||||
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
|
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
|
||||||
import org.apache.kafka.common.record.TimestampType
|
import org.apache.kafka.common.record.TimestampType
|
||||||
import org.apache.kafka.common.utils.Utils
|
import org.apache.kafka.common.utils.Utils
|
||||||
|
import java.util.Locale
|
||||||
|
|
||||||
object Defaults {
|
object Defaults {
|
||||||
val SegmentSize = kafka.server.Defaults.LogSegmentBytes
|
val SegmentSize = kafka.server.Defaults.LogSegmentBytes
|
||||||
|
@ -70,10 +71,10 @@ case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfi
|
||||||
val fileDeleteDelayMs = getLong(LogConfig.FileDeleteDelayMsProp)
|
val fileDeleteDelayMs = getLong(LogConfig.FileDeleteDelayMsProp)
|
||||||
val deleteRetentionMs = getLong(LogConfig.DeleteRetentionMsProp)
|
val deleteRetentionMs = getLong(LogConfig.DeleteRetentionMsProp)
|
||||||
val minCleanableRatio = getDouble(LogConfig.MinCleanableDirtyRatioProp)
|
val minCleanableRatio = getDouble(LogConfig.MinCleanableDirtyRatioProp)
|
||||||
val compact = getString(LogConfig.CleanupPolicyProp).toLowerCase != LogConfig.Delete
|
val compact = getString(LogConfig.CleanupPolicyProp).toLowerCase(Locale.ROOT) != LogConfig.Delete
|
||||||
val uncleanLeaderElectionEnable = getBoolean(LogConfig.UncleanLeaderElectionEnableProp)
|
val uncleanLeaderElectionEnable = getBoolean(LogConfig.UncleanLeaderElectionEnableProp)
|
||||||
val minInSyncReplicas = getInt(LogConfig.MinInSyncReplicasProp)
|
val minInSyncReplicas = getInt(LogConfig.MinInSyncReplicasProp)
|
||||||
val compressionType = getString(LogConfig.CompressionTypeProp).toLowerCase
|
val compressionType = getString(LogConfig.CompressionTypeProp).toLowerCase(Locale.ROOT)
|
||||||
val preallocate = getBoolean(LogConfig.PreAllocateEnableProp)
|
val preallocate = getBoolean(LogConfig.PreAllocateEnableProp)
|
||||||
val messageFormatVersion = ApiVersion(getString(LogConfig.MessageFormatVersionProp))
|
val messageFormatVersion = ApiVersion(getString(LogConfig.MessageFormatVersionProp))
|
||||||
val messageTimestampType = TimestampType.forName(getString(LogConfig.MessageTimestampTypeProp))
|
val messageTimestampType = TimestampType.forName(getString(LogConfig.MessageTimestampTypeProp))
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
|
|
||||||
package kafka.message
|
package kafka.message
|
||||||
|
|
||||||
|
import java.util.Locale
|
||||||
|
|
||||||
object CompressionCodec {
|
object CompressionCodec {
|
||||||
def getCompressionCodec(codec: Int): CompressionCodec = {
|
def getCompressionCodec(codec: Int): CompressionCodec = {
|
||||||
codec match {
|
codec match {
|
||||||
|
@ -28,7 +30,7 @@ object CompressionCodec {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
def getCompressionCodec(name: String): CompressionCodec = {
|
def getCompressionCodec(name: String): CompressionCodec = {
|
||||||
name.toLowerCase match {
|
name.toLowerCase(Locale.ROOT) match {
|
||||||
case NoCompressionCodec.name => NoCompressionCodec
|
case NoCompressionCodec.name => NoCompressionCodec
|
||||||
case GZIPCompressionCodec.name => GZIPCompressionCodec
|
case GZIPCompressionCodec.name => GZIPCompressionCodec
|
||||||
case SnappyCompressionCodec.name => SnappyCompressionCodec
|
case SnappyCompressionCodec.name => SnappyCompressionCodec
|
||||||
|
@ -43,10 +45,10 @@ object BrokerCompressionCodec {
|
||||||
val brokerCompressionCodecs = List(UncompressedCodec, SnappyCompressionCodec, LZ4CompressionCodec, GZIPCompressionCodec, ProducerCompressionCodec)
|
val brokerCompressionCodecs = List(UncompressedCodec, SnappyCompressionCodec, LZ4CompressionCodec, GZIPCompressionCodec, ProducerCompressionCodec)
|
||||||
val brokerCompressionOptions = brokerCompressionCodecs.map(codec => codec.name)
|
val brokerCompressionOptions = brokerCompressionCodecs.map(codec => codec.name)
|
||||||
|
|
||||||
def isValid(compressionType: String): Boolean = brokerCompressionOptions.contains(compressionType.toLowerCase())
|
def isValid(compressionType: String): Boolean = brokerCompressionOptions.contains(compressionType.toLowerCase(Locale.ROOT))
|
||||||
|
|
||||||
def getCompressionCodec(compressionType: String): CompressionCodec = {
|
def getCompressionCodec(compressionType: String): CompressionCodec = {
|
||||||
compressionType.toLowerCase match {
|
compressionType.toLowerCase(Locale.ROOT) match {
|
||||||
case UncompressedCodec.name => NoCompressionCodec
|
case UncompressedCodec.name => NoCompressionCodec
|
||||||
case _ => CompressionCodec.getCompressionCodec(compressionType)
|
case _ => CompressionCodec.getCompressionCodec(compressionType)
|
||||||
}
|
}
|
||||||
|
|
|
@ -349,9 +349,9 @@ class DefaultMessageFormatter extends MessageFormatter {
|
||||||
|
|
||||||
override def init(props: Properties) {
|
override def init(props: Properties) {
|
||||||
if (props.containsKey("print.timestamp"))
|
if (props.containsKey("print.timestamp"))
|
||||||
printTimestamp = props.getProperty("print.timestamp").trim.toLowerCase.equals("true")
|
printTimestamp = props.getProperty("print.timestamp").trim.equalsIgnoreCase("true")
|
||||||
if (props.containsKey("print.key"))
|
if (props.containsKey("print.key"))
|
||||||
printKey = props.getProperty("print.key").trim.toLowerCase.equals("true")
|
printKey = props.getProperty("print.key").trim.equalsIgnoreCase("true")
|
||||||
if (props.containsKey("key.separator"))
|
if (props.containsKey("key.separator"))
|
||||||
keySeparator = props.getProperty("key.separator").getBytes
|
keySeparator = props.getProperty("key.separator").getBytes
|
||||||
if (props.containsKey("line.separator"))
|
if (props.containsKey("line.separator"))
|
||||||
|
|
|
@ -295,11 +295,11 @@ object ConsoleProducer {
|
||||||
override def init(inputStream: InputStream, props: Properties) {
|
override def init(inputStream: InputStream, props: Properties) {
|
||||||
topic = props.getProperty("topic")
|
topic = props.getProperty("topic")
|
||||||
if (props.containsKey("parse.key"))
|
if (props.containsKey("parse.key"))
|
||||||
parseKey = props.getProperty("parse.key").trim.toLowerCase.equals("true")
|
parseKey = props.getProperty("parse.key").trim.equalsIgnoreCase("true")
|
||||||
if (props.containsKey("key.separator"))
|
if (props.containsKey("key.separator"))
|
||||||
keySeparator = props.getProperty("key.separator")
|
keySeparator = props.getProperty("key.separator")
|
||||||
if (props.containsKey("ignore.error"))
|
if (props.containsKey("ignore.error"))
|
||||||
ignoreError = props.getProperty("ignore.error").trim.toLowerCase.equals("true")
|
ignoreError = props.getProperty("ignore.error").trim.equalsIgnoreCase("true")
|
||||||
reader = new BufferedReader(new InputStreamReader(inputStream))
|
reader = new BufferedReader(new InputStreamReader(inputStream))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,7 @@ package kafka.utils
|
||||||
|
|
||||||
import org.apache.log4j.{Logger, Level, LogManager}
|
import org.apache.log4j.{Logger, Level, LogManager}
|
||||||
import java.util
|
import java.util
|
||||||
|
import java.util.Locale
|
||||||
|
|
||||||
|
|
||||||
object Log4jController {
|
object Log4jController {
|
||||||
|
@ -81,7 +82,7 @@ private class Log4jController extends Log4jControllerMBean {
|
||||||
def setLogLevel(loggerName: String, level: String) = {
|
def setLogLevel(loggerName: String, level: String) = {
|
||||||
val log = newLogger(loggerName)
|
val log = newLogger(loggerName)
|
||||||
if (!loggerName.trim.isEmpty && !level.trim.isEmpty && log != null) {
|
if (!loggerName.trim.isEmpty && !level.trim.isEmpty && log != null) {
|
||||||
log.setLevel(Level.toLevel(level.toUpperCase))
|
log.setLevel(Level.toLevel(level.toUpperCase(Locale.ROOT)))
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
else false
|
else false
|
||||||
|
|
|
@ -17,7 +17,9 @@
|
||||||
|
|
||||||
package kafka.utils
|
package kafka.utils
|
||||||
|
|
||||||
|
import java.util.Locale
|
||||||
|
|
||||||
object Os {
|
object Os {
|
||||||
val name = System.getProperty("os.name").toLowerCase
|
val name = System.getProperty("os.name").toLowerCase(Locale.ROOT)
|
||||||
val isWindows = name.startsWith("windows")
|
val isWindows = name.startsWith("windows")
|
||||||
}
|
}
|
|
@ -35,6 +35,7 @@ import org.junit.Test
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
import scala.collection.mutable.Buffer
|
import scala.collection.mutable.Buffer
|
||||||
|
import java.util.Locale
|
||||||
|
|
||||||
/* We have some tests in this class instead of `BaseConsumerTest` in order to keep the build time under control. */
|
/* We have some tests in this class instead of `BaseConsumerTest` in order to keep the build time under control. */
|
||||||
class PlaintextConsumerTest extends BaseConsumerTest {
|
class PlaintextConsumerTest extends BaseConsumerTest {
|
||||||
|
@ -606,7 +607,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
|
||||||
for (i <- 0 until numRecords) {
|
for (i <- 0 until numRecords) {
|
||||||
val record = records.get(i)
|
val record = records.get(i)
|
||||||
assertEquals(s"key $i", new String(record.key()))
|
assertEquals(s"key $i", new String(record.key()))
|
||||||
assertEquals(s"value $i$appendStr".toUpperCase, new String(record.value()))
|
assertEquals(s"value $i$appendStr".toUpperCase(Locale.ROOT), new String(record.value()))
|
||||||
}
|
}
|
||||||
|
|
||||||
// commit sync and verify onCommit is called
|
// commit sync and verify onCommit is called
|
||||||
|
|
|
@ -18,7 +18,7 @@ package kafka.examples;
|
||||||
|
|
||||||
public class KafkaConsumerProducerDemo {
|
public class KafkaConsumerProducerDemo {
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
boolean isAsync = args.length == 0 || !args[0].trim().toLowerCase().equals("sync");
|
boolean isAsync = args.length == 0 || !args[0].trim().equalsIgnoreCase("sync");
|
||||||
Producer producerThread = new Producer(KafkaProperties.TOPIC, isAsync);
|
Producer producerThread = new Producer(KafkaProperties.TOPIC, isAsync);
|
||||||
producerThread.start();
|
producerThread.start();
|
||||||
|
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
|
||||||
import org.apache.kafka.streams.kstream.ValueMapper;
|
import org.apache.kafka.streams.kstream.ValueMapper;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Locale;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -63,7 +64,7 @@ public class WordCountDemo {
|
||||||
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
|
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
|
||||||
@Override
|
@Override
|
||||||
public Iterable<String> apply(String value) {
|
public Iterable<String> apply(String value) {
|
||||||
return Arrays.asList(value.toLowerCase().split(" "));
|
return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
|
||||||
}
|
}
|
||||||
}).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
|
}).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.kafka.streams.state.KeyValueIterator;
|
||||||
import org.apache.kafka.streams.state.KeyValueStore;
|
import org.apache.kafka.streams.state.KeyValueStore;
|
||||||
import org.apache.kafka.streams.state.Stores;
|
import org.apache.kafka.streams.state.Stores;
|
||||||
|
|
||||||
|
import java.util.Locale;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -63,7 +64,7 @@ public class WordCountProcessorDemo {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void process(String dummy, String line) {
|
public void process(String dummy, String line) {
|
||||||
String[] words = line.toLowerCase().split(" ");
|
String[] words = line.toLowerCase(Locale.getDefault()).split(" ");
|
||||||
|
|
||||||
for (String word : words) {
|
for (String word : words) {
|
||||||
Integer oldValue = this.kvStore.get(word);
|
Integer oldValue = this.kvStore.get(word);
|
||||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.kafka.test.MockProcessorSupplier;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Locale;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
@ -42,7 +43,7 @@ public class KStreamFlatMapValuesTest {
|
||||||
@Override
|
@Override
|
||||||
public Iterable<String> apply(String value) {
|
public Iterable<String> apply(String value) {
|
||||||
ArrayList<String> result = new ArrayList<String>();
|
ArrayList<String> result = new ArrayList<String>();
|
||||||
result.add(value.toLowerCase());
|
result.add(value.toLowerCase(Locale.ROOT));
|
||||||
result.add(value);
|
result.add(value);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.kafka.streams.KeyValue;
|
||||||
import org.apache.kafka.test.KStreamTestDriver;
|
import org.apache.kafka.test.KStreamTestDriver;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
@ -60,7 +61,7 @@ public class KStreamForeachTest {
|
||||||
new ForeachAction<Integer, String>() {
|
new ForeachAction<Integer, String>() {
|
||||||
@Override
|
@Override
|
||||||
public void apply(Integer key, String value) {
|
public void apply(Integer key, String value) {
|
||||||
actualRecords.add(new KeyValue<>(key * 2, value.toUpperCase()));
|
actualRecords.add(new KeyValue<>(key * 2, value.toUpperCase(Locale.ROOT)));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.kafka.streams.kstream.KTable;
|
||||||
import org.apache.kafka.test.KStreamTestDriver;
|
import org.apache.kafka.test.KStreamTestDriver;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
@ -60,7 +61,7 @@ public class KTableForeachTest {
|
||||||
new ForeachAction<Integer, String>() {
|
new ForeachAction<Integer, String>() {
|
||||||
@Override
|
@Override
|
||||||
public void apply(Integer key, String value) {
|
public void apply(Integer key, String value) {
|
||||||
actualRecords.add(new KeyValue<>(key * 2, value.toUpperCase()));
|
actualRecords.add(new KeyValue<>(key * 2, value.toUpperCase(Locale.ROOT)));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue