MINOR: Fix toString method of IsolationLevel (#14782)

Reviewers: Matthias J. Sax <matthias@confluent.io>, Ashwin Pankaj <apankaj@confluent.io>
This commit is contained in:
Minha, Jeong 2024-02-16 12:07:18 +09:00 committed by GitHub
parent ff90f78c70
commit 553f45bca8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 39 additions and 45 deletions

View File

@ -363,7 +363,7 @@ public class ConsumerConfig extends AbstractConfig {
" consumers will not be able to read up to the high watermark when there are in flight transactions.</p><p> Further, when in <code>read_committed</code> the seekToEnd method will" +
" return the LSO</p>";
public static final String DEFAULT_ISOLATION_LEVEL = IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT);
public static final String DEFAULT_ISOLATION_LEVEL = IsolationLevel.READ_UNCOMMITTED.toString();
/** <code>allow.auto.create.topics</code> */
public static final String ALLOW_AUTO_CREATE_TOPICS_CONFIG = "allow.auto.create.topics";
@ -620,7 +620,7 @@ public class ConsumerConfig extends AbstractConfig {
.define(ISOLATION_LEVEL_CONFIG,
Type.STRING,
DEFAULT_ISOLATION_LEVEL,
in(IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT), IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT)),
in(IsolationLevel.READ_COMMITTED.toString(), IsolationLevel.READ_UNCOMMITTED.toString()),
Importance.MEDIUM,
ISOLATION_LEVEL_DOC)
.define(ALLOW_AUTO_CREATE_TOPICS_CONFIG,

View File

@ -16,6 +16,8 @@
*/
package org.apache.kafka.common;
import java.util.Locale;
public enum IsolationLevel {
READ_UNCOMMITTED((byte) 0), READ_COMMITTED((byte) 1);
@ -39,4 +41,9 @@ public enum IsolationLevel {
throw new IllegalArgumentException("Unknown isolation level " + id);
}
}
@Override
public String toString() {
return super.toString().toLowerCase(Locale.ROOT);
}
}

View File

@ -116,7 +116,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
@ -2959,7 +2958,7 @@ public class KafkaConsumerTest {
configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
configs.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatIntervalMs);
configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_UNCOMMITTED.toString());
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, fetchSize);
configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, rebalanceTimeoutMs);

View File

@ -17,7 +17,6 @@
package org.apache.kafka.connect.mirror;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicReference;
@ -88,7 +87,7 @@ public class MirrorSourceConnector extends SourceConnector {
private static final ResourcePatternFilter ANY_TOPIC = new ResourcePatternFilter(ResourceType.TOPIC,
null, PatternType.ANY);
private static final AclBindingFilter ANY_TOPIC_ACL = new AclBindingFilter(ANY_TOPIC, AccessControlEntryFilter.ANY);
private static final String READ_COMMITTED = IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT);
private static final String READ_COMMITTED = IsolationLevel.READ_COMMITTED.toString();
private static final String EXACTLY_ONCE_SUPPORT_CONFIG = "exactly.once.support";
private final AtomicBoolean noAclAuthorizer = new AtomicBoolean(false);

View File

@ -107,7 +107,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@ -855,7 +854,7 @@ public class Worker {
connName, defaultClientId, config, connConfig, connectorClass,
connectorClientConfigOverridePolicy, clusterId, ConnectorType.SOURCE);
ConnectUtils.ensureProperty(
result, ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT),
result, ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString(),
"for source connectors' offset consumers when exactly-once source support is enabled",
false
);
@ -875,7 +874,7 @@ public class Worker {
// Users can disable this if they want to since the task isn't exactly-once anyways
result.putIfAbsent(
ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
IsolationLevel.READ_COMMITTED.toString());
return result;
}

View File

@ -67,7 +67,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@ -781,7 +780,7 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme
ConnectUtils.addMetricsContextProperties(consumerProps, config, clusterId);
if (config.exactlyOnceSourceEnabled()) {
ConnectUtils.ensureProperty(
consumerProps, ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT),
consumerProps, ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString(),
"for the worker's config topic consumer when exactly-once source support is enabled",
true
);

View File

@ -47,7 +47,6 @@ import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@ -203,7 +202,7 @@ public class KafkaOffsetBackingStore extends KafkaTopicBasedBackingStore impleme
ConnectUtils.addMetricsContextProperties(consumerProps, config, clusterId);
if (config.exactlyOnceSourceEnabled()) {
ConnectUtils.ensureProperty(
consumerProps, ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT),
consumerProps, ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString(),
"for the worker offsets topic consumer when exactly-once source support is enabled",
false
);
@ -250,7 +249,7 @@ public class KafkaOffsetBackingStore extends KafkaTopicBasedBackingStore impleme
+ "support for source connectors, or upgrade to a newer Kafka broker version.";
} else {
message = "When " + ConsumerConfig.ISOLATION_LEVEL_CONFIG + "is set to "
+ IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)
+ IsolationLevel.READ_COMMITTED.toString()
+ ", a Kafka broker version that allows admin clients to read consumer offsets is required. "
+ "Please either reconfigure the worker or connector, or upgrade to a newer Kafka broker version.";
}

View File

@ -46,7 +46,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@ -179,7 +178,7 @@ public class KafkaBasedLog<K, V> {
// as it will not take records from currently-open transactions into account. We want to err on the side of caution in that
// case: when users request a read to the end of the log, we will read up to the point where the latest offsets visible to the
// consumer are at least as high as the (possibly-part-of-a-transaction) end offsets of the topic.
this.requireAdminForOffsets = IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)
this.requireAdminForOffsets = IsolationLevel.READ_COMMITTED.toString()
.equals(consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG));
}
@ -252,7 +251,7 @@ public class KafkaBasedLog<K, V> {
throw new ConnectException(
"Must provide a TopicAdmin to KafkaBasedLog when consumer is configured with "
+ ConsumerConfig.ISOLATION_LEVEL_CONFIG + " set to "
+ IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)
+ IsolationLevel.READ_COMMITTED.toString()
);
}
initializer.accept(admin);

View File

@ -65,7 +65,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
@ -1643,7 +1642,7 @@ public class KafkaConfigBackingStoreTest {
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
assertEquals(
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT),
IsolationLevel.READ_COMMITTED.toString(),
capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG)
);
@ -1653,7 +1652,7 @@ public class KafkaConfigBackingStoreTest {
@Test
public void testConsumerPropertiesOverrideUserSuppliedValuesWithExactlyOnceSourceEnabled() throws Exception {
props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
props.put(ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
props.put(ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_UNCOMMITTED.toString());
createStore();
expectConfigure();
@ -1662,7 +1661,7 @@ public class KafkaConfigBackingStoreTest {
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
assertEquals(
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT),
IsolationLevel.READ_COMMITTED.toString(),
capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG)
);
@ -1688,7 +1687,7 @@ public class KafkaConfigBackingStoreTest {
@Test
public void testConsumerPropertiesDoNotOverrideUserSuppliedValuesWithoutExactlyOnceSourceEnabled() throws Exception {
props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing");
props.put(ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
props.put(ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_UNCOMMITTED.toString());
createStore();
expectConfigure();
@ -1697,7 +1696,7 @@ public class KafkaConfigBackingStoreTest {
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
assertEquals(
IsolationLevel.READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT),
IsolationLevel.READ_UNCOMMITTED.toString(),
capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG)
);

View File

@ -46,7 +46,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@ -420,7 +419,7 @@ public class KafkaOffsetBackingStoreTest {
store.configure(mockConfig(props));
assertEquals(
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT),
IsolationLevel.READ_COMMITTED.toString(),
capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG)
);
}
@ -428,12 +427,12 @@ public class KafkaOffsetBackingStoreTest {
@Test
public void testConsumerPropertiesOverrideUserSuppliedValuesWithExactlyOnceSourceEnabled() {
props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
props.put(ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
props.put(ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_UNCOMMITTED.toString());
store.configure(mockConfig(props));
assertEquals(
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT),
IsolationLevel.READ_COMMITTED.toString(),
capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG)
);
}
@ -451,12 +450,12 @@ public class KafkaOffsetBackingStoreTest {
@Test
public void testConsumerPropertiesDoNotOverrideUserSuppliedValuesWithoutExactlyOnceSourceEnabled() {
props.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "disabled");
props.put(ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
props.put(ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_UNCOMMITTED.toString());
store.configure(mockConfig(props));
assertEquals(
IsolationLevel.READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT),
IsolationLevel.READ_UNCOMMITTED.toString(),
capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG)
);

View File

@ -57,7 +57,6 @@ import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
@ -1208,7 +1207,7 @@ public class StreamsConfig extends AbstractConfig {
private static final Map<String, Object> CONSUMER_EOS_OVERRIDES;
static {
final Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>(CONSUMER_DEFAULT_OVERRIDES);
tempConsumerDefaultOverrides.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, READ_COMMITTED.name().toLowerCase(Locale.ROOT));
tempConsumerDefaultOverrides.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, READ_COMMITTED.toString());
CONSUMER_EOS_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides);
}

View File

@ -674,18 +674,18 @@ public class StreamsConfigTest {
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
assertThat(
consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG),
equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT))
equalTo(READ_COMMITTED.toString())
);
}
@Test
public void shouldAllowSettingConsumerIsolationLevelIfEosDisabled() {
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, READ_UNCOMMITTED.toString());
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
assertThat(
consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG),
equalTo(READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT))
equalTo(READ_UNCOMMITTED.toString())
);
}
@ -752,7 +752,7 @@ public class StreamsConfigTest {
assertThat(
consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG),
equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT))
equalTo(READ_COMMITTED.toString())
);
assertTrue((Boolean) producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG));
assertThat(producerConfigs.get(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG), equalTo(Integer.MAX_VALUE));

View File

@ -88,7 +88,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@ -1171,7 +1170,7 @@ public class EosIntegrationTest {
valueDeserializer,
Utils.mkProperties(Collections.singletonMap(
ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)))),
IsolationLevel.READ_COMMITTED.toString()))),
topic,
numberOfRecords
);
@ -1203,7 +1202,7 @@ public class EosIntegrationTest {
valueDeserializer,
Utils.mkProperties(Collections.singletonMap(
ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT))
IsolationLevel.READ_COMMITTED.toString())
)
),
topic,

View File

@ -75,7 +75,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@ -1067,7 +1066,7 @@ public class EosV2UpgradeIntegrationTest {
LongDeserializer.class,
Utils.mkProperties(Collections.singletonMap(
ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT))
IsolationLevel.READ_COMMITTED.toString())
)
),
MULTI_PARTITION_OUTPUT_TOPIC,

View File

@ -40,7 +40,6 @@ import org.apache.kafka.streams.kstream.Grouped;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Locale;
import java.util.Properties;
public class BrokerCompatibilityTest {
@ -146,7 +145,7 @@ public class BrokerCompatibilityTest {
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
if (eosEnabled) {
consumerProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
consumerProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString());
}
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {

View File

@ -47,7 +47,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
@ -152,7 +151,7 @@ public class EosTestDriver extends SmokeTestUtil {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString());
try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
final List<TopicPartition> partitions = getAllPartitions(consumer, "data");
@ -178,7 +177,7 @@ public class EosTestDriver extends SmokeTestUtil {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString());
try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
verifyAllTransactionFinished(consumer, kafka, withRepartitioning);