From c2aeec46a2899c6f8ad27bab65baeeaa87f1dacf Mon Sep 17 00:00:00 2001 From: Lan Ding Date: Mon, 29 Sep 2025 01:37:58 +0800 Subject: [PATCH] MINOR: Remove logContext arrtibute from StreamsGroup and CoordinatorRuntime (#20572) The `logContext` attribute in `StreamsGroup` and `CoordinatorRuntime` is not used anymore. This patch removes it. Reviewers: Ken Huang , TengYao Chi , Chia-Ping Tsai --- .../kafka/common/network/ChannelBuilders.java | 2 +- .../kafka/common/network/SslChannelBuilder.java | 4 +--- .../kafka/common/network/SslSelectorTest.java | 9 ++++----- .../common/network/SslTransportLayerTest.java | 14 ++++++-------- .../common/network/SslTransportTls12Tls13Test.java | 5 ++--- .../common/runtime/CoordinatorRuntime.java | 6 ------ .../coordinator/group/streams/StreamsGroup.java | 2 -- 7 files changed, 14 insertions(+), 28 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java index aea38c72cac..847f887a324 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java @@ -125,7 +125,7 @@ public class ChannelBuilders { switch (securityProtocol) { case SSL: requireNonNullMode(connectionMode, securityProtocol); - channelBuilder = new SslChannelBuilder(connectionMode, listenerName, isInterBrokerListener, logContext); + channelBuilder = new SslChannelBuilder(connectionMode, listenerName, isInterBrokerListener); break; case SASL_SSL: case SASL_PLAINTEXT: diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java index a35a0b8b209..249fcad163a 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java @@ -26,7 +26,6 @@ import org.apache.kafka.common.security.auth.KafkaPrincipalSerde; import org.apache.kafka.common.security.auth.SslAuthenticationContext; import org.apache.kafka.common.security.ssl.SslFactory; import org.apache.kafka.common.security.ssl.SslPrincipalMapper; -import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import java.io.Closeable; @@ -53,8 +52,7 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable */ public SslChannelBuilder(ConnectionMode connectionMode, ListenerName listenerName, - boolean isInterBrokerListener, - LogContext logContext) { + boolean isInterBrokerListener) { this.connectionMode = connectionMode; this.listenerName = listenerName; this.isInterBrokerListener = isInterBrokerListener; diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java index 8a9704c1621..572ec443e08 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java @@ -74,11 +74,10 @@ public abstract class SslSelectorTest extends SelectorTest { this.server.start(); this.time = new MockTime(); sslClientConfigs = createSslClientConfigs(trustStoreFile); - LogContext logContext = new LogContext(); - this.channelBuilder = new SslChannelBuilder(ConnectionMode.CLIENT, null, false, logContext); + this.channelBuilder = new SslChannelBuilder(ConnectionMode.CLIENT, null, false); this.channelBuilder.configure(sslClientConfigs); this.metrics = new Metrics(); - this.selector = new Selector(5000, metrics, time, "MetricGroup", channelBuilder, logContext); + this.selector = new Selector(5000, metrics, time, "MetricGroup", channelBuilder, new LogContext()); } protected abstract Map createSslClientConfigs(File trustStoreFile) throws GeneralSecurityException, IOException; @@ -255,7 +254,7 @@ public abstract class SslSelectorTest extends SelectorTest { .tlsProtocol(tlsProtocol) .createNewTrustStore(trustStoreFile) .build(); - channelBuilder = new SslChannelBuilder(ConnectionMode.SERVER, null, false, new LogContext()); + channelBuilder = new SslChannelBuilder(ConnectionMode.SERVER, null, false); channelBuilder.configure(sslServerConfigs); selector = new Selector(NetworkReceive.UNLIMITED, 5000, metrics, time, "MetricGroup", new HashMap<>(), true, false, channelBuilder, pool, new LogContext()); @@ -342,7 +341,7 @@ public abstract class SslSelectorTest extends SelectorTest { private static class TestSslChannelBuilder extends SslChannelBuilder { public TestSslChannelBuilder(ConnectionMode connectionMode) { - super(connectionMode, null, false, new LogContext()); + super(connectionMode, null, false); } @Override diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index 72f130ca4e3..9525ccfbc82 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -767,11 +767,10 @@ public class SslTransportLayerTest { @ParameterizedTest @ArgumentsSource(SslTransportLayerArgumentsProvider.class) public void testNetworkThreadTimeRecorded(Args args) throws Exception { - LogContext logContext = new LogContext(); - ChannelBuilder channelBuilder = new SslChannelBuilder(ConnectionMode.CLIENT, null, false, logContext); + ChannelBuilder channelBuilder = new SslChannelBuilder(ConnectionMode.CLIENT, null, false); channelBuilder.configure(args.sslClientConfigs); try (Selector selector = new Selector(NetworkReceive.UNLIMITED, Selector.NO_IDLE_TIMEOUT_MS, new Metrics(), Time.SYSTEM, - "MetricGroup", new HashMap<>(), false, true, channelBuilder, MemoryPool.NONE, logContext)) { + "MetricGroup", new HashMap<>(), false, true, channelBuilder, MemoryPool.NONE, new LogContext())) { String node = "0"; server = createEchoServer(args, SecurityProtocol.SSL); @@ -967,7 +966,7 @@ public class SslTransportLayerTest { } private SslChannelBuilder newClientChannelBuilder() { - return new SslChannelBuilder(ConnectionMode.CLIENT, null, false, new LogContext()); + return new SslChannelBuilder(ConnectionMode.CLIENT, null, false); } private void testClose(Args args, SecurityProtocol securityProtocol, ChannelBuilder clientChannelBuilder) throws Exception { @@ -1311,10 +1310,9 @@ public class SslTransportLayerTest { } private Selector createSelector(Args args) { - LogContext logContext = new LogContext(); - ChannelBuilder channelBuilder = new SslChannelBuilder(ConnectionMode.CLIENT, null, false, logContext); + ChannelBuilder channelBuilder = new SslChannelBuilder(ConnectionMode.CLIENT, null, false); channelBuilder.configure(args.sslClientConfigs); - selector = new Selector(5000, new Metrics(), TIME, "MetricGroup", channelBuilder, logContext); + selector = new Selector(5000, new Metrics(), TIME, "MetricGroup", channelBuilder, new LogContext()); return selector; } @@ -1371,7 +1369,7 @@ public class SslTransportLayerTest { int flushDelayCount = 0; public TestSslChannelBuilder(ConnectionMode connectionMode) { - super(connectionMode, null, false, new LogContext()); + super(connectionMode, null, false); } public void configureBufferSizes(Integer netReadBufSize, Integer netWriteBufSize, Integer appBufSize) { diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportTls12Tls13Test.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportTls12Tls13Test.java index 0d67c8aee63..0ad81d17b5b 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportTls12Tls13Test.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportTls12Tls13Test.java @@ -50,10 +50,9 @@ public class SslTransportTls12Tls13Test { sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores); sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores); - LogContext logContext = new LogContext(); - ChannelBuilder channelBuilder = new SslChannelBuilder(ConnectionMode.CLIENT, null, false, logContext); + ChannelBuilder channelBuilder = new SslChannelBuilder(ConnectionMode.CLIENT, null, false); channelBuilder.configure(sslClientConfigs); - this.selector = new Selector(5000, new Metrics(), TIME, "MetricGroup", channelBuilder, logContext); + this.selector = new Selector(5000, new Metrics(), TIME, "MetricGroup", channelBuilder, new LogContext()); } @AfterEach diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java index 27dd6518f1a..52d3f27f3cf 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java @@ -1923,11 +1923,6 @@ public class CoordinatorRuntime, U> implements Aut */ private final String logPrefix; - /** - * The log context. - */ - private final LogContext logContext; - /** * The logger. */ @@ -2054,7 +2049,6 @@ public class CoordinatorRuntime, U> implements Aut ExecutorService executorService ) { this.logPrefix = logPrefix; - this.logContext = logContext; this.log = logContext.logger(CoordinatorRuntime.class); this.time = time; this.timer = timer; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java index 61f61d101f1..7ec3596628e 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java @@ -113,7 +113,6 @@ public class StreamsGroup implements Group { } } - private final LogContext logContext; private final Logger log; /** @@ -217,7 +216,6 @@ public class StreamsGroup implements Group { String groupId ) { this.log = logContext.logger(StreamsGroup.class); - this.logContext = logContext; this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry); this.groupId = Objects.requireNonNull(groupId); this.state = new TimelineObject<>(snapshotRegistry, EMPTY);