mirror of https://github.com/apache/kafka.git
KAFKA-17778: Add listener to remove client instances on connection disconnect (#17474)
The ClientsMetricManager keeps the client instance cache at a limit of 16384. The active connections can be low, but connections can be created and destroyed in short span. Which hits the cache limit and results in cache miss for new connections. The client instance cache keeps the instances for 3 * push interval ms. Hence when the cache gets full it creates new instances for every request while evicting others from cache. This gives some bumps to the GC for the broker and eventually CPU. Though with evicting early it will still be garbage collected but the long running active connections will not be removed from the cache. This PR adds capability to evict client instances from cache when the connection from client is dropped. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Andrew Schofield <aschofield@confluent.io>, Jun Rao <junrao@gmail.com>
This commit is contained in:
parent
f7498a40f2
commit
5da3b9410d
|
@ -261,10 +261,11 @@ class BrokerServer(
|
|||
Some(clientMetricsManager)
|
||||
)
|
||||
|
||||
val connectionDisconnectListeners = Seq(clientMetricsManager.connectionDisconnectListener())
|
||||
// Create and start the socket server acceptor threads so that the bound port is known.
|
||||
// Delay starting processors until the end of the initialization sequence to ensure
|
||||
// that credentials have been loaded before processing authentications.
|
||||
socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)
|
||||
socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager, connectionDisconnectListeners)
|
||||
|
||||
clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)
|
||||
|
||||
|
|
|
@ -50,6 +50,7 @@ import org.apache.kafka.server.metrics.ClientMetricsConfigs;
|
|||
import org.apache.kafka.server.metrics.ClientMetricsInstance;
|
||||
import org.apache.kafka.server.metrics.ClientMetricsInstanceMetadata;
|
||||
import org.apache.kafka.server.metrics.ClientMetricsReceiverPlugin;
|
||||
import org.apache.kafka.server.network.ConnectionDisconnectListener;
|
||||
import org.apache.kafka.server.util.timer.SystemTimer;
|
||||
import org.apache.kafka.server.util.timer.SystemTimerReaper;
|
||||
import org.apache.kafka.server.util.timer.Timer;
|
||||
|
@ -91,6 +92,7 @@ public class ClientMetricsManager implements AutoCloseable {
|
|||
|
||||
private final ClientMetricsReceiverPlugin receiverPlugin;
|
||||
private final Cache<Uuid, ClientMetricsInstance> clientInstanceCache;
|
||||
private final Map<String, Uuid> clientConnectionIdMap;
|
||||
private final Timer expirationTimer;
|
||||
private final Map<String, SubscriptionInfo> subscriptionMap;
|
||||
private final int clientTelemetryMaxBytes;
|
||||
|
@ -99,6 +101,7 @@ public class ClientMetricsManager implements AutoCloseable {
|
|||
private final AtomicLong lastCacheErrorLogMs;
|
||||
private final Metrics metrics;
|
||||
private final ClientMetricsStats clientMetricsStats;
|
||||
private final ConnectionDisconnectListener connectionDisconnectListener;
|
||||
|
||||
// The latest subscription version is used to determine if subscription has changed and needs
|
||||
// to re-evaluate the client instance subscription id as per changed subscriptions.
|
||||
|
@ -114,6 +117,7 @@ public class ClientMetricsManager implements AutoCloseable {
|
|||
this.subscriptionMap = new ConcurrentHashMap<>();
|
||||
this.subscriptionUpdateVersion = new AtomicInteger(0);
|
||||
this.clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CACHE_MAX_SIZE));
|
||||
this.clientConnectionIdMap = new ConcurrentHashMap<>();
|
||||
this.expirationTimer = new SystemTimerReaper(CLIENT_METRICS_REAPER_THREAD_NAME, new SystemTimer("client-metrics"));
|
||||
this.clientTelemetryMaxBytes = clientTelemetryMaxBytes;
|
||||
this.time = time;
|
||||
|
@ -121,6 +125,7 @@ public class ClientMetricsManager implements AutoCloseable {
|
|||
this.lastCacheErrorLogMs = new AtomicLong(0);
|
||||
this.metrics = metrics;
|
||||
this.clientMetricsStats = new ClientMetricsStats();
|
||||
this.connectionDisconnectListener = new ClientConnectionDisconnectListener();
|
||||
}
|
||||
|
||||
public Set<String> listClientMetricsResources() {
|
||||
|
@ -227,6 +232,10 @@ public class ClientMetricsManager implements AutoCloseable {
|
|||
return !receiverPlugin.isEmpty();
|
||||
}
|
||||
|
||||
public ConnectionDisconnectListener connectionDisconnectListener() {
|
||||
return connectionDisconnectListener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
subscriptionMap.clear();
|
||||
|
@ -274,7 +283,7 @@ public class ClientMetricsManager implements AutoCloseable {
|
|||
|
||||
ClientMetricsInstanceMetadata instanceMetadata = new ClientMetricsInstanceMetadata(
|
||||
clientInstanceId, requestContext);
|
||||
clientInstance = createClientInstanceAndUpdateCache(clientInstanceId, instanceMetadata);
|
||||
clientInstance = createClientInstanceAndUpdateCache(clientInstanceId, instanceMetadata, requestContext.connectionId());
|
||||
}
|
||||
} else if (clientInstance.subscriptionVersion() < subscriptionUpdateVersion.get()) {
|
||||
/*
|
||||
|
@ -293,13 +302,13 @@ public class ClientMetricsManager implements AutoCloseable {
|
|||
}
|
||||
// Cancel the existing expiration timer task for the old client instance.
|
||||
clientInstance.cancelExpirationTimerTask();
|
||||
clientInstance = createClientInstanceAndUpdateCache(clientInstanceId, clientInstance.instanceMetadata());
|
||||
clientInstance = createClientInstanceAndUpdateCache(clientInstanceId, clientInstance.instanceMetadata(), requestContext.connectionId());
|
||||
}
|
||||
}
|
||||
|
||||
// Update the expiration timer task for the client instance.
|
||||
long expirationTimeMs = Math.max(cacheExpiryMs, clientInstance.pushIntervalMs() * 3);
|
||||
TimerTask timerTask = new ExpirationTimerTask(clientInstanceId, expirationTimeMs);
|
||||
TimerTask timerTask = new ExpirationTimerTask(clientInstanceId, requestContext.connectionId(), expirationTimeMs);
|
||||
clientInstance.updateExpirationTimerTask(timerTask);
|
||||
expirationTimer.add(timerTask);
|
||||
|
||||
|
@ -307,13 +316,14 @@ public class ClientMetricsManager implements AutoCloseable {
|
|||
}
|
||||
|
||||
private ClientMetricsInstance createClientInstanceAndUpdateCache(Uuid clientInstanceId,
|
||||
ClientMetricsInstanceMetadata instanceMetadata) {
|
||||
ClientMetricsInstanceMetadata instanceMetadata, String connectionId) {
|
||||
|
||||
ClientMetricsInstance clientInstance = createClientInstance(clientInstanceId, instanceMetadata);
|
||||
// Maybe add client metrics, if metrics not already added. Metrics might be already added
|
||||
// if the client instance was evicted from the cache because of size limit.
|
||||
clientMetricsStats.maybeAddClientInstanceMetrics(clientInstanceId);
|
||||
clientInstanceCache.put(clientInstanceId, clientInstance);
|
||||
clientConnectionIdMap.put(connectionId, clientInstanceId);
|
||||
return clientInstance;
|
||||
}
|
||||
|
||||
|
@ -459,6 +469,34 @@ public class ClientMetricsManager implements AutoCloseable {
|
|||
return expirationTimer;
|
||||
}
|
||||
|
||||
// Visible for testing
|
||||
Map<String, Uuid> clientConnectionIdMap() {
|
||||
return clientConnectionIdMap;
|
||||
}
|
||||
|
||||
private final class ClientConnectionDisconnectListener implements ConnectionDisconnectListener {
|
||||
|
||||
@Override
|
||||
public void onDisconnect(String connectionId) {
|
||||
log.trace("Removing client connection id [{}] from the client instance cache", connectionId);
|
||||
|
||||
Uuid clientInstanceId = clientConnectionIdMap.remove(connectionId);
|
||||
if (clientInstanceId == null) {
|
||||
log.trace("Client connection id [{}] is not found in the client instance cache", connectionId);
|
||||
return;
|
||||
}
|
||||
|
||||
// Unregister the client instance metrics from the broker metrics.
|
||||
clientMetricsStats.unregisterClientInstanceMetrics(clientInstanceId);
|
||||
|
||||
ClientMetricsInstance clientInstance = clientInstanceCache.get(clientInstanceId);
|
||||
if (clientInstance != null) {
|
||||
clientInstance.cancelExpirationTimerTask();
|
||||
clientInstanceCache.remove(clientInstanceId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class SubscriptionInfo {
|
||||
|
||||
private final String name;
|
||||
|
@ -496,16 +534,19 @@ public class ClientMetricsManager implements AutoCloseable {
|
|||
private static final long CACHE_ERROR_LOG_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes
|
||||
|
||||
private final Uuid clientInstanceId;
|
||||
private final String connectionId;
|
||||
|
||||
private ExpirationTimerTask(Uuid clientInstanceId, long delayMs) {
|
||||
private ExpirationTimerTask(Uuid clientInstanceId, String connectionId, long delayMs) {
|
||||
super(delayMs);
|
||||
this.clientInstanceId = clientInstanceId;
|
||||
this.connectionId = connectionId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
log.trace("Expiration timer task run for client instance id: {}, after delay ms: {}", clientInstanceId, delayMs);
|
||||
clientMetricsStats.unregisterClientInstanceMetrics(clientInstanceId);
|
||||
clientConnectionIdMap.remove(connectionId);
|
||||
if (!clientInstanceCache.remove(clientInstanceId)) {
|
||||
/*
|
||||
This can only happen if the client instance is removed from the cache by the LRU
|
||||
|
@ -517,7 +558,8 @@ public class ClientMetricsManager implements AutoCloseable {
|
|||
if (time.milliseconds() - lastErrorMs > CACHE_ERROR_LOG_INTERVAL_MS &&
|
||||
lastCacheErrorLogMs.compareAndSet(lastErrorMs, time.milliseconds())) {
|
||||
log.warn("Client metrics instance cache cannot find the client instance id: {}. The cache"
|
||||
+ " must be at capacity, size: {} ", clientInstanceId, clientInstanceCache.size());
|
||||
+ " must be at capacity, size: {}. Connection map size: {}",
|
||||
clientInstanceId, clientInstanceCache.size(), clientConnectionIdMap.size());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1071,6 +1071,9 @@ public class ClientMetricsManagerTest {
|
|||
assertEquals((double) 0, getMetric(ClientMetricsManager.ClientMetricsStats.PLUGIN_ERROR + "-count").metricValue());
|
||||
assertEquals(Double.NaN, getMetric(ClientMetricsManager.ClientMetricsStats.PLUGIN_EXPORT_TIME + "-avg").metricValue());
|
||||
assertEquals(Double.NaN, getMetric(ClientMetricsManager.ClientMetricsStats.PLUGIN_EXPORT_TIME + "-max").metricValue());
|
||||
// Validate client connection id map should contain 2 entries. 1 for GET request and 1 for PUSH request as we did
|
||||
// generate random connection id. The throttled request should not be added to the map.
|
||||
assertEquals(2, clientMetricsManager.clientConnectionIdMap().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1139,6 +1142,7 @@ public class ClientMetricsManagerTest {
|
|||
assertEquals((double) 1, getMetric(ClientMetricsManager.ClientMetricsStats.INSTANCE_COUNT).metricValue());
|
||||
|
||||
assertNotNull(clientMetricsManager.clientInstance(response.data().clientInstanceId()));
|
||||
assertEquals(1, clientMetricsManager.clientConnectionIdMap().size());
|
||||
assertEquals(1, clientMetricsManager.expirationTimer().size());
|
||||
// Cache expiry should occur after 100 * 3 = 300 ms, wait for the eviction to happen.
|
||||
// Force clocks to advance by 300 ms.
|
||||
|
@ -1155,6 +1159,8 @@ public class ClientMetricsManagerTest {
|
|||
// subscription count metrics and kafka metrics count registered i.e. 4 metrics.
|
||||
assertEquals(4, kafkaMetrics.metrics().size());
|
||||
assertEquals((double) 0, getMetric(ClientMetricsManager.ClientMetricsStats.INSTANCE_COUNT).metricValue());
|
||||
// Validate client connection id map should be empty.
|
||||
assertTrue(clientMetricsManager.clientConnectionIdMap().isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1181,6 +1187,7 @@ public class ClientMetricsManagerTest {
|
|||
|
||||
assertNotNull(clientMetricsManager.clientInstance(response1.data().clientInstanceId()));
|
||||
assertNotNull(clientMetricsManager.clientInstance(response2.data().clientInstanceId()));
|
||||
assertEquals(2, clientMetricsManager.clientConnectionIdMap().size());
|
||||
assertEquals(2, clientMetricsManager.expirationTimer().size());
|
||||
// Cache expiry should occur after 100 * 3 = 300 ms, wait for the eviction to happen.
|
||||
// Force clocks to advance by 300 ms.
|
||||
|
@ -1198,6 +1205,8 @@ public class ClientMetricsManagerTest {
|
|||
// subscription count metrics and kafka metrics count registered i.e. 4 metrics.
|
||||
assertEquals(4, kafkaMetrics.metrics().size());
|
||||
assertEquals((double) 0, getMetric(ClientMetricsManager.ClientMetricsStats.INSTANCE_COUNT).metricValue());
|
||||
// Validate client connection id map should be empty.
|
||||
assertTrue(clientMetricsManager.clientConnectionIdMap().isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1206,7 +1215,7 @@ public class ClientMetricsManagerTest {
|
|||
new GetTelemetrySubscriptionsRequestData(), true).build();
|
||||
|
||||
GetTelemetrySubscriptionsResponse response = clientMetricsManager.processGetTelemetrySubscriptionRequest(
|
||||
request, ClientMetricsTestUtils.requestContext());
|
||||
request, ClientMetricsTestUtils.requestContextWithConnectionId("conn-1"));
|
||||
assertEquals(Errors.NONE, response.error());
|
||||
Uuid clientInstanceId = response.data().clientInstanceId();
|
||||
int subscriptionId = response.data().subscriptionId();
|
||||
|
@ -1224,7 +1233,7 @@ public class ClientMetricsManagerTest {
|
|||
new GetTelemetrySubscriptionsRequestData().setClientInstanceId(clientInstanceId), true).build();
|
||||
|
||||
response = clientMetricsManager.processGetTelemetrySubscriptionRequest(
|
||||
request, ClientMetricsTestUtils.requestContext());
|
||||
request, ClientMetricsTestUtils.requestContextWithConnectionId("conn-1"));
|
||||
assertEquals(Errors.NONE, response.error());
|
||||
assertTrue(subscriptionId != response.data().subscriptionId());
|
||||
|
||||
|
@ -1239,6 +1248,60 @@ public class ClientMetricsManagerTest {
|
|||
// Metrics size should remain same on instance update.
|
||||
assertEquals(12, kafkaMetrics.metrics().size());
|
||||
assertEquals((double) 1, getMetric(ClientMetricsManager.ClientMetricsStats.INSTANCE_COUNT).metricValue());
|
||||
// Validate client connection id map contains new connection id.
|
||||
assertEquals(1, clientMetricsManager.clientConnectionIdMap().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveConnection() throws Exception {
|
||||
GetTelemetrySubscriptionsRequest request = new GetTelemetrySubscriptionsRequest.Builder(
|
||||
new GetTelemetrySubscriptionsRequestData(), true).build();
|
||||
|
||||
GetTelemetrySubscriptionsResponse response = clientMetricsManager.processGetTelemetrySubscriptionRequest(
|
||||
request, ClientMetricsTestUtils.requestContextWithConnectionId("conn-1"));
|
||||
assertEquals(Errors.NONE, response.error());
|
||||
Uuid clientInstanceId = response.data().clientInstanceId();
|
||||
|
||||
// Validate instance and metrics exists.
|
||||
ClientMetricsInstance instance = clientMetricsManager.clientInstance(clientInstanceId);
|
||||
assertNotNull(instance);
|
||||
assertEquals(1, clientMetricsManager.clientConnectionIdMap().size());
|
||||
assertEquals(clientInstanceId, clientMetricsManager.clientConnectionIdMap().get("conn-1"));
|
||||
assertEquals(12, kafkaMetrics.metrics().size());
|
||||
assertEquals((double) 1, getMetric(ClientMetricsManager.ClientMetricsStats.INSTANCE_COUNT).metricValue());
|
||||
|
||||
clientMetricsManager.connectionDisconnectListener().onDisconnect("conn-1");
|
||||
assertNull(clientMetricsManager.clientInstance(clientInstanceId));
|
||||
assertTrue(clientMetricsManager.clientConnectionIdMap().isEmpty());
|
||||
// Instance metrics should get removed.
|
||||
assertEquals(4, kafkaMetrics.metrics().size());
|
||||
assertEquals((double) 0, getMetric(ClientMetricsManager.ClientMetricsStats.INSTANCE_COUNT).metricValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveConnectionUnknownConnectionId() throws Exception {
|
||||
GetTelemetrySubscriptionsRequest request = new GetTelemetrySubscriptionsRequest.Builder(
|
||||
new GetTelemetrySubscriptionsRequestData(), true).build();
|
||||
|
||||
GetTelemetrySubscriptionsResponse response = clientMetricsManager.processGetTelemetrySubscriptionRequest(
|
||||
request, ClientMetricsTestUtils.requestContextWithConnectionId("conn-1"));
|
||||
assertEquals(Errors.NONE, response.error());
|
||||
Uuid clientInstanceId = response.data().clientInstanceId();
|
||||
|
||||
// Validate instance and metrics exists.
|
||||
ClientMetricsInstance instance = clientMetricsManager.clientInstance(clientInstanceId);
|
||||
assertNotNull(instance);
|
||||
assertEquals(1, clientMetricsManager.clientConnectionIdMap().size());
|
||||
assertEquals(clientInstanceId, clientMetricsManager.clientConnectionIdMap().get("conn-1"));
|
||||
assertEquals(12, kafkaMetrics.metrics().size());
|
||||
assertEquals((double) 1, getMetric(ClientMetricsManager.ClientMetricsStats.INSTANCE_COUNT).metricValue());
|
||||
|
||||
clientMetricsManager.connectionDisconnectListener().onDisconnect("conn-2");
|
||||
assertNotNull(clientMetricsManager.clientInstance(clientInstanceId));
|
||||
assertEquals(1, clientMetricsManager.clientConnectionIdMap().size());
|
||||
// Metrics size should remain same.
|
||||
assertEquals(12, kafkaMetrics.metrics().size());
|
||||
assertEquals((double) 1, getMetric(ClientMetricsManager.ClientMetricsStats.INSTANCE_COUNT).metricValue());
|
||||
}
|
||||
|
||||
private KafkaMetric getMetric(String name) throws Exception {
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol;
|
|||
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
|
||||
import org.apache.kafka.server.telemetry.ClientTelemetryPayload;
|
||||
import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
|
@ -59,7 +60,7 @@ public class ClientMetricsTestUtils {
|
|||
public static RequestContext requestContext() throws UnknownHostException {
|
||||
return new RequestContext(
|
||||
new RequestHeader(ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS, (short) 0, "producer-1", 0),
|
||||
"1",
|
||||
TestUtils.randomString(5),
|
||||
InetAddress.getLocalHost(),
|
||||
Optional.of(CLIENT_PORT),
|
||||
KafkaPrincipal.ANONYMOUS,
|
||||
|
@ -82,6 +83,19 @@ public class ClientMetricsTestUtils {
|
|||
false);
|
||||
}
|
||||
|
||||
public static RequestContext requestContextWithConnectionId(String connectionId) throws UnknownHostException {
|
||||
return new RequestContext(
|
||||
new RequestHeader(ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS, (short) 0, "producer-1", 0),
|
||||
connectionId,
|
||||
InetAddress.getLocalHost(),
|
||||
Optional.of(CLIENT_PORT),
|
||||
KafkaPrincipal.ANONYMOUS,
|
||||
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
|
||||
SecurityProtocol.PLAINTEXT,
|
||||
new ClientInformation("apache-kafka-java", "3.5.2"),
|
||||
false);
|
||||
}
|
||||
|
||||
public static class TestClientMetricsReceiver implements ClientTelemetryReceiver {
|
||||
public int exportMetricsInvokedCount = 0;
|
||||
public List<ByteBuffer> metricsData = new ArrayList<>();
|
||||
|
|
Loading…
Reference in New Issue