mirror of https://github.com/apache/kafka.git
MINOR: Remove usages of JavaConversions and fix some typos (#5115)
Reviewers: Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
parent
f6d7377f95
commit
0cacbcf30e
|
@ -181,7 +181,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
|
|||
}
|
||||
|
||||
/**
|
||||
* On controlled shutdown shutdown, the controller first determines the partitions that the
|
||||
* On controlled shutdown, the controller first determines the partitions that the
|
||||
* shutting down broker leads, and moves leadership of those partitions to another broker
|
||||
* that is in that partition's ISR.
|
||||
*
|
||||
|
|
|
@ -1212,7 +1212,7 @@ object GroupMetadataManager {
|
|||
}
|
||||
|
||||
/**
|
||||
* Decodes the group metadata messages' payload and retrieves its member metadatafrom it
|
||||
* Decodes the group metadata messages' payload and retrieves its member metadata from it
|
||||
*
|
||||
* @param buffer input byte-buffer
|
||||
* @return a group metadata object from the message
|
||||
|
|
|
@ -1275,7 +1275,7 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
Collections.emptyMap())
|
||||
)
|
||||
} else {
|
||||
// let the coordinator to handle join-group
|
||||
// let the coordinator handle join-group
|
||||
val protocols = joinGroupRequest.groupProtocols().asScala.map(protocol =>
|
||||
(protocol.name, Utils.toArray(protocol.metadata))).toList
|
||||
groupCoordinator.handleJoinGroup(
|
||||
|
|
|
@ -37,7 +37,6 @@ import org.apache.kafka.common.record.TimestampType
|
|||
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, Deserializer}
|
||||
import org.apache.kafka.common.utils.Utils
|
||||
|
||||
import scala.collection.JavaConversions
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
/**
|
||||
|
@ -170,7 +169,7 @@ object ConsoleConsumer extends Logging {
|
|||
def checkErr(output: PrintStream, formatter: MessageFormatter): Boolean = {
|
||||
val gotError = output.checkError()
|
||||
if (gotError) {
|
||||
// This means no one is listening to our output stream any more, time to shutdown
|
||||
// This means no one is listening to our output stream anymore, time to shutdown
|
||||
System.err.println("Unable to write to standard out, closing consumer.")
|
||||
}
|
||||
gotError
|
||||
|
@ -535,21 +534,21 @@ class DefaultMessageFormatter extends MessageFormatter {
|
|||
// Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
|
||||
if (props.containsKey("key.deserializer")) {
|
||||
keyDeserializer = Some(Class.forName(props.getProperty("key.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
|
||||
keyDeserializer.get.configure(JavaConversions.propertiesAsScalaMap(propertiesWithKeyPrefixStripped("key.deserializer.", props)).asJava, true)
|
||||
keyDeserializer.get.configure(propertiesWithKeyPrefixStripped("key.deserializer.", props).asScala.asJava, true)
|
||||
}
|
||||
// Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
|
||||
if (props.containsKey("value.deserializer")) {
|
||||
valueDeserializer = Some(Class.forName(props.getProperty("value.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
|
||||
valueDeserializer.get.configure(JavaConversions.propertiesAsScalaMap(propertiesWithKeyPrefixStripped("value.deserializer.", props)).asJava, false)
|
||||
valueDeserializer.get.configure(propertiesWithKeyPrefixStripped("value.deserializer.", props).asScala.asJava, false)
|
||||
}
|
||||
}
|
||||
|
||||
private def propertiesWithKeyPrefixStripped(prefix: String, props: Properties): Properties = {
|
||||
val newProps = new Properties()
|
||||
import scala.collection.JavaConversions._
|
||||
for ((key, value) <- props if key.startsWith(prefix) && key.length > prefix.length)
|
||||
props.asScala.foreach { case (key, value) =>
|
||||
if (key.startsWith(prefix) && key.length > prefix.length)
|
||||
newProps.put(key.substring(prefix.length), value)
|
||||
|
||||
}
|
||||
newProps
|
||||
}
|
||||
|
||||
|
|
|
@ -2515,7 +2515,7 @@ class LogTest {
|
|||
log.onHighWatermarkIncremented(log.logEndOffset)
|
||||
log.deleteOldSegments()
|
||||
|
||||
//The the first entry should have gone from (0,0) => (0,5)
|
||||
//The first entry should have gone from (0,0) => (0,5)
|
||||
assertEquals(ListBuffer(EpochEntry(0, 5), EpochEntry(1, 7), EpochEntry(2, 10)), cache.epochEntries)
|
||||
}
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartition
|
|||
import org.junit.{Before, Test}
|
||||
import org.junit.Assert._
|
||||
|
||||
import scala.collection.JavaConversions._
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
class AddPartitionsToTxnRequestTest extends BaseRequestTest {
|
||||
private val topic1 = "foobartopic"
|
||||
|
@ -69,7 +69,7 @@ class AddPartitionsToTxnRequestTest extends BaseRequestTest {
|
|||
val transactionalId = "foobar"
|
||||
val producerId = 1000L
|
||||
val producerEpoch: Short = 0
|
||||
val builder = new AddPartitionsToTxnRequest.Builder(transactionalId, producerId, producerEpoch, partitions)
|
||||
val builder = new AddPartitionsToTxnRequest.Builder(transactionalId, producerId, producerEpoch, partitions.asJava)
|
||||
builder.build()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -139,7 +139,7 @@
|
|||
<p>
|
||||
We have added support to allow routing records dynamically to Kafka topics. More specifically, in both the lower-level <code>Topology#addSink</code> and higher-level <code>KStream#to</code> APIs, we have added variants that
|
||||
take a <code>TopicNameExtractor</code> instance instead of a specific <code>String</code> typed topic name, such that for each received record from the upstream processor, the library will dynamically determine which Kafka topic to write to
|
||||
based on the record's key and value, as well as record context. Note that all the Kafka topics that that may possibly be used are still considered as user topics and hence required to be pre-created. In addition to that, we have modified the
|
||||
based on the record's key and value, as well as record context. Note that all the Kafka topics that may possibly be used are still considered as user topics and hence required to be pre-created. In addition to that, we have modified the
|
||||
<code>StreamPartitioner</code> interface to add the topic name parameter since the topic name now may not be known beforehand; users who have customized implementations of this interface would need to update their code while upgrading their application
|
||||
to use Kafka Streams 2.0.0.
|
||||
</p>
|
||||
|
|
|
@ -59,7 +59,7 @@
|
|||
with the new protocol by default.</li>
|
||||
<li>Bumping the protocol version and restarting can be done any time after the brokers are upgraded. It does not have to be immediately after.
|
||||
Similarly for the message format version.</li>
|
||||
<li>If you are using Java8 method references in your Kafka Streams code you might need to update your code to resolve method ambiguties.
|
||||
<li>If you are using Java8 method references in your Kafka Streams code you might need to update your code to resolve method ambiguities.
|
||||
Hot-swapping the jar-file only might not work.</li>
|
||||
</ol>
|
||||
|
||||
|
|
|
@ -1028,7 +1028,7 @@ public class KafkaStreams {
|
|||
* @param <T> return type
|
||||
* @return A facade wrapping the local {@link StateStore} instances
|
||||
* @throws InvalidStateStoreException if Kafka Streams is (re-)initializing or a store with {@code storeName} and
|
||||
* {@code queryableStoreType} doesnt' exist
|
||||
* {@code queryableStoreType} doesn't exist
|
||||
*/
|
||||
public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) {
|
||||
validateIsRunning();
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.junit.BeforeClass;
|
|||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import scala.collection.JavaConverters;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
@ -116,7 +117,8 @@ public class InternalTopicIntegrationTest {
|
|||
DEFAULT_ZK_SESSION_TIMEOUT_MS, DEFAULT_ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE,
|
||||
Time.SYSTEM, "testMetricGroup", "testMetricType")) {
|
||||
final AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
|
||||
final Map<String, Properties> topicConfigs = scala.collection.JavaConversions.mapAsJavaMap(adminZkClient.getAllTopicConfigs());
|
||||
final Map<String, Properties> topicConfigs =
|
||||
JavaConverters.mapAsJavaMapConverter(adminZkClient.getAllTopicConfigs()).asJava();
|
||||
|
||||
for (Map.Entry<String, Properties> topicConfig : topicConfigs.entrySet()) {
|
||||
if (topicConfig.getKey().equals(changelog)) {
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.kafka.test.TestUtils;
|
|||
import org.junit.rules.ExternalResource;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import scala.collection.JavaConverters;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -296,8 +297,8 @@ public class EmbeddedKafkaCluster extends ExternalResource {
|
|||
|
||||
@Override
|
||||
public boolean conditionMet() {
|
||||
final Set<String> allTopics = new HashSet<>();
|
||||
allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
|
||||
final Set<String> allTopics = new HashSet<>(
|
||||
JavaConverters.seqAsJavaListConverter(zkUtils.getAllTopics()).asJava());
|
||||
return !allTopics.removeAll(deletedTopics);
|
||||
}
|
||||
}
|
||||
|
@ -311,8 +312,8 @@ public class EmbeddedKafkaCluster extends ExternalResource {
|
|||
|
||||
@Override
|
||||
public boolean conditionMet() {
|
||||
final Set<String> allTopics = new HashSet<>();
|
||||
allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
|
||||
final Set<String> allTopics = new HashSet<>(
|
||||
JavaConverters.seqAsJavaListConverter(zkUtils.getAllTopics()).asJava());
|
||||
return allTopics.equals(remainingTopics);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,21 +20,19 @@ import kafka.admin.RackAwareMode;
|
|||
import kafka.server.KafkaConfig;
|
||||
import kafka.server.KafkaConfig$;
|
||||
import kafka.server.KafkaServer;
|
||||
import kafka.utils.CoreUtils;
|
||||
import kafka.utils.MockTime;
|
||||
import kafka.utils.TestUtils;
|
||||
import kafka.zk.AdminZkClient;
|
||||
import kafka.zk.KafkaZkClient;
|
||||
import org.apache.kafka.common.network.ListenerName;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
|
@ -128,9 +126,12 @@ public class KafkaEmbedded {
|
|||
brokerList(), zookeeperConnect());
|
||||
kafka.shutdown();
|
||||
kafka.awaitShutdown();
|
||||
log.debug("Removing logs.dir at {} ...", logDir);
|
||||
final List<String> logDirs = Collections.singletonList(logDir.getAbsolutePath());
|
||||
CoreUtils.delete(scala.collection.JavaConversions.asScalaBuffer(logDirs).seq());
|
||||
log.debug("Removing log dir at {} ...", logDir);
|
||||
try {
|
||||
Utils.delete(logDir);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
tmpFolder.delete();
|
||||
log.debug("Shutdown of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...",
|
||||
brokerList(), zookeeperConnect());
|
||||
|
|
Loading…
Reference in New Issue