mirror of https://github.com/apache/kafka.git
MINOR: Collection/Option usage simplification via methods introduced in Java 9 & 11 (#18305)
Relevant methods: 1. `List.of`, `Set.of`, `Map.of` and similar (introduced in Java 9) 2. Optional: `isEmpty` (introduced in Java 11), `stream` (introduced in Java 9). Reviewers: Mickael Maison <mimaison@users.noreply.github.com>
This commit is contained in:
parent
ca511cd1cf
commit
409a43eff7
|
@ -1219,7 +1219,7 @@ public class NetworkClient implements KafkaClient {
|
|||
return metadataTimeout;
|
||||
}
|
||||
|
||||
if (!metadataAttemptStartMs.isPresent())
|
||||
if (metadataAttemptStartMs.isEmpty())
|
||||
metadataAttemptStartMs = Optional.of(now);
|
||||
|
||||
// Beware that the behavior of this method and the computation of timeouts for poll() are
|
||||
|
@ -1412,7 +1412,7 @@ public class NetworkClient implements KafkaClient {
|
|||
if (canSendRequest(nodeConnectionId, now)) {
|
||||
Optional<AbstractRequest.Builder<?>> requestOpt = clientTelemetrySender.createRequest();
|
||||
|
||||
if (!requestOpt.isPresent())
|
||||
if (requestOpt.isEmpty())
|
||||
return Long.MAX_VALUE;
|
||||
|
||||
AbstractRequest.Builder<?> request = requestOpt.get();
|
||||
|
|
|
@ -20,8 +20,6 @@ import java.nio.ByteBuffer;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Base64;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -51,11 +49,7 @@ public class Uuid implements Comparable<Uuid> {
|
|||
/**
|
||||
* The set of reserved UUIDs that will never be returned by the randomUuid method.
|
||||
*/
|
||||
public static final Set<Uuid> RESERVED = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
|
||||
METADATA_TOPIC_ID,
|
||||
ZERO_UUID,
|
||||
ONE_UUID
|
||||
)));
|
||||
public static final Set<Uuid> RESERVED = Set.of(ZERO_UUID, ONE_UUID);
|
||||
|
||||
private final long mostSignificantBits;
|
||||
private final long leastSignificantBits;
|
||||
|
|
|
@ -367,7 +367,7 @@ public final class Sensor {
|
|||
}
|
||||
|
||||
synchronized List<KafkaMetric> metrics() {
|
||||
return unmodifiableList(new ArrayList<>(this.metrics.values()));
|
||||
return List.copyOf(this.metrics.values());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -19,8 +19,6 @@ package org.apache.kafka.common.security.oauthbearer.internals.secured;
|
|||
|
||||
import org.jose4j.keys.resolvers.VerificationKeyResolver;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -57,7 +55,7 @@ public class AccessTokenValidatorFactory {
|
|||
List<String> l = cu.get(SASL_OAUTHBEARER_EXPECTED_AUDIENCE);
|
||||
|
||||
if (l != null)
|
||||
expectedAudiences = Collections.unmodifiableSet(new HashSet<>(l));
|
||||
expectedAudiences = Set.copyOf(l);
|
||||
|
||||
Integer clockSkew = cu.validateInteger(SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS, false);
|
||||
String expectedIssuer = cu.validateString(SASL_OAUTHBEARER_EXPECTED_ISSUER, false);
|
||||
|
|
|
@ -223,7 +223,7 @@ public abstract class ConsumerCoordinatorTest {
|
|||
groupInstanceId,
|
||||
retryBackoffMs,
|
||||
retryBackoffMaxMs,
|
||||
!groupInstanceId.isPresent());
|
||||
groupInstanceId.isEmpty());
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
|
@ -4135,7 +4135,7 @@ public abstract class ConsumerCoordinatorTest {
|
|||
@Override
|
||||
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) {
|
||||
subscriptions.forEach((consumer, subscription) -> {
|
||||
if (!subscription.rackId().isPresent())
|
||||
if (subscription.rackId().isEmpty())
|
||||
throw new IllegalStateException("Rack id not provided in subscription for " + consumer);
|
||||
rackIds.add(subscription.rackId().get());
|
||||
});
|
||||
|
|
|
@ -891,7 +891,7 @@ public class SelectorTest {
|
|||
.filter(entry ->
|
||||
entry.getKey().name().equals(name) && entry.getKey().tags().equals(tags))
|
||||
.findFirst();
|
||||
if (!metric.isPresent())
|
||||
if (metric.isEmpty())
|
||||
throw new Exception(String.format("Could not find metric called %s with tags %s", name, tags.toString()));
|
||||
|
||||
return metric.get().getValue();
|
||||
|
@ -1112,7 +1112,7 @@ public class SelectorTest {
|
|||
Optional<Map.Entry<MetricName, KafkaMetric>> metric = metrics.metrics().entrySet().stream()
|
||||
.filter(entry -> entry.getKey().name().equals(name))
|
||||
.findFirst();
|
||||
if (!metric.isPresent())
|
||||
if (metric.isEmpty())
|
||||
throw new Exception(String.format("Could not find metric called %s", name));
|
||||
|
||||
return metric.get().getValue();
|
||||
|
|
|
@ -420,7 +420,7 @@ public class SchemaBuilder implements Schema {
|
|||
public Schema build() {
|
||||
return new ConnectSchema(type, isOptional(), defaultValue, name, version, doc,
|
||||
parameters == null ? null : Collections.unmodifiableMap(parameters),
|
||||
fields == null ? null : Collections.unmodifiableList(new ArrayList<>(fields.values())), keySchema, valueSchema);
|
||||
fields == null ? null : List.copyOf(fields.values()), keySchema, valueSchema);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -441,4 +441,4 @@ public class SchemaBuilder implements Schema {
|
|||
if (val == null)
|
||||
throw new SchemaBuilderException("Invalid SchemaBuilder call: " + fieldName + " must be specified to set " + fieldToSet);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,7 +46,6 @@ import java.util.Set;
|
|||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.apache.kafka.connect.mirror.MirrorUtils.adminCall;
|
||||
|
||||
|
@ -196,7 +195,7 @@ public class MirrorCheckpointTask extends SourceTask {
|
|||
return upstreamGroupOffsets.entrySet().stream()
|
||||
.filter(x -> shouldCheckpointTopic(x.getKey().topic())) // Only perform relevant checkpoints filtered by "topic filter"
|
||||
.map(x -> checkpoint(group, x.getKey(), x.getValue()))
|
||||
.flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)) // do not emit checkpoints for partitions that don't have offset-syncs
|
||||
.flatMap(o -> o.stream()) // do not emit checkpoints for partitions that don't have offset-syncs
|
||||
.filter(x -> x.downstreamOffset() >= 0) // ignore offsets we cannot translate accurately
|
||||
.filter(this::checkpointIsMoreRecent) // do not emit checkpoints for partitions that have a later checkpoint
|
||||
.collect(Collectors.toMap(Checkpoint::topicPartition, Function.identity()));
|
||||
|
|
|
@ -102,11 +102,7 @@ public class MirrorMaker {
|
|||
|
||||
private static final long SHUTDOWN_TIMEOUT_SECONDS = 60L;
|
||||
|
||||
public static final List<Class<?>> CONNECTOR_CLASSES = Collections.unmodifiableList(
|
||||
Arrays.asList(
|
||||
MirrorSourceConnector.class,
|
||||
MirrorHeartbeatConnector.class,
|
||||
MirrorCheckpointConnector.class));
|
||||
public static final List<Class<?>> CONNECTOR_CLASSES = List.of(MirrorSourceConnector.class, MirrorHeartbeatConnector.class, MirrorCheckpointConnector.class);
|
||||
|
||||
private final Map<SourceAndTarget, Herder> herders = new HashMap<>();
|
||||
private CountDownLatch startLatch;
|
||||
|
|
|
@ -420,7 +420,7 @@ public class MirrorSourceConnector extends SourceConnector {
|
|||
void syncTopicAcls()
|
||||
throws InterruptedException, ExecutionException {
|
||||
Optional<Collection<AclBinding>> rawBindings = listTopicAclBindings();
|
||||
if (!rawBindings.isPresent())
|
||||
if (rawBindings.isEmpty())
|
||||
return;
|
||||
List<AclBinding> filteredBindings = rawBindings.get().stream()
|
||||
.filter(x -> x.pattern().resourceType() == ResourceType.TOPIC)
|
||||
|
|
|
@ -102,9 +102,7 @@ public abstract class RestServerConfig extends AbstractConfig {
|
|||
static final String RESPONSE_HTTP_HEADERS_DOC = "Rules for REST API HTTP response headers";
|
||||
// Visible for testing
|
||||
static final String RESPONSE_HTTP_HEADERS_DEFAULT = "";
|
||||
private static final Collection<String> HEADER_ACTIONS = Collections.unmodifiableList(
|
||||
Arrays.asList("set", "add", "setDate", "addDate")
|
||||
);
|
||||
private static final Collection<String> HEADER_ACTIONS = List.of("set", "add", "setDate", "addDate");
|
||||
|
||||
|
||||
/**
|
||||
|
|
|
@ -34,7 +34,6 @@ import org.apache.maven.artifact.versioning.InvalidVersionSpecificationException
|
|||
import org.apache.maven.artifact.versioning.VersionRange;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashSet;
|
||||
|
@ -149,7 +148,7 @@ public class ConnectorPluginsResource {
|
|||
.filter(p -> PluginType.SINK.toString().equals(p.type()) || PluginType.SOURCE.toString().equals(p.type()))
|
||||
.collect(Collectors.toList()));
|
||||
} else {
|
||||
return Collections.unmodifiableList(new ArrayList<>(connectorPlugins));
|
||||
return List.copyOf(connectorPlugins);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -378,7 +378,7 @@ public final class StandaloneHerder extends AbstractHerder {
|
|||
}
|
||||
|
||||
Optional<RestartPlan> maybePlan = buildRestartPlan(request);
|
||||
if (!maybePlan.isPresent()) {
|
||||
if (maybePlan.isEmpty()) {
|
||||
cb.onCompletion(new NotFoundException("Status for connector " + connectorName + " not found", null), null);
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -504,7 +504,7 @@ public interface FieldType {
|
|||
}
|
||||
|
||||
default boolean isVariableLength() {
|
||||
return !fixedLength().isPresent();
|
||||
return fixedLength().isEmpty();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -485,7 +485,7 @@ public final class MessageDataGenerator implements MessageClassGenerator {
|
|||
for (FieldSpec field : struct.fields()) {
|
||||
Versions validTaggedVersions = field.versions().intersect(field.taggedVersions());
|
||||
if (!validTaggedVersions.empty()) {
|
||||
if (!field.tag().isPresent()) {
|
||||
if (field.tag().isEmpty()) {
|
||||
throw new RuntimeException("Field " + field.name() + " has tagged versions, but no tag.");
|
||||
}
|
||||
buffer.printf("case %d: {%n", field.tag().get());
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.kafka.message;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
@ -58,7 +57,7 @@ public final class MessageSpec {
|
|||
this.apiKey = apiKey == null ? Optional.empty() : Optional.of(apiKey);
|
||||
this.type = Objects.requireNonNull(type);
|
||||
this.commonStructs = commonStructs == null ? Collections.emptyList() :
|
||||
Collections.unmodifiableList(new ArrayList<>(commonStructs));
|
||||
List.copyOf(commonStructs);
|
||||
if (flexibleVersions == null) {
|
||||
throw new RuntimeException("You must specify a value for flexibleVersions. " +
|
||||
"Please use 0+ for all new messages.");
|
||||
|
|
|
@ -79,7 +79,7 @@ class CheckerUtils {
|
|||
FieldSpec field,
|
||||
Versions topLevelFlexibleVersions
|
||||
) {
|
||||
if (!field.flexibleVersions().isPresent()) {
|
||||
if (field.flexibleVersions().isEmpty()) {
|
||||
if (!topLevelFlexibleVersions.contains(field.taggedVersions())) {
|
||||
throw new RuntimeException("Tagged versions for " + what + " " +
|
||||
field.name() + " are " + field.taggedVersions() + ", but top " +
|
||||
|
|
|
@ -151,7 +151,7 @@ public class ConfigurationControlManager {
|
|||
this.alterConfigPolicy = alterConfigPolicy;
|
||||
this.validator = validator;
|
||||
this.configData = new TimelineHashMap<>(snapshotRegistry, 0);
|
||||
this.staticConfig = Collections.unmodifiableMap(new HashMap<>(staticConfig));
|
||||
this.staticConfig = Map.copyOf(staticConfig);
|
||||
this.currentController = new ConfigResource(Type.BROKER, Integer.toString(nodeId));
|
||||
}
|
||||
|
||||
|
@ -439,7 +439,7 @@ public class ConfigurationControlManager {
|
|||
if (map == null) {
|
||||
return Collections.emptyMap();
|
||||
} else {
|
||||
return Collections.unmodifiableMap(new HashMap<>(map));
|
||||
return Map.copyOf(map);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1568,7 +1568,7 @@ public class ReplicationControlManager {
|
|||
.setEligibleLeaderReplicasEnabled(isElrEnabled())
|
||||
.setDefaultDirProvider(clusterDescriber)
|
||||
.build();
|
||||
if (!record.isPresent()) {
|
||||
if (record.isEmpty()) {
|
||||
if (electionType == ElectionType.PREFERRED) {
|
||||
return new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE);
|
||||
} else {
|
||||
|
@ -1649,7 +1649,7 @@ public class ReplicationControlManager {
|
|||
ControllerResult<Boolean> maybeFenceOneStaleBroker() {
|
||||
BrokerHeartbeatManager heartbeatManager = clusterControl.heartbeatManager();
|
||||
Optional<BrokerIdAndEpoch> idAndEpoch = heartbeatManager.tracker().maybeRemoveExpired();
|
||||
if (!idAndEpoch.isPresent()) {
|
||||
if (idAndEpoch.isEmpty()) {
|
||||
log.debug("No stale brokers found.");
|
||||
return ControllerResult.of(Collections.emptyList(), false);
|
||||
}
|
||||
|
|
|
@ -20,8 +20,6 @@ package org.apache.kafka.image.node;
|
|||
import org.apache.kafka.image.MetadataImage;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
|
||||
|
@ -37,22 +35,18 @@ public class MetadataImageNode implements MetadataNode {
|
|||
*/
|
||||
private final MetadataImage image;
|
||||
|
||||
private static final Map<String, Function<MetadataImage, MetadataNode>> CHILDREN;
|
||||
|
||||
static {
|
||||
Map<String, Function<MetadataImage, MetadataNode>> children = new HashMap<>();
|
||||
children.put(ProvenanceNode.NAME, image -> new ProvenanceNode(image.provenance()));
|
||||
children.put(FeaturesImageNode.NAME, image -> new FeaturesImageNode(image.features()));
|
||||
children.put(ClusterImageNode.NAME, image -> new ClusterImageNode(image.cluster()));
|
||||
children.put(TopicsImageNode.NAME, image -> new TopicsImageNode(image.topics()));
|
||||
children.put(ConfigurationsImageNode.NAME, image -> new ConfigurationsImageNode(image.configs()));
|
||||
children.put(ClientQuotasImageNode.NAME, image -> new ClientQuotasImageNode(image.clientQuotas()));
|
||||
children.put(ProducerIdsImageNode.NAME, image -> new ProducerIdsImageNode(image.producerIds()));
|
||||
children.put(AclsImageNode.NAME, image -> new AclsImageByIdNode(image.acls()));
|
||||
children.put(ScramImageNode.NAME, image -> new ScramImageNode(image.scram()));
|
||||
children.put(DelegationTokenImageNode.NAME, image -> new DelegationTokenImageNode(image.delegationTokens()));
|
||||
CHILDREN = Collections.unmodifiableMap(children);
|
||||
}
|
||||
private static final Map<String, Function<MetadataImage, MetadataNode>> CHILDREN = Map.of(
|
||||
ProvenanceNode.NAME, image -> new ProvenanceNode(image.provenance()),
|
||||
FeaturesImageNode.NAME, image -> new FeaturesImageNode(image.features()),
|
||||
ClusterImageNode.NAME, image -> new ClusterImageNode(image.cluster()),
|
||||
TopicsImageNode.NAME, image -> new TopicsImageNode(image.topics()),
|
||||
ConfigurationsImageNode.NAME, image -> new ConfigurationsImageNode(image.configs()),
|
||||
ClientQuotasImageNode.NAME, image -> new ClientQuotasImageNode(image.clientQuotas()),
|
||||
ProducerIdsImageNode.NAME, image -> new ProducerIdsImageNode(image.producerIds()),
|
||||
AclsImageNode.NAME, image -> new AclsImageByIdNode(image.acls()),
|
||||
ScramImageNode.NAME, image -> new ScramImageNode(image.scram()),
|
||||
DelegationTokenImageNode.NAME, image -> new DelegationTokenImageNode(image.delegationTokens())
|
||||
);
|
||||
|
||||
public MetadataImageNode(MetadataImage image) {
|
||||
this.image = image;
|
||||
|
|
|
@ -19,9 +19,6 @@ package org.apache.kafka.metadata.placement;
|
|||
|
||||
import org.apache.kafka.common.Uuid;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
|
@ -39,12 +36,8 @@ public class PartitionAssignment {
|
|||
private final List<Uuid> directories;
|
||||
|
||||
public PartitionAssignment(List<Integer> replicas, DefaultDirProvider defaultDirProvider) {
|
||||
this.replicas = Collections.unmodifiableList(new ArrayList<>(replicas));
|
||||
Uuid[] directories = new Uuid[replicas.size()];
|
||||
for (int i = 0; i < directories.length; i++) {
|
||||
directories[i] = defaultDirProvider.defaultDir(replicas.get(i));
|
||||
}
|
||||
this.directories = Collections.unmodifiableList(Arrays.asList(directories));
|
||||
this.replicas = List.copyOf(replicas);
|
||||
this.directories = replicas.stream().map(replica -> defaultDirProvider.defaultDir(replica)).toList();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
|
||||
package org.apache.kafka.metadata.placement;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
|
@ -31,7 +29,7 @@ public class TopicAssignment {
|
|||
private final List<PartitionAssignment> assignments;
|
||||
|
||||
public TopicAssignment(List<PartitionAssignment> assignments) {
|
||||
this.assignments = Collections.unmodifiableList(new ArrayList<>(assignments));
|
||||
this.assignments = List.copyOf(assignments);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -1338,7 +1338,7 @@ public class QuorumControllerTest {
|
|||
}
|
||||
|
||||
private static final List<ApiMessageAndVersion> PRE_PRODUCTION_RECORDS =
|
||||
Collections.unmodifiableList(Arrays.asList(
|
||||
List.of(
|
||||
new ApiMessageAndVersion(new RegisterBrokerRecord().
|
||||
setBrokerEpoch(42).
|
||||
setBrokerId(123).
|
||||
|
@ -1352,7 +1352,7 @@ public class QuorumControllerTest {
|
|||
new ApiMessageAndVersion(new TopicRecord().
|
||||
setName("bar").
|
||||
setTopicId(Uuid.fromString("cxBT72dK4si8Ied1iP4wBA")),
|
||||
(short) 0)));
|
||||
(short) 0));
|
||||
|
||||
private static final BootstrapMetadata COMPLEX_BOOTSTRAP = BootstrapMetadata.fromRecords(
|
||||
Arrays.asList(
|
||||
|
|
|
@ -26,7 +26,6 @@ import org.junit.jupiter.params.ParameterizedTest;
|
|||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
@ -37,18 +36,14 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
|||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class QuorumFeaturesTest {
|
||||
private static final Map<String, VersionRange> LOCAL;
|
||||
private static final Map<String, VersionRange> LOCAL = Map.of(
|
||||
"foo", VersionRange.of(0, 3),
|
||||
"bar", VersionRange.of(0, 4),
|
||||
"baz", VersionRange.of(2, 2)
|
||||
);
|
||||
|
||||
private static final QuorumFeatures QUORUM_FEATURES;
|
||||
|
||||
static {
|
||||
Map<String, VersionRange> local = new HashMap<>();
|
||||
local.put("foo", VersionRange.of(0, 3));
|
||||
local.put("bar", VersionRange.of(0, 4));
|
||||
local.put("baz", VersionRange.of(2, 2));
|
||||
LOCAL = Collections.unmodifiableMap(local);
|
||||
QUORUM_FEATURES = new QuorumFeatures(0, LOCAL, Arrays.asList(0, 1, 2));
|
||||
}
|
||||
private static final QuorumFeatures QUORUM_FEATURES = new QuorumFeatures(0, LOCAL,
|
||||
Arrays.asList(0, 1, 2));
|
||||
|
||||
@Test
|
||||
public void testDefaultFeatureMap() {
|
||||
|
|
|
@ -31,20 +31,18 @@ import java.io.File;
|
|||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static java.util.Collections.unmodifiableList;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
|
||||
@Timeout(40)
|
||||
public class BootstrapDirectoryTest {
|
||||
static final List<ApiMessageAndVersion> SAMPLE_RECORDS1 = unmodifiableList(asList(
|
||||
static final List<ApiMessageAndVersion> SAMPLE_RECORDS1 = List.of(
|
||||
new ApiMessageAndVersion(new FeatureLevelRecord().
|
||||
setName(MetadataVersion.FEATURE_NAME).
|
||||
setFeatureLevel((short) 7), (short) 0),
|
||||
new ApiMessageAndVersion(new NoOpRecord(), (short) 0),
|
||||
new ApiMessageAndVersion(new NoOpRecord(), (short) 0)));
|
||||
new ApiMessageAndVersion(new NoOpRecord(), (short) 0));
|
||||
|
||||
static class BootstrapTestDirectory implements AutoCloseable {
|
||||
File directory = null;
|
||||
|
|
|
@ -28,9 +28,7 @@ import java.util.Arrays;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.unmodifiableList;
|
||||
import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
|
||||
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1;
|
||||
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2;
|
||||
|
@ -40,14 +38,14 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
|
|||
|
||||
@Timeout(60)
|
||||
public class BootstrapMetadataTest {
|
||||
static final List<ApiMessageAndVersion> SAMPLE_RECORDS1 = unmodifiableList(asList(
|
||||
static final List<ApiMessageAndVersion> SAMPLE_RECORDS1 = List.of(
|
||||
new ApiMessageAndVersion(new FeatureLevelRecord().
|
||||
setName(FEATURE_NAME).
|
||||
setFeatureLevel((short) 7), (short) 0),
|
||||
new ApiMessageAndVersion(new NoOpRecord(), (short) 0),
|
||||
new ApiMessageAndVersion(new FeatureLevelRecord().
|
||||
setName(FEATURE_NAME).
|
||||
setFeatureLevel((short) 6), (short) 0)));
|
||||
setFeatureLevel((short) 6), (short) 0));
|
||||
|
||||
@Test
|
||||
public void testFromVersion() {
|
||||
|
|
|
@ -97,7 +97,7 @@ public interface EventQueue extends AutoCloseable {
|
|||
|
||||
@Override
|
||||
public OptionalLong apply(OptionalLong prevDeadlineNs) {
|
||||
if (!prevDeadlineNs.isPresent()) {
|
||||
if (prevDeadlineNs.isEmpty()) {
|
||||
return OptionalLong.of(newDeadlineNs);
|
||||
} else if (prevDeadlineNs.getAsLong() < newDeadlineNs) {
|
||||
return prevDeadlineNs;
|
||||
|
@ -116,7 +116,7 @@ public interface EventQueue extends AutoCloseable {
|
|||
|
||||
@Override
|
||||
public OptionalLong apply(OptionalLong prevDeadlineNs) {
|
||||
if (!prevDeadlineNs.isPresent()) {
|
||||
if (prevDeadlineNs.isEmpty()) {
|
||||
return OptionalLong.of(newDeadlineNs);
|
||||
} else if (prevDeadlineNs.getAsLong() > newDeadlineNs) {
|
||||
return prevDeadlineNs;
|
||||
|
|
|
@ -337,7 +337,7 @@ public final class KafkaEventQueue implements EventQueue {
|
|||
}
|
||||
break;
|
||||
case DEFERRED:
|
||||
if (!deadlineNs.isPresent()) {
|
||||
if (deadlineNs.isEmpty()) {
|
||||
return new RuntimeException(
|
||||
"You must specify a deadline for deferred events.");
|
||||
}
|
||||
|
|
|
@ -163,7 +163,7 @@ public class CheckpointFile<T> {
|
|||
line = reader.readLine();
|
||||
while (line != null) {
|
||||
Optional<T> maybeEntry = formatter.fromString(line);
|
||||
if (!maybeEntry.isPresent()) {
|
||||
if (maybeEntry.isEmpty()) {
|
||||
throw buildMalformedLineException(line);
|
||||
}
|
||||
entries.add(maybeEntry.get());
|
||||
|
|
|
@ -20,9 +20,7 @@ import org.apache.kafka.common.config.AbstractConfig;
|
|||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -115,12 +113,12 @@ public class QuotaConfig {
|
|||
.define(QuotaConfig.ALTER_LOG_DIRS_REPLICATION_QUOTA_WINDOW_SIZE_SECONDS_CONFIG, INT, QuotaConfig.QUOTA_WINDOW_SIZE_SECONDS_DEFAULT, atLeast(1), LOW, QuotaConfig.ALTER_LOG_DIRS_REPLICATION_QUOTA_WINDOW_SIZE_SECONDS_DOC)
|
||||
.define(QuotaConfig.CONTROLLER_QUOTA_WINDOW_SIZE_SECONDS_CONFIG, INT, QuotaConfig.QUOTA_WINDOW_SIZE_SECONDS_DEFAULT, atLeast(1), LOW, QuotaConfig.CONTROLLER_QUOTA_WINDOW_SIZE_SECONDS_DOC)
|
||||
.define(QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, CLASS, null, LOW, QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_DOC);
|
||||
private static final Set<String> USER_AND_CLIENT_QUOTA_NAMES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
|
||||
private static final Set<String> USER_AND_CLIENT_QUOTA_NAMES = Set.of(
|
||||
PRODUCER_BYTE_RATE_OVERRIDE_CONFIG,
|
||||
CONSUMER_BYTE_RATE_OVERRIDE_CONFIG,
|
||||
REQUEST_PERCENTAGE_OVERRIDE_CONFIG,
|
||||
CONTROLLER_MUTATION_RATE_OVERRIDE_CONFIG
|
||||
)));
|
||||
);
|
||||
|
||||
private static void buildUserClientQuotaConfigDef(ConfigDef configDef) {
|
||||
configDef.define(PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, ConfigDef.Type.LONG, Long.MAX_VALUE,
|
||||
|
|
|
@ -81,7 +81,7 @@ public class FetchParams {
|
|||
}
|
||||
|
||||
public boolean fetchOnlyLeader() {
|
||||
return isFromFollower() || (isFromConsumer() && !clientMetadata.isPresent()) || shareFetchRequest;
|
||||
return isFromFollower() || (isFromConsumer() && clientMetadata.isEmpty()) || shareFetchRequest;
|
||||
}
|
||||
|
||||
public boolean hardMaxBytesLimit() {
|
||||
|
|
|
@ -115,7 +115,7 @@ public class AclEntry extends AccessControlEntry {
|
|||
return Collections.emptySet();
|
||||
|
||||
Optional<JsonValue> jsonValue = Json.parseBytes(bytes);
|
||||
if (!jsonValue.isPresent())
|
||||
if (jsonValue.isEmpty())
|
||||
return Collections.emptySet();
|
||||
|
||||
JsonObject js = jsonValue.get().asJsonObject();
|
||||
|
|
|
@ -429,7 +429,7 @@ public final class AssignmentsManager {
|
|||
}
|
||||
|
||||
static Optional<String> globalResponseError(Optional<ClientResponse> response) {
|
||||
if (!response.isPresent()) {
|
||||
if (response.isEmpty()) {
|
||||
return Optional.of("Timeout");
|
||||
}
|
||||
if (response.get().authenticationException() != null) {
|
||||
|
|
|
@ -62,7 +62,6 @@ import org.slf4j.LoggerFactory;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
|
@ -85,8 +84,8 @@ public class ClientMetricsManager implements AutoCloseable {
|
|||
public static final String CLIENT_METRICS_REAPER_THREAD_NAME = "client-metrics-reaper";
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class);
|
||||
private static final List<Byte> SUPPORTED_COMPRESSION_TYPES = Collections.unmodifiableList(
|
||||
Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, CompressionType.GZIP.id, CompressionType.SNAPPY.id));
|
||||
private static final List<Byte> SUPPORTED_COMPRESSION_TYPES = List.of(CompressionType.ZSTD.id, CompressionType.LZ4.id,
|
||||
CompressionType.GZIP.id, CompressionType.SNAPPY.id);
|
||||
// Max cache size (16k active client connections per broker)
|
||||
private static final int CACHE_MAX_SIZE = 16384;
|
||||
private static final int DEFAULT_CACHE_EXPIRY_MS = 60 * 1000;
|
||||
|
|
|
@ -19,8 +19,6 @@ package org.apache.kafka.server.config;
|
|||
|
||||
import org.apache.kafka.common.config.ConfigDef;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
@ -98,24 +96,23 @@ public final class ZkConfigs {
|
|||
private static final String ZOOKEEPER_CLIENT_CNXN_SOCKET = "zookeeper.clientCnxnSocket";
|
||||
|
||||
static {
|
||||
Map<String, String> zkSslConfigToSystemPropertyMap = new HashMap<>();
|
||||
|
||||
zkSslConfigToSystemPropertyMap.put(ZK_SSL_CLIENT_ENABLE_CONFIG, SECURE_CLIENT);
|
||||
zkSslConfigToSystemPropertyMap.put(ZK_CLIENT_CNXN_SOCKET_CONFIG, ZOOKEEPER_CLIENT_CNXN_SOCKET);
|
||||
zkSslConfigToSystemPropertyMap.put(ZK_SSL_KEY_STORE_LOCATION_CONFIG, "zookeeper.ssl.keyStore.location");
|
||||
zkSslConfigToSystemPropertyMap.put(ZK_SSL_KEY_STORE_PASSWORD_CONFIG, "zookeeper.ssl.keyStore.password");
|
||||
zkSslConfigToSystemPropertyMap.put(ZK_SSL_KEY_STORE_TYPE_CONFIG, "zookeeper.ssl.keyStore.type");
|
||||
zkSslConfigToSystemPropertyMap.put(ZK_SSL_TRUST_STORE_LOCATION_CONFIG, "zookeeper.ssl.trustStore.location");
|
||||
zkSslConfigToSystemPropertyMap.put(ZK_SSL_TRUST_STORE_PASSWORD_CONFIG, "zookeeper.ssl.trustStore.password");
|
||||
zkSslConfigToSystemPropertyMap.put(ZK_SSL_TRUST_STORE_TYPE_CONFIG, "zookeeper.ssl.trustStore.type");
|
||||
zkSslConfigToSystemPropertyMap.put(ZK_SSL_PROTOCOL_CONFIG, "zookeeper.ssl.protocol");
|
||||
zkSslConfigToSystemPropertyMap.put(ZK_SSL_ENABLED_PROTOCOLS_CONFIG, "zookeeper.ssl.enabledProtocols");
|
||||
zkSslConfigToSystemPropertyMap.put(ZK_SSL_CIPHER_SUITES_CONFIG, "zookeeper.ssl.ciphersuites");
|
||||
zkSslConfigToSystemPropertyMap.put(ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "zookeeper.ssl.hostnameVerification");
|
||||
zkSslConfigToSystemPropertyMap.put(ZK_SSL_CRL_ENABLE_CONFIG, "zookeeper.ssl.crl");
|
||||
zkSslConfigToSystemPropertyMap.put(ZK_SSL_OCSP_ENABLE_CONFIG, "zookeeper.ssl.ocsp");
|
||||
|
||||
ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP = Collections.unmodifiableMap(zkSslConfigToSystemPropertyMap);
|
||||
ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP = Map.ofEntries(
|
||||
Map.entry(ZK_SSL_CLIENT_ENABLE_CONFIG, SECURE_CLIENT),
|
||||
Map.entry(ZK_CLIENT_CNXN_SOCKET_CONFIG, ZOOKEEPER_CLIENT_CNXN_SOCKET),
|
||||
Map.entry(ZK_SSL_KEY_STORE_LOCATION_CONFIG, "zookeeper.ssl.keyStore.location"),
|
||||
Map.entry(ZK_SSL_KEY_STORE_PASSWORD_CONFIG, "zookeeper.ssl.keyStore.password"),
|
||||
Map.entry(ZK_SSL_KEY_STORE_TYPE_CONFIG, "zookeeper.ssl.keyStore.type"),
|
||||
Map.entry(ZK_SSL_TRUST_STORE_LOCATION_CONFIG, "zookeeper.ssl.trustStore.location"),
|
||||
Map.entry(ZK_SSL_TRUST_STORE_PASSWORD_CONFIG, "zookeeper.ssl.trustStore.password"),
|
||||
Map.entry(ZK_SSL_TRUST_STORE_TYPE_CONFIG, "zookeeper.ssl.trustStore.type"),
|
||||
Map.entry(ZK_SSL_PROTOCOL_CONFIG, "zookeeper.ssl.protocol"),
|
||||
Map.entry(ZK_SSL_ENABLED_PROTOCOLS_CONFIG, "zookeeper.ssl.enabledProtocols"),
|
||||
Map.entry(ZK_SSL_CIPHER_SUITES_CONFIG, "zookeeper.ssl.ciphersuites"),
|
||||
Map.entry(ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "zookeeper.ssl.hostnameVerification"),
|
||||
Map.entry(ZK_SSL_CRL_ENABLE_CONFIG, "zookeeper.ssl.crl"),
|
||||
Map.entry(ZK_SSL_OCSP_ENABLE_CONFIG, "zookeeper.ssl.ocsp")
|
||||
);
|
||||
|
||||
ZK_SSL_CLIENT_ENABLE_DOC = "Set client to use TLS when connecting to ZooKeeper." +
|
||||
" An explicit value overrides any value set via the <code>zookeeper.client.secure</code> system property (note the different name)." +
|
||||
|
|
|
@ -1313,7 +1313,7 @@ public class ClientMetricsManagerTest {
|
|||
Optional<Entry<MetricName, KafkaMetric>> metric = kafkaMetrics.metrics().entrySet().stream()
|
||||
.filter(entry -> entry.getKey().name().equals(name))
|
||||
.findFirst();
|
||||
if (!metric.isPresent())
|
||||
if (metric.isEmpty())
|
||||
throw new Exception(String.format("Could not find metric called %s", name));
|
||||
|
||||
return metric.get().getValue();
|
||||
|
|
|
@ -32,8 +32,6 @@ import java.net.InetAddress;
|
|||
import java.net.UnknownHostException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Properties;
|
||||
|
@ -43,10 +41,10 @@ public class ClientMetricsTestUtils {
|
|||
public static final String DEFAULT_METRICS =
|
||||
"org.apache.kafka.client.producer.partition.queue.,org.apache.kafka.client.producer.partition.latency";
|
||||
public static final int DEFAULT_PUSH_INTERVAL_MS = 30 * 1000; // 30 seconds
|
||||
public static final List<String> DEFAULT_CLIENT_MATCH_PATTERNS = Collections.unmodifiableList(Arrays.asList(
|
||||
public static final List<String> DEFAULT_CLIENT_MATCH_PATTERNS = List.of(
|
||||
ClientMetricsConfigs.CLIENT_SOFTWARE_NAME + "=apache-kafka-java",
|
||||
ClientMetricsConfigs.CLIENT_SOFTWARE_VERSION + "=3.5.*"
|
||||
));
|
||||
);
|
||||
public static final int CLIENT_PORT = 56078;
|
||||
|
||||
public static Properties defaultProperties() {
|
||||
|
|
|
@ -93,7 +93,7 @@ public final class HistoryCommandHandler implements Commands.Handler {
|
|||
PrintWriter writer,
|
||||
MetadataShellState state
|
||||
) throws Exception {
|
||||
if (!shell.isPresent()) {
|
||||
if (shell.isEmpty()) {
|
||||
throw new RuntimeException("The history command requires a shell.");
|
||||
}
|
||||
Iterator<Map.Entry<Integer, String>> iter = shell.get().history(numEntriesToShow);
|
||||
|
|
|
@ -191,7 +191,7 @@ public final class LsCommandHandler implements Commands.Handler {
|
|||
|
||||
static ColumnSchema calculateColumnSchema(OptionalInt screenWidth,
|
||||
List<String> entries) {
|
||||
if (!screenWidth.isPresent()) {
|
||||
if (screenWidth.isEmpty()) {
|
||||
return new ColumnSchema(1, entries.size());
|
||||
}
|
||||
int maxColumns = screenWidth.getAsInt() / 4;
|
||||
|
|
|
@ -105,7 +105,7 @@ public class GlobVisitorTest {
|
|||
|
||||
@Override
|
||||
public void accept(Optional<MetadataNodeInfo> info) {
|
||||
if (!infos.isPresent()) {
|
||||
if (infos.isEmpty()) {
|
||||
if (info.isPresent()) {
|
||||
infos = Optional.of(new ArrayList<>());
|
||||
infos.get().add(info.get());
|
||||
|
|
|
@ -20,8 +20,6 @@ import org.apache.kafka.server.metrics.KafkaYammerMetrics;
|
|||
|
||||
import com.yammer.metrics.core.MetricName;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -54,8 +52,7 @@ public class RemoteStorageMetrics {
|
|||
private static final String REMOTE_LOG_READER_TASK_QUEUE_SIZE = REMOTE_LOG_READER_METRICS_NAME_PREFIX + TASK_QUEUE_SIZE;
|
||||
private static final String REMOTE_LOG_READER_AVG_IDLE_PERCENT = REMOTE_LOG_READER_METRICS_NAME_PREFIX + AVG_IDLE_PERCENT;
|
||||
private static final String REMOTE_LOG_READER_FETCH_RATE_AND_TIME_MS = REMOTE_LOG_READER_METRICS_NAME_PREFIX + "FetchRateAndTimeMs";
|
||||
public static final Set<String> REMOTE_STORAGE_THREAD_POOL_METRICS = Collections.unmodifiableSet(
|
||||
new HashSet<>(Arrays.asList(REMOTE_LOG_READER_TASK_QUEUE_SIZE, REMOTE_LOG_READER_AVG_IDLE_PERCENT)));
|
||||
public static final Set<String> REMOTE_STORAGE_THREAD_POOL_METRICS = Set.of(REMOTE_LOG_READER_TASK_QUEUE_SIZE, REMOTE_LOG_READER_AVG_IDLE_PERCENT);
|
||||
|
||||
public static final MetricName REMOTE_COPY_BYTES_PER_SEC_METRIC = getMetricName(
|
||||
"kafka.server", "BrokerTopicMetrics", REMOTE_COPY_BYTES_PER_SEC);
|
||||
|
|
|
@ -442,7 +442,7 @@ public class LogSegment implements Closeable {
|
|||
// return empty records in the fetch-data-info when:
|
||||
// 1. adjustedMaxSize is 0 (or)
|
||||
// 2. maxPosition to read is unavailable
|
||||
if (adjustedMaxSize == 0 || !maxPositionOpt.isPresent())
|
||||
if (adjustedMaxSize == 0 || maxPositionOpt.isEmpty())
|
||||
return new FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY);
|
||||
|
||||
// calculate the length of the message set to read based on whether or not they gave us a maxOffset
|
||||
|
@ -497,7 +497,7 @@ public class LogSegment implements Closeable {
|
|||
if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2) {
|
||||
leaderEpochCache.ifPresent(cache -> {
|
||||
if (batch.partitionLeaderEpoch() >= 0 &&
|
||||
(!cache.latestEpoch().isPresent() || batch.partitionLeaderEpoch() > cache.latestEpoch().getAsInt()))
|
||||
(cache.latestEpoch().isEmpty() || batch.partitionLeaderEpoch() > cache.latestEpoch().getAsInt()))
|
||||
cache.assign(batch.partitionLeaderEpoch(), batch.baseOffset());
|
||||
});
|
||||
updateProducerState(producerStateManager, batch);
|
||||
|
@ -686,7 +686,7 @@ public class LogSegment implements Closeable {
|
|||
* load the timestamp of the first message into memory.
|
||||
*/
|
||||
private void loadFirstBatchTimestamp() {
|
||||
if (!rollingBasedTimestamp.isPresent()) {
|
||||
if (rollingBasedTimestamp.isEmpty()) {
|
||||
Iterator<FileChannelRecordBatch> iter = log.batches().iterator();
|
||||
if (iter.hasNext())
|
||||
rollingBasedTimestamp = OptionalLong.of(iter.next().maxTimestamp());
|
||||
|
|
|
@ -383,12 +383,12 @@ public class LogValidator {
|
|||
|
||||
Optional<ApiRecordError> recordError = validateRecordCompression(sourceCompressionType,
|
||||
recordIndex, record);
|
||||
if (!recordError.isPresent()) {
|
||||
if (recordError.isEmpty()) {
|
||||
recordError = validateRecord(batch, topicPartition, record, recordIndex, now,
|
||||
timestampType, timestampBeforeMaxMs, timestampAfterMaxMs, compactedTopic, metricsRecorder);
|
||||
}
|
||||
|
||||
if (!recordError.isPresent()
|
||||
if (recordError.isEmpty()
|
||||
&& batch.magic() > RecordBatch.MAGIC_VALUE_V0
|
||||
&& toMagic > RecordBatch.MAGIC_VALUE_V0) {
|
||||
|
||||
|
|
|
@ -181,7 +181,7 @@ public class ProducerAppendInfo {
|
|||
// Received a non-transactional message while a transaction is active
|
||||
throw new InvalidTxnStateException("Expected transactional write from producer " + producerId + " at " +
|
||||
"offset " + firstOffsetMetadata + " in partition " + topicPartition);
|
||||
} else if (!currentTxnFirstOffset.isPresent() && isTransactional) {
|
||||
} else if (currentTxnFirstOffset.isEmpty() && isTransactional) {
|
||||
// Began a new transaction
|
||||
updatedEntry.setCurrentTxnFirstOffset(firstOffset);
|
||||
transactions.add(new TxnMetadata(producerId, firstOffsetMetadata));
|
||||
|
|
|
@ -248,9 +248,9 @@ public class ProducerStateManager {
|
|||
Optional<LogOffsetMetadata> unreplicatedFirstOffset = Optional.ofNullable(unreplicatedTxns.firstEntry()).map(e -> e.getValue().firstOffset);
|
||||
Optional<LogOffsetMetadata> undecidedFirstOffset = Optional.ofNullable(ongoingTxns.firstEntry()).map(e -> e.getValue().firstOffset);
|
||||
|
||||
if (!unreplicatedFirstOffset.isPresent())
|
||||
if (unreplicatedFirstOffset.isEmpty())
|
||||
return undecidedFirstOffset;
|
||||
else if (!undecidedFirstOffset.isPresent())
|
||||
else if (undecidedFirstOffset.isEmpty())
|
||||
return unreplicatedFirstOffset;
|
||||
else if (undecidedFirstOffset.get().messageOffset < unreplicatedFirstOffset.get().messageOffset)
|
||||
return undecidedFirstOffset;
|
||||
|
@ -328,7 +328,7 @@ public class ProducerStateManager {
|
|||
}
|
||||
|
||||
private boolean isProducerExpired(long currentTimeMs, ProducerStateEntry producerState) {
|
||||
return !producerState.currentTxnFirstOffset().isPresent() && currentTimeMs - producerState.lastTimestamp() >= producerStateManagerConfig.producerIdExpirationMs();
|
||||
return producerState.currentTxnFirstOffset().isEmpty() && currentTimeMs - producerState.lastTimestamp() >= producerStateManagerConfig.producerIdExpirationMs();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -83,7 +83,7 @@ public class UnifiedLog {
|
|||
// offset (see below). The next time the log is reloaded, we will load producer state using this snapshot
|
||||
// (or later snapshots). Otherwise, if there is no snapshot file, then we have to rebuild producer state
|
||||
// from the first segment.
|
||||
if (!producerStateManager.latestSnapshotOffset().isPresent() && reloadFromCleanShutdown) {
|
||||
if (producerStateManager.latestSnapshotOffset().isEmpty() && reloadFromCleanShutdown) {
|
||||
// To avoid an expensive scan through all the segments, we take empty snapshots from the start of the
|
||||
// last two segments and the last offset. This should avoid the full scan in the case that the log needs
|
||||
// truncation.
|
||||
|
|
|
@ -84,7 +84,7 @@ public final class BrokerTopicMetrics {
|
|||
metricTypeMap.put(INVALID_MESSAGE_CRC_RECORDS_PER_SEC, new MeterWrapper(INVALID_MESSAGE_CRC_RECORDS_PER_SEC, "requests"));
|
||||
metricTypeMap.put(INVALID_OFFSET_OR_SEQUENCE_RECORDS_PER_SEC, new MeterWrapper(INVALID_OFFSET_OR_SEQUENCE_RECORDS_PER_SEC, "requests"));
|
||||
|
||||
if (!name.isPresent()) {
|
||||
if (name.isEmpty()) {
|
||||
metricTypeMap.put(REPLICATION_BYTES_IN_PER_SEC, new MeterWrapper(REPLICATION_BYTES_IN_PER_SEC, "bytes"));
|
||||
metricTypeMap.put(REPLICATION_BYTES_OUT_PER_SEC, new MeterWrapper(REPLICATION_BYTES_OUT_PER_SEC, "bytes"));
|
||||
metricTypeMap.put(REASSIGNMENT_BYTES_IN_PER_SEC, new MeterWrapper(REASSIGNMENT_BYTES_IN_PER_SEC, "bytes"));
|
||||
|
|
|
@ -73,7 +73,7 @@ public final class LocalTieredStorageEvent implements Comparable<LocalTieredStor
|
|||
if (!exception.map(e -> condition.failed).orElseGet(() -> !condition.failed)) {
|
||||
return false;
|
||||
}
|
||||
if (condition.baseOffset != null && !metadata.isPresent()) {
|
||||
if (condition.baseOffset != null && metadata.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
return condition.baseOffset == null || metadata.get().startOffset() == condition.baseOffset;
|
||||
|
|
|
@ -47,16 +47,16 @@ public final class AlterLogDirAction implements TieredStorageTestAction {
|
|||
@Override
|
||||
public void doExecute(TieredStorageTestContext context) throws InterruptedException, ExecutionException, TimeoutException {
|
||||
Optional<BrokerLocalStorage> localStorage = context.localStorages().stream().filter(storage -> storage.getBrokerId() == brokerId).findFirst();
|
||||
if (!localStorage.isPresent()) {
|
||||
if (localStorage.isEmpty()) {
|
||||
throw new IllegalArgumentException("cannot find local storage for this topic partition:" + topicPartition + " in this broker id:" + brokerId);
|
||||
}
|
||||
|
||||
Optional<File> sourceDir = localStorage.get().getBrokerStorageDirectories().stream().filter(dir -> localStorage.get().dirContainsTopicPartition(topicPartition, dir)).findFirst();
|
||||
if (!sourceDir.isPresent()) {
|
||||
if (sourceDir.isEmpty()) {
|
||||
throw new IllegalArgumentException("No log dir with topic partition:" + topicPartition + " in this broker id:" + brokerId);
|
||||
}
|
||||
Optional<File> targetDir = localStorage.get().getBrokerStorageDirectories().stream().filter(dir -> !localStorage.get().dirContainsTopicPartition(topicPartition, dir)).findFirst();
|
||||
if (!targetDir.isPresent()) {
|
||||
if (targetDir.isEmpty()) {
|
||||
throw new IllegalArgumentException("No log dir without topic partition:" + topicPartition + " in this broker id:" + brokerId);
|
||||
}
|
||||
|
||||
|
|
|
@ -100,7 +100,7 @@ public final class ConsumeAction implements TieredStorageTestAction {
|
|||
.filter(record -> record.offset() >= fetchOffset)
|
||||
.findFirst();
|
||||
|
||||
if (!firstExpectedRecordOpt.isPresent()) {
|
||||
if (firstExpectedRecordOpt.isEmpty()) {
|
||||
// If no records could be found in the second-tier storage, no record would be consumed from that storage.
|
||||
if (expectedFromSecondTierCount > 0) {
|
||||
fail("Could not find any record with offset >= " + fetchOffset + " from tier storage.");
|
||||
|
|
|
@ -181,7 +181,7 @@ public final class BrokerLocalStorage {
|
|||
.filter(filename -> filename.endsWith(LogFileUtils.LOG_FILE_SUFFIX))
|
||||
.sorted()
|
||||
.findFirst();
|
||||
if (!firstLogFile.isPresent()) {
|
||||
if (firstLogFile.isEmpty()) {
|
||||
throw new IllegalArgumentException(String.format(
|
||||
"[BrokerId=%d] No log file found for the topic-partition %s", brokerId, topicPartition));
|
||||
}
|
||||
|
|
|
@ -379,10 +379,10 @@ public class AdjustStreamThreadCountTest {
|
|||
executor.execute(() -> {
|
||||
try {
|
||||
for (int i = 0; i < loop + 1; i++) {
|
||||
if (!kafkaStreams.addStreamThread().isPresent())
|
||||
if (kafkaStreams.addStreamThread().isEmpty())
|
||||
throw new RuntimeException("failed to create stream thread");
|
||||
kafkaStreams.metadataForLocalThreads();
|
||||
if (!kafkaStreams.removeStreamThread().isPresent())
|
||||
if (kafkaStreams.removeStreamThread().isEmpty())
|
||||
throw new RuntimeException("failed to delete a stream thread");
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
|
|
|
@ -19,10 +19,8 @@ package org.apache.kafka.streams.integration.utils;
|
|||
import org.apache.kafka.streams.KafkaStreams.State;
|
||||
import org.apache.kafka.streams.KafkaStreams.StateListener;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
|
@ -39,7 +37,7 @@ public class CompositeStateListener implements StateListener {
|
|||
}
|
||||
|
||||
public CompositeStateListener(final Collection<StateListener> stateListeners) {
|
||||
this.listeners = Collections.unmodifiableList(new ArrayList<>(stateListeners));
|
||||
this.listeners = List.copyOf(stateListeners);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1222,34 +1222,27 @@ public class StreamsConfig extends AbstractConfig {
|
|||
|
||||
// this is the list of configs for underlying clients
|
||||
// that streams prefer different default values
|
||||
private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES;
|
||||
static {
|
||||
final Map<String, Object> tempProducerDefaultOverrides = new HashMap<>();
|
||||
tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, "100");
|
||||
PRODUCER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides);
|
||||
}
|
||||
private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES = Map.of(ProducerConfig.LINGER_MS_CONFIG, "100");
|
||||
|
||||
private static final Map<String, Object> PRODUCER_EOS_OVERRIDES;
|
||||
static {
|
||||
final Map<String, Object> tempProducerDefaultOverrides = new HashMap<>(PRODUCER_DEFAULT_OVERRIDES);
|
||||
tempProducerDefaultOverrides.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);
|
||||
tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
|
||||
// Reduce the transaction timeout for quicker pending offset expiration on broker side.
|
||||
tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, DEFAULT_TRANSACTION_TIMEOUT);
|
||||
|
||||
tempProducerDefaultOverrides.putAll(Map.of(
|
||||
ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE,
|
||||
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true,
|
||||
// Reduce the transaction timeout for quicker pending offset expiration on broker side.
|
||||
ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, DEFAULT_TRANSACTION_TIMEOUT
|
||||
));
|
||||
PRODUCER_EOS_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides);
|
||||
}
|
||||
|
||||
private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES;
|
||||
static {
|
||||
final Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>();
|
||||
tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
|
||||
tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
|
||||
tempConsumerDefaultOverrides.put("internal.leave.group.on.close", false);
|
||||
tempConsumerDefaultOverrides.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic");
|
||||
CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides);
|
||||
}
|
||||
private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES = Map.of(
|
||||
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000",
|
||||
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
|
||||
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
|
||||
"internal.leave.group.on.close", false,
|
||||
ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic"
|
||||
);
|
||||
|
||||
private static final Map<String, Object> CONSUMER_EOS_OVERRIDES;
|
||||
static {
|
||||
|
@ -1258,12 +1251,8 @@ public class StreamsConfig extends AbstractConfig {
|
|||
CONSUMER_EOS_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides);
|
||||
}
|
||||
|
||||
private static final Map<String, Object> ADMIN_CLIENT_OVERRIDES;
|
||||
static {
|
||||
final Map<String, Object> tempAdminClientDefaultOverrides = new HashMap<>();
|
||||
tempAdminClientDefaultOverrides.put(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, true);
|
||||
ADMIN_CLIENT_OVERRIDES = Collections.unmodifiableMap(tempAdminClientDefaultOverrides);
|
||||
}
|
||||
private static final Map<String, Object> ADMIN_CLIENT_OVERRIDES =
|
||||
Map.of(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, true);
|
||||
|
||||
public static class InternalConfig {
|
||||
// This is settable in the main Streams config, but it's a private API for now
|
||||
|
|
|
@ -171,7 +171,7 @@ abstract class KStreamKStreamJoin<K, VLeft, VRight, VOut, VThis, VOther> impleme
|
|||
//
|
||||
// This condition below allows us to process the out-of-order records without the need
|
||||
// to hold it in the temporary outer store
|
||||
if (!outerJoinStore.isPresent() || timeTo < sharedTimeTracker.streamTime) {
|
||||
if (outerJoinStore.isEmpty() || timeTo < sharedTimeTracker.streamTime) {
|
||||
context().forward(record.withValue(joiner.apply(record.key(), record.value(), null)));
|
||||
} else {
|
||||
sharedTimeTracker.updatedMinTime(inputRecordTimestamp);
|
||||
|
|
|
@ -1120,7 +1120,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
|
|||
}
|
||||
|
||||
private final Function<Optional<Set<Integer>>, Optional<Set<Integer>>> getPartition = maybeMulticastPartitions -> {
|
||||
if (!maybeMulticastPartitions.isPresent()) {
|
||||
if (maybeMulticastPartitions.isEmpty()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
if (maybeMulticastPartitions.get().size() != 1) {
|
||||
|
|
|
@ -657,12 +657,12 @@ public final class TaskAssignmentUtils {
|
|||
return false;
|
||||
}
|
||||
|
||||
if (!assignmentConfigs.rackAwareTrafficCost().isPresent()) {
|
||||
if (assignmentConfigs.rackAwareTrafficCost().isEmpty()) {
|
||||
LOG.warn("Rack aware task assignment optimization unavailable: must configure {}", StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!assignmentConfigs.rackAwareNonOverlapCost().isPresent()) {
|
||||
if (assignmentConfigs.rackAwareNonOverlapCost().isEmpty()) {
|
||||
LOG.warn("Rack aware task assignment optimization unavailable: must configure {}", StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG);
|
||||
return false;
|
||||
}
|
||||
|
@ -695,7 +695,7 @@ public final class TaskAssignmentUtils {
|
|||
}
|
||||
|
||||
private static boolean hasValidRackInformation(final KafkaStreamsState state) {
|
||||
if (!state.rackId().isPresent()) {
|
||||
if (state.rackId().isEmpty()) {
|
||||
LOG.error("KafkaStreams client {} doesn't have a rack id configured.", state.processId().id());
|
||||
return false;
|
||||
}
|
||||
|
@ -710,7 +710,7 @@ public final class TaskAssignmentUtils {
|
|||
|
||||
for (final TaskTopicPartition topicPartition : topicPartitions) {
|
||||
final Optional<Set<String>> racks = topicPartition.rackIds();
|
||||
if (!racks.isPresent() || racks.get().isEmpty()) {
|
||||
if (racks.isEmpty() || racks.get().isEmpty()) {
|
||||
LOG.error("Topic partition {} for task {} does not have racks configured.", topicPartition, task.id());
|
||||
return false;
|
||||
}
|
||||
|
@ -1043,4 +1043,4 @@ public final class TaskAssignmentUtils {
|
|||
this.tagEntryToClients = tagEntryToClients;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -926,21 +926,21 @@ public class DefaultStateUpdater implements StateUpdater {
|
|||
|
||||
public Set<StandbyTask> updatingStandbyTasks() {
|
||||
return stateUpdaterThread != null
|
||||
? Collections.unmodifiableSet(new HashSet<>(stateUpdaterThread.updatingStandbyTasks()))
|
||||
? Set.copyOf(stateUpdaterThread.updatingStandbyTasks())
|
||||
: Collections.emptySet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Task> updatingTasks() {
|
||||
return stateUpdaterThread != null
|
||||
? Collections.unmodifiableSet(new HashSet<>(stateUpdaterThread.updatingTasks()))
|
||||
? Set.copyOf(stateUpdaterThread.updatingTasks())
|
||||
: Collections.emptySet();
|
||||
}
|
||||
|
||||
public Set<StreamTask> restoredActiveTasks() {
|
||||
restoredActiveTasksLock.lock();
|
||||
try {
|
||||
return Collections.unmodifiableSet(new HashSet<>(restoredActiveTasks));
|
||||
return Set.copyOf(restoredActiveTasks);
|
||||
} finally {
|
||||
restoredActiveTasksLock.unlock();
|
||||
}
|
||||
|
@ -949,19 +949,19 @@ public class DefaultStateUpdater implements StateUpdater {
|
|||
public List<ExceptionAndTask> exceptionsAndFailedTasks() {
|
||||
exceptionsAndFailedTasksLock.lock();
|
||||
try {
|
||||
return Collections.unmodifiableList(new ArrayList<>(exceptionsAndFailedTasks));
|
||||
return List.copyOf(exceptionsAndFailedTasks);
|
||||
} finally {
|
||||
exceptionsAndFailedTasksLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public Set<Task> removedTasks() {
|
||||
return Collections.unmodifiableSet(new HashSet<>(removedTasks));
|
||||
return Set.copyOf(removedTasks);
|
||||
}
|
||||
|
||||
public Set<Task> pausedTasks() {
|
||||
return stateUpdaterThread != null
|
||||
? Collections.unmodifiableSet(new HashSet<>(stateUpdaterThread.pausedTasks()))
|
||||
? Set.copyOf(stateUpdaterThread.pausedTasks())
|
||||
: Collections.emptySet();
|
||||
}
|
||||
|
||||
|
|
|
@ -659,7 +659,7 @@ public class InternalTopicManager {
|
|||
final Set<String> topicsToCreate = new HashSet<>();
|
||||
for (final String topicName : topicsToValidate) {
|
||||
final Optional<Integer> numberOfPartitions = topicsMap.get(topicName).numberOfPartitions();
|
||||
if (!numberOfPartitions.isPresent()) {
|
||||
if (numberOfPartitions.isEmpty()) {
|
||||
log.error("Found undefined number of partitions for topic {}", topicName);
|
||||
throw new StreamsException("Topic " + topicName + " number of partitions not defined");
|
||||
}
|
||||
|
|
|
@ -1459,7 +1459,7 @@ public class InternalTopologyBuilder {
|
|||
}
|
||||
}
|
||||
final Set<Set<String>> uniqueCopartitionGroups = new HashSet<>(topicsToCopartitionGroup.values());
|
||||
return Collections.unmodifiableList(new ArrayList<>(uniqueCopartitionGroups));
|
||||
return List.copyOf(uniqueCopartitionGroups);
|
||||
}
|
||||
|
||||
private List<String> maybeDecorateInternalSourceTopics(final Collection<String> sourceTopics) {
|
||||
|
|
|
@ -160,7 +160,7 @@ public class RecordCollectorImpl implements RecordCollector {
|
|||
}
|
||||
if (!partitions.isEmpty()) {
|
||||
final Optional<Set<Integer>> maybeMulticastPartitions = partitioner.partitions(topic, key, value, partitions.size());
|
||||
if (!maybeMulticastPartitions.isPresent()) {
|
||||
if (maybeMulticastPartitions.isEmpty()) {
|
||||
// A null//empty partition indicates we should use the default partitioner
|
||||
send(topic, key, value, headers, null, timestamp, keySerializer, valueSerializer, processorNodeId, context);
|
||||
} else {
|
||||
|
@ -570,7 +570,7 @@ public class RecordCollectorImpl implements RecordCollector {
|
|||
|
||||
@Override
|
||||
public Map<TopicPartition, Long> offsets() {
|
||||
return Collections.unmodifiableMap(new HashMap<>(offsets));
|
||||
return Map.copyOf(offsets);
|
||||
}
|
||||
|
||||
private void checkForException() {
|
||||
|
|
|
@ -217,7 +217,7 @@ public class RepartitionTopics {
|
|||
final Optional<Integer> repartitionSourceTopicPartitionCount =
|
||||
repartitionTopicMetadata.get(repartitionSourceTopic).numberOfPartitions();
|
||||
|
||||
if (!repartitionSourceTopicPartitionCount.isPresent()) {
|
||||
if (repartitionSourceTopicPartitionCount.isEmpty()) {
|
||||
final Integer numPartitions = computePartitionCount(
|
||||
repartitionTopicMetadata,
|
||||
topicGroups,
|
||||
|
|
|
@ -742,7 +742,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
|
|||
}
|
||||
final boolean readyToProcess = partitionGroup.readyToProcess(wallClockTime);
|
||||
if (!readyToProcess) {
|
||||
if (!timeCurrentIdlingStarted.isPresent()) {
|
||||
if (timeCurrentIdlingStarted.isEmpty()) {
|
||||
timeCurrentIdlingStarted = Optional.of(wallClockTime);
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -463,7 +463,7 @@ public class StreamsMetadataState {
|
|||
}
|
||||
|
||||
private final Function<Optional<Set<Integer>>, Integer> getPartition = maybeMulticastPartitions -> {
|
||||
if (!maybeMulticastPartitions.isPresent()) {
|
||||
if (maybeMulticastPartitions.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
if (maybeMulticastPartitions.get().size() != 1) {
|
||||
|
|
|
@ -99,7 +99,7 @@ public class DefaultKafkaStreamsState implements KafkaStreamsState {
|
|||
|
||||
@Override
|
||||
public long lagFor(final TaskId task) {
|
||||
if (!taskLagTotals.isPresent()) {
|
||||
if (taskLagTotals.isEmpty()) {
|
||||
LOG.error("lagFor was called on a KafkaStreamsState {} that does not support lag computations.", processId);
|
||||
throw new UnsupportedOperationException("Lag computation was not requested for KafkaStreamsState with process " + processId);
|
||||
}
|
||||
|
@ -115,7 +115,7 @@ public class DefaultKafkaStreamsState implements KafkaStreamsState {
|
|||
|
||||
@Override
|
||||
public SortedSet<TaskId> prevTasksByLag(final String consumerClientId) {
|
||||
if (!taskLagTotals.isPresent()) {
|
||||
if (taskLagTotals.isEmpty()) {
|
||||
LOG.error("prevTasksByLag was called on a KafkaStreamsState {} that does not support lag computations.", processId);
|
||||
throw new UnsupportedOperationException("Lag computation was not requested for KafkaStreamsState with process " + processId);
|
||||
}
|
||||
|
@ -139,7 +139,7 @@ public class DefaultKafkaStreamsState implements KafkaStreamsState {
|
|||
|
||||
@Override
|
||||
public Map<TaskId, Long> statefulTasksToLagSums() {
|
||||
if (!taskLagTotals.isPresent()) {
|
||||
if (taskLagTotals.isEmpty()) {
|
||||
LOG.error("statefulTasksToLagSums was called on a KafkaStreamsState {} that does not support lag computations.", processId);
|
||||
throw new UnsupportedOperationException("Lag computation was not requested for KafkaStreamsState with process " + processId);
|
||||
}
|
||||
|
|
|
@ -241,7 +241,7 @@ public class RackAwareTaskAssignor {
|
|||
final ProcessId processId = entry.getKey();
|
||||
KeyValue<String, String> previousRackInfo = null;
|
||||
for (final Map.Entry<String, Optional<String>> rackEntry : entry.getValue().entrySet()) {
|
||||
if (!rackEntry.getValue().isPresent()) {
|
||||
if (rackEntry.getValue().isEmpty()) {
|
||||
if (!StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy())) {
|
||||
log.error(
|
||||
String.format("RackId doesn't exist for process %s and consumer %s",
|
||||
|
|
|
@ -215,7 +215,7 @@ public class KafkaStreamsNamedTopologyWrapper extends KafkaStreams {
|
|||
removeTopologyFuture.completeExceptionally(
|
||||
new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state)
|
||||
);
|
||||
} else if (!getTopologyByName(topologyToRemove).isPresent()) {
|
||||
} else if (getTopologyByName(topologyToRemove).isEmpty()) {
|
||||
log.error("Attempted to remove unknown topology {}. This application currently contains the"
|
||||
+ "following topologies: {}.", topologyToRemove, topologyMetadata.namedTopologiesView()
|
||||
);
|
||||
|
@ -431,7 +431,7 @@ public class KafkaStreamsNamedTopologyWrapper extends KafkaStreams {
|
|||
* See {@link KafkaStreams#allLocalStorePartitionLags()}
|
||||
*/
|
||||
public Map<String, Map<Integer, LagInfo>> allLocalStorePartitionLagsForTopology(final String topologyName) {
|
||||
if (!getTopologyByName(topologyName).isPresent()) {
|
||||
if (getTopologyByName(topologyName).isEmpty()) {
|
||||
log.error("Can't get local store partition lags since topology {} does not exist in this application",
|
||||
topologyName);
|
||||
throw new UnknownTopologyException("Can't get local store partition lags", topologyName);
|
||||
|
|
|
@ -18,8 +18,6 @@ package org.apache.kafka.streams.query;
|
|||
|
||||
import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -102,7 +100,7 @@ public class StateQueryRequest<R> {
|
|||
return new StateQueryRequest<>(
|
||||
storeName,
|
||||
position,
|
||||
Optional.of(Collections.unmodifiableSet(new HashSet<>(partitions))),
|
||||
Optional.of(Set.copyOf(partitions)),
|
||||
query,
|
||||
executionInfoEnabled,
|
||||
requireActive
|
||||
|
@ -166,7 +164,7 @@ public class StateQueryRequest<R> {
|
|||
* Whether this request should fetch from all locally available partitions.
|
||||
*/
|
||||
public boolean isAllPartitions() {
|
||||
return !partitions.isPresent();
|
||||
return partitions.isEmpty();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -175,7 +173,7 @@ public class StateQueryRequest<R> {
|
|||
* @throws IllegalStateException if this is a request for all partitions
|
||||
*/
|
||||
public Set<Integer> getPartitions() {
|
||||
if (!partitions.isPresent()) {
|
||||
if (partitions.isEmpty()) {
|
||||
throw new IllegalStateException(
|
||||
"Cannot list partitions of an 'all partitions' request");
|
||||
} else {
|
||||
|
|
|
@ -205,11 +205,11 @@ public final class StoreQueryUtils {
|
|||
final ResultOrder order = rangeQuery.resultOrder();
|
||||
final KeyValueIterator<Bytes, byte[]> iterator;
|
||||
try {
|
||||
if (!lowerRange.isPresent() && !upperRange.isPresent() && !order.equals(ResultOrder.DESCENDING)) {
|
||||
if (lowerRange.isEmpty() && upperRange.isEmpty() && !order.equals(ResultOrder.DESCENDING)) {
|
||||
iterator = kvStore.all();
|
||||
} else if (!order.equals(ResultOrder.DESCENDING)) {
|
||||
iterator = kvStore.range(lowerRange.orElse(null), upperRange.orElse(null));
|
||||
} else if (!lowerRange.isPresent() && !upperRange.isPresent()) {
|
||||
} else if (lowerRange.isEmpty() && upperRange.isEmpty()) {
|
||||
iterator = kvStore.reverseAll();
|
||||
} else {
|
||||
iterator = kvStore.reverseRange(lowerRange.orElse(null), upperRange.orElse(null));
|
||||
|
@ -500,4 +500,4 @@ public final class StoreQueryUtils {
|
|||
printWriter.flush();
|
||||
return stringWriter.toString();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1552,7 +1552,7 @@ public class StreamTaskTest {
|
|||
task.initializeIfNeeded();
|
||||
task.completeRestoration(noOpResetter -> { });
|
||||
|
||||
assertThat("task is not idling", !task.timeCurrentIdlingStarted().isPresent());
|
||||
assertThat("task is not idling", task.timeCurrentIdlingStarted().isEmpty());
|
||||
|
||||
assertFalse(task.process(0L));
|
||||
|
||||
|
@ -1564,7 +1564,7 @@ public class StreamTaskTest {
|
|||
task.addRecords(partition2, singleton(getConsumerRecordWithOffsetAsTimestamp(partition2, 0)));
|
||||
|
||||
assertTrue(task.process(0L));
|
||||
assertThat("task is not idling", !task.timeCurrentIdlingStarted().isPresent());
|
||||
assertThat("task is not idling", task.timeCurrentIdlingStarted().isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1580,7 +1580,7 @@ public class StreamTaskTest {
|
|||
|
||||
task.resume();
|
||||
|
||||
assertThat("task is not idling", !task.timeCurrentIdlingStarted().isPresent());
|
||||
assertThat("task is not idling", task.timeCurrentIdlingStarted().isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -419,7 +419,7 @@ public class MockProcessorContext<KForward, VForward> implements ProcessorContex
|
|||
public List<CapturedForward<? extends KForward, ? extends VForward>> forwarded(final String childName) {
|
||||
final LinkedList<CapturedForward<? extends KForward, ? extends VForward>> result = new LinkedList<>();
|
||||
for (final CapturedForward<? extends KForward, ? extends VForward> capture : capturedForwards) {
|
||||
if (!capture.childName().isPresent() || capture.childName().equals(Optional.of(childName))) {
|
||||
if (capture.childName().isEmpty() || capture.childName().equals(Optional.of(childName))) {
|
||||
result.add(capture);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,7 +30,6 @@ import java.io.File;
|
|||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
@ -119,7 +118,7 @@ public class TestKitNodes {
|
|||
public Builder setPerServerProperties(Map<Integer, Map<String, String>> perServerProperties) {
|
||||
this.perServerProperties = Collections.unmodifiableMap(
|
||||
perServerProperties.entrySet().stream()
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, e -> Collections.unmodifiableMap(new HashMap<>(e.getValue())))));
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, e -> Map.copyOf(e.getValue()))));
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -378,4 +377,4 @@ public class TestKitNodes {
|
|||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,10 +23,7 @@ import org.apache.kafka.server.common.Feature;
|
|||
import org.apache.kafka.server.common.MetadataVersion;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -260,7 +257,7 @@ public class ClusterConfig {
|
|||
private Builder() {}
|
||||
|
||||
public Builder setTypes(Set<Type> types) {
|
||||
this.types = Collections.unmodifiableSet(new HashSet<>(types));
|
||||
this.types = Set.copyOf(types);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -315,44 +312,44 @@ public class ClusterConfig {
|
|||
}
|
||||
|
||||
public Builder setServerProperties(Map<String, String> serverProperties) {
|
||||
this.serverProperties = Collections.unmodifiableMap(new HashMap<>(serverProperties));
|
||||
this.serverProperties = Map.copyOf(serverProperties);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setConsumerProperties(Map<String, String> consumerProperties) {
|
||||
this.consumerProperties = Collections.unmodifiableMap(new HashMap<>(consumerProperties));
|
||||
this.consumerProperties = Map.copyOf(consumerProperties);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setProducerProperties(Map<String, String> producerProperties) {
|
||||
this.producerProperties = Collections.unmodifiableMap(new HashMap<>(producerProperties));
|
||||
this.producerProperties = Map.copyOf(producerProperties);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setAdminClientProperties(Map<String, String> adminClientProperties) {
|
||||
this.adminClientProperties = Collections.unmodifiableMap(new HashMap<>(adminClientProperties));
|
||||
this.adminClientProperties = Map.copyOf(adminClientProperties);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setSaslServerProperties(Map<String, String> saslServerProperties) {
|
||||
this.saslServerProperties = Collections.unmodifiableMap(new HashMap<>(saslServerProperties));
|
||||
this.saslServerProperties = Map.copyOf(saslServerProperties);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setSaslClientProperties(Map<String, String> saslClientProperties) {
|
||||
this.saslClientProperties = Collections.unmodifiableMap(new HashMap<>(saslClientProperties));
|
||||
this.saslClientProperties = Map.copyOf(saslClientProperties);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setPerServerProperties(Map<Integer, Map<String, String>> perServerProperties) {
|
||||
this.perServerProperties = Collections.unmodifiableMap(
|
||||
perServerProperties.entrySet().stream()
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, e -> Collections.unmodifiableMap(new HashMap<>(e.getValue())))));
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, e -> Map.copyOf(e.getValue()))));
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setTags(List<String> tags) {
|
||||
this.tags = Collections.unmodifiableList(new ArrayList<>(tags));
|
||||
this.tags = List.copyOf(tags);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -50,7 +50,7 @@ public class ClusterInstanceParameterResolver implements ParameterResolver {
|
|||
return false;
|
||||
}
|
||||
|
||||
if (!extensionContext.getTestMethod().isPresent()) {
|
||||
if (extensionContext.getTestMethod().isEmpty()) {
|
||||
// Allow this to be injected into the class
|
||||
extensionContext.getRequiredTestClass();
|
||||
return true;
|
||||
|
|
|
@ -32,7 +32,6 @@ import org.junit.platform.commons.util.ReflectionUtils;
|
|||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -102,10 +101,9 @@ public class ClusterTestExtensions implements TestTemplateInvocationContextProvi
|
|||
private static final String PROCESS_REAPER_THREAD_PREFIX = "process reaper";
|
||||
private static final String RMI_THREAD_PREFIX = "RMI";
|
||||
private static final String DETECT_THREAD_LEAK_KEY = "detectThreadLeak";
|
||||
private static final Set<String> SKIPPED_THREAD_PREFIX = Collections.unmodifiableSet(Stream.of(
|
||||
METRICS_METER_TICK_THREAD_PREFIX, SCALA_THREAD_PREFIX, FORK_JOIN_POOL_THREAD_PREFIX, JUNIT_THREAD_PREFIX,
|
||||
ATTACH_LISTENER_THREAD_PREFIX, PROCESS_REAPER_THREAD_PREFIX, RMI_THREAD_PREFIX, SystemTimer.SYSTEM_TIMER_THREAD_PREFIX)
|
||||
.collect(Collectors.toSet()));
|
||||
private static final Set<String> SKIPPED_THREAD_PREFIX = Set.of(METRICS_METER_TICK_THREAD_PREFIX, SCALA_THREAD_PREFIX,
|
||||
FORK_JOIN_POOL_THREAD_PREFIX, JUNIT_THREAD_PREFIX, ATTACH_LISTENER_THREAD_PREFIX, PROCESS_REAPER_THREAD_PREFIX,
|
||||
RMI_THREAD_PREFIX, SystemTimer.SYSTEM_TIMER_THREAD_PREFIX);
|
||||
|
||||
@Override
|
||||
public boolean supportsTestTemplate(ExtensionContext context) {
|
||||
|
|
|
@ -208,7 +208,7 @@ public class JmxTool {
|
|||
List<ObjectName> queries,
|
||||
Set<ObjectName> found) throws Exception {
|
||||
Map<ObjectName, Integer> result = new HashMap<>();
|
||||
if (!attributesInclude.isPresent()) {
|
||||
if (attributesInclude.isEmpty()) {
|
||||
found.forEach(objectName -> {
|
||||
try {
|
||||
MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName);
|
||||
|
|
|
@ -102,7 +102,7 @@ public class LeaderElectionCommand {
|
|||
* The validate function should be checking that this option is required if the --topic and --path-to-json-file
|
||||
* are not specified.
|
||||
*/
|
||||
Optional<Set<TopicPartition>> topicPartitions = jsonFileTopicPartitions.map(Optional::of).orElse(singleTopicPartition);
|
||||
Optional<Set<TopicPartition>> topicPartitions = jsonFileTopicPartitions.or(() -> singleTopicPartition);
|
||||
|
||||
Properties props = new Properties();
|
||||
if (commandOptions.hasAdminClientConfig()) {
|
||||
|
|
|
@ -216,10 +216,10 @@ public class MetadataQuorumCommand {
|
|||
String status,
|
||||
boolean humanReadable) {
|
||||
return infos.map(info -> {
|
||||
String lastFetchTimestamp = !info.lastFetchTimestamp().isPresent() ? "-1" :
|
||||
String lastFetchTimestamp = info.lastFetchTimestamp().isEmpty() ? "-1" :
|
||||
humanReadable ? format("%d ms ago", relativeTimeMs(info.lastFetchTimestamp().getAsLong(), "last fetch")) :
|
||||
valueOf(info.lastFetchTimestamp().getAsLong());
|
||||
String lastCaughtUpTimestamp = !info.lastCaughtUpTimestamp().isPresent() ? "-1" :
|
||||
String lastCaughtUpTimestamp = info.lastCaughtUpTimestamp().isEmpty() ? "-1" :
|
||||
humanReadable ? format("%d ms ago", relativeTimeMs(info.lastCaughtUpTimestamp().getAsLong(), "last caught up")) :
|
||||
valueOf(info.lastCaughtUpTimestamp().getAsLong());
|
||||
return Stream.of(
|
||||
|
@ -382,7 +382,7 @@ public class MetadataQuorumCommand {
|
|||
if (metaProperties == null) {
|
||||
throw new TerseException("Unable to read meta.properties from " + metadataDirectory);
|
||||
}
|
||||
if (!metaProperties.directoryId().isPresent()) {
|
||||
if (metaProperties.directoryId().isEmpty()) {
|
||||
throw new TerseException("No directory id found in " + metadataDirectory);
|
||||
}
|
||||
return metaProperties.directoryId().get();
|
||||
|
|
|
@ -472,7 +472,7 @@ public class ReplicaVerificationTool {
|
|||
if (batch.lastOffset() >= fetchResponsePerReplica.get(replicaId).highWatermark()) {
|
||||
isMessageInAllReplicas = false;
|
||||
} else {
|
||||
if (!messageInfoFromFirstReplicaOpt.isPresent()) {
|
||||
if (messageInfoFromFirstReplicaOpt.isEmpty()) {
|
||||
messageInfoFromFirstReplicaOpt = Optional.of(
|
||||
new MessageInfo(replicaId, batch.lastOffset(), batch.nextOffset(), batch.checksum())
|
||||
);
|
||||
|
|
|
@ -175,7 +175,7 @@ public abstract class TransactionsCommand {
|
|||
})
|
||||
.findFirst();
|
||||
|
||||
if (!foundProducerState.isPresent()) {
|
||||
if (foundProducerState.isEmpty()) {
|
||||
printErrorAndExit("Could not find any open transactions starting at offset " +
|
||||
startOffset + " on partition " + topicPartition);
|
||||
return null;
|
||||
|
@ -543,14 +543,14 @@ public abstract class TransactionsCommand {
|
|||
Optional<Integer> brokerId = Optional.ofNullable(ns.getInt("broker_id"));
|
||||
Optional<String> topic = Optional.ofNullable(ns.getString("topic"));
|
||||
|
||||
if (!topic.isPresent() && !brokerId.isPresent()) {
|
||||
if (topic.isEmpty() && brokerId.isEmpty()) {
|
||||
printErrorAndExit("The `find-hanging` command requires either --topic " +
|
||||
"or --broker-id to limit the scope of the search");
|
||||
return;
|
||||
}
|
||||
|
||||
Optional<Integer> partition = Optional.ofNullable(ns.getInt("partition"));
|
||||
if (partition.isPresent() && !topic.isPresent()) {
|
||||
if (partition.isPresent() && topic.isEmpty()) {
|
||||
printErrorAndExit("The --partition argument requires --topic to be provided");
|
||||
return;
|
||||
}
|
||||
|
@ -767,7 +767,7 @@ public abstract class TransactionsCommand {
|
|||
Map<String, TopicDescription> topicDescriptions = admin.describeTopics(topics).allTopicNames().get();
|
||||
topicDescriptions.forEach((topic, description) -> {
|
||||
description.partitions().forEach(partitionInfo -> {
|
||||
if (!brokerId.isPresent() || hasReplica(brokerId.get(), partitionInfo)) {
|
||||
if (brokerId.isEmpty() || hasReplica(brokerId.get(), partitionInfo)) {
|
||||
topicPartitions.add(new TopicPartition(topic, partitionInfo.partition()));
|
||||
}
|
||||
});
|
||||
|
@ -1017,7 +1017,7 @@ public abstract class TransactionsCommand {
|
|||
.filter(cmd -> cmd.name().equals(commandName))
|
||||
.findFirst();
|
||||
|
||||
if (!commandOpt.isPresent()) {
|
||||
if (commandOpt.isEmpty()) {
|
||||
printErrorAndExit("Unexpected command " + commandName);
|
||||
}
|
||||
|
||||
|
|
|
@ -187,7 +187,7 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions {
|
|||
|
||||
private void checkRequiredArgs() {
|
||||
List<Optional<String>> topicOrFilterArgs = new ArrayList<>(Arrays.asList(topicArg(), includedTopicsArg()));
|
||||
topicOrFilterArgs.removeIf(arg -> !arg.isPresent());
|
||||
topicOrFilterArgs.removeIf(arg -> arg.isEmpty());
|
||||
// user need to specify value for either --topic or --include options)
|
||||
if (topicOrFilterArgs.size() != 1) {
|
||||
CommandLineUtils.printUsageAndExit(parser, "Exactly one of --include/--topic is required. ");
|
||||
|
|
|
@ -679,7 +679,7 @@ public class ReassignPartitionsCommand {
|
|||
: new BrokerMetadata(node.id(), Optional.empty())
|
||||
).collect(Collectors.toList());
|
||||
|
||||
long numRackless = results.stream().filter(m -> !m.rack.isPresent()).count();
|
||||
long numRackless = results.stream().filter(m -> m.rack.isEmpty()).count();
|
||||
if (enableRackAwareness && numRackless != 0 && numRackless != results.size()) {
|
||||
throw new AdminOperationException("Not all brokers have rack information. Add " +
|
||||
"--disable-rack-aware in command line to make replica assignment without rack " +
|
||||
|
|
|
@ -463,7 +463,7 @@ public class DescribeConsumerGroupTest {
|
|||
return false;
|
||||
|
||||
Optional<PartitionAssignmentState> maybePartitionState = assignments.get().stream().filter(isGrp).findFirst();
|
||||
if (!maybePartitionState.isPresent())
|
||||
if (maybePartitionState.isEmpty())
|
||||
return false;
|
||||
|
||||
PartitionAssignmentState partitionState = maybePartitionState.get();
|
||||
|
@ -837,7 +837,7 @@ public class DescribeConsumerGroupTest {
|
|||
res.getValue().isPresent() &&
|
||||
res.getValue().get().stream().filter(s -> Objects.equals(s.group, group)).count() == 2 &&
|
||||
res.getValue().get().stream().filter(x -> Objects.equals(x.group, group) && x.partition.isPresent()).count() == 2 &&
|
||||
res.getValue().get().stream().noneMatch(x -> Objects.equals(x.group, group) && !x.partition.isPresent());
|
||||
res.getValue().get().stream().noneMatch(x -> Objects.equals(x.group, group) && x.partition.isEmpty());
|
||||
}, "Expected two rows (one row per consumer) in describe group results.");
|
||||
}
|
||||
}
|
||||
|
@ -1031,7 +1031,7 @@ public class DescribeConsumerGroupTest {
|
|||
return false;
|
||||
|
||||
Optional<PartitionAssignmentState> maybeAssignmentState = groupOffsets.getValue().get().stream().filter(isGrp).findFirst();
|
||||
if (!maybeAssignmentState.isPresent())
|
||||
if (maybeAssignmentState.isEmpty())
|
||||
return false;
|
||||
|
||||
PartitionAssignmentState assignmentState = maybeAssignmentState.get();
|
||||
|
|
|
@ -16,8 +16,6 @@
|
|||
*/
|
||||
package org.apache.kafka.trogdor.rest;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
|
@ -36,7 +34,6 @@ public enum TaskStateType {
|
|||
static final String RUNNING_VALUE = "RUNNING";
|
||||
static final String STOPPING_VALUE = "STOPPING";
|
||||
static final String DONE_VALUE = "DONE";
|
||||
public static final List<String> VALUES = Collections.unmodifiableList(
|
||||
Arrays.asList(PENDING_VALUE, RUNNING_VALUE, STOPPING_VALUE, DONE_VALUE));
|
||||
public static final List<String> VALUES = List.of(PENDING_VALUE, RUNNING_VALUE, STOPPING_VALUE, DONE_VALUE);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,7 +24,6 @@ import org.apache.kafka.trogdor.task.TaskWorker;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -58,7 +57,7 @@ public final class ConnectionStressSpec extends TaskSpec {
|
|||
@JsonProperty("action") ConnectionStressAction action) {
|
||||
super(startMs, durationMs);
|
||||
this.clientNodes = (clientNodes == null) ? Collections.emptyList() :
|
||||
Collections.unmodifiableList(new ArrayList<>(clientNodes));
|
||||
List.copyOf(clientNodes);
|
||||
this.bootstrapServers = (bootstrapServers == null) ? "" : bootstrapServers;
|
||||
this.commonClientConf = configOrEmptyMap(commonClientConf);
|
||||
this.targetConnectionsPerSec = targetConnectionsPerSec;
|
||||
|
|
|
@ -272,7 +272,7 @@ public class ExternalCommandWorker implements TaskWorker {
|
|||
while (true) {
|
||||
log.info("{}: stdin writer ready.", id);
|
||||
Optional<JsonNode> node = stdinQueue.take();
|
||||
if (!node.isPresent()) {
|
||||
if (node.isEmpty()) {
|
||||
log.trace("{}: StdinWriter terminating.", id);
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -66,7 +66,7 @@ public class PartitionsSpec extends Message {
|
|||
if (configs == null) {
|
||||
this.configs = Collections.emptyMap();
|
||||
} else {
|
||||
this.configs = Collections.unmodifiableMap(new HashMap<>(configs));
|
||||
this.configs = Map.copyOf(configs);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue