KAFKA-17248: Override admin client to push metrics true, test for case where streams metrics [4/N] (#17422)

This PR disables metrics push for the AdminClient as the default. KafkaStreams enables metrics push for its internal AdminClient.

Tests are included that assert an error if a user disables either the main consumer or admin client metrics push but Kafka Streams metrics push config is enabled.

Reviewers: Matthias Sax <mjsax@apache.org>
This commit is contained in:
Bill Bejeck 2024-10-16 15:01:50 -04:00 committed by GitHub
parent 0d6d3effd0
commit 493e4c1cc9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 170 additions and 19 deletions

View File

@ -188,7 +188,7 @@ public class AdminClientConfig extends AbstractConfig {
RETRY_BACKOFF_MAX_MS_DOC)
.define(ENABLE_METRICS_PUSH_CONFIG,
Type.BOOLEAN,
true,
false,
Importance.LOW,
ENABLE_METRICS_PUSH_DOC)
.define(REQUEST_TIMEOUT_MS_CONFIG,

View File

@ -8073,7 +8073,7 @@ public class KafkaAdminClientTest {
@Test
public void testClientInstanceId() {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
try (AdminClientUnitTestEnv env = mockClientEnv(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, "true")) {
Uuid expected = Uuid.randomUuid();
GetTelemetrySubscriptionsResponseData responseData =

View File

@ -80,6 +80,7 @@ public class ClientTelemetryTest {
public void testClientInstanceId(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
configs.put(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, true);
try (Admin admin = Admin.create(configs)) {
String testTopicName = "test_topic";
admin.createTopics(Collections.singletonList(new NewTopic(testTopicName, 1, (short) 1)));

View File

@ -1227,6 +1227,13 @@ public class StreamsConfig extends AbstractConfig {
CONSUMER_EOS_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides);
}
private static final Map<String, Object> ADMIN_CLIENT_OVERRIDES;
static {
final Map<String, Object> tempAdminClientDefaultOverrides = new HashMap<>();
tempAdminClientDefaultOverrides.put(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, "true");
ADMIN_CLIENT_OVERRIDES = Collections.unmodifiableMap(tempAdminClientDefaultOverrides);
}
public static class InternalConfig {
// This is settable in the main Streams config, but it's a private API for now
public static final String INTERNAL_TASK_ASSIGNOR_CLASS = "internal.task.assignor.class";
@ -1434,6 +1441,7 @@ public class StreamsConfig extends AbstractConfig {
verifyEOSTransactionTimeoutCompatibility();
}
verifyTopologyOptimizationConfigs(getString(TOPOLOGY_OPTIMIZATION_CONFIG));
verifyClientTelemetryConfigs();
}
private void verifyEOSTransactionTimeoutCompatibility() {
@ -1459,6 +1467,52 @@ public class StreamsConfig extends AbstractConfig {
}
}
private void verifyClientTelemetryConfigs() {
final boolean streamTelemetryEnabled = getBoolean(ENABLE_METRICS_PUSH_CONFIG);
final Boolean mainConsumerMetricsConfig = maybeMetricsPushEnabled(MAIN_CONSUMER_PREFIX);
final Boolean consumerMetricsConfig = maybeMetricsPushEnabled(CONSUMER_PREFIX);
final Boolean adminMetricsConfig = maybeMetricsPushEnabled(ADMIN_CLIENT_PREFIX);
if (streamTelemetryEnabled) {
checkConsumerAndMainConsumerAndAdminMetricsConfig(adminMetricsConfig, consumerMetricsConfig, mainConsumerMetricsConfig);
checkMainConsumerAndAdminMetricsConfig(adminMetricsConfig, mainConsumerMetricsConfig, "enabled");
}
}
private void checkConsumerAndMainConsumerAndAdminMetricsConfig(final Boolean adminMetricsConfig,
final Boolean consumerMetricsConfig,
final Boolean mainConsumerMetricsConfig) {
if (consumerMetricsConfig != null) {
if (!consumerMetricsConfig
&& mainConsumerMetricsConfig == null
&& adminMetricsConfig == null) {
throw new ConfigException("Kafka Streams metrics push enabled but consumer.enable.metrics is false, the setting needs to be consistent between the two");
} else if (consumerMetricsConfig) {
checkMainConsumerAndAdminMetricsConfig(adminMetricsConfig, mainConsumerMetricsConfig, "and consumer.enable.metrics are enabled,");
}
}
}
private void checkMainConsumerAndAdminMetricsConfig(final Boolean adminMetricsConfig, final Boolean mainConsumerMetricsConfig, final String message) {
if (mainConsumerMetricsConfig != null && !mainConsumerMetricsConfig
&& adminMetricsConfig != null && !adminMetricsConfig) {
throw new ConfigException("Kafka Streams metrics push " + message + " but main.consumer and admin.client metrics push are disabled, the setting needs to be consistent between the two");
} else if (mainConsumerMetricsConfig != null && !mainConsumerMetricsConfig) {
throw new ConfigException("Kafka Streams metrics push " + message + " but main.consumer metrics push is disabled, the setting needs to be consistent between the two");
} else if (adminMetricsConfig != null && !adminMetricsConfig) {
throw new ConfigException("Kafka Streams metrics push " + message + " but admin.client metrics push is disabled, the setting needs to be consistent between the two");
}
}
private Boolean maybeMetricsPushEnabled(final String prefix) {
Boolean configSetValue = null;
if (originalsWithPrefix(prefix).containsKey(ENABLE_METRICS_PUSH_CONFIG)) {
configSetValue = (Boolean) originalsWithPrefix(prefix).get(ENABLE_METRICS_PUSH_CONFIG);
}
return configSetValue;
}
@Override
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
final Map<String, Object> configUpdates =
@ -1797,7 +1851,7 @@ public class StreamsConfig extends AbstractConfig {
public Map<String, Object> getAdminConfigs(final String clientId) {
final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames());
final Map<String, Object> props = new HashMap<>();
final Map<String, Object> props = new HashMap<>(ADMIN_CLIENT_OVERRIDES);
props.putAll(getClientCustomProps());
props.putAll(clientProvidedProps);

View File

@ -65,6 +65,7 @@ import static org.apache.kafka.common.IsolationLevel.READ_UNCOMMITTED;
import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.ENABLE_METRICS_PUSH_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_V2;
import static org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH;
import static org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH;
@ -379,6 +380,12 @@ public class StreamsConfigTest {
assertEquals("host3", adminConfigs.get("custom.property.host"));
}
@Test
public void shouldOverrideAdminDefaultAdminClientEnableTelemetry() {
final Map<String, Object> returnedProps = streamsConfig.getAdminConfigs(clientId);
assertThat(returnedProps.get(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG), equalTo("true"));
}
@Test
public void shouldSupportNonPrefixedAdminConfigs() {
props.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 10);
@ -1242,15 +1249,11 @@ public class StreamsConfigTest {
streamsConfig.getProducerConfigs("clientId")
.get(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG)
);
assertNull(
streamsConfig.getAdminConfigs("clientId")
.get(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG)
);
}
@Test
public void shouldEnableMetricCollectionForAllInternalClientsByDefault() {
props.put(StreamsConfig.ENABLE_METRICS_PUSH_CONFIG, true);
props.put(ENABLE_METRICS_PUSH_CONFIG, true);
final StreamsConfig streamsConfig = new StreamsConfig(props);
assertTrue(
@ -1277,7 +1280,7 @@ public class StreamsConfigTest {
@Test
public void shouldDisableMetricCollectionForAllInternalClients() {
props.put(StreamsConfig.ENABLE_METRICS_PUSH_CONFIG, false);
props.put(ENABLE_METRICS_PUSH_CONFIG, false);
final StreamsConfig streamsConfig = new StreamsConfig(props);
assertFalse(
@ -1303,22 +1306,115 @@ public class StreamsConfigTest {
}
@Test
public void shouldDisableMetricCollectionOnMainConsumerOnly() {
public void shouldThrowConfigExceptionWhenMainConsumerMetricsDisabledStreamsMetricsPushEnabled() {
props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG), false);
final Exception exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props));
assertThat(
exception.getMessage(),
containsString("main.consumer metrics push is disabled")
);
}
@Test
public void shouldThrowConfigExceptionConsumerMetricsDisabledStreamsMetricsPushEnabled() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG), false);
final Exception exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props));
assertThat(
exception.getMessage(),
containsString("Kafka Streams metrics push enabled but consumer.enable.metrics is false, the setting needs to be consistent between the two")
);
}
@Test
public void shouldThrowConfigExceptionConsumerMetricsEnabledButMainConsumerAndAdminMetricsDisabled() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG), true);
props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG), false);
props.put(StreamsConfig.adminClientPrefix(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG), false);
final Exception exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props));
assertThat(
exception.getMessage(),
containsString("Kafka Streams metrics push and consumer.enable.metrics are enabled, but main.consumer and admin.client metrics push are disabled")
);
}
@Test
public void shouldThrowConfigExceptionConsumerMetricsEnabledButMainConsumerMetricsDisabled() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG), true);
props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG), false);
final Exception exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props));
assertThat(
exception.getMessage(),
containsString("main.consumer metrics push is disabled")
);
}
@Test
public void shouldThrowConfigExceptionConsumerMetricsEnabledButAdminClientMetricsDisabled() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG), true);
props.put(StreamsConfig.adminClientPrefix(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG), false);
final Exception exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props));
assertThat(
exception.getMessage(),
containsString("admin.client metrics push is disabled")
);
}
@Test
public void shouldEnableMetricsForMainConsumerWhenConsumerPrefixDisabledMetricsPushEnabled() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG), false);
props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG), true);
final StreamsConfig streamsConfig = new StreamsConfig(props);
assertFalse(
(Boolean) streamsConfig.getMainConsumerConfigs("groupId", "clientId", 0)
.get(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG)
assertTrue(
(Boolean) streamsConfig.getMainConsumerConfigs("groupId", "clientId", 0)
.get(StreamsConfig.mainConsumerPrefix(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG))
);
assertNull(
streamsConfig.getRestoreConsumerConfigs("clientId")
.get(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG)
}
@Test
public void shouldEnableMetricsForMainConsumerWhenStreamMetricsPushDisabledButMainConsumerEnabled() {
props.put(StreamsConfig.ENABLE_METRICS_PUSH_CONFIG, false);
props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG), true);
final StreamsConfig streamsConfig = new StreamsConfig(props);
assertTrue(
(Boolean) streamsConfig.getMainConsumerConfigs("groupId", "clientId", 0)
.get(StreamsConfig.mainConsumerPrefix(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG))
);
assertNull(
streamsConfig.getGlobalConsumerConfigs("clientId")
.get(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG)
}
@Test
public void shouldThrowConfigExceptionWhenAdminClientMetricsDisabledStreamsMetricsPushEnabled() {
props.put(StreamsConfig.adminClientPrefix(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG), false);
final Exception exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props));
assertThat(
exception.getMessage(),
containsString("admin.client metrics push is disabled")
);
}
@Test
public void shouldThrowConfigExceptionWhenAdminClientAndMainConsumerMetricsDisabledStreamsMetricsPushEnabled() {
props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG), false);
props.put(StreamsConfig.adminClientPrefix(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG), false);
final Exception exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props));
assertThat(
exception.getMessage(),
containsString("main.consumer and admin.client metrics push are disabled")
);
}