KAFKA-15807: Added support for compression of metrics (KIP-714) (#15148)

Part of KIP-714.

Adds support for compression/decompression of metrics.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Philip Nee <pnee@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Apoorv Mittal 2024-01-18 03:19:57 +05:30 committed by GitHub
parent 2dc3fff14a
commit 2df8c1ca3d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 226 additions and 17 deletions

View File

@ -180,6 +180,10 @@
<allow pkg="org.apache.kafka.clients.admin" /> <allow pkg="org.apache.kafka.clients.admin" />
<!-- for testing --> <!-- for testing -->
<allow pkg="org.apache.kafka.common.errors" /> <allow pkg="org.apache.kafka.common.errors" />
<!-- for testing -->
<allow pkg="io.opentelemetry.proto"/>
<!-- for testing -->
<allow pkg="org.apache.kafka.common.telemetry" />
</subpackage> </subpackage>
<subpackage name="serialization"> <subpackage name="serialization">

View File

@ -22,6 +22,7 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -87,12 +88,7 @@ public class PushTelemetryRequest extends AbstractRequest {
public ByteBuffer metricsData() { public ByteBuffer metricsData() {
CompressionType cType = CompressionType.forId(this.data.compressionType()); CompressionType cType = CompressionType.forId(this.data.compressionType());
return (cType == CompressionType.NONE) ? return (cType == CompressionType.NONE) ?
ByteBuffer.wrap(this.data.metrics()) : decompressMetricsData(cType, this.data.metrics()); ByteBuffer.wrap(this.data.metrics()) : ClientTelemetryUtils.decompress(this.data.metrics(), cType);
}
private static ByteBuffer decompressMetricsData(CompressionType compressionType, byte[] metrics) {
// TODO: Add support for decompression of metrics data
return ByteBuffer.wrap(metrics);
} }
public static PushTelemetryRequest parse(ByteBuffer buffer, short version) { public static PushTelemetryRequest parse(ByteBuffer buffer, short version) {

View File

@ -42,11 +42,10 @@ import org.apache.kafka.common.requests.PushTelemetryRequest;
import org.apache.kafka.common.requests.PushTelemetryResponse; import org.apache.kafka.common.requests.PushTelemetryResponse;
import org.apache.kafka.common.telemetry.ClientTelemetryState; import org.apache.kafka.common.telemetry.ClientTelemetryState;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer; import java.io.IOException;
import java.time.Duration; import java.time.Duration;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -702,6 +701,10 @@ public class ClientTelemetryReporter implements MetricsReporter {
lock.writeLock().unlock(); lock.writeLock().unlock();
} }
return createPushRequest(localSubscription, terminating);
}
private Optional<Builder<?>> createPushRequest(ClientTelemetrySubscription localSubscription, boolean terminating) {
byte[] payload; byte[] payload;
try (MetricsEmitter emitter = new ClientTelemetryEmitter(localSubscription.selector(), localSubscription.deltaTemporality())) { try (MetricsEmitter emitter = new ClientTelemetryEmitter(localSubscription.selector(), localSubscription.deltaTemporality())) {
emitter.init(); emitter.init();
@ -715,7 +718,14 @@ public class ClientTelemetryReporter implements MetricsReporter {
} }
CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes()); CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes());
ByteBuffer buffer = ClientTelemetryUtils.compress(payload, compressionType); byte[] compressedPayload;
try {
compressedPayload = ClientTelemetryUtils.compress(payload, compressionType);
} catch (IOException e) {
log.info("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType);
compressedPayload = payload;
compressionType = CompressionType.NONE;
}
AbstractRequest.Builder<?> requestBuilder = new PushTelemetryRequest.Builder( AbstractRequest.Builder<?> requestBuilder = new PushTelemetryRequest.Builder(
new PushTelemetryRequestData() new PushTelemetryRequestData()
@ -723,7 +733,7 @@ public class ClientTelemetryReporter implements MetricsReporter {
.setSubscriptionId(localSubscription.subscriptionId()) .setSubscriptionId(localSubscription.subscriptionId())
.setTerminating(terminating) .setTerminating(terminating)
.setCompressionType(compressionType.id) .setCompressionType(compressionType.id)
.setMetrics(Utils.readBytes(buffer)), true); .setMetrics(compressedPayload), true);
return Optional.of(requestBuilder); return Optional.of(requestBuilder);
} }

View File

@ -23,10 +23,17 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metrics.MetricsContext; import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
@ -175,16 +182,40 @@ public class ClientTelemetryUtils {
} }
public static CompressionType preferredCompressionType(List<CompressionType> acceptedCompressionTypes) { public static CompressionType preferredCompressionType(List<CompressionType> acceptedCompressionTypes) {
// TODO: Support compression in client telemetry. if (acceptedCompressionTypes != null && !acceptedCompressionTypes.isEmpty()) {
// Broker is providing the compression types in order of preference. Grab the
// first one.
return acceptedCompressionTypes.get(0);
}
return CompressionType.NONE; return CompressionType.NONE;
} }
public static ByteBuffer compress(byte[] raw, CompressionType compressionType) { public static byte[] compress(byte[] raw, CompressionType compressionType) throws IOException {
// TODO: Support compression in client telemetry. try (ByteBufferOutputStream compressedOut = new ByteBufferOutputStream(512)) {
if (compressionType == CompressionType.NONE) { try (OutputStream out = compressionType.wrapForOutput(compressedOut, RecordBatch.CURRENT_MAGIC_VALUE)) {
return ByteBuffer.wrap(raw); out.write(raw);
} else { out.flush();
throw new UnsupportedOperationException("Compression is not supported"); }
compressedOut.buffer().flip();
return Utils.toArray(compressedOut.buffer());
}
}
public static ByteBuffer decompress(byte[] metrics, CompressionType compressionType) {
ByteBuffer data = ByteBuffer.wrap(metrics);
try (InputStream in = compressionType.wrapForInput(data, RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
ByteArrayOutputStream out = new ByteArrayOutputStream()) {
byte[] bytes = new byte[data.capacity() * 2];
int nRead;
while ((nRead = in.read(bytes, 0, bytes.length)) != -1) {
out.write(bytes, 0, nRead);
}
out.flush();
return ByteBuffer.wrap(out.toByteArray());
} catch (IOException e) {
throw new KafkaException("Failed to decompress metrics data", e);
} }
} }

View File

@ -17,13 +17,33 @@
package org.apache.kafka.common.requests; package org.apache.kafka.common.requests;
import io.opentelemetry.proto.metrics.v1.Metric;
import io.opentelemetry.proto.metrics.v1.MetricsData;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
import io.opentelemetry.proto.resource.v1.Resource;
import org.apache.kafka.common.message.PushTelemetryRequestData; import org.apache.kafka.common.message.PushTelemetryRequestData;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
import org.apache.kafka.common.telemetry.internals.MetricKey;
import org.apache.kafka.common.telemetry.internals.SinglePointMetric;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class PushTelemetryRequestTest { public class PushTelemetryRequestTest {
@ -34,4 +54,65 @@ public class PushTelemetryRequestTest {
assertEquals(Collections.singletonMap(Errors.CLUSTER_AUTHORIZATION_FAILED, 1), response.errorCounts()); assertEquals(Collections.singletonMap(Errors.CLUSTER_AUTHORIZATION_FAILED, 1), response.errorCounts());
} }
@ParameterizedTest
@EnumSource(CompressionType.class)
public void testMetricsDataCompression(CompressionType compressionType) throws IOException {
MetricsData metricsData = getMetricsData();
PushTelemetryRequest req = getPushTelemetryRequest(metricsData, compressionType);
ByteBuffer receivedMetricsBuffer = req.metricsData();
assertNotNull(receivedMetricsBuffer);
assertTrue(receivedMetricsBuffer.capacity() > 0);
MetricsData receivedData = ClientTelemetryUtils.deserializeMetricsData(receivedMetricsBuffer);
assertEquals(metricsData, receivedData);
}
private PushTelemetryRequest getPushTelemetryRequest(MetricsData metricsData, CompressionType compressionType) throws IOException {
byte[] data = metricsData.toByteArray();
byte[] compressedData = ClientTelemetryUtils.compress(data, compressionType);
if (compressionType != CompressionType.NONE) {
assertTrue(compressedData.length < data.length);
} else {
assertArrayEquals(compressedData, data);
}
return new PushTelemetryRequest.Builder(
new PushTelemetryRequestData()
.setMetrics(compressedData)
.setCompressionType(compressionType.id)).build();
}
private MetricsData getMetricsData() {
List<Metric> metricsList = new ArrayList<>();
metricsList.add(SinglePointMetric.sum(
new MetricKey("metricName"), 1.0, true, Instant.now(), null, Collections.emptySet())
.builder().build());
metricsList.add(SinglePointMetric.sum(
new MetricKey("metricName1"), 100.0, false, Instant.now(), Instant.now(), Collections.emptySet())
.builder().build());
metricsList.add(SinglePointMetric.deltaSum(
new MetricKey("metricName2"), 1.0, true, Instant.now(), Instant.now(), Collections.emptySet())
.builder().build());
metricsList.add(SinglePointMetric.gauge(
new MetricKey("metricName3"), 1.0, Instant.now(), Collections.emptySet())
.builder().build());
metricsList.add(SinglePointMetric.gauge(
new MetricKey("metricName4"), Long.valueOf(100), Instant.now(), Collections.emptySet())
.builder().build());
MetricsData.Builder builder = MetricsData.newBuilder();
for (Metric metric : metricsList) {
ResourceMetrics rm = ResourceMetrics.newBuilder()
.setResource(Resource.newBuilder().build())
.addScopeMetrics(ScopeMetrics.newBuilder()
.addMetrics(metric)
.build()
).build();
builder.addResourceMetrics(rm);
}
return builder.build();
}
} }

View File

@ -40,8 +40,13 @@ import org.apache.kafka.common.utils.MockTime;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.MockedStatic;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.internal.stubbing.answers.CallsRealMethods;
import java.io.IOException;
import java.time.Duration; import java.time.Duration;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -58,6 +63,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
public class ClientTelemetryReporterTest { public class ClientTelemetryReporterTest {
@ -350,6 +356,62 @@ public class ClientTelemetryReporterTest {
assertEquals(now + 1000, telemetrySender.lastRequestMs()); assertEquals(now + 1000, telemetrySender.lastRequestMs());
} }
@ParameterizedTest
@EnumSource(CompressionType.class)
public void testCreateRequestPushCompression(CompressionType compressionType) {
clientTelemetryReporter.configure(configs);
clientTelemetryReporter.contextChange(metricsContext);
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
ClientTelemetryReporter.ClientTelemetrySubscription subscription = new ClientTelemetryReporter.ClientTelemetrySubscription(
uuid, 1234, 20000, Collections.singletonList(compressionType), true, null);
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
Optional<AbstractRequest.Builder<?>> requestOptional = telemetrySender.createRequest();
assertNotNull(requestOptional);
assertTrue(requestOptional.isPresent());
assertTrue(requestOptional.get().build() instanceof PushTelemetryRequest);
PushTelemetryRequest request = (PushTelemetryRequest) requestOptional.get().build();
assertEquals(subscription.clientInstanceId(), request.data().clientInstanceId());
assertEquals(subscription.subscriptionId(), request.data().subscriptionId());
assertEquals(compressionType.id, request.data().compressionType());
assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS, telemetrySender.state());
}
@Test
public void testCreateRequestPushCompressionException() {
clientTelemetryReporter.configure(configs);
clientTelemetryReporter.contextChange(metricsContext);
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
ClientTelemetryReporter.ClientTelemetrySubscription subscription = new ClientTelemetryReporter.ClientTelemetrySubscription(
uuid, 1234, 20000, Collections.singletonList(CompressionType.GZIP), true, null);
telemetrySender.updateSubscriptionResult(subscription, time.milliseconds());
try (MockedStatic<ClientTelemetryUtils> mockedCompress = Mockito.mockStatic(ClientTelemetryUtils.class, new CallsRealMethods())) {
mockedCompress.when(() -> ClientTelemetryUtils.compress(any(), any())).thenThrow(new IOException());
Optional<AbstractRequest.Builder<?>> requestOptional = telemetrySender.createRequest();
assertNotNull(requestOptional);
assertTrue(requestOptional.isPresent());
assertTrue(requestOptional.get().build() instanceof PushTelemetryRequest);
PushTelemetryRequest request = (PushTelemetryRequest) requestOptional.get().build();
assertEquals(subscription.clientInstanceId(), request.data().clientInstanceId());
assertEquals(subscription.subscriptionId(), request.data().subscriptionId());
// CompressionType.NONE is used when compression fails.
assertEquals(CompressionType.NONE.id, request.data().compressionType());
assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS, telemetrySender.state());
}
}
@Test @Test
public void testHandleResponseGetSubscriptions() { public void testHandleResponseGetSubscriptions() {
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender(); ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = (ClientTelemetryReporter.DefaultClientTelemetrySender) clientTelemetryReporter.telemetrySender();

View File

@ -21,8 +21,12 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.CompressionType;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource; import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -30,9 +34,11 @@ import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.function.Predicate; import java.util.function.Predicate;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@ -111,5 +117,24 @@ public class ClientTelemetryUtilsTest {
public void testPreferredCompressionType() { public void testPreferredCompressionType() {
assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(Collections.emptyList())); assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(Collections.emptyList()));
assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(null)); assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(null));
assertEquals(CompressionType.NONE, ClientTelemetryUtils.preferredCompressionType(Arrays.asList(CompressionType.NONE, CompressionType.GZIP)));
assertEquals(CompressionType.GZIP, ClientTelemetryUtils.preferredCompressionType(Arrays.asList(CompressionType.GZIP, CompressionType.NONE)));
}
@ParameterizedTest
@EnumSource(CompressionType.class)
public void testCompressDecompress(CompressionType compressionType) throws IOException {
byte[] testString = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".getBytes(StandardCharsets.UTF_8);
byte[] compressed = ClientTelemetryUtils.compress(testString, compressionType);
assertNotNull(compressed);
if (compressionType != CompressionType.NONE) {
assertTrue(compressed.length < testString.length);
} else {
assertArrayEquals(testString, compressed);
}
ByteBuffer decompressed = ClientTelemetryUtils.decompress(compressed, compressionType);
assertNotNull(decompressed);
assertArrayEquals(testString, decompressed.array());
} }
} }