From bff5ba4ad93cd40df501987baad3cb4d1452d5c2 Mon Sep 17 00:00:00 2001 From: yunchi Date: Sun, 4 May 2025 05:39:55 -0700 Subject: [PATCH] MINOR: replace .stream().forEach() with .forEach() (#19626) replace all applicable `.stream().forEach()` in codebase with just `.forEach()`. Reviewers: TengYao Chi , Ken Huang , Chia-Ping Tsai --- .../src/main/java/org/apache/kafka/clients/Metadata.java | 2 +- .../clients/admin/DescribeUserScramCredentialsResult.java | 2 +- .../org/apache/kafka/clients/admin/KafkaAdminClient.java | 4 ++-- .../apache/kafka/clients/producer/internals/SenderTest.java | 2 +- .../mirror/clients/admin/FakeLocalMetadataStore.java | 2 +- .../controller/QuorumControllerIntegrationTestUtils.java | 6 +++--- .../java/org/apache/kafka/server/AssignmentsManager.java | 2 +- tools/src/main/java/org/apache/kafka/tools/ClusterTool.java | 4 ++-- 8 files changed, 12 insertions(+), 12 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index b60156aae00..4c567b7d466 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -381,7 +381,7 @@ public class Metadata implements Closeable { public synchronized Set updatePartitionLeadership(Map partitionLeaders, List leaderNodes) { Map newNodes = leaderNodes.stream().collect(Collectors.toMap(Node::id, node -> node)); // Insert non-overlapping nodes from existing-nodes into new-nodes. - this.metadataSnapshot.cluster().nodes().stream().forEach(node -> newNodes.putIfAbsent(node.id(), node)); + this.metadataSnapshot.cluster().nodes().forEach(node -> newNodes.putIfAbsent(node.id(), node)); // Create partition-metadata for all updated partitions. Exclude updates for partitions - // 1. for which the corresponding partition has newer leader in existing metadata. diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java index 5a2f55c544f..fad56892f45 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java @@ -69,7 +69,7 @@ public class DescribeUserScramCredentialsResult { retval.completeExceptionally(Errors.forCode(optionalFirstFailedDescribe.get().errorCode()).exception(optionalFirstFailedDescribe.get().errorMessage())); } else { Map retvalMap = new HashMap<>(); - data.results().stream().forEach(userResult -> + data.results().forEach(userResult -> retvalMap.put(userResult.user(), new UserScramCredentialsDescription(userResult.user(), getScramCredentialInfosFor(userResult)))); retval.complete(retvalMap); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 5cb5cc292ea..3206d6f19ed 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -4487,8 +4487,8 @@ public class KafkaAdminClient extends AdminClient { * Be sure to do this after the NOT_CONTROLLER error check above * so that all errors are consistent in that case. */ - userIllegalAlterationExceptions.entrySet().stream().forEach(entry -> - futures.get(entry.getKey()).completeExceptionally(entry.getValue()) + userIllegalAlterationExceptions.forEach((key, value) -> + futures.get(key).completeExceptionally(value) ); response.data().results().forEach(result -> { KafkaFutureImpl future = futures.get(result.user()); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 44d9bec5dde..34392d58690 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -2413,7 +2413,7 @@ public class SenderTest { // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1 MetadataResponse metadataUpdate1 = RequestTestUtils.metadataUpdateWithIds(2, Collections.singletonMap(topic, 2), TOPIC_IDS); client.prepareMetadataUpdate(metadataUpdate1); - metadataUpdate1.brokers().stream().forEach(node -> + metadataUpdate1.brokers().forEach(node -> apiVersions.update(node.idString(), NodeApiVersions.create(ApiKeys.PRODUCE.id, ApiKeys.PRODUCE.oldestVersion(), ApiKeys.PRODUCE.latestVersion())) ); diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeLocalMetadataStore.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeLocalMetadataStore.java index 1c08cbaf72e..b55673810a4 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeLocalMetadataStore.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeLocalMetadataStore.java @@ -66,7 +66,7 @@ public class FakeLocalMetadataStore { */ public static void updateTopicConfig(String topic, Config newConfig) { ConcurrentHashMap topicConfigs = FakeLocalMetadataStore.ALL_TOPICS.getOrDefault(topic, new ConcurrentHashMap<>()); - newConfig.entries().stream().forEach(configEntry -> { + newConfig.entries().forEach(configEntry -> { if (configEntry.name() != null) { if (configEntry.value() != null) { log.debug("Topic '{}' update config '{}' to '{}'", topic, configEntry.name(), configEntry.value()); diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java index e37a9634a8d..0c094f19ca1 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java @@ -88,10 +88,10 @@ public class QuorumControllerIntegrationTestUtils { .setName(MetadataVersion.FEATURE_NAME) .setMinSupportedVersion(minVersion.featureLevel()) .setMaxSupportedVersion(maxVersion.featureLevel())); - featureMaxVersions.entrySet().forEach(entry -> { + featureMaxVersions.forEach((key, value) -> { features.add(new BrokerRegistrationRequestData.Feature() - .setName(entry.getKey()) - .setMaxSupportedVersion(entry.getValue()) + .setName(key) + .setMaxSupportedVersion(value) .setMinSupportedVersion((short) 0)); }); return features; diff --git a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java index b6223418ab5..6d493d96ec1 100644 --- a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java +++ b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java @@ -368,7 +368,7 @@ public final class AssignmentsManager { previousGlobalFailures++; log.error("handleResponse: {} assignments failed; global error: {}. Retrying.", sent.size(), globalResponseError.get()); - sent.entrySet().forEach(e -> ready.putIfAbsent(e.getKey(), e.getValue())); + sent.forEach(ready::putIfAbsent); return; } previousGlobalFailures = 0; diff --git a/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java b/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java index 370d756d493..ccffaeae0f1 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java +++ b/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java @@ -167,7 +167,7 @@ public class ClusterTool { if (listControllerEndpoints) { String format = "%-10s %-" + maxHostLength + "s %-10s %-" + maxRackLength + "s %-15s%n"; stream.printf(format, "ID", "HOST", "PORT", "RACK", "ENDPOINT_TYPE"); - nodes.stream().forEach(node -> stream.printf(format, + nodes.forEach(node -> stream.printf(format, node.idString(), node.host(), node.port(), @@ -177,7 +177,7 @@ public class ClusterTool { } else { String format = "%-10s %-" + maxHostLength + "s %-10s %-" + maxRackLength + "s %-10s %-15s%n"; stream.printf(format, "ID", "HOST", "PORT", "RACK", "STATE", "ENDPOINT_TYPE"); - nodes.stream().forEach(node -> stream.printf(format, + nodes.forEach(node -> stream.printf(format, node.idString(), node.host(), node.port(),