MINOR: New year code cleanup - include final keyword (#15072)

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Sagar Rao <sagarmeansocean@gmail.com>
This commit is contained in:
Divij Vaidya 2024-01-11 17:53:35 +01:00 committed by GitHub
parent 6ff21ee1e0
commit 65424ab484
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
61 changed files with 197 additions and 206 deletions

View File

@ -44,9 +44,9 @@ final class ClusterConnectionStates {
private final Map<String, NodeConnectionState> nodeState; private final Map<String, NodeConnectionState> nodeState;
private final Logger log; private final Logger log;
private final HostResolver hostResolver; private final HostResolver hostResolver;
private Set<String> connectingNodes; private final Set<String> connectingNodes;
private ExponentialBackoff reconnectBackoff; private final ExponentialBackoff reconnectBackoff;
private ExponentialBackoff connectionSetupTimeout; private final ExponentialBackoff connectionSetupTimeout;
public ClusterConnectionStates(long reconnectBackoffMs, long reconnectBackoffMaxMs, public ClusterConnectionStates(long reconnectBackoffMs, long reconnectBackoffMaxMs,
long connectionSetupTimeoutMs, long connectionSetupTimeoutMaxMs, long connectionSetupTimeoutMs, long connectionSetupTimeoutMaxMs,
@ -464,6 +464,9 @@ final class ClusterConnectionStates {
* The state of our connection to a node. * The state of our connection to a node.
*/ */
private static class NodeConnectionState { private static class NodeConnectionState {
private final String host;
private final HostResolver hostResolver;
private final Logger log;
ConnectionState state; ConnectionState state;
AuthenticationException authenticationException; AuthenticationException authenticationException;
@ -476,10 +479,7 @@ final class ClusterConnectionStates {
long throttleUntilTimeMs; long throttleUntilTimeMs;
private List<InetAddress> addresses; private List<InetAddress> addresses;
private int addressIndex; private int addressIndex;
private final String host;
private final HostResolver hostResolver;
private InetAddress lastAttemptedAddress; private InetAddress lastAttemptedAddress;
private Logger log;
private NodeConnectionState(ConnectionState state, long lastConnectAttemptMs, long reconnectBackoffMs, private NodeConnectionState(ConnectionState state, long lastConnectAttemptMs, long reconnectBackoffMs,
long connectionSetupTimeoutMs, String host, HostResolver hostResolver, Logger log) { long connectionSetupTimeoutMs, String host, HostResolver hostResolver, Logger log) {

View File

@ -228,6 +228,8 @@ public class FetchSessionHandler {
} }
public class Builder { public class Builder {
private final Map<Uuid, String> topicNames;
private final boolean copySessionPartitions;
/** /**
* The next partitions which we want to fetch. * The next partitions which we want to fetch.
* *
@ -242,8 +244,6 @@ public class FetchSessionHandler {
* incremental fetch requests (see below). * incremental fetch requests (see below).
*/ */
private LinkedHashMap<TopicPartition, PartitionData> next; private LinkedHashMap<TopicPartition, PartitionData> next;
private Map<Uuid, String> topicNames;
private final boolean copySessionPartitions;
private int partitionsWithoutTopicIds = 0; private int partitionsWithoutTopicIds = 0;
Builder() { Builder() {

View File

@ -30,9 +30,9 @@ import java.util.Map;
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class NewPartitions { public class NewPartitions {
private int totalCount; private final int totalCount;
private List<List<Integer>> newAssignments; private final List<List<Integer>> newAssignments;
private NewPartitions(int totalCount, List<List<Integer>> newAssignments) { private NewPartitions(int totalCount, List<List<Integer>> newAssignments) {
this.totalCount = totalCount; this.totalCount = totalCount;

View File

@ -29,7 +29,7 @@ import java.util.Map;
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class RecordsToDelete { public class RecordsToDelete {
private long offset; private final long offset;
private RecordsToDelete(long offset) { private RecordsToDelete(long offset) {
this.offset = offset; this.offset = offset;

View File

@ -32,7 +32,7 @@ import java.util.Set;
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class RemoveMembersFromConsumerGroupOptions extends AbstractOptions<RemoveMembersFromConsumerGroupOptions> { public class RemoveMembersFromConsumerGroupOptions extends AbstractOptions<RemoveMembersFromConsumerGroupOptions> {
private Set<MemberToRemove> members; private final Set<MemberToRemove> members;
private String reason; private String reason;
public RemoveMembersFromConsumerGroupOptions(Collection<MemberToRemove> members) { public RemoveMembersFromConsumerGroupOptions(Collection<MemberToRemove> members) {

View File

@ -172,8 +172,8 @@ public interface ConsumerPartitionAssignor {
} }
final class Assignment { final class Assignment {
private List<TopicPartition> partitions; private final List<TopicPartition> partitions;
private ByteBuffer userData; private final ByteBuffer userData;
public Assignment(List<TopicPartition> partitions, ByteBuffer userData) { public Assignment(List<TopicPartition> partitions, ByteBuffer userData) {
this.partitions = partitions; this.partitions = partitions;

View File

@ -101,7 +101,7 @@ public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
@Override @Override
public Iterator<ConsumerRecord<K, V>> iterator() { public Iterator<ConsumerRecord<K, V>> iterator() {
return new AbstractIterator<ConsumerRecord<K, V>>() { return new AbstractIterator<ConsumerRecord<K, V>>() {
Iterator<? extends Iterable<ConsumerRecord<K, V>>> iters = iterables.iterator(); final Iterator<? extends Iterable<ConsumerRecord<K, V>>> iters = iterables.iterator();
Iterator<ConsumerRecord<K, V>> current; Iterator<ConsumerRecord<K, V>> current;
protected ConsumerRecord<K, V> makeNext() { protected ConsumerRecord<K, V> makeNext() {

View File

@ -64,11 +64,12 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
private final Map<TopicPartition, OffsetAndMetadata> committed; private final Map<TopicPartition, OffsetAndMetadata> committed;
private final Queue<Runnable> pollTasks; private final Queue<Runnable> pollTasks;
private final Set<TopicPartition> paused; private final Set<TopicPartition> paused;
private final AtomicBoolean wakeup;
private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
private Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
private KafkaException pollException; private KafkaException pollException;
private KafkaException offsetsException; private KafkaException offsetsException;
private AtomicBoolean wakeup;
private Duration lastPollTimeout; private Duration lastPollTimeout;
private boolean closed; private boolean closed;
private boolean shouldRebalance; private boolean shouldRebalance;

View File

@ -111,17 +111,15 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
// this collection must be thread-safe because it is modified from the response handler // this collection must be thread-safe because it is modified from the response handler
// of offset commit requests, which may be invoked from the heartbeat thread // of offset commit requests, which may be invoked from the heartbeat thread
private final ConcurrentLinkedQueue<OffsetCommitCompletion> completedOffsetCommits; private final ConcurrentLinkedQueue<OffsetCommitCompletion> completedOffsetCommits;
private final AtomicBoolean asyncCommitFenced;
private final boolean throwOnFetchStableOffsetsUnsupported;
private final Optional<String> rackId;
private boolean isLeader = false; private boolean isLeader = false;
private Set<String> joinedSubscription; private Set<String> joinedSubscription;
private MetadataSnapshot metadataSnapshot; private MetadataSnapshot metadataSnapshot;
private MetadataSnapshot assignmentSnapshot; private MetadataSnapshot assignmentSnapshot;
private Timer nextAutoCommitTimer; private Timer nextAutoCommitTimer;
private AtomicBoolean asyncCommitFenced;
private ConsumerGroupMetadata groupMetadata; private ConsumerGroupMetadata groupMetadata;
private final boolean throwOnFetchStableOffsetsUnsupported;
private final Optional<String> rackId;
// hold onto request&future for committed offset requests to enable async calls. // hold onto request&future for committed offset requests to enable async calls.
private PendingCommittedOffsetRequest pendingCommittedOffsetRequest = null; private PendingCommittedOffsetRequest pendingCommittedOffsetRequest = null;

View File

@ -36,11 +36,11 @@ public final class Heartbeat {
private final Timer sessionTimer; private final Timer sessionTimer;
private final Timer pollTimer; private final Timer pollTimer;
private final Logger log; private final Logger log;
private final ExponentialBackoff retryBackoff;
private volatile long lastHeartbeatSend = 0L; private volatile long lastHeartbeatSend = 0L;
private volatile boolean heartbeatInFlight = false; private volatile boolean heartbeatInFlight = false;
private volatile long heartbeatAttempts = 0L; private volatile long heartbeatAttempts = 0L;
private ExponentialBackoff retryBackoff;
public Heartbeat(GroupRebalanceConfig config, public Heartbeat(GroupRebalanceConfig config,
Time time) { Time time) {

View File

@ -58,10 +58,11 @@ public class MockProducer<K, V> implements Producer<K, V> {
private final Deque<Completion> completions; private final Deque<Completion> completions;
private final Map<TopicPartition, Long> offsets; private final Map<TopicPartition, Long> offsets;
private final List<Map<String, Map<TopicPartition, OffsetAndMetadata>>> consumerGroupOffsets; private final List<Map<String, Map<TopicPartition, OffsetAndMetadata>>> consumerGroupOffsets;
private Map<String, Map<TopicPartition, OffsetAndMetadata>> uncommittedConsumerGroupOffsets; private final Map<MetricName, Metric> mockMetrics;
private final Serializer<K> keySerializer; private final Serializer<K> keySerializer;
private final Serializer<V> valueSerializer; private final Serializer<V> valueSerializer;
private boolean autoComplete; private final boolean autoComplete;
private Map<String, Map<TopicPartition, OffsetAndMetadata>> uncommittedConsumerGroupOffsets;
private boolean closed; private boolean closed;
private boolean transactionInitialized; private boolean transactionInitialized;
private boolean transactionInFlight; private boolean transactionInFlight;
@ -70,7 +71,6 @@ public class MockProducer<K, V> implements Producer<K, V> {
private boolean producerFenced; private boolean producerFenced;
private boolean sentOffsets; private boolean sentOffsets;
private long commitCount = 0L; private long commitCount = 0L;
private final Map<MetricName, Metric> mockMetrics;
public RuntimeException initTransactionException = null; public RuntimeException initTransactionException = null;
public RuntimeException beginTransactionException = null; public RuntimeException beginTransactionException = null;

View File

@ -25,12 +25,11 @@ import java.nio.charset.StandardCharsets;
public class ErrorLoggingCallback implements Callback { public class ErrorLoggingCallback implements Callback {
private static final Logger log = LoggerFactory.getLogger(ErrorLoggingCallback.class); private static final Logger log = LoggerFactory.getLogger(ErrorLoggingCallback.class);
private String topic; private final String topic;
private byte[] key; private final byte[] key;
private final int valueLength;
private final boolean logAsString;
private byte[] value; private byte[] value;
private int valueLength;
private boolean logAsString;
public ErrorLoggingCallback(String topic, byte[] key, byte[] value, boolean logAsString) { public ErrorLoggingCallback(String topic, byte[] key, byte[] value, boolean logAsString) {
this.topic = topic; this.topic = topic;
this.key = key; this.key = key;

View File

@ -65,7 +65,7 @@ public final class MetricName {
private final String name; private final String name;
private final String group; private final String group;
private final String description; private final String description;
private Map<String, String> tags; private final Map<String, String> tags;
private int hash = 0; private int hash = 0;
/** /**

View File

@ -32,7 +32,7 @@ public class MetricNameTemplate {
private final String name; private final String name;
private final String group; private final String group;
private final String description; private final String description;
private LinkedHashSet<String> tags; private final LinkedHashSet<String> tags;
/** /**
* Create a new template. Note that the order of the tags will be preserved if the supplied * Create a new template. Note that the order of the tags will be preserved if the supplied

View File

@ -22,7 +22,7 @@ import org.apache.kafka.common.utils.Time;
public final class KafkaMetric implements Metric { public final class KafkaMetric implements Metric {
private MetricName metricName; private final MetricName metricName;
private final Object lock; private final Object lock;
private final Time time; private final Time time;
private final MetricValueProvider<?> metricValueProvider; private final MetricValueProvider<?> metricValueProvider;

View File

@ -34,7 +34,7 @@ import org.apache.kafka.common.metrics.MetricConfig;
*/ */
public abstract class SampledStat implements MeasurableStat { public abstract class SampledStat implements MeasurableStat {
private double initialValue; private final double initialValue;
private int current = 0; private int current = 0;
protected List<Sample> samples; protected List<Sample> samples;

View File

@ -87,18 +87,17 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
private final Map<String, LoginManager> loginManagers; private final Map<String, LoginManager> loginManagers;
private final Map<String, Subject> subjects; private final Map<String, Subject> subjects;
private final Supplier<ApiVersionsResponse> apiVersionSupplier; private final Supplier<ApiVersionsResponse> apiVersionSupplier;
private SslFactory sslFactory;
private Map<String, ?> configs;
private final String sslClientAuthOverride; private final String sslClientAuthOverride;
private final Map<String, AuthenticateCallbackHandler> saslCallbackHandlers;
private KerberosShortNamer kerberosShortNamer; private final Map<String, Long> connectionsMaxReauthMsByMechanism;
private Map<String, AuthenticateCallbackHandler> saslCallbackHandlers;
private Map<String, Long> connectionsMaxReauthMsByMechanism;
private final Time time; private final Time time;
private final LogContext logContext; private final LogContext logContext;
private final Logger log; private final Logger log;
private SslFactory sslFactory;
private Map<String, ?> configs;
private KerberosShortNamer kerberosShortNamer;
public SaslChannelBuilder(Mode mode, public SaslChannelBuilder(Mode mode,
Map<String, JaasContext> jaasContexts, Map<String, JaasContext> jaasContexts,
SecurityProtocol securityProtocol, SecurityProtocol securityProtocol,

View File

@ -94,7 +94,7 @@ public class Selector implements Selectable, AutoCloseable {
NOTIFY_ONLY(true), // discard any outstanding receives, notify disconnect NOTIFY_ONLY(true), // discard any outstanding receives, notify disconnect
DISCARD_NO_NOTIFY(false); // discard any outstanding receives, no disconnect notification DISCARD_NO_NOTIFY(false); // discard any outstanding receives, no disconnect notification
boolean notifyDisconnect; final boolean notifyDisconnect;
CloseMode(boolean notifyDisconnect) { CloseMode(boolean notifyDisconnect) {
this.notifyDisconnect = notifyDisconnect; this.notifyDisconnect = notifyDisconnect;

View File

@ -43,11 +43,11 @@ import java.util.function.Supplier;
public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable { public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable {
private final ListenerName listenerName; private final ListenerName listenerName;
private final boolean isInterBrokerListener; private final boolean isInterBrokerListener;
private final Mode mode;
private final Logger log;
private SslFactory sslFactory; private SslFactory sslFactory;
private Mode mode;
private Map<String, ?> configs; private Map<String, ?> configs;
private SslPrincipalMapper sslPrincipalMapper; private SslPrincipalMapper sslPrincipalMapper;
private final Logger log;
/** /**
* Constructs an SSL channel builder. ListenerName is provided only * Constructs an SSL channel builder. ListenerName is provided only

View File

@ -396,17 +396,17 @@ public enum Errors {
private static final Logger log = LoggerFactory.getLogger(Errors.class); private static final Logger log = LoggerFactory.getLogger(Errors.class);
private static Map<Class<?>, Errors> classToError = new HashMap<>(); private static final Map<Class<?>, Errors> CLASS_TO_ERROR = new HashMap<>();
private static Map<Short, Errors> codeToError = new HashMap<>(); private static final Map<Short, Errors> CODE_TO_ERROR = new HashMap<>();
static { static {
for (Errors error : Errors.values()) { for (Errors error : Errors.values()) {
if (codeToError.put(error.code(), error) != null) if (CODE_TO_ERROR.put(error.code(), error) != null)
throw new ExceptionInInitializerError("Code " + error.code() + " for error " + throw new ExceptionInInitializerError("Code " + error.code() + " for error " +
error + " has already been used"); error + " has already been used");
if (error.exception != null) if (error.exception != null)
classToError.put(error.exception.getClass(), error); CLASS_TO_ERROR.put(error.exception.getClass(), error);
} }
} }
@ -479,7 +479,7 @@ public enum Errors {
* Throw the exception if there is one * Throw the exception if there is one
*/ */
public static Errors forCode(short code) { public static Errors forCode(short code) {
Errors error = codeToError.get(code); Errors error = CODE_TO_ERROR.get(code);
if (error != null) { if (error != null) {
return error; return error;
} else { } else {
@ -496,7 +496,7 @@ public enum Errors {
Throwable cause = maybeUnwrapException(t); Throwable cause = maybeUnwrapException(t);
Class<?> clazz = cause.getClass(); Class<?> clazz = cause.getClass();
while (clazz != null) { while (clazz != null) {
Errors error = classToError.get(clazz); Errors error = CLASS_TO_ERROR.get(clazz);
if (error != null) if (error != null)
return error; return error;
clazz = clazz.getSuperclass(); clazz = clazz.getSuperclass();

View File

@ -36,9 +36,10 @@ public final class LazyDownConversionRecordsSend extends RecordsSend<LazyDownCon
private static final int MAX_READ_SIZE = 128 * 1024; private static final int MAX_READ_SIZE = 128 * 1024;
static final int MIN_OVERFLOW_MESSAGE_LENGTH = Records.LOG_OVERHEAD; static final int MIN_OVERFLOW_MESSAGE_LENGTH = Records.LOG_OVERHEAD;
private RecordValidationStats recordValidationStats; private final RecordValidationStats recordValidationStats;
private final Iterator<ConvertedRecords<?>> convertedRecordsIterator;
private RecordsSend convertedRecordsWriter; private RecordsSend convertedRecordsWriter;
private Iterator<ConvertedRecords<?>> convertedRecordsIterator;
public LazyDownConversionRecordsSend(LazyDownConversionRecords records) { public LazyDownConversionRecordsSend(LazyDownConversionRecords records) {
super(records, records.sizeInBytes()); super(records, records.sizeInBytes());

View File

@ -65,6 +65,7 @@ public class MemoryRecordsBuilder implements AutoCloseable {
private final int partitionLeaderEpoch; private final int partitionLeaderEpoch;
private final int writeLimit; private final int writeLimit;
private final int batchHeaderSizeInBytes; private final int batchHeaderSizeInBytes;
private final long deleteHorizonMs;
// Use a conservative estimate of the compression ratio. The producer overrides this using statistics // Use a conservative estimate of the compression ratio. The producer overrides this using statistics
// from previous batches before appending any records. // from previous batches before appending any records.
@ -80,7 +81,6 @@ public class MemoryRecordsBuilder implements AutoCloseable {
private int numRecords = 0; private int numRecords = 0;
private float actualCompressionRatio = 1; private float actualCompressionRatio = 1;
private long maxTimestamp = RecordBatch.NO_TIMESTAMP; private long maxTimestamp = RecordBatch.NO_TIMESTAMP;
private long deleteHorizonMs;
private long offsetOfMaxTimestamp = -1; private long offsetOfMaxTimestamp = -1;
private Long lastOffset = null; private Long lastOffset = null;
private Long baseTimestamp = null; private Long baseTimestamp = null;

View File

@ -34,7 +34,7 @@ public class DeleteRecordsRequest extends AbstractRequest {
private final DeleteRecordsRequestData data; private final DeleteRecordsRequestData data;
public static class Builder extends AbstractRequest.Builder<DeleteRecordsRequest> { public static class Builder extends AbstractRequest.Builder<DeleteRecordsRequest> {
private DeleteRecordsRequestData data; private final DeleteRecordsRequestData data;
public Builder(DeleteRecordsRequestData data) { public Builder(DeleteRecordsRequestData data) {
super(ApiKeys.DELETE_RECORDS); super(ApiKeys.DELETE_RECORDS);

View File

@ -33,7 +33,7 @@ import java.util.stream.Collectors;
public class DeleteTopicsRequest extends AbstractRequest { public class DeleteTopicsRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<DeleteTopicsRequest> { public static class Builder extends AbstractRequest.Builder<DeleteTopicsRequest> {
private DeleteTopicsRequestData data; private final DeleteTopicsRequestData data;
public Builder(DeleteTopicsRequestData data) { public Builder(DeleteTopicsRequestData data) {
super(ApiKeys.DELETE_TOPICS); super(ApiKeys.DELETE_TOPICS);
@ -62,7 +62,7 @@ public class DeleteTopicsRequest extends AbstractRequest {
} }
} }
private DeleteTopicsRequestData data; private final DeleteTopicsRequestData data;
private DeleteTopicsRequest(DeleteTopicsRequestData data, short version) { private DeleteTopicsRequest(DeleteTopicsRequestData data, short version) {
super(ApiKeys.DELETE_TOPICS, version); super(ApiKeys.DELETE_TOPICS, version);

View File

@ -51,7 +51,7 @@ public class ListPartitionReassignmentsRequest extends AbstractRequest {
} }
} }
private ListPartitionReassignmentsRequestData data; private final ListPartitionReassignmentsRequestData data;
private ListPartitionReassignmentsRequest(ListPartitionReassignmentsRequestData data, short version) { private ListPartitionReassignmentsRequest(ListPartitionReassignmentsRequestData data, short version) {
super(ApiKeys.LIST_PARTITION_REASSIGNMENTS, version); super(ApiKeys.LIST_PARTITION_REASSIGNMENTS, version);

View File

@ -41,7 +41,7 @@ public class OAuthBearerClientInitialResponse {
private final String tokenValue; private final String tokenValue;
private final String authorizationId; private final String authorizationId;
private SaslExtensions saslExtensions; private final SaslExtensions saslExtensions;
public static final Pattern EXTENSION_KEY_PATTERN = Pattern.compile(KEY); public static final Pattern EXTENSION_KEY_PATTERN = Pattern.compile(KEY);
public static final Pattern EXTENSION_VALUE_PATTERN = Pattern.compile(VALUE); public static final Pattern EXTENSION_VALUE_PATTERN = Pattern.compile(VALUE);

View File

@ -56,7 +56,7 @@ class CommonNameLoggingTrustManagerFactoryWrapper {
private static final Logger log = LoggerFactory.getLogger(CommonNameLoggingTrustManagerFactoryWrapper.class); private static final Logger log = LoggerFactory.getLogger(CommonNameLoggingTrustManagerFactoryWrapper.class);
private TrustManagerFactory origTmf; private final TrustManagerFactory origTmf;
/** /**
* Create a wrapped trust manager factory * Create a wrapped trust manager factory
@ -270,7 +270,7 @@ class CommonNameLoggingTrustManagerFactoryWrapper {
static class NeverExpiringX509Certificate extends X509Certificate { static class NeverExpiringX509Certificate extends X509Certificate {
private X509Certificate origCertificate; private final X509Certificate origCertificate;
public NeverExpiringX509Certificate(X509Certificate origCertificate) { public NeverExpiringX509Certificate(X509Certificate origCertificate) {
this.origCertificate = origCertificate; this.origCertificate = origCertificate;

View File

@ -29,8 +29,8 @@ import java.util.Objects;
*/ */
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class DelegationToken { public class DelegationToken {
private TokenInformation tokenInformation; private final TokenInformation tokenInformation;
private byte[] hmac; private final byte[] hmac;
public DelegationToken(TokenInformation tokenInformation, byte[] hmac) { public DelegationToken(TokenInformation tokenInformation, byte[] hmac) {
this.tokenInformation = tokenInformation; this.tokenInformation = tokenInformation;

View File

@ -31,16 +31,16 @@ import java.util.concurrent.ConcurrentHashMap;
public class DelegationTokenCache { public class DelegationTokenCache {
private CredentialCache credentialCache = new CredentialCache(); private final CredentialCache credentialCache = new CredentialCache();
//Cache to hold all the tokens //Cache to hold all the tokens
private Map<String, TokenInformation> tokenCache = new ConcurrentHashMap<>(); private final Map<String, TokenInformation> tokenCache = new ConcurrentHashMap<>();
//Cache to hold hmac->tokenId mapping. This is required for renew, expire requests //Cache to hold hmac->tokenId mapping. This is required for renew, expire requests
private Map<String, String> hmacTokenIdCache = new ConcurrentHashMap<>(); private final Map<String, String> hmacTokenIdCache = new ConcurrentHashMap<>();
//Cache to hold tokenId->hmac mapping. This is required for removing entry from hmacTokenIdCache using tokenId. //Cache to hold tokenId->hmac mapping. This is required for removing entry from hmacTokenIdCache using tokenId.
private Map<String, String> tokenIdHmacCache = new ConcurrentHashMap<>(); private final Map<String, String> tokenIdHmacCache = new ConcurrentHashMap<>();
public DelegationTokenCache(Collection<String> scramMechanisms) { public DelegationTokenCache(Collection<String> scramMechanisms) {
//Create caches for scramMechanisms //Create caches for scramMechanisms

View File

@ -37,13 +37,13 @@ import java.util.Set;
public class SchemaProjector { public class SchemaProjector {
private static Set<AbstractMap.SimpleImmutableEntry<Type, Type>> promotable = new HashSet<>(); private static final Set<AbstractMap.SimpleImmutableEntry<Type, Type>> PROMOTABLE = new HashSet<>();
static { static {
Type[] promotableTypes = {Type.INT8, Type.INT16, Type.INT32, Type.INT64, Type.FLOAT32, Type.FLOAT64}; Type[] promotableTypes = {Type.INT8, Type.INT16, Type.INT32, Type.INT64, Type.FLOAT32, Type.FLOAT64};
for (int i = 0; i < promotableTypes.length; ++i) { for (int i = 0; i < promotableTypes.length; ++i) {
for (int j = i; j < promotableTypes.length; ++j) { for (int j = i; j < promotableTypes.length; ++j) {
promotable.add(new AbstractMap.SimpleImmutableEntry<>(promotableTypes[i], promotableTypes[j])); PROMOTABLE.add(new AbstractMap.SimpleImmutableEntry<>(promotableTypes[i], promotableTypes[j]));
} }
} }
} }
@ -191,6 +191,6 @@ public class SchemaProjector {
} }
private static boolean isPromotable(Type sourceType, Type targetType) { private static boolean isPromotable(Type sourceType, Type targetType) {
return promotable.contains(new AbstractMap.SimpleImmutableEntry<>(sourceType, targetType)); return PROMOTABLE.contains(new AbstractMap.SimpleImmutableEntry<>(sourceType, targetType));
} }
} }

View File

@ -48,9 +48,9 @@ import javax.security.auth.spi.LoginModule;
*/ */
public class PropertyFileLoginModule implements LoginModule { public class PropertyFileLoginModule implements LoginModule {
private static final Logger log = LoggerFactory.getLogger(PropertyFileLoginModule.class); private static final Logger log = LoggerFactory.getLogger(PropertyFileLoginModule.class);
private static final Map<String, Properties> CREDENTIAL_PROPERTIES_MAP = new ConcurrentHashMap<>();
private CallbackHandler callbackHandler;
private static final String FILE_OPTIONS = "file"; private static final String FILE_OPTIONS = "file";
private CallbackHandler callbackHandler;
private String fileName; private String fileName;
private boolean authenticated; private boolean authenticated;

View File

@ -91,6 +91,8 @@ public class MirrorSourceConnector extends SourceConnector {
private static final String READ_COMMITTED = IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT); private static final String READ_COMMITTED = IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT);
private static final String EXACTLY_ONCE_SUPPORT_CONFIG = "exactly.once.support"; private static final String EXACTLY_ONCE_SUPPORT_CONFIG = "exactly.once.support";
private final AtomicBoolean noAclAuthorizer = new AtomicBoolean(false);
private Scheduler scheduler; private Scheduler scheduler;
private MirrorSourceConfig config; private MirrorSourceConfig config;
private SourceAndTarget sourceAndTarget; private SourceAndTarget sourceAndTarget;
@ -105,7 +107,6 @@ public class MirrorSourceConnector extends SourceConnector {
private Admin targetAdminClient; private Admin targetAdminClient;
private Admin offsetSyncsAdminClient; private Admin offsetSyncsAdminClient;
private volatile boolean useIncrementalAlterConfigs; private volatile boolean useIncrementalAlterConfigs;
private AtomicBoolean noAclAuthorizer = new AtomicBoolean(false);
public MirrorSourceConnector() { public MirrorSourceConnector() {
// nop // nop

View File

@ -210,7 +210,7 @@ public class DistributedConfig extends WorkerConfig {
+ "which must include the algorithm used for the " + INTER_WORKER_SIGNATURE_ALGORITHM_CONFIG + " property. " + "which must include the algorithm used for the " + INTER_WORKER_SIGNATURE_ALGORITHM_CONFIG + " property. "
+ "The algorithm(s) '" + INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT + "' will be used as a default on JVMs that provide them; " + "The algorithm(s) '" + INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT + "' will be used as a default on JVMs that provide them; "
+ "on other JVMs, no default is used and a value for this property must be manually specified in the worker config."; + "on other JVMs, no default is used and a value for this property must be manually specified in the worker config.";
private Crypto crypto; private final Crypto crypto;
public enum ExactlyOnceSourceSupport { public enum ExactlyOnceSourceSupport {
DISABLED(false), DISABLED(false),

View File

@ -72,6 +72,7 @@ public abstract class HeaderFrom<R extends ConnectRecord<R>> implements Transfor
"Either <code>move</code> if the fields are to be moved to the headers (removed from the key/value), " + "Either <code>move</code> if the fields are to be moved to the headers (removed from the key/value), " +
"or <code>copy</code> if the fields are to be copied to the headers (retained in the key/value)."); "or <code>copy</code> if the fields are to be copied to the headers (retained in the key/value).");
private final Cache<Schema, Schema> moveSchemaCache = new SynchronizedCache<>(new LRUCache<>(16));
enum Operation { enum Operation {
MOVE(MOVE_OPERATION), MOVE(MOVE_OPERATION),
COPY(COPY_OPERATION); COPY(COPY_OPERATION);
@ -104,8 +105,6 @@ public abstract class HeaderFrom<R extends ConnectRecord<R>> implements Transfor
private Operation operation; private Operation operation;
private Cache<Schema, Schema> moveSchemaCache = new SynchronizedCache<>(new LRUCache<>(16));
@Override @Override
public R apply(R record) { public R apply(R record) {
Object operatingValue = operatingValue(record); Object operatingValue = operatingValue(record);

View File

@ -35,6 +35,7 @@ import java.util.List;
public class LogManagerBuilder { public class LogManagerBuilder {
private static final int PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS = 600000;
private List<File> logDirs = null; private List<File> logDirs = null;
private List<File> initialOfflineDirs = Collections.emptyList(); private List<File> initialOfflineDirs = Collections.emptyList();
private ConfigRepository configRepository = null; private ConfigRepository configRepository = null;
@ -47,7 +48,6 @@ public class LogManagerBuilder {
private long retentionCheckMs = 1000L; private long retentionCheckMs = 1000L;
private int maxTransactionTimeoutMs = 15 * 60 * 1000; private int maxTransactionTimeoutMs = 15 * 60 * 1000;
private ProducerStateManagerConfig producerStateManagerConfig = new ProducerStateManagerConfig(60000, false); private ProducerStateManagerConfig producerStateManagerConfig = new ProducerStateManagerConfig(60000, false);
private int producerIdExpirationCheckIntervalMs = 600000;
private MetadataVersion interBrokerProtocolVersion = MetadataVersion.latest(); private MetadataVersion interBrokerProtocolVersion = MetadataVersion.latest();
private Scheduler scheduler = null; private Scheduler scheduler = null;
private BrokerTopicStats brokerTopicStats = null; private BrokerTopicStats brokerTopicStats = null;
@ -172,7 +172,7 @@ public class LogManagerBuilder {
retentionCheckMs, retentionCheckMs,
maxTransactionTimeoutMs, maxTransactionTimeoutMs,
producerStateManagerConfig, producerStateManagerConfig,
producerIdExpirationCheckIntervalMs, PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS,
interBrokerProtocolVersion, interBrokerProtocolVersion,
scheduler, scheduler,
brokerTopicStats, brokerTopicStats,

View File

@ -72,12 +72,12 @@ public class GroupCoordinatorRuntimeMetrics implements CoordinatorRuntimeMetrics
/** /**
* The partition load sensor. * The partition load sensor.
*/ */
private Sensor partitionLoadSensor; private final Sensor partitionLoadSensor;
/** /**
* The thread idle sensor. * The thread idle sensor.
*/ */
private Sensor threadIdleRatioSensor; private final Sensor threadIdleRatioSensor;
public GroupCoordinatorRuntimeMetrics(Metrics metrics) { public GroupCoordinatorRuntimeMetrics(Metrics metrics) {
this.metrics = Objects.requireNonNull(metrics); this.metrics = Objects.requireNonNull(metrics);

View File

@ -81,7 +81,7 @@ public class AuthorizerBenchmark {
ACL(AclAuthorizer::new), ACL(AclAuthorizer::new),
KRAFT(StandardAuthorizer::new); KRAFT(StandardAuthorizer::new);
private Supplier<Authorizer> supplier; private final Supplier<Authorizer> supplier;
AuthorizerType(Supplier<Authorizer> supplier) { AuthorizerType(Supplier<Authorizer> supplier) {
this.supplier = supplier; this.supplier = supplier;
@ -107,13 +107,12 @@ public class AuthorizerBenchmark {
private final int hostPreCount = 1000; private final int hostPreCount = 1000;
private final String resourceNamePrefix = "foo-bar35_resource-"; private final String resourceNamePrefix = "foo-bar35_resource-";
private final KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user"); private final KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
private final String authorizeByResourceTypeHostName = "127.0.0.2";
private final HashMap<ResourcePattern, AclAuthorizer.VersionedAcls> aclToUpdate = new HashMap<>();
private Authorizer authorizer; private Authorizer authorizer;
private List<Action> actions = new ArrayList<>(); private List<Action> actions = new ArrayList<>();
private RequestContext authorizeContext; private RequestContext authorizeContext;
private RequestContext authorizeByResourceTypeContext; private RequestContext authorizeByResourceTypeContext;
private String authorizeByResourceTypeHostName = "127.0.0.2";
private HashMap<ResourcePattern, AclAuthorizer.VersionedAcls> aclToUpdate = new HashMap<>();
Random rand = new Random(System.currentTimeMillis()); Random rand = new Random(System.currentTimeMillis());
double eps = 1e-9; double eps = 1e-9;

View File

@ -59,17 +59,17 @@ import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
@BenchmarkMode(Mode.AverageTime) @BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS) @OutputTimeUnit(TimeUnit.MILLISECONDS)
public class StandardAuthorizerUpdateBenchmark { public class StandardAuthorizerUpdateBenchmark {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final KafkaPrincipal PRINCIPAL = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
private final String resourceNamePrefix = "foo-bar35_resource-";
private final Set<Uuid> ids = new HashSet<>();
private final List<StandardAclWithId> aclsToAdd = prepareAcls();
private StandardAuthorizer authorizer;
@Param({"25000", "50000", "75000", "100000"}) @Param({"25000", "50000", "75000", "100000"})
private int aclCount; private int aclCount;
private final String resourceNamePrefix = "foo-bar35_resource-";
private static final KafkaPrincipal PRINCIPAL = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
private StandardAuthorizer authorizer;
private final Set<Uuid> ids = new HashSet<>();
private List<StandardAclWithId> aclsToAdd = prepareAcls();
int index = 0; int index = 0;
private static final Random RANDOM = new Random(System.currentTimeMillis());
@Setup(Level.Trial) @Setup(Level.Trial)
public void setup() throws Exception { public void setup() throws Exception {

View File

@ -110,18 +110,17 @@ import scala.collection.Map;
@BenchmarkMode(Mode.AverageTime) @BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS) @OutputTimeUnit(TimeUnit.NANOSECONDS)
public class ReplicaFetcherThreadBenchmark { public class ReplicaFetcherThreadBenchmark {
private final File logDir = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
private final KafkaScheduler scheduler = new KafkaScheduler(1, true, "scheduler");
private final Pool<TopicPartition, Partition> pool = new Pool<TopicPartition, Partition>(Option.empty());
private final Metrics metrics = new Metrics();
private final Option<Uuid> topicId = Option.apply(Uuid.randomUuid());
@Param({"100", "500", "1000", "5000"}) @Param({"100", "500", "1000", "5000"})
private int partitionCount; private int partitionCount;
private ReplicaFetcherBenchThread fetcher; private ReplicaFetcherBenchThread fetcher;
private LogManager logManager; private LogManager logManager;
private File logDir = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
private KafkaScheduler scheduler = new KafkaScheduler(1, true, "scheduler");
private Pool<TopicPartition, Partition> pool = new Pool<TopicPartition, Partition>(Option.empty());
private Metrics metrics = new Metrics();
private ReplicaManager replicaManager; private ReplicaManager replicaManager;
private ReplicaQuota replicaQuota; private ReplicaQuota replicaQuota;
private Option<Uuid> topicId = Option.apply(Uuid.randomUuid());
@Setup(Level.Trial) @Setup(Level.Trial)
public void setup() throws IOException { public void setup() throws IOException {

View File

@ -94,31 +94,30 @@ import java.util.stream.IntStream;
@OutputTimeUnit(TimeUnit.NANOSECONDS) @OutputTimeUnit(TimeUnit.NANOSECONDS)
public class KRaftMetadataRequestBenchmark { public class KRaftMetadataRequestBenchmark {
private final RequestChannel requestChannel = Mockito.mock(RequestChannel.class, Mockito.withSettings().stubOnly());
private final RequestChannel.Metrics requestChannelMetrics = Mockito.mock(RequestChannel.Metrics.class);
private final ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
private final GroupCoordinator groupCoordinator = Mockito.mock(GroupCoordinator.class);
private final TransactionCoordinator transactionCoordinator = Mockito.mock(TransactionCoordinator.class);
private final AutoTopicCreationManager autoTopicCreationManager = Mockito.mock(AutoTopicCreationManager.class);
private final Metrics metrics = new Metrics();
private final int brokerId = 1;
private final ForwardingManager forwardingManager = Mockito.mock(ForwardingManager.class);
private final KRaftMetadataCache metadataCache = MetadataCache.kRaftMetadataCache(brokerId);
private final ClientQuotaManager clientQuotaManager = Mockito.mock(ClientQuotaManager.class);
private final ClientRequestQuotaManager clientRequestQuotaManager = Mockito.mock(ClientRequestQuotaManager.class);
private final ControllerMutationQuotaManager controllerMutationQuotaManager = Mockito.mock(ControllerMutationQuotaManager.class);
private final ReplicationQuotaManager replicaQuotaManager = Mockito.mock(ReplicationQuotaManager.class);
private final QuotaFactory.QuotaManagers quotaManagers = new QuotaFactory.QuotaManagers(clientQuotaManager,
clientQuotaManager, clientRequestQuotaManager, controllerMutationQuotaManager, replicaQuotaManager,
replicaQuotaManager, replicaQuotaManager, Option.empty());
private final FetchManager fetchManager = Mockito.mock(FetchManager.class);
private final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(Optional.empty());
private final KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
@Param({"500", "1000", "5000"}) @Param({"500", "1000", "5000"})
private int topicCount; private int topicCount;
@Param({"10", "20", "50"}) @Param({"10", "20", "50"})
private int partitionCount; private int partitionCount;
private RequestChannel requestChannel = Mockito.mock(RequestChannel.class, Mockito.withSettings().stubOnly());
private RequestChannel.Metrics requestChannelMetrics = Mockito.mock(RequestChannel.Metrics.class);
private ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
private GroupCoordinator groupCoordinator = Mockito.mock(GroupCoordinator.class);
private TransactionCoordinator transactionCoordinator = Mockito.mock(TransactionCoordinator.class);
private AutoTopicCreationManager autoTopicCreationManager = Mockito.mock(AutoTopicCreationManager.class);
private Metrics metrics = new Metrics();
private int brokerId = 1;
private ForwardingManager forwardingManager = Mockito.mock(ForwardingManager.class);
private KRaftMetadataCache metadataCache = MetadataCache.kRaftMetadataCache(brokerId);
private ClientQuotaManager clientQuotaManager = Mockito.mock(ClientQuotaManager.class);
private ClientRequestQuotaManager clientRequestQuotaManager = Mockito.mock(ClientRequestQuotaManager.class);
private ControllerMutationQuotaManager controllerMutationQuotaManager = Mockito.mock(ControllerMutationQuotaManager.class);
private ReplicationQuotaManager replicaQuotaManager = Mockito.mock(ReplicationQuotaManager.class);
private QuotaFactory.QuotaManagers quotaManagers = new QuotaFactory.QuotaManagers(clientQuotaManager,
clientQuotaManager, clientRequestQuotaManager, controllerMutationQuotaManager, replicaQuotaManager,
replicaQuotaManager, replicaQuotaManager, Option.empty());
private FetchManager fetchManager = Mockito.mock(FetchManager.class);
private BrokerTopicStats brokerTopicStats = new BrokerTopicStats(Optional.empty());
private KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
private KafkaApis kafkaApis; private KafkaApis kafkaApis;
private RequestChannel.Request allTopicMetadataRequest; private RequestChannel.Request allTopicMetadataRequest;

View File

@ -96,34 +96,33 @@ import java.util.stream.IntStream;
@OutputTimeUnit(TimeUnit.NANOSECONDS) @OutputTimeUnit(TimeUnit.NANOSECONDS)
public class MetadataRequestBenchmark { public class MetadataRequestBenchmark {
private final RequestChannel requestChannel = Mockito.mock(RequestChannel.class, Mockito.withSettings().stubOnly());
private final RequestChannel.Metrics requestChannelMetrics = Mockito.mock(RequestChannel.Metrics.class);
private final ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
private final GroupCoordinator groupCoordinator = Mockito.mock(GroupCoordinator.class);
private final ZkAdminManager adminManager = Mockito.mock(ZkAdminManager.class);
private final TransactionCoordinator transactionCoordinator = Mockito.mock(TransactionCoordinator.class);
private final KafkaController kafkaController = Mockito.mock(KafkaController.class);
private final AutoTopicCreationManager autoTopicCreationManager = Mockito.mock(AutoTopicCreationManager.class);
private final KafkaZkClient kafkaZkClient = Mockito.mock(KafkaZkClient.class);
private final Metrics metrics = new Metrics();
private final int brokerId = 1;
private final ZkMetadataCache metadataCache = MetadataCache.zkMetadataCache(brokerId,
MetadataVersion.latest(), BrokerFeatures.createEmpty(), null, false);
private final ClientQuotaManager clientQuotaManager = Mockito.mock(ClientQuotaManager.class);
private final ClientRequestQuotaManager clientRequestQuotaManager = Mockito.mock(ClientRequestQuotaManager.class);
private final ControllerMutationQuotaManager controllerMutationQuotaManager = Mockito.mock(ControllerMutationQuotaManager.class);
private final ReplicationQuotaManager replicaQuotaManager = Mockito.mock(ReplicationQuotaManager.class);
private final QuotaFactory.QuotaManagers quotaManagers = new QuotaFactory.QuotaManagers(clientQuotaManager,
clientQuotaManager, clientRequestQuotaManager, controllerMutationQuotaManager, replicaQuotaManager,
replicaQuotaManager, replicaQuotaManager, Option.empty());
private final FetchManager fetchManager = Mockito.mock(FetchManager.class);
private final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(Optional.empty());
private final KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
@Param({"500", "1000", "5000"}) @Param({"500", "1000", "5000"})
private int topicCount; private int topicCount;
@Param({"10", "20", "50"}) @Param({"10", "20", "50"})
private int partitionCount; private int partitionCount;
private RequestChannel requestChannel = Mockito.mock(RequestChannel.class, Mockito.withSettings().stubOnly());
private RequestChannel.Metrics requestChannelMetrics = Mockito.mock(RequestChannel.Metrics.class);
private ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
private GroupCoordinator groupCoordinator = Mockito.mock(GroupCoordinator.class);
private ZkAdminManager adminManager = Mockito.mock(ZkAdminManager.class);
private TransactionCoordinator transactionCoordinator = Mockito.mock(TransactionCoordinator.class);
private KafkaController kafkaController = Mockito.mock(KafkaController.class);
private AutoTopicCreationManager autoTopicCreationManager = Mockito.mock(AutoTopicCreationManager.class);
private KafkaZkClient kafkaZkClient = Mockito.mock(KafkaZkClient.class);
private Metrics metrics = new Metrics();
private int brokerId = 1;
private ZkMetadataCache metadataCache = MetadataCache.zkMetadataCache(brokerId,
MetadataVersion.latest(), BrokerFeatures.createEmpty(), null, false);
private ClientQuotaManager clientQuotaManager = Mockito.mock(ClientQuotaManager.class);
private ClientRequestQuotaManager clientRequestQuotaManager = Mockito.mock(ClientRequestQuotaManager.class);
private ControllerMutationQuotaManager controllerMutationQuotaManager = Mockito.mock(ControllerMutationQuotaManager.class);
private ReplicationQuotaManager replicaQuotaManager = Mockito.mock(ReplicationQuotaManager.class);
private QuotaFactory.QuotaManagers quotaManagers = new QuotaFactory.QuotaManagers(clientQuotaManager,
clientQuotaManager, clientRequestQuotaManager, controllerMutationQuotaManager, replicaQuotaManager,
replicaQuotaManager, replicaQuotaManager, Option.empty());
private FetchManager fetchManager = Mockito.mock(FetchManager.class);
private BrokerTopicStats brokerTopicStats = new BrokerTopicStats(Optional.empty());
private KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
private KafkaApis kafkaApis; private KafkaApis kafkaApis;
private RequestChannel.Request allTopicMetadataRequest; private RequestChannel.Request allTopicMetadataRequest;

View File

@ -76,15 +76,15 @@ import scala.compat.java8.OptionConverters;
@BenchmarkMode(Mode.AverageTime) @BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS) @OutputTimeUnit(TimeUnit.NANOSECONDS)
public class PartitionMakeFollowerBenchmark { public class PartitionMakeFollowerBenchmark {
private LogManager logManager; private final File logDir = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
private File logDir = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString()); private final KafkaScheduler scheduler = new KafkaScheduler(1, true, "scheduler");
private KafkaScheduler scheduler = new KafkaScheduler(1, true, "scheduler"); private final List<Integer> replicas = Arrays.asList(0, 1, 2);
private Partition partition; private final OffsetCheckpoints offsetCheckpoints = Mockito.mock(OffsetCheckpoints.class);
private List<Integer> replicas = Arrays.asList(0, 1, 2); private final DelayedOperations delayedOperations = Mockito.mock(DelayedOperations.class);
private OffsetCheckpoints offsetCheckpoints = Mockito.mock(OffsetCheckpoints.class); private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private DelayedOperations delayedOperations = Mockito.mock(DelayedOperations.class);
private ExecutorService executorService = Executors.newSingleThreadExecutor();
private Option<Uuid> topicId; private Option<Uuid> topicId;
private Partition partition;
private LogManager logManager;
@Setup(Level.Trial) @Setup(Level.Trial)
public void setup() throws IOException { public void setup() throws IOException {

View File

@ -70,12 +70,12 @@ import scala.compat.java8.OptionConverters;
@BenchmarkMode(Mode.AverageTime) @BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS) @OutputTimeUnit(TimeUnit.NANOSECONDS)
public class UpdateFollowerFetchStateBenchmark { public class UpdateFollowerFetchStateBenchmark {
private TopicPartition topicPartition = new TopicPartition(UUID.randomUUID().toString(), 0); private final TopicPartition topicPartition = new TopicPartition(UUID.randomUUID().toString(), 0);
private Option<Uuid> topicId = OptionConverters.toScala(Optional.of(Uuid.randomUuid())); private final Option<Uuid> topicId = OptionConverters.toScala(Optional.of(Uuid.randomUuid()));
private File logDir = new File(System.getProperty("java.io.tmpdir"), topicPartition.toString()); private final File logDir = new File(System.getProperty("java.io.tmpdir"), topicPartition.toString());
private KafkaScheduler scheduler = new KafkaScheduler(1, true, "scheduler"); private final KafkaScheduler scheduler = new KafkaScheduler(1, true, "scheduler");
private BrokerTopicStats brokerTopicStats = new BrokerTopicStats(Optional.empty()); private final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(Optional.empty());
private LogDirFailureChannel logDirFailureChannel = Mockito.mock(LogDirFailureChannel.class); private final LogDirFailureChannel logDirFailureChannel = Mockito.mock(LogDirFailureChannel.class);
private long nextOffset = 0; private long nextOffset = 0;
private LogManager logManager; private LogManager logManager;
private Partition partition; private Partition partition;

View File

@ -59,7 +59,7 @@ import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
* Manages DelegationTokens. * Manages DelegationTokens.
*/ */
public class DelegationTokenControlManager { public class DelegationTokenControlManager {
private Time time = Time.SYSTEM; private final Time time = Time.SYSTEM;
static class Builder { static class Builder {
private LogContext logContext = null; private LogContext logContext = null;

View File

@ -85,6 +85,8 @@ public class PartitionChangeBuilder {
private final int partitionId; private final int partitionId;
private final IntPredicate isAcceptableLeader; private final IntPredicate isAcceptableLeader;
private final MetadataVersion metadataVersion; private final MetadataVersion metadataVersion;
private final int minISR;
private final Map<Integer, Uuid> targetDirectories;
private List<Integer> targetIsr; private List<Integer> targetIsr;
private List<Integer> targetReplicas; private List<Integer> targetReplicas;
private List<Integer> targetRemoving; private List<Integer> targetRemoving;
@ -96,8 +98,6 @@ public class PartitionChangeBuilder {
private LeaderRecoveryState targetLeaderRecoveryState; private LeaderRecoveryState targetLeaderRecoveryState;
private boolean zkMigrationEnabled; private boolean zkMigrationEnabled;
private boolean eligibleLeaderReplicasEnabled; private boolean eligibleLeaderReplicasEnabled;
private int minISR;
private Map<Integer, Uuid> targetDirectories;
private DefaultDirProvider defaultDirProvider; private DefaultDirProvider defaultDirProvider;
// Whether allow electing last known leader in a Balanced recovery. Note, the last known leader will be stored in the // Whether allow electing last known leader in a Balanced recovery. Note, the last known leader will be stored in the

View File

@ -1715,7 +1715,7 @@ public final class QuorumController implements Controller {
* from this callbacks need to compare against this value to verify that the event * from this callbacks need to compare against this value to verify that the event
* was not from a previous registration. * was not from a previous registration.
*/ */
private QuorumMetaLogListener metaLogListener; private final QuorumMetaLogListener metaLogListener;
/** /**
* If this controller is active, this is the non-negative controller epoch. * If this controller is active, this is the non-negative controller epoch.

View File

@ -173,11 +173,11 @@ public final class MetaPropertiesEnsemble {
}; };
private final MetaPropertiesEnsemble prev; private final MetaPropertiesEnsemble prev;
private Random random = new Random(); private final Set<String> emptyLogDirs;
private Set<String> emptyLogDirs; private final Set<String> errorLogDirs;
private Set<String> errorLogDirs; private final Map<String, MetaProperties> logDirProps;
private Map<String, MetaProperties> logDirProps;
private Optional<String> metaLogDir; private Optional<String> metaLogDir;
private Random random = new Random();
private PreWriteHandler preWriteHandler = (logDir, isNew, metaProperties) -> { private PreWriteHandler preWriteHandler = (logDir, isNew, metaProperties) -> {
LOG.info("Writing out {} {}{}meta.properties file containing {}", LOG.info("Writing out {} {}{}meta.properties file containing {}",
@ -432,8 +432,8 @@ public final class MetaPropertiesEnsemble {
*/ */
public Iterator<Entry<String, Optional<MetaProperties>>> nonFailedDirectoryProps() { public Iterator<Entry<String, Optional<MetaProperties>>> nonFailedDirectoryProps() {
return new Iterator<Entry<String, Optional<MetaProperties>>>() { return new Iterator<Entry<String, Optional<MetaProperties>>>() {
private Iterator<String> emptyLogDirsIterator = emptyLogDirs.iterator(); private final Iterator<String> emptyLogDirsIterator = emptyLogDirs.iterator();
private Iterator<Entry<String, MetaProperties>> logDirsIterator = private final Iterator<Entry<String, MetaProperties>> logDirsIterator =
logDirProps.entrySet().iterator(); logDirProps.entrySet().iterator();
@Override @Override

View File

@ -145,8 +145,8 @@ public final class InteractiveShell implements AutoCloseable {
} }
public class HistoryIterator implements Iterator<Entry<Integer, String>> { public class HistoryIterator implements Iterator<Entry<Integer, String>> {
private final int last;
private int index; private int index;
private int last;
HistoryIterator(int index, int last) { HistoryIterator(int index, int last) {
this.index = index; this.index = index;

View File

@ -114,7 +114,7 @@ public class RemoteIndexCache implements Closeable {
* *
* We use {@link Caffeine} cache instead of implementing a thread safe LRU cache on our own. * We use {@link Caffeine} cache instead of implementing a thread safe LRU cache on our own.
*/ */
private Cache<Uuid, Entry> internalCache; private final Cache<Uuid, Entry> internalCache;
/** /**
* Creates RemoteIndexCache with the given configs. * Creates RemoteIndexCache with the given configs.

View File

@ -165,11 +165,9 @@ public class ProcessorStateManager implements StateManager {
private static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog"; private static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
private Logger log; private final String logPrefix;
private String logPrefix;
private final TaskId taskId; private final TaskId taskId;
private Task.State taskState;
private final boolean eosEnabled; private final boolean eosEnabled;
private final ChangelogRegister changelogReader; private final ChangelogRegister changelogReader;
private final Collection<TopicPartition> sourcePartitions; private final Collection<TopicPartition> sourcePartitions;
@ -181,9 +179,11 @@ public class ProcessorStateManager implements StateManager {
private final File baseDir; private final File baseDir;
private final OffsetCheckpoint checkpointFile; private final OffsetCheckpoint checkpointFile;
private final boolean stateUpdaterEnabled;
private TaskType taskType; private TaskType taskType;
private final boolean stateUpdaterEnabled; private Logger log;
private Task.State taskState;
public static String storeChangelogTopic(final String prefix, final String storeName, final String namedTopology) { public static String storeChangelogTopic(final String prefix, final String storeName, final String namedTopology) {
if (namedTopology == null) { if (namedTopology == null) {

View File

@ -57,9 +57,8 @@ public class ClientState {
private final ClientStateTask previousActiveTasks = new ClientStateTask(null, new TreeMap<>()); private final ClientStateTask previousActiveTasks = new ClientStateTask(null, new TreeMap<>());
private final ClientStateTask previousStandbyTasks = new ClientStateTask(null, null); private final ClientStateTask previousStandbyTasks = new ClientStateTask(null, null);
private final ClientStateTask revokingActiveTasks = new ClientStateTask(null, new TreeMap<>()); private final ClientStateTask revokingActiveTasks = new ClientStateTask(null, new TreeMap<>());
private final UUID processId;
private int capacity; private int capacity;
private UUID processId;
public ClientState() { public ClientState() {
this(null, 0); this(null, 0);

View File

@ -209,7 +209,7 @@ public class KeyValueToTimestampedKeyValueByteStoreAdapter implements KeyValueSt
private static class KeyValueToTimestampedKeyValueAdapterIterator implements ManagedKeyValueIterator<Bytes, byte[]> { private static class KeyValueToTimestampedKeyValueAdapterIterator implements ManagedKeyValueIterator<Bytes, byte[]> {
private RocksDbIterator rocksDbIterator; private final RocksDbIterator rocksDbIterator;
public KeyValueToTimestampedKeyValueAdapterIterator(final RocksDbIterator rocksDbIterator) { public KeyValueToTimestampedKeyValueAdapterIterator(final RocksDbIterator rocksDbIterator) {
this.rocksDbIterator = rocksDbIterator; this.rocksDbIterator = rocksDbIterator;

View File

@ -41,9 +41,7 @@ import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTEN
* An in-memory LRU cache store based on HashSet and HashMap. * An in-memory LRU cache store based on HashSet and HashMap.
*/ */
public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> { public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> {
private final Position position = Position.emptyPosition();
protected StateStoreContext context;
private Position position = Position.emptyPosition();
public interface EldestEntryRemovalListener { public interface EldestEntryRemovalListener {
void apply(Bytes key, byte[] value); void apply(Bytes key, byte[] value);
@ -55,6 +53,7 @@ public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> {
private boolean restoring = false; // TODO: this is a sub-optimal solution to avoid logging during restoration. private boolean restoring = false; // TODO: this is a sub-optimal solution to avoid logging during restoration.
// in the future we should augment the StateRestoreCallback with onComplete etc to better resolve this. // in the future we should augment the StateRestoreCallback with onComplete etc to better resolve this.
private volatile boolean open = true; private volatile boolean open = true;
protected StateStoreContext context;
private EldestEntryRemovalListener listener; private EldestEntryRemovalListener listener;

View File

@ -104,7 +104,7 @@ public class ClientMetricsCommand {
} }
public static class ClientMetricsService implements AutoCloseable { public static class ClientMetricsService implements AutoCloseable {
private Admin adminClient; private final Admin adminClient;
public ClientMetricsService(Properties config) { public ClientMetricsService(Properties config) {
this.adminClient = Admin.create(config); this.adminClient = Admin.create(config);

View File

@ -336,10 +336,10 @@ public class ProducerPerformance {
// Visible for testing // Visible for testing
static class Stats { static class Stats {
private long start; private final long start;
private long windowStart; private final int[] latencies;
private int[] latencies; private final long sampling;
private long sampling; private final long reportingInterval;
private long iteration; private long iteration;
private int index; private int index;
private long count; private long count;
@ -350,7 +350,7 @@ public class ProducerPerformance {
private int windowMaxLatency; private int windowMaxLatency;
private long windowTotalLatency; private long windowTotalLatency;
private long windowBytes; private long windowBytes;
private long reportingInterval; private long windowStart;
public Stats(long numRecords, int reportingInterval) { public Stats(long numRecords, int reportingInterval) {
this.start = System.currentTimeMillis(); this.start = System.currentTimeMillis();

View File

@ -420,7 +420,7 @@ public abstract class TopicCommand {
} }
public static class TopicService implements AutoCloseable { public static class TopicService implements AutoCloseable {
private Admin adminClient; private final Admin adminClient;
public TopicService(Properties commandConfig, Optional<String> bootstrapServer) { public TopicService(Properties commandConfig, Optional<String> bootstrapServer) {
this.adminClient = createAdminClient(commandConfig, bootstrapServer); this.adminClient = createAdminClient(commandConfig, bootstrapServer);

View File

@ -95,10 +95,9 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
private final boolean useAsyncCommit; private final boolean useAsyncCommit;
private final boolean verbose; private final boolean verbose;
private final int maxMessages; private final int maxMessages;
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
private int consumedMessages = 0; private int consumedMessages = 0;
private CountDownLatch shutdownLatch = new CountDownLatch(1);
public VerifiableConsumer(KafkaConsumer<String, String> consumer, public VerifiableConsumer(KafkaConsumer<String, String> consumer,
PrintStream out, PrintStream out,
String topic, String topic,

View File

@ -346,9 +346,9 @@ public class VerifiableProducer implements AutoCloseable {
private static class SuccessfulSend extends ProducerEvent { private static class SuccessfulSend extends ProducerEvent {
private String key; private final String key;
private String value; private final String value;
private RecordMetadata recordMetadata; private final RecordMetadata recordMetadata;
public SuccessfulSend(String key, String value, RecordMetadata recordMetadata) { public SuccessfulSend(String key, String value, RecordMetadata recordMetadata) {
assert recordMetadata != null : "Expected non-null recordMetadata object."; assert recordMetadata != null : "Expected non-null recordMetadata object.";
@ -390,10 +390,10 @@ public class VerifiableProducer implements AutoCloseable {
private static class FailedSend extends ProducerEvent { private static class FailedSend extends ProducerEvent {
private String topic; private final String topic;
private String key; private final String key;
private String value; private final String value;
private Exception exception; private final Exception exception;
public FailedSend(String key, String value, String topic, Exception exception) { public FailedSend(String key, String value, String topic, Exception exception) {
assert exception != null : "Expected non-null exception."; assert exception != null : "Expected non-null exception.";
@ -436,10 +436,10 @@ public class VerifiableProducer implements AutoCloseable {
private static class ToolData extends ProducerEvent { private static class ToolData extends ProducerEvent {
private long sent; private final long sent;
private long acked; private final long acked;
private long targetThroughput; private final long targetThroughput;
private double avgThroughput; private final double avgThroughput;
public ToolData(long sent, long acked, long targetThroughput, double avgThroughput) { public ToolData(long sent, long acked, long targetThroughput, double avgThroughput) {
this.sent = sent; this.sent = sent;
@ -485,8 +485,8 @@ public class VerifiableProducer implements AutoCloseable {
/** Callback which prints errors to stdout when the producer fails to send. */ /** Callback which prints errors to stdout when the producer fails to send. */
private class PrintInfoCallback implements Callback { private class PrintInfoCallback implements Callback {
private String key; private final String key;
private String value; private final String value;
PrintInfoCallback(String key, String value) { PrintInfoCallback(String key, String value) {
this.key = key; this.key = key;

View File

@ -120,7 +120,7 @@ public final class TaskManager {
/** /**
* True if the TaskManager is shut down. * True if the TaskManager is shut down.
*/ */
private AtomicBoolean shutdown = new AtomicBoolean(false); private final AtomicBoolean shutdown = new AtomicBoolean(false);
/** /**
* The ID to use for the next worker. Only accessed by the state change thread. * The ID to use for the next worker. Only accessed by the state change thread.

View File

@ -25,8 +25,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
*/ */
public class UptimeResponse extends Message { public class UptimeResponse extends Message {
private long serverStartMs; private final long serverStartMs;
private long nowMs; private final long nowMs;
@JsonCreator @JsonCreator
public UptimeResponse(@JsonProperty("serverStartMs") long serverStartMs, public UptimeResponse(@JsonProperty("serverStartMs") long serverStartMs,

View File

@ -186,11 +186,11 @@ public class ProduceBenchWorker implements TaskWorker {
private final Optional<TransactionGenerator> transactionGenerator; private final Optional<TransactionGenerator> transactionGenerator;
private final Throttle throttle; private final Throttle throttle;
private final AtomicLong transactionsCommitted;
private final boolean enableTransactions;
private Iterator<TopicPartition> partitionsIterator; private Iterator<TopicPartition> partitionsIterator;
private Future<RecordMetadata> sendFuture; private Future<RecordMetadata> sendFuture;
private AtomicLong transactionsCommitted;
private boolean enableTransactions;
SendRecords(HashSet<TopicPartition> activePartitions) { SendRecords(HashSet<TopicPartition> activePartitions) {
this.activePartitions = activePartitions; this.activePartitions = activePartitions;