mirror of https://github.com/apache/kafka.git
MINOR: Fixing client telemetry validate request (#19959)
CI / build (push) Has been cancelled
Details
CI / build (push) Has been cancelled
Details
Minor fix to correct the validate condition for GetTelemetryRequests. Added respective tests as well. Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
parent
00a1b1e8ce
commit
254c1fa519
|
@ -392,7 +392,7 @@ public class ClientMetricsManager implements AutoCloseable {
|
|||
ClientMetricsInstance clientInstance, long timestamp) {
|
||||
|
||||
if (!clientInstance.maybeUpdateGetRequestTimestamp(timestamp) && (clientInstance.lastKnownError() != Errors.UNKNOWN_SUBSCRIPTION_ID
|
||||
|| clientInstance.lastKnownError() != Errors.UNSUPPORTED_COMPRESSION_TYPE)) {
|
||||
&& clientInstance.lastKnownError() != Errors.UNSUPPORTED_COMPRESSION_TYPE)) {
|
||||
clientMetricsStats.recordThrottleCount(clientInstance.clientInstanceId());
|
||||
String msg = String.format("Request from the client [%s] arrived before the next push interval time",
|
||||
request.data().clientInstanceId());
|
||||
|
|
|
@ -826,7 +826,6 @@ public class ClientMetricsManagerTest {
|
|||
assertEquals((double) 0, getMetric(ClientMetricsManager.ClientMetricsStats.PLUGIN_ERROR + "-count").metricValue());
|
||||
assertEquals(Double.NaN, getMetric(ClientMetricsManager.ClientMetricsStats.PLUGIN_EXPORT_TIME + "-avg").metricValue());
|
||||
assertEquals(Double.NaN, getMetric(ClientMetricsManager.ClientMetricsStats.PLUGIN_EXPORT_TIME + "-max").metricValue());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1124,6 +1123,115 @@ public class ClientMetricsManagerTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetTelemetrySubscriptionAfterPushTelemetryUnknownSubscriptionSucceeds() throws Exception {
|
||||
clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultTestProperties());
|
||||
assertEquals(1, clientMetricsManager.subscriptions().size());
|
||||
|
||||
GetTelemetrySubscriptionsRequest subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder(
|
||||
new GetTelemetrySubscriptionsRequestData(), true).build();
|
||||
|
||||
GetTelemetrySubscriptionsResponse subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest(
|
||||
subscriptionsRequest, ClientMetricsTestUtils.requestContext());
|
||||
|
||||
Properties properties = new Properties();
|
||||
properties.put("interval.ms", "100");
|
||||
clientMetricsManager.updateSubscription("sub-2", properties);
|
||||
assertEquals(2, clientMetricsManager.subscriptions().size());
|
||||
|
||||
PushTelemetryRequest request = new Builder(
|
||||
new PushTelemetryRequestData()
|
||||
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
|
||||
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
|
||||
.setCompressionType(CompressionType.NONE.id)
|
||||
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))), true).build();
|
||||
|
||||
PushTelemetryResponse response = clientMetricsManager.processPushTelemetryRequest(
|
||||
request, ClientMetricsTestUtils.requestContext());
|
||||
|
||||
assertEquals(Errors.UNKNOWN_SUBSCRIPTION_ID, response.error());
|
||||
ClientMetricsInstance instance = clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId());
|
||||
assertNotNull(instance);
|
||||
assertEquals(Errors.UNKNOWN_SUBSCRIPTION_ID, instance.lastKnownError());
|
||||
|
||||
subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder(
|
||||
new GetTelemetrySubscriptionsRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()), true).build();
|
||||
subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest(
|
||||
subscriptionsRequest, ClientMetricsTestUtils.requestContext());
|
||||
assertEquals(Errors.NONE, subscriptionsResponse.error());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetTelemetrySubscriptionAfterPushTelemetryUnknownCompressionSucceeds() throws Exception {
|
||||
clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultTestProperties());
|
||||
assertEquals(1, clientMetricsManager.subscriptions().size());
|
||||
|
||||
GetTelemetrySubscriptionsRequest subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder(
|
||||
new GetTelemetrySubscriptionsRequestData(), true).build();
|
||||
|
||||
GetTelemetrySubscriptionsResponse subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest(
|
||||
subscriptionsRequest, ClientMetricsTestUtils.requestContext());
|
||||
|
||||
PushTelemetryRequest request = new Builder(
|
||||
new PushTelemetryRequestData()
|
||||
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
|
||||
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
|
||||
.setCompressionType((byte) 10) // // Invalid compression type
|
||||
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))), true).build();
|
||||
|
||||
PushTelemetryResponse response = clientMetricsManager.processPushTelemetryRequest(
|
||||
request, ClientMetricsTestUtils.requestContext());
|
||||
|
||||
assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, response.error());
|
||||
ClientMetricsInstance instance = clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId());
|
||||
assertNotNull(instance);
|
||||
assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, instance.lastKnownError());
|
||||
|
||||
subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder(
|
||||
new GetTelemetrySubscriptionsRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()), true).build();
|
||||
subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest(
|
||||
subscriptionsRequest, ClientMetricsTestUtils.requestContext());
|
||||
assertEquals(Errors.NONE, subscriptionsResponse.error());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetTelemetrySubscriptionAfterPushTelemetryBytesExceptionFails() throws Exception {
|
||||
try (
|
||||
Metrics kafkaMetrics = new Metrics();
|
||||
ClientMetricsManager clientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, 1, time, kafkaMetrics)
|
||||
) {
|
||||
GetTelemetrySubscriptionsRequest subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder(
|
||||
new GetTelemetrySubscriptionsRequestData(), true).build();
|
||||
|
||||
GetTelemetrySubscriptionsResponse subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest(
|
||||
subscriptionsRequest, ClientMetricsTestUtils.requestContext());
|
||||
|
||||
byte[] metrics = "ab".getBytes(StandardCharsets.UTF_8);
|
||||
assertEquals(2, metrics.length);
|
||||
|
||||
PushTelemetryRequest request = new PushTelemetryRequest.Builder(
|
||||
new PushTelemetryRequestData()
|
||||
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
|
||||
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
|
||||
.setMetrics(ByteBuffer.wrap(metrics)), true).build();
|
||||
|
||||
// Set the max bytes 1 to force the error.
|
||||
PushTelemetryResponse response = clientMetricsManager.processPushTelemetryRequest(
|
||||
request, ClientMetricsTestUtils.requestContext());
|
||||
|
||||
assertEquals(Errors.TELEMETRY_TOO_LARGE, response.error());
|
||||
ClientMetricsInstance instance = clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId());
|
||||
assertNotNull(instance);
|
||||
assertEquals(Errors.TELEMETRY_TOO_LARGE, instance.lastKnownError());
|
||||
|
||||
subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder(
|
||||
new GetTelemetrySubscriptionsRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()), true).build();
|
||||
subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest(
|
||||
subscriptionsRequest, ClientMetricsTestUtils.requestContext());
|
||||
assertEquals(Errors.THROTTLING_QUOTA_EXCEEDED, subscriptionsResponse.error());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCacheEviction() throws Exception {
|
||||
Properties properties = new Properties();
|
||||
|
|
Loading…
Reference in New Issue