mirror of https://github.com/apache/kafka.git
MINOR: replace .stream().forEach() with .forEach() (#19626)
CI / build (push) Waiting to run
Details
CI / build (push) Waiting to run
Details
replace all applicable `.stream().forEach()` in codebase with just `.forEach()`. Reviewers: TengYao Chi <kitingiao@gmail.com>, Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
c85e09f7a5
commit
bff5ba4ad9
|
@ -381,7 +381,7 @@ public class Metadata implements Closeable {
|
||||||
public synchronized Set<TopicPartition> updatePartitionLeadership(Map<TopicPartition, LeaderIdAndEpoch> partitionLeaders, List<Node> leaderNodes) {
|
public synchronized Set<TopicPartition> updatePartitionLeadership(Map<TopicPartition, LeaderIdAndEpoch> partitionLeaders, List<Node> leaderNodes) {
|
||||||
Map<Integer, Node> newNodes = leaderNodes.stream().collect(Collectors.toMap(Node::id, node -> node));
|
Map<Integer, Node> newNodes = leaderNodes.stream().collect(Collectors.toMap(Node::id, node -> node));
|
||||||
// Insert non-overlapping nodes from existing-nodes into new-nodes.
|
// Insert non-overlapping nodes from existing-nodes into new-nodes.
|
||||||
this.metadataSnapshot.cluster().nodes().stream().forEach(node -> newNodes.putIfAbsent(node.id(), node));
|
this.metadataSnapshot.cluster().nodes().forEach(node -> newNodes.putIfAbsent(node.id(), node));
|
||||||
|
|
||||||
// Create partition-metadata for all updated partitions. Exclude updates for partitions -
|
// Create partition-metadata for all updated partitions. Exclude updates for partitions -
|
||||||
// 1. for which the corresponding partition has newer leader in existing metadata.
|
// 1. for which the corresponding partition has newer leader in existing metadata.
|
||||||
|
|
|
@ -69,7 +69,7 @@ public class DescribeUserScramCredentialsResult {
|
||||||
retval.completeExceptionally(Errors.forCode(optionalFirstFailedDescribe.get().errorCode()).exception(optionalFirstFailedDescribe.get().errorMessage()));
|
retval.completeExceptionally(Errors.forCode(optionalFirstFailedDescribe.get().errorCode()).exception(optionalFirstFailedDescribe.get().errorMessage()));
|
||||||
} else {
|
} else {
|
||||||
Map<String, UserScramCredentialsDescription> retvalMap = new HashMap<>();
|
Map<String, UserScramCredentialsDescription> retvalMap = new HashMap<>();
|
||||||
data.results().stream().forEach(userResult ->
|
data.results().forEach(userResult ->
|
||||||
retvalMap.put(userResult.user(), new UserScramCredentialsDescription(userResult.user(),
|
retvalMap.put(userResult.user(), new UserScramCredentialsDescription(userResult.user(),
|
||||||
getScramCredentialInfosFor(userResult))));
|
getScramCredentialInfosFor(userResult))));
|
||||||
retval.complete(retvalMap);
|
retval.complete(retvalMap);
|
||||||
|
|
|
@ -4487,8 +4487,8 @@ public class KafkaAdminClient extends AdminClient {
|
||||||
* Be sure to do this after the NOT_CONTROLLER error check above
|
* Be sure to do this after the NOT_CONTROLLER error check above
|
||||||
* so that all errors are consistent in that case.
|
* so that all errors are consistent in that case.
|
||||||
*/
|
*/
|
||||||
userIllegalAlterationExceptions.entrySet().stream().forEach(entry ->
|
userIllegalAlterationExceptions.forEach((key, value) ->
|
||||||
futures.get(entry.getKey()).completeExceptionally(entry.getValue())
|
futures.get(key).completeExceptionally(value)
|
||||||
);
|
);
|
||||||
response.data().results().forEach(result -> {
|
response.data().results().forEach(result -> {
|
||||||
KafkaFutureImpl<Void> future = futures.get(result.user());
|
KafkaFutureImpl<Void> future = futures.get(result.user());
|
||||||
|
|
|
@ -2413,7 +2413,7 @@ public class SenderTest {
|
||||||
// Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
|
// Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
|
||||||
MetadataResponse metadataUpdate1 = RequestTestUtils.metadataUpdateWithIds(2, Collections.singletonMap(topic, 2), TOPIC_IDS);
|
MetadataResponse metadataUpdate1 = RequestTestUtils.metadataUpdateWithIds(2, Collections.singletonMap(topic, 2), TOPIC_IDS);
|
||||||
client.prepareMetadataUpdate(metadataUpdate1);
|
client.prepareMetadataUpdate(metadataUpdate1);
|
||||||
metadataUpdate1.brokers().stream().forEach(node ->
|
metadataUpdate1.brokers().forEach(node ->
|
||||||
apiVersions.update(node.idString(), NodeApiVersions.create(ApiKeys.PRODUCE.id, ApiKeys.PRODUCE.oldestVersion(), ApiKeys.PRODUCE.latestVersion()))
|
apiVersions.update(node.idString(), NodeApiVersions.create(ApiKeys.PRODUCE.id, ApiKeys.PRODUCE.oldestVersion(), ApiKeys.PRODUCE.latestVersion()))
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -66,7 +66,7 @@ public class FakeLocalMetadataStore {
|
||||||
*/
|
*/
|
||||||
public static void updateTopicConfig(String topic, Config newConfig) {
|
public static void updateTopicConfig(String topic, Config newConfig) {
|
||||||
ConcurrentHashMap<String, String> topicConfigs = FakeLocalMetadataStore.ALL_TOPICS.getOrDefault(topic, new ConcurrentHashMap<>());
|
ConcurrentHashMap<String, String> topicConfigs = FakeLocalMetadataStore.ALL_TOPICS.getOrDefault(topic, new ConcurrentHashMap<>());
|
||||||
newConfig.entries().stream().forEach(configEntry -> {
|
newConfig.entries().forEach(configEntry -> {
|
||||||
if (configEntry.name() != null) {
|
if (configEntry.name() != null) {
|
||||||
if (configEntry.value() != null) {
|
if (configEntry.value() != null) {
|
||||||
log.debug("Topic '{}' update config '{}' to '{}'", topic, configEntry.name(), configEntry.value());
|
log.debug("Topic '{}' update config '{}' to '{}'", topic, configEntry.name(), configEntry.value());
|
||||||
|
|
|
@ -88,10 +88,10 @@ public class QuorumControllerIntegrationTestUtils {
|
||||||
.setName(MetadataVersion.FEATURE_NAME)
|
.setName(MetadataVersion.FEATURE_NAME)
|
||||||
.setMinSupportedVersion(minVersion.featureLevel())
|
.setMinSupportedVersion(minVersion.featureLevel())
|
||||||
.setMaxSupportedVersion(maxVersion.featureLevel()));
|
.setMaxSupportedVersion(maxVersion.featureLevel()));
|
||||||
featureMaxVersions.entrySet().forEach(entry -> {
|
featureMaxVersions.forEach((key, value) -> {
|
||||||
features.add(new BrokerRegistrationRequestData.Feature()
|
features.add(new BrokerRegistrationRequestData.Feature()
|
||||||
.setName(entry.getKey())
|
.setName(key)
|
||||||
.setMaxSupportedVersion(entry.getValue())
|
.setMaxSupportedVersion(value)
|
||||||
.setMinSupportedVersion((short) 0));
|
.setMinSupportedVersion((short) 0));
|
||||||
});
|
});
|
||||||
return features;
|
return features;
|
||||||
|
|
|
@ -368,7 +368,7 @@ public final class AssignmentsManager {
|
||||||
previousGlobalFailures++;
|
previousGlobalFailures++;
|
||||||
log.error("handleResponse: {} assignments failed; global error: {}. Retrying.",
|
log.error("handleResponse: {} assignments failed; global error: {}. Retrying.",
|
||||||
sent.size(), globalResponseError.get());
|
sent.size(), globalResponseError.get());
|
||||||
sent.entrySet().forEach(e -> ready.putIfAbsent(e.getKey(), e.getValue()));
|
sent.forEach(ready::putIfAbsent);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
previousGlobalFailures = 0;
|
previousGlobalFailures = 0;
|
||||||
|
|
|
@ -167,7 +167,7 @@ public class ClusterTool {
|
||||||
if (listControllerEndpoints) {
|
if (listControllerEndpoints) {
|
||||||
String format = "%-10s %-" + maxHostLength + "s %-10s %-" + maxRackLength + "s %-15s%n";
|
String format = "%-10s %-" + maxHostLength + "s %-10s %-" + maxRackLength + "s %-15s%n";
|
||||||
stream.printf(format, "ID", "HOST", "PORT", "RACK", "ENDPOINT_TYPE");
|
stream.printf(format, "ID", "HOST", "PORT", "RACK", "ENDPOINT_TYPE");
|
||||||
nodes.stream().forEach(node -> stream.printf(format,
|
nodes.forEach(node -> stream.printf(format,
|
||||||
node.idString(),
|
node.idString(),
|
||||||
node.host(),
|
node.host(),
|
||||||
node.port(),
|
node.port(),
|
||||||
|
@ -177,7 +177,7 @@ public class ClusterTool {
|
||||||
} else {
|
} else {
|
||||||
String format = "%-10s %-" + maxHostLength + "s %-10s %-" + maxRackLength + "s %-10s %-15s%n";
|
String format = "%-10s %-" + maxHostLength + "s %-10s %-" + maxRackLength + "s %-10s %-15s%n";
|
||||||
stream.printf(format, "ID", "HOST", "PORT", "RACK", "STATE", "ENDPOINT_TYPE");
|
stream.printf(format, "ID", "HOST", "PORT", "RACK", "STATE", "ENDPOINT_TYPE");
|
||||||
nodes.stream().forEach(node -> stream.printf(format,
|
nodes.forEach(node -> stream.printf(format,
|
||||||
node.idString(),
|
node.idString(),
|
||||||
node.host(),
|
node.host(),
|
||||||
node.port(),
|
node.port(),
|
||||||
|
|
Loading…
Reference in New Issue