MINOR: apply Utils.isBlank to code base (#10124)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
CHUN-HAO TANG 2021-02-20 11:44:29 +08:00 committed by GitHub
parent 236ddda360
commit 954c090ffc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 53 additions and 45 deletions

View File

@ -4540,7 +4540,7 @@ public class KafkaAdminClient extends AdminClient {
final Map<String, KafkaFutureImpl<Void>> updateFutures = new HashMap<>();
for (final Map.Entry<String, FeatureUpdate> entry : featureUpdates.entrySet()) {
final String feature = entry.getKey();
if (feature.trim().isEmpty()) {
if (Utils.isBlank(feature)) {
throw new IllegalArgumentException("Provided feature can not be empty.");
}
updateFutures.put(entry.getKey(), new KafkaFutureImpl<>());

View File

@ -957,7 +957,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this.unsubscribe();
} else {
for (String topic : topics) {
if (topic == null || topic.trim().isEmpty())
if (Utils.isBlank(topic))
throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
}
@ -1108,7 +1108,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
} else {
for (TopicPartition tp : partitions) {
String topic = (tp != null) ? tp.topic() : null;
if (topic == null || topic.trim().isEmpty())
if (Utils.isBlank(topic))
throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
}
fetcher.clearBufferedDataForUnassignedPartitions(partitions);

View File

@ -31,6 +31,7 @@ import java.util.Objects;
import java.util.Set;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
import org.apache.kafka.common.utils.Utils;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -103,7 +104,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
OAuthBearerValidationResult.newFailure("No expiration time in JWT"));
lifetime = convertClaimTimeInSecondsToMs(expirationTimeSeconds);
String principalName = claim(this.principalClaimName, String.class);
if (principalName == null || principalName.trim().isEmpty())
if (Utils.isBlank(principalName))
throw new OAuthBearerIllegalTokenException(OAuthBearerValidationResult
.newFailure("No principal name in JWT claim: " + this.principalClaimName));
this.principalName = principalName;
@ -345,7 +346,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
String scopeClaimName = scopeClaimName();
if (isClaimType(scopeClaimName, String.class)) {
String scopeClaimValue = claim(scopeClaimName, String.class);
if (scopeClaimValue.trim().isEmpty())
if (Utils.isBlank(scopeClaimValue))
return Collections.emptySet();
else {
Set<String> retval = new HashSet<>();
@ -360,7 +361,7 @@ public class OAuthBearerUnsecuredJws implements OAuthBearerToken {
List<String> stringList = (List<String>) scopeClaimValue;
Set<String> retval = new HashSet<>();
for (String scope : stringList) {
if (scope != null && !scope.trim().isEmpty()) {
if (!Utils.isBlank(scope)) {
retval.add(scope.trim());
}
}

View File

@ -45,6 +45,7 @@ import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -191,13 +192,9 @@ public class OAuthBearerUnsecuredLoginCallbackHandler implements AuthenticateCal
throw new OAuthBearerConfigException("Extensions provided in login context without a token");
}
String principalClaimNameValue = optionValue(PRINCIPAL_CLAIM_NAME_OPTION);
String principalClaimName = principalClaimNameValue != null && !principalClaimNameValue.trim().isEmpty()
? principalClaimNameValue.trim()
: DEFAULT_PRINCIPAL_CLAIM_NAME;
String principalClaimName = Utils.isBlank(principalClaimNameValue) ? DEFAULT_PRINCIPAL_CLAIM_NAME : principalClaimNameValue.trim();
String scopeClaimNameValue = optionValue(SCOPE_CLAIM_NAME_OPTION);
String scopeClaimName = scopeClaimNameValue != null && !scopeClaimNameValue.trim().isEmpty()
? scopeClaimNameValue.trim()
: DEFAULT_SCOPE_CLAIM_NAME;
String scopeClaimName = Utils.isBlank(scopeClaimNameValue) ? DEFAULT_SCOPE_CLAIM_NAME : scopeClaimNameValue.trim();
String headerJson = "{" + claimOrHeaderJsonText("alg", "none") + "}";
String lifetimeSecondsValueToUse = optionValue(LIFETIME_SECONDS_OPTION, DEFAULT_LIFETIME_SECONDS_ONE_HOUR);
String claimsJson;

View File

@ -195,8 +195,7 @@ public class OAuthBearerUnsecuredValidatorCallbackHandler implements Authenticat
String allowableClockSkewMsValue = option(ALLOWABLE_CLOCK_SKEW_MILLIS_OPTION);
int allowableClockSkewMs = 0;
try {
allowableClockSkewMs = allowableClockSkewMsValue == null || allowableClockSkewMsValue.trim().isEmpty() ? 0
: Integer.parseInt(allowableClockSkewMsValue.trim());
allowableClockSkewMs = Utils.isBlank(allowableClockSkewMsValue) ? 0 : Integer.parseInt(allowableClockSkewMsValue.trim());
} catch (NumberFormatException e) {
throw new OAuthBearerConfigException(e.getMessage(), e);
}

View File

@ -21,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.fail;
import java.util.List;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;
public class OAuthBearerScopeUtilsTest {
@ -28,7 +29,7 @@ public class OAuthBearerScopeUtilsTest {
public void validScope() {
for (String validScope : new String[] {"", " ", "scope1", " scope1 ", "scope1 Scope2", "scope1 Scope2"}) {
List<String> parsedScope = OAuthBearerScopeUtils.parseScope(validScope);
if (validScope.trim().isEmpty()) {
if (Utils.isBlank(validScope)) {
assertTrue(parsedScope.isEmpty());
} else if (validScope.contains("Scope2")) {
assertTrue(parsedScope.size() == 2 && parsedScope.get(0).equals("scope1")

View File

@ -891,7 +891,7 @@ public class Values {
}
String token = parser.next();
if (token.trim().isEmpty()) {
if (Utils.isBlank(token)) {
return new SchemaAndValue(Schema.STRING_SCHEMA, token);
}
token = token.trim();
@ -1253,7 +1253,7 @@ public class Values {
nextToken = consumeNextToken();
}
if (ignoreLeadingAndTrailingWhitespace) {
while (nextToken.trim().isEmpty() && canConsumeNextToken()) {
while (Utils.isBlank(nextToken) && canConsumeNextToken()) {
nextToken = consumeNextToken();
}
}

View File

@ -19,6 +19,8 @@ package org.apache.kafka.connect.health;
import java.util.Objects;
import org.apache.kafka.common.utils.Utils;
/**
* Provides the current status along with identifier for Connect worker and tasks.
*/
@ -36,10 +38,10 @@ public abstract class AbstractState {
* @param traceMessage any error trace message associated with the connector or the task; may be null or empty
*/
public AbstractState(String state, String workerId, String traceMessage) {
if (state == null || state.trim().isEmpty()) {
if (Utils.isBlank(state)) {
throw new IllegalArgumentException("State must not be null or empty");
}
if (workerId == null || workerId.trim().isEmpty()) {
if (Utils.isBlank(workerId)) {
throw new IllegalArgumentException("Worker ID must not be null or empty");
}
this.state = state;

View File

@ -16,10 +16,11 @@
*/
package org.apache.kafka.connect.health;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.common.utils.Utils;
/**
* Provides basic health information about the connector and its tasks.
*/
@ -35,7 +36,7 @@ public class ConnectorHealth {
ConnectorState connectorState,
Map<Integer, TaskState> tasks,
ConnectorType type) {
if (name == null || name.trim().isEmpty()) {
if (Utils.isBlank(name)) {
throw new IllegalArgumentException("Connector name is required");
}
Objects.requireNonNull(connectorState, "connectorState can't be null");

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.data;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.data.Values.Parser;
import org.apache.kafka.connect.errors.DataException;
@ -911,7 +912,7 @@ public class ValuesTest {
protected void assertConsumable(Parser parser, String... expectedTokens) {
for (String expectedToken : expectedTokens) {
if (!expectedToken.trim().isEmpty()) {
if (!Utils.isBlank(expectedToken)) {
int position = parser.mark();
assertTrue(parser.canConsume(expectedToken.trim()));
parser.rewindTo(position);

View File

@ -18,6 +18,7 @@
package org.apache.kafka.connect.rest.basic.auth.extension;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -59,7 +60,7 @@ public class PropertyFileLoginModule implements LoginModule {
public void initialize(Subject subject, CallbackHandler callbackHandler, Map<String, ?> sharedState, Map<String, ?> options) {
this.callbackHandler = callbackHandler;
fileName = (String) options.get(FILE_OPTIONS);
if (fileName == null || fileName.trim().isEmpty()) {
if (Utils.isBlank(fileName)) {
throw new ConfigException("Property Credentials file must be specified");
}

View File

@ -494,7 +494,7 @@ public class ConnectorConfig extends AbstractConfig {
.filter(c -> Modifier.isPublic(c.getModifiers()))
.map(Class::getName)
.collect(Collectors.joining(", "));
String message = childClassNames.trim().isEmpty() ?
String message = Utils.isBlank(childClassNames) ?
aliasKind + " is abstract and cannot be created." :
aliasKind + " is abstract and cannot be created. Did you mean " + childClassNames + "?";
throw new ConfigException(key, String.valueOf(cls), message);

View File

@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.sink.SinkTask;
@ -125,17 +126,17 @@ public class SinkConnectorConfig extends ConnectorConfig {
public static boolean hasTopicsConfig(Map<String, String> props) {
String topicsStr = props.get(TOPICS_CONFIG);
return topicsStr != null && !topicsStr.trim().isEmpty();
return !Utils.isBlank(topicsStr);
}
public static boolean hasTopicsRegexConfig(Map<String, String> props) {
String topicsRegexStr = props.get(TOPICS_REGEX_CONFIG);
return topicsRegexStr != null && !topicsRegexStr.trim().isEmpty();
return !Utils.isBlank(topicsRegexStr);
}
public static boolean hasDlqTopicConfig(Map<String, String> props) {
String dqlTopicStr = props.get(DLQ_TOPIC_NAME_CONFIG);
return dqlTopicStr != null && !dqlTopicStr.trim().isEmpty();
return !Utils.isBlank(dqlTopicStr);
}
@SuppressWarnings("unchecked")

View File

@ -25,6 +25,7 @@ import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.storage.Converter;
@ -516,7 +517,7 @@ public class WorkerConfig extends AbstractConfig {
if (!(item instanceof String)) {
throw new ConfigException("Invalid type for admin listener (expected String).");
}
if (((String) item).trim().isEmpty()) {
if (Utils.isBlank((String) item)) {
throw new ConfigException("Empty listener found when parsing list.");
}
}
@ -527,7 +528,7 @@ public class WorkerConfig extends AbstractConfig {
@Override
public void ensureValid(String name, Object value) {
String strValue = (String) value;
if (strValue == null || strValue.trim().isEmpty()) {
if (Utils.isBlank(strValue)) {
return;
}

View File

@ -1447,7 +1447,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
forwardRequestExecutor.submit(() -> {
try {
String leaderUrl = leaderUrl();
if (leaderUrl == null || leaderUrl.trim().isEmpty()) {
if (Utils.isBlank(leaderUrl)) {
cb.onCompletion(new ConnectException("Request to leader to " +
"reconfigure connector tasks failed " +
"because the URL of the leader's REST interface is empty!"), null);

View File

@ -445,8 +445,7 @@ public class Plugins {
plugin = newPlugin(klass);
if (plugin instanceof Versioned) {
Versioned versionedPlugin = (Versioned) plugin;
if (versionedPlugin.version() == null || versionedPlugin.version().trim()
.isEmpty()) {
if (Utils.isBlank(versionedPlugin.version())) {
throw new ConnectException("Version not defined for '" + klassName + "'");
}
}

View File

@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.rest;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.health.ConnectClusterDetails;
import org.apache.kafka.connect.rest.ConnectRestExtension;
@ -275,19 +276,19 @@ public class RestServer {
}
String allowedOrigins = config.getString(WorkerConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG);
if (allowedOrigins != null && !allowedOrigins.trim().isEmpty()) {
if (!Utils.isBlank(allowedOrigins)) {
FilterHolder filterHolder = new FilterHolder(new CrossOriginFilter());
filterHolder.setName("cross-origin");
filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_ORIGINS_PARAM, allowedOrigins);
String allowedMethods = config.getString(WorkerConfig.ACCESS_CONTROL_ALLOW_METHODS_CONFIG);
if (allowedMethods != null && !allowedOrigins.trim().isEmpty()) {
if (!Utils.isBlank(allowedMethods)) {
filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_METHODS_PARAM, allowedMethods);
}
context.addFilter(filterHolder, "/*", EnumSet.of(DispatcherType.REQUEST));
}
String headerConfig = config.getString(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG);
if (headerConfig != null && !headerConfig.trim().isEmpty()) {
if (!Utils.isBlank(headerConfig)) {
configureHttpResponsHeaderFilter(context);
}

View File

@ -255,11 +255,11 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements
throw new ConfigException("Unknown timestamp type in TimestampConverter: " + type + ". Valid values are "
+ Utils.join(VALID_TYPES, ", ") + ".");
}
if (type.equals(TYPE_STRING) && formatPattern.trim().isEmpty()) {
if (type.equals(TYPE_STRING) && Utils.isBlank(formatPattern)) {
throw new ConfigException("TimestampConverter requires format option to be specified when using string timestamps");
}
SimpleDateFormat format = null;
if (formatPattern != null && !formatPattern.trim().isEmpty()) {
if (!Utils.isBlank(formatPattern)) {
try {
format = new SimpleDateFormat(formatPattern);
format.setTimeZone(UTC);

View File

@ -47,7 +47,7 @@ import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, ClientI
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{ApiVersionsRequest, RequestContext, RequestHeader}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time}
import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time, Utils}
import org.apache.kafka.common.{Endpoint, KafkaException, MetricName, Reconfigurable}
import org.slf4j.event.Level
@ -650,7 +650,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
*/
private def openServerSocket(host: String, port: Int): ServerSocketChannel = {
val socketAddress =
if (host == null || host.trim.isEmpty)
if (Utils.isBlank(host))
new InetSocketAddress(port)
else
new InetSocketAddress(host, port)

View File

@ -46,7 +46,7 @@ import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledSh
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.security.{JaasContext, JaasUtils}
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils}
import org.apache.kafka.common.{Endpoint, Node}
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.server.authorizer.Authorizer
@ -477,7 +477,7 @@ class KafkaServer(
}
val updatedEndpoints = listeners.map(endpoint =>
if (endpoint.host == null || endpoint.host.trim.isEmpty)
if (Utils.isBlank(endpoint.host))
endpoint.copy(host = InetAddress.getLocalHost.getCanonicalHostName)
else
endpoint

View File

@ -20,6 +20,7 @@ package kafka.utils
import java.util
import java.util.Locale
import org.apache.kafka.common.utils.Utils
import org.apache.log4j.{Level, LogManager, Logger}
import scala.collection.mutable
@ -71,7 +72,7 @@ object Log4jController {
*/
def logLevel(loggerName: String, logLevel: String): Boolean = {
val log = existingLogger(loggerName)
if (!loggerName.trim.isEmpty && !logLevel.trim.isEmpty && log != null) {
if (!Utils.isBlank(loggerName) && !Utils.isBlank(logLevel) && log != null) {
log.setLevel(Level.toLevel(logLevel.toUpperCase(Locale.ROOT)))
true
}
@ -80,7 +81,7 @@ object Log4jController {
def unsetLogLevel(loggerName: String): Boolean = {
val log = existingLogger(loggerName)
if (!loggerName.trim.isEmpty && log != null) {
if (!Utils.isBlank(loggerName) && log != null) {
log.setLevel(null)
true
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.TopologyException;
import java.io.IOException;
@ -63,7 +64,7 @@ public class Printed<K, V> implements NamedOperation<Printed<K, V>> {
*/
public static <K, V> Printed<K, V> toFile(final String filePath) {
Objects.requireNonNull(filePath, "filePath can't be null");
if (filePath.trim().isEmpty()) {
if (Utils.isBlank(filePath)) {
throw new TopologyException("filePath can't be an empty string");
}
try {

View File

@ -21,6 +21,7 @@ import static org.apache.kafka.common.utils.Utils.getPort;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
@ -55,7 +56,7 @@ public class HostInfo {
* @return a new HostInfo or null if endPoint is null or has no characters
*/
public static HostInfo buildFromEndpoint(final String endPoint) {
if (endPoint == null || endPoint.trim().isEmpty()) {
if (Utils.isBlank(endPoint)) {
return null;
}