mirror of https://github.com/apache/kafka.git
MINOR: Remove logContext arrtibute from StreamsGroup and CoordinatorRuntime (#20572)
CI / build (push) Waiting to run
Details
CI / build (push) Waiting to run
Details
The `logContext` attribute in `StreamsGroup` and `CoordinatorRuntime` is not used anymore. This patch removes it. Reviewers: Ken Huang <s7133700@gmail.com>, TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
e27ea8d4db
commit
c2aeec46a2
|
@ -125,7 +125,7 @@ public class ChannelBuilders {
|
||||||
switch (securityProtocol) {
|
switch (securityProtocol) {
|
||||||
case SSL:
|
case SSL:
|
||||||
requireNonNullMode(connectionMode, securityProtocol);
|
requireNonNullMode(connectionMode, securityProtocol);
|
||||||
channelBuilder = new SslChannelBuilder(connectionMode, listenerName, isInterBrokerListener, logContext);
|
channelBuilder = new SslChannelBuilder(connectionMode, listenerName, isInterBrokerListener);
|
||||||
break;
|
break;
|
||||||
case SASL_SSL:
|
case SASL_SSL:
|
||||||
case SASL_PLAINTEXT:
|
case SASL_PLAINTEXT:
|
||||||
|
|
|
@ -26,7 +26,6 @@ import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
|
||||||
import org.apache.kafka.common.security.auth.SslAuthenticationContext;
|
import org.apache.kafka.common.security.auth.SslAuthenticationContext;
|
||||||
import org.apache.kafka.common.security.ssl.SslFactory;
|
import org.apache.kafka.common.security.ssl.SslFactory;
|
||||||
import org.apache.kafka.common.security.ssl.SslPrincipalMapper;
|
import org.apache.kafka.common.security.ssl.SslPrincipalMapper;
|
||||||
import org.apache.kafka.common.utils.LogContext;
|
|
||||||
import org.apache.kafka.common.utils.Utils;
|
import org.apache.kafka.common.utils.Utils;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
|
@ -53,8 +52,7 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable
|
||||||
*/
|
*/
|
||||||
public SslChannelBuilder(ConnectionMode connectionMode,
|
public SslChannelBuilder(ConnectionMode connectionMode,
|
||||||
ListenerName listenerName,
|
ListenerName listenerName,
|
||||||
boolean isInterBrokerListener,
|
boolean isInterBrokerListener) {
|
||||||
LogContext logContext) {
|
|
||||||
this.connectionMode = connectionMode;
|
this.connectionMode = connectionMode;
|
||||||
this.listenerName = listenerName;
|
this.listenerName = listenerName;
|
||||||
this.isInterBrokerListener = isInterBrokerListener;
|
this.isInterBrokerListener = isInterBrokerListener;
|
||||||
|
|
|
@ -74,11 +74,10 @@ public abstract class SslSelectorTest extends SelectorTest {
|
||||||
this.server.start();
|
this.server.start();
|
||||||
this.time = new MockTime();
|
this.time = new MockTime();
|
||||||
sslClientConfigs = createSslClientConfigs(trustStoreFile);
|
sslClientConfigs = createSslClientConfigs(trustStoreFile);
|
||||||
LogContext logContext = new LogContext();
|
this.channelBuilder = new SslChannelBuilder(ConnectionMode.CLIENT, null, false);
|
||||||
this.channelBuilder = new SslChannelBuilder(ConnectionMode.CLIENT, null, false, logContext);
|
|
||||||
this.channelBuilder.configure(sslClientConfigs);
|
this.channelBuilder.configure(sslClientConfigs);
|
||||||
this.metrics = new Metrics();
|
this.metrics = new Metrics();
|
||||||
this.selector = new Selector(5000, metrics, time, "MetricGroup", channelBuilder, logContext);
|
this.selector = new Selector(5000, metrics, time, "MetricGroup", channelBuilder, new LogContext());
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract Map<String, Object> createSslClientConfigs(File trustStoreFile) throws GeneralSecurityException, IOException;
|
protected abstract Map<String, Object> createSslClientConfigs(File trustStoreFile) throws GeneralSecurityException, IOException;
|
||||||
|
@ -255,7 +254,7 @@ public abstract class SslSelectorTest extends SelectorTest {
|
||||||
.tlsProtocol(tlsProtocol)
|
.tlsProtocol(tlsProtocol)
|
||||||
.createNewTrustStore(trustStoreFile)
|
.createNewTrustStore(trustStoreFile)
|
||||||
.build();
|
.build();
|
||||||
channelBuilder = new SslChannelBuilder(ConnectionMode.SERVER, null, false, new LogContext());
|
channelBuilder = new SslChannelBuilder(ConnectionMode.SERVER, null, false);
|
||||||
channelBuilder.configure(sslServerConfigs);
|
channelBuilder.configure(sslServerConfigs);
|
||||||
selector = new Selector(NetworkReceive.UNLIMITED, 5000, metrics, time, "MetricGroup",
|
selector = new Selector(NetworkReceive.UNLIMITED, 5000, metrics, time, "MetricGroup",
|
||||||
new HashMap<>(), true, false, channelBuilder, pool, new LogContext());
|
new HashMap<>(), true, false, channelBuilder, pool, new LogContext());
|
||||||
|
@ -342,7 +341,7 @@ public abstract class SslSelectorTest extends SelectorTest {
|
||||||
private static class TestSslChannelBuilder extends SslChannelBuilder {
|
private static class TestSslChannelBuilder extends SslChannelBuilder {
|
||||||
|
|
||||||
public TestSslChannelBuilder(ConnectionMode connectionMode) {
|
public TestSslChannelBuilder(ConnectionMode connectionMode) {
|
||||||
super(connectionMode, null, false, new LogContext());
|
super(connectionMode, null, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -767,11 +767,10 @@ public class SslTransportLayerTest {
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@ArgumentsSource(SslTransportLayerArgumentsProvider.class)
|
@ArgumentsSource(SslTransportLayerArgumentsProvider.class)
|
||||||
public void testNetworkThreadTimeRecorded(Args args) throws Exception {
|
public void testNetworkThreadTimeRecorded(Args args) throws Exception {
|
||||||
LogContext logContext = new LogContext();
|
ChannelBuilder channelBuilder = new SslChannelBuilder(ConnectionMode.CLIENT, null, false);
|
||||||
ChannelBuilder channelBuilder = new SslChannelBuilder(ConnectionMode.CLIENT, null, false, logContext);
|
|
||||||
channelBuilder.configure(args.sslClientConfigs);
|
channelBuilder.configure(args.sslClientConfigs);
|
||||||
try (Selector selector = new Selector(NetworkReceive.UNLIMITED, Selector.NO_IDLE_TIMEOUT_MS, new Metrics(), Time.SYSTEM,
|
try (Selector selector = new Selector(NetworkReceive.UNLIMITED, Selector.NO_IDLE_TIMEOUT_MS, new Metrics(), Time.SYSTEM,
|
||||||
"MetricGroup", new HashMap<>(), false, true, channelBuilder, MemoryPool.NONE, logContext)) {
|
"MetricGroup", new HashMap<>(), false, true, channelBuilder, MemoryPool.NONE, new LogContext())) {
|
||||||
|
|
||||||
String node = "0";
|
String node = "0";
|
||||||
server = createEchoServer(args, SecurityProtocol.SSL);
|
server = createEchoServer(args, SecurityProtocol.SSL);
|
||||||
|
@ -967,7 +966,7 @@ public class SslTransportLayerTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
private SslChannelBuilder newClientChannelBuilder() {
|
private SslChannelBuilder newClientChannelBuilder() {
|
||||||
return new SslChannelBuilder(ConnectionMode.CLIENT, null, false, new LogContext());
|
return new SslChannelBuilder(ConnectionMode.CLIENT, null, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testClose(Args args, SecurityProtocol securityProtocol, ChannelBuilder clientChannelBuilder) throws Exception {
|
private void testClose(Args args, SecurityProtocol securityProtocol, ChannelBuilder clientChannelBuilder) throws Exception {
|
||||||
|
@ -1311,10 +1310,9 @@ public class SslTransportLayerTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
private Selector createSelector(Args args) {
|
private Selector createSelector(Args args) {
|
||||||
LogContext logContext = new LogContext();
|
ChannelBuilder channelBuilder = new SslChannelBuilder(ConnectionMode.CLIENT, null, false);
|
||||||
ChannelBuilder channelBuilder = new SslChannelBuilder(ConnectionMode.CLIENT, null, false, logContext);
|
|
||||||
channelBuilder.configure(args.sslClientConfigs);
|
channelBuilder.configure(args.sslClientConfigs);
|
||||||
selector = new Selector(5000, new Metrics(), TIME, "MetricGroup", channelBuilder, logContext);
|
selector = new Selector(5000, new Metrics(), TIME, "MetricGroup", channelBuilder, new LogContext());
|
||||||
return selector;
|
return selector;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1371,7 +1369,7 @@ public class SslTransportLayerTest {
|
||||||
int flushDelayCount = 0;
|
int flushDelayCount = 0;
|
||||||
|
|
||||||
public TestSslChannelBuilder(ConnectionMode connectionMode) {
|
public TestSslChannelBuilder(ConnectionMode connectionMode) {
|
||||||
super(connectionMode, null, false, new LogContext());
|
super(connectionMode, null, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void configureBufferSizes(Integer netReadBufSize, Integer netWriteBufSize, Integer appBufSize) {
|
public void configureBufferSizes(Integer netReadBufSize, Integer netWriteBufSize, Integer appBufSize) {
|
||||||
|
|
|
@ -50,10 +50,9 @@ public class SslTransportTls12Tls13Test {
|
||||||
sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores);
|
sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores);
|
||||||
sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores);
|
sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores);
|
||||||
|
|
||||||
LogContext logContext = new LogContext();
|
ChannelBuilder channelBuilder = new SslChannelBuilder(ConnectionMode.CLIENT, null, false);
|
||||||
ChannelBuilder channelBuilder = new SslChannelBuilder(ConnectionMode.CLIENT, null, false, logContext);
|
|
||||||
channelBuilder.configure(sslClientConfigs);
|
channelBuilder.configure(sslClientConfigs);
|
||||||
this.selector = new Selector(5000, new Metrics(), TIME, "MetricGroup", channelBuilder, logContext);
|
this.selector = new Selector(5000, new Metrics(), TIME, "MetricGroup", channelBuilder, new LogContext());
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterEach
|
@AfterEach
|
||||||
|
|
|
@ -1923,11 +1923,6 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
|
||||||
*/
|
*/
|
||||||
private final String logPrefix;
|
private final String logPrefix;
|
||||||
|
|
||||||
/**
|
|
||||||
* The log context.
|
|
||||||
*/
|
|
||||||
private final LogContext logContext;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The logger.
|
* The logger.
|
||||||
*/
|
*/
|
||||||
|
@ -2054,7 +2049,6 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
|
||||||
ExecutorService executorService
|
ExecutorService executorService
|
||||||
) {
|
) {
|
||||||
this.logPrefix = logPrefix;
|
this.logPrefix = logPrefix;
|
||||||
this.logContext = logContext;
|
|
||||||
this.log = logContext.logger(CoordinatorRuntime.class);
|
this.log = logContext.logger(CoordinatorRuntime.class);
|
||||||
this.time = time;
|
this.time = time;
|
||||||
this.timer = timer;
|
this.timer = timer;
|
||||||
|
|
|
@ -113,7 +113,6 @@ public class StreamsGroup implements Group {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private final LogContext logContext;
|
|
||||||
private final Logger log;
|
private final Logger log;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -217,7 +216,6 @@ public class StreamsGroup implements Group {
|
||||||
String groupId
|
String groupId
|
||||||
) {
|
) {
|
||||||
this.log = logContext.logger(StreamsGroup.class);
|
this.log = logContext.logger(StreamsGroup.class);
|
||||||
this.logContext = logContext;
|
|
||||||
this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry);
|
this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry);
|
||||||
this.groupId = Objects.requireNonNull(groupId);
|
this.groupId = Objects.requireNonNull(groupId);
|
||||||
this.state = new TimelineObject<>(snapshotRegistry, EMPTY);
|
this.state = new TimelineObject<>(snapshotRegistry, EMPTY);
|
||||||
|
|
Loading…
Reference in New Issue