mirror of https://github.com/apache/kafka.git
Minor fix to correct the validate condition for GetTelemetryRequests. Added respective tests as well. Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
parent
7e07659b7a
commit
2572c7ff9d
|
@ -391,7 +391,7 @@ public class ClientMetricsManager implements AutoCloseable {
|
||||||
ClientMetricsInstance clientInstance, long timestamp) {
|
ClientMetricsInstance clientInstance, long timestamp) {
|
||||||
|
|
||||||
if (!clientInstance.maybeUpdateGetRequestTimestamp(timestamp) && (clientInstance.lastKnownError() != Errors.UNKNOWN_SUBSCRIPTION_ID
|
if (!clientInstance.maybeUpdateGetRequestTimestamp(timestamp) && (clientInstance.lastKnownError() != Errors.UNKNOWN_SUBSCRIPTION_ID
|
||||||
|| clientInstance.lastKnownError() != Errors.UNSUPPORTED_COMPRESSION_TYPE)) {
|
&& clientInstance.lastKnownError() != Errors.UNSUPPORTED_COMPRESSION_TYPE)) {
|
||||||
clientMetricsStats.recordThrottleCount(clientInstance.clientInstanceId());
|
clientMetricsStats.recordThrottleCount(clientInstance.clientInstanceId());
|
||||||
String msg = String.format("Request from the client [%s] arrived before the next push interval time",
|
String msg = String.format("Request from the client [%s] arrived before the next push interval time",
|
||||||
request.data().clientInstanceId());
|
request.data().clientInstanceId());
|
||||||
|
|
|
@ -826,7 +826,6 @@ public class ClientMetricsManagerTest {
|
||||||
assertEquals((double) 0, getMetric(ClientMetricsManager.ClientMetricsStats.PLUGIN_ERROR + "-count").metricValue());
|
assertEquals((double) 0, getMetric(ClientMetricsManager.ClientMetricsStats.PLUGIN_ERROR + "-count").metricValue());
|
||||||
assertEquals(Double.NaN, getMetric(ClientMetricsManager.ClientMetricsStats.PLUGIN_EXPORT_TIME + "-avg").metricValue());
|
assertEquals(Double.NaN, getMetric(ClientMetricsManager.ClientMetricsStats.PLUGIN_EXPORT_TIME + "-avg").metricValue());
|
||||||
assertEquals(Double.NaN, getMetric(ClientMetricsManager.ClientMetricsStats.PLUGIN_EXPORT_TIME + "-max").metricValue());
|
assertEquals(Double.NaN, getMetric(ClientMetricsManager.ClientMetricsStats.PLUGIN_EXPORT_TIME + "-max").metricValue());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -1124,6 +1123,115 @@ public class ClientMetricsManagerTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetTelemetrySubscriptionAfterPushTelemetryUnknownSubscriptionSucceeds() throws Exception {
|
||||||
|
clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultTestProperties());
|
||||||
|
assertEquals(1, clientMetricsManager.subscriptions().size());
|
||||||
|
|
||||||
|
GetTelemetrySubscriptionsRequest subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder(
|
||||||
|
new GetTelemetrySubscriptionsRequestData(), true).build();
|
||||||
|
|
||||||
|
GetTelemetrySubscriptionsResponse subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest(
|
||||||
|
subscriptionsRequest, ClientMetricsTestUtils.requestContext());
|
||||||
|
|
||||||
|
Properties properties = new Properties();
|
||||||
|
properties.put("interval.ms", "100");
|
||||||
|
clientMetricsManager.updateSubscription("sub-2", properties);
|
||||||
|
assertEquals(2, clientMetricsManager.subscriptions().size());
|
||||||
|
|
||||||
|
PushTelemetryRequest request = new Builder(
|
||||||
|
new PushTelemetryRequestData()
|
||||||
|
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
|
||||||
|
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
|
||||||
|
.setCompressionType(CompressionType.NONE.id)
|
||||||
|
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))), true).build();
|
||||||
|
|
||||||
|
PushTelemetryResponse response = clientMetricsManager.processPushTelemetryRequest(
|
||||||
|
request, ClientMetricsTestUtils.requestContext());
|
||||||
|
|
||||||
|
assertEquals(Errors.UNKNOWN_SUBSCRIPTION_ID, response.error());
|
||||||
|
ClientMetricsInstance instance = clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId());
|
||||||
|
assertNotNull(instance);
|
||||||
|
assertEquals(Errors.UNKNOWN_SUBSCRIPTION_ID, instance.lastKnownError());
|
||||||
|
|
||||||
|
subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder(
|
||||||
|
new GetTelemetrySubscriptionsRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()), true).build();
|
||||||
|
subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest(
|
||||||
|
subscriptionsRequest, ClientMetricsTestUtils.requestContext());
|
||||||
|
assertEquals(Errors.NONE, subscriptionsResponse.error());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetTelemetrySubscriptionAfterPushTelemetryUnknownCompressionSucceeds() throws Exception {
|
||||||
|
clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultTestProperties());
|
||||||
|
assertEquals(1, clientMetricsManager.subscriptions().size());
|
||||||
|
|
||||||
|
GetTelemetrySubscriptionsRequest subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder(
|
||||||
|
new GetTelemetrySubscriptionsRequestData(), true).build();
|
||||||
|
|
||||||
|
GetTelemetrySubscriptionsResponse subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest(
|
||||||
|
subscriptionsRequest, ClientMetricsTestUtils.requestContext());
|
||||||
|
|
||||||
|
PushTelemetryRequest request = new Builder(
|
||||||
|
new PushTelemetryRequestData()
|
||||||
|
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
|
||||||
|
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
|
||||||
|
.setCompressionType((byte) 10) // // Invalid compression type
|
||||||
|
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))), true).build();
|
||||||
|
|
||||||
|
PushTelemetryResponse response = clientMetricsManager.processPushTelemetryRequest(
|
||||||
|
request, ClientMetricsTestUtils.requestContext());
|
||||||
|
|
||||||
|
assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, response.error());
|
||||||
|
ClientMetricsInstance instance = clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId());
|
||||||
|
assertNotNull(instance);
|
||||||
|
assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, instance.lastKnownError());
|
||||||
|
|
||||||
|
subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder(
|
||||||
|
new GetTelemetrySubscriptionsRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()), true).build();
|
||||||
|
subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest(
|
||||||
|
subscriptionsRequest, ClientMetricsTestUtils.requestContext());
|
||||||
|
assertEquals(Errors.NONE, subscriptionsResponse.error());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetTelemetrySubscriptionAfterPushTelemetryBytesExceptionFails() throws Exception {
|
||||||
|
try (
|
||||||
|
Metrics kafkaMetrics = new Metrics();
|
||||||
|
ClientMetricsManager clientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, 1, time, kafkaMetrics)
|
||||||
|
) {
|
||||||
|
GetTelemetrySubscriptionsRequest subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder(
|
||||||
|
new GetTelemetrySubscriptionsRequestData(), true).build();
|
||||||
|
|
||||||
|
GetTelemetrySubscriptionsResponse subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest(
|
||||||
|
subscriptionsRequest, ClientMetricsTestUtils.requestContext());
|
||||||
|
|
||||||
|
byte[] metrics = "ab".getBytes(StandardCharsets.UTF_8);
|
||||||
|
assertEquals(2, metrics.length);
|
||||||
|
|
||||||
|
PushTelemetryRequest request = new PushTelemetryRequest.Builder(
|
||||||
|
new PushTelemetryRequestData()
|
||||||
|
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
|
||||||
|
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
|
||||||
|
.setMetrics(ByteBuffer.wrap(metrics)), true).build();
|
||||||
|
|
||||||
|
// Set the max bytes 1 to force the error.
|
||||||
|
PushTelemetryResponse response = clientMetricsManager.processPushTelemetryRequest(
|
||||||
|
request, ClientMetricsTestUtils.requestContext());
|
||||||
|
|
||||||
|
assertEquals(Errors.TELEMETRY_TOO_LARGE, response.error());
|
||||||
|
ClientMetricsInstance instance = clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId());
|
||||||
|
assertNotNull(instance);
|
||||||
|
assertEquals(Errors.TELEMETRY_TOO_LARGE, instance.lastKnownError());
|
||||||
|
|
||||||
|
subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder(
|
||||||
|
new GetTelemetrySubscriptionsRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()), true).build();
|
||||||
|
subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest(
|
||||||
|
subscriptionsRequest, ClientMetricsTestUtils.requestContext());
|
||||||
|
assertEquals(Errors.THROTTLING_QUOTA_EXCEEDED, subscriptionsResponse.error());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCacheEviction() throws Exception {
|
public void testCacheEviction() throws Exception {
|
||||||
Properties properties = new Properties();
|
Properties properties = new Properties();
|
||||||
|
|
Loading…
Reference in New Issue