mirror of https://github.com/apache/kafka.git
MINOR: Small cleanups in clients (#20530)
- Fix non-constant calls to logging - Fix assertEquals order - Fix javadoc Reviewers: TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
caeca090b8
commit
3cbb2a0aaf
|
@ -22,7 +22,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
|
|||
import java.util.Collection;
|
||||
|
||||
/**
|
||||
* Options for the {@link Admin#deleteShareGroups(Collection <String>, DeleteShareGroupsOptions)} call.
|
||||
* Options for the {@link Admin#deleteShareGroups(Collection, DeleteShareGroupsOptions)} call.
|
||||
* <p>
|
||||
* The API of this class is evolving, see {@link Admin} for details.
|
||||
*/
|
||||
|
|
|
@ -25,7 +25,7 @@ import java.util.HashMap;
|
|||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* The result of the {@link Admin#deleteShareGroups(Collection <String>, DeleteShareGroupsOptions)} call.
|
||||
* The result of the {@link Admin#deleteShareGroups(Collection, DeleteShareGroupsOptions)} call.
|
||||
* <p></p>
|
||||
* The API of this class is evolving, see {@link Admin} for details.
|
||||
*/
|
||||
|
|
|
@ -21,7 +21,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
|
|||
import java.util.Collection;
|
||||
|
||||
/**
|
||||
* Options for the {@link Admin#deleteStreamsGroups(Collection<String>, DeleteStreamsGroupsOptions)} call.
|
||||
* Options for the {@link Admin#deleteStreamsGroups(Collection, DeleteStreamsGroupsOptions)} call.
|
||||
* <p>
|
||||
* The API of this class is evolving, see {@link Admin} for details.
|
||||
*/
|
||||
|
|
|
@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition;
|
|||
import org.apache.kafka.common.annotation.InterfaceStability;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Specification of streams group offsets to list using {@link Admin#listStreamsGroupOffsets(Map, ListStreamsGroupOffsetsOptions)}.
|
||||
|
|
|
@ -122,8 +122,8 @@ public class DeleteShareGroupOffsetsHandler extends AdminApiHandler.Batched<Coor
|
|||
if (topic.errorCode() != Errors.NONE.code()) {
|
||||
final Errors topicError = Errors.forCode(topic.errorCode());
|
||||
final String topicErrorMessage = topic.errorMessage();
|
||||
log.debug("DeleteShareGroupOffsets request for group id {} and topic {} failed and returned error {}." + topicErrorMessage,
|
||||
groupId.idValue, topic.topicName(), topicError);
|
||||
log.debug("DeleteShareGroupOffsets request for group id {} and topic {} failed and returned error {}. {}",
|
||||
groupId.idValue, topic.topicName(), topicError, topicErrorMessage);
|
||||
}
|
||||
topicResults.put(
|
||||
topic.topicName(),
|
||||
|
@ -147,14 +147,14 @@ public class DeleteShareGroupOffsetsHandler extends AdminApiHandler.Batched<Coor
|
|||
case REBALANCE_IN_PROGRESS:
|
||||
// If the coordinator is in the middle of loading, then we just need to retry
|
||||
log.debug("DeleteShareGroupOffsets request for group id {} failed because the coordinator" +
|
||||
" is still in the process of loading state. Will retry. " + errorMessage, groupId.idValue);
|
||||
" is still in the process of loading state. Will retry. {}", groupId.idValue, errorMessage);
|
||||
break;
|
||||
case COORDINATOR_NOT_AVAILABLE:
|
||||
case NOT_COORDINATOR:
|
||||
// If the coordinator is unavailable or there was a coordinator change, then we unmap
|
||||
// the key so that we retry the `FindCoordinator` request
|
||||
log.debug("DeleteShareGroupOffsets request for group id {} returned error {}. Will rediscover the coordinator and retry. " + errorMessage,
|
||||
groupId.idValue, error);
|
||||
log.debug("DeleteShareGroupOffsets request for group id {} returned error {}. Will rediscover the coordinator and retry. {}",
|
||||
groupId.idValue, error, errorMessage);
|
||||
groupsToUnmap.add(groupId);
|
||||
break;
|
||||
case INVALID_GROUP_ID:
|
||||
|
@ -164,11 +164,11 @@ public class DeleteShareGroupOffsetsHandler extends AdminApiHandler.Batched<Coor
|
|||
case UNKNOWN_SERVER_ERROR:
|
||||
case KAFKA_STORAGE_ERROR:
|
||||
case GROUP_AUTHORIZATION_FAILED:
|
||||
log.debug("DeleteShareGroupOffsets request for group id {} failed due to error {}. " + errorMessage, groupId.idValue, error);
|
||||
log.debug("DeleteShareGroupOffsets request for group id {} failed due to error {}. {}", groupId.idValue, error, errorMessage);
|
||||
failed.put(groupId, error.exception(errorMessage));
|
||||
break;
|
||||
default:
|
||||
log.error("DeleteShareGroupOffsets request for group id {} failed due to unexpected error {}. " + errorMessage, groupId.idValue, error);
|
||||
log.error("DeleteShareGroupOffsets request for group id {} failed due to unexpected error {}. {}", groupId.idValue, error, errorMessage);
|
||||
failed.put(groupId, error.exception(errorMessage));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -202,7 +202,7 @@ public final class ListOffsetsHandler extends Batched<TopicPartition, ListOffset
|
|||
public Map<TopicPartition, Throwable> handleUnsupportedVersionException(
|
||||
int brokerId, UnsupportedVersionException exception, Set<TopicPartition> keys
|
||||
) {
|
||||
log.warn("Broker " + brokerId + " does not support MAX_TIMESTAMP offset specs");
|
||||
log.warn("Broker {} does not support MAX_TIMESTAMP offset specs", brokerId);
|
||||
Map<TopicPartition, Throwable> maxTimestampPartitions = new HashMap<>();
|
||||
for (TopicPartition topicPartition : keys) {
|
||||
Long offsetTimestamp = offsetTimestampsByPartition.get(topicPartition);
|
||||
|
|
|
@ -988,7 +988,7 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
|
|||
String reason = rejoinedWhileReconciliationInProgress ?
|
||||
"the member has re-joined the group" :
|
||||
"the member already transitioned out of the reconciling state into " + state;
|
||||
log.info("Interrupting reconciliation that is not relevant anymore because " + reason);
|
||||
log.info("Interrupting reconciliation that is not relevant anymore because {}", reason);
|
||||
markReconciliationCompleted();
|
||||
}
|
||||
return shouldAbort;
|
||||
|
|
|
@ -965,8 +965,8 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
|
|||
super(partitionsPerTopic, rackInfo, currentAssignment);
|
||||
this.subscriptions = subscriptions;
|
||||
|
||||
topic2AllPotentialConsumers = new HashMap<>(partitionsPerTopic.keySet().size());
|
||||
consumer2AllPotentialTopics = new HashMap<>(subscriptions.keySet().size());
|
||||
topic2AllPotentialConsumers = new HashMap<>(partitionsPerTopic.size());
|
||||
consumer2AllPotentialTopics = new HashMap<>(subscriptions.size());
|
||||
|
||||
// initialize topic2AllPotentialConsumers and consumer2AllPotentialTopics
|
||||
partitionsPerTopic.keySet().forEach(
|
||||
|
|
|
@ -541,7 +541,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
|
|||
boolean inflightRemoved = pendingRequests.inflightOffsetFetches.remove(fetchRequest);
|
||||
if (!inflightRemoved) {
|
||||
log.warn("A duplicated, inflight, request was identified, but unable to find it in the " +
|
||||
"outbound buffer:" + fetchRequest);
|
||||
"outbound buffer: {}", fetchRequest);
|
||||
}
|
||||
if (error == null) {
|
||||
maybeUpdateLastSeenEpochIfNewer(res);
|
||||
|
|
|
@ -287,7 +287,7 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager {
|
|||
|
||||
private final HeartbeatMetricsManager metricsManager;
|
||||
|
||||
private StreamsRebalanceData streamsRebalanceData;
|
||||
private final StreamsRebalanceData streamsRebalanceData;
|
||||
|
||||
/**
|
||||
* Timer for tracking the time since the last consumer poll. If the timer expires, the consumer will stop
|
||||
|
|
|
@ -1010,8 +1010,8 @@ public class StreamsMembershipManager implements RequestManager {
|
|||
return;
|
||||
}
|
||||
if (reconciliationInProgress) {
|
||||
log.trace("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment " +
|
||||
targetAssignment + " will be handled in the next reconciliation loop.");
|
||||
log.trace("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment {}" +
|
||||
" will be handled in the next reconciliation loop.", targetAssignment);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1131,12 +1131,12 @@ public class StreamsMembershipManager implements RequestManager {
|
|||
);
|
||||
|
||||
final SortedSet<TopicPartition> partitionsToAssign = topicPartitionsForActiveTasks(activeTasksToAssign);
|
||||
final SortedSet<TopicPartition> partitionsToAssigneNotPreviouslyOwned =
|
||||
final SortedSet<TopicPartition> partitionsToAssignNotPreviouslyOwned =
|
||||
partitionsToAssignNotPreviouslyOwned(partitionsToAssign, topicPartitionsForActiveTasks(ownedActiveTasks));
|
||||
|
||||
subscriptionState.assignFromSubscribedAwaitingCallback(
|
||||
partitionsToAssign,
|
||||
partitionsToAssigneNotPreviouslyOwned
|
||||
partitionsToAssignNotPreviouslyOwned
|
||||
);
|
||||
notifyAssignmentChange(partitionsToAssign);
|
||||
|
||||
|
@ -1152,10 +1152,10 @@ public class StreamsMembershipManager implements RequestManager {
|
|||
if (callbackError == null) {
|
||||
subscriptionState.enablePartitionsAwaitingCallback(partitionsToAssign);
|
||||
} else {
|
||||
if (!partitionsToAssigneNotPreviouslyOwned.isEmpty()) {
|
||||
if (!partitionsToAssignNotPreviouslyOwned.isEmpty()) {
|
||||
log.warn("Leaving newly assigned partitions {} marked as non-fetchable and not " +
|
||||
"requiring initializing positions after onTasksAssigned callback failed.",
|
||||
partitionsToAssigneNotPreviouslyOwned, callbackError);
|
||||
partitionsToAssignNotPreviouslyOwned, callbackError);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -1205,9 +1205,9 @@ public class StreamsMembershipManager implements RequestManager {
|
|||
Stream.concat(
|
||||
streamsRebalanceData.subtopologies().get(task.subtopologyId()).sourceTopics().stream(),
|
||||
streamsRebalanceData.subtopologies().get(task.subtopologyId()).repartitionSourceTopics().keySet().stream()
|
||||
).forEach(topic -> {
|
||||
topicPartitions.add(new TopicPartition(topic, task.partitionId()));
|
||||
})
|
||||
).forEach(topic ->
|
||||
topicPartitions.add(new TopicPartition(topic, task.partitionId()))
|
||||
)
|
||||
);
|
||||
return topicPartitions;
|
||||
}
|
||||
|
@ -1223,7 +1223,7 @@ public class StreamsMembershipManager implements RequestManager {
|
|||
String reason = rejoinedWhileReconciliationInProgress ?
|
||||
"the member has re-joined the group" :
|
||||
"the member already transitioned out of the reconciling state into " + state;
|
||||
log.info("Interrupting reconciliation that is not relevant anymore because " + reason);
|
||||
log.info("Interrupting reconciliation that is not relevant anymore because {}", reason);
|
||||
markReconciliationCompleted();
|
||||
}
|
||||
return shouldAbort;
|
||||
|
|
|
@ -553,7 +553,7 @@ public final class Metrics implements Closeable {
|
|||
try {
|
||||
reporter.metricRemoval(metric);
|
||||
} catch (Exception e) {
|
||||
log.error("Error when removing metric from " + reporter.getClass().getName(), e);
|
||||
log.error("Error when removing metric from {}", reporter.getClass().getName(), e);
|
||||
}
|
||||
}
|
||||
log.trace("Removed metric named {}", metricName);
|
||||
|
@ -596,7 +596,7 @@ public final class Metrics implements Closeable {
|
|||
try {
|
||||
reporter.metricChange(metric);
|
||||
} catch (Exception e) {
|
||||
log.error("Error when registering metric on " + reporter.getClass().getName(), e);
|
||||
log.error("Error when registering metric on {}", reporter.getClass().getName(), e);
|
||||
}
|
||||
}
|
||||
log.trace("Registered metric named {}", metricName);
|
||||
|
@ -688,7 +688,7 @@ public final class Metrics implements Closeable {
|
|||
log.info("Closing reporter {}", reporter.getClass().getName());
|
||||
reporter.close();
|
||||
} catch (Exception e) {
|
||||
log.error("Error when closing " + reporter.getClass().getName(), e);
|
||||
log.error("Error when closing {}", reporter.getClass().getName(), e);
|
||||
}
|
||||
}
|
||||
log.info("Metrics reporters closed");
|
||||
|
|
|
@ -40,7 +40,7 @@ public final class ByteBufferUnmapper {
|
|||
private static final RuntimeException UNMAP_NOT_SUPPORTED_EXCEPTION;
|
||||
|
||||
static {
|
||||
Object unmap = null;
|
||||
MethodHandle unmap = null;
|
||||
RuntimeException exception = null;
|
||||
try {
|
||||
unmap = lookupUnmapMethodHandle();
|
||||
|
@ -48,7 +48,7 @@ public final class ByteBufferUnmapper {
|
|||
exception = e;
|
||||
}
|
||||
if (unmap != null) {
|
||||
UNMAP = (MethodHandle) unmap;
|
||||
UNMAP = unmap;
|
||||
UNMAP_NOT_SUPPORTED_EXCEPTION = null;
|
||||
} else {
|
||||
UNMAP = null;
|
||||
|
|
|
@ -75,7 +75,8 @@ public class ConfigUtils {
|
|||
} else if (value instanceof String) {
|
||||
return Boolean.parseBoolean((String) value);
|
||||
} else {
|
||||
log.error("Invalid value (" + value + ") on configuration '" + key + "'. The default value '" + defaultValue + "' will be used instead. Please specify a true/false value.");
|
||||
log.error("Invalid value ({}) on configuration '{}'. The default value '{}' will be used instead. Please specify a true/false value.",
|
||||
value, key, defaultValue);
|
||||
return defaultValue;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -69,7 +69,7 @@ public class LoggingSignalHandler {
|
|||
for (String signal : SIGNALS) {
|
||||
register(signal, jvmSignalHandlers);
|
||||
}
|
||||
log.info("Registered signal handlers for " + String.join(", ", SIGNALS));
|
||||
log.info("Registered signal handlers for {}", String.join(", ", SIGNALS));
|
||||
}
|
||||
|
||||
private Object createSignalHandler(final Map<String, Object> jvmSignalHandlers) {
|
||||
|
|
|
@ -33,7 +33,7 @@ import java.util.Map;
|
|||
|
||||
public class SecurityUtils {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(SecurityConfig.class);
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(SecurityUtils.class);
|
||||
|
||||
private static final Map<String, ResourceType> NAME_TO_RESOURCE_TYPES;
|
||||
private static final Map<String, AclOperation> NAME_TO_OPERATIONS;
|
||||
|
|
|
@ -857,7 +857,7 @@ public final class Utils {
|
|||
public static void delete(final File rootFile) throws IOException {
|
||||
if (rootFile == null)
|
||||
return;
|
||||
Files.walkFileTree(rootFile.toPath(), new SimpleFileVisitor<Path>() {
|
||||
Files.walkFileTree(rootFile.toPath(), new SimpleFileVisitor<>() {
|
||||
@Override
|
||||
public FileVisitResult visitFileFailed(Path path, IOException exc) throws IOException {
|
||||
if (exc instanceof NoSuchFileException) {
|
||||
|
@ -1403,7 +1403,7 @@ public final class Utils {
|
|||
* @return new Collector<Map.Entry<K, V>, M, M>
|
||||
*/
|
||||
public static <K, V, M extends Map<K, V>> Collector<Map.Entry<K, V>, M, M> entriesToMap(final Supplier<M> mapSupplier) {
|
||||
return new Collector<Map.Entry<K, V>, M, M>() {
|
||||
return new Collector<>() {
|
||||
@Override
|
||||
public Supplier<M> supplier() {
|
||||
return mapSupplier;
|
||||
|
|
|
@ -318,7 +318,7 @@ public class KafkaFutureTest {
|
|||
awaitAndAssertResult(future, 21, null);
|
||||
Throwable cause = awaitAndAssertFailure(dependantFuture, CompletionException.class, "java.lang.RuntimeException: We require more vespene gas");
|
||||
assertInstanceOf(RuntimeException.class, cause.getCause());
|
||||
assertEquals(cause.getCause().getMessage(), "We require more vespene gas");
|
||||
assertEquals("We require more vespene gas", cause.getCause().getMessage());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -35,8 +35,8 @@ public class UuidTest {
|
|||
public void testSignificantBits() {
|
||||
Uuid id = new Uuid(34L, 98L);
|
||||
|
||||
assertEquals(id.getMostSignificantBits(), 34L);
|
||||
assertEquals(id.getLeastSignificantBits(), 98L);
|
||||
assertEquals(34L, id.getMostSignificantBits());
|
||||
assertEquals(98L, id.getLeastSignificantBits());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -74,15 +74,15 @@ public class UuidTest {
|
|||
|
||||
String zeroIdString = Uuid.ZERO_UUID.toString();
|
||||
|
||||
assertEquals(Uuid.fromString(zeroIdString), Uuid.ZERO_UUID);
|
||||
assertEquals(Uuid.ZERO_UUID, Uuid.fromString(zeroIdString));
|
||||
}
|
||||
|
||||
@RepeatedTest(value = 100, name = RepeatedTest.LONG_DISPLAY_NAME)
|
||||
public void testRandomUuid() {
|
||||
Uuid randomID = Uuid.randomUuid();
|
||||
|
||||
assertNotEquals(randomID, Uuid.ZERO_UUID);
|
||||
assertNotEquals(randomID, Uuid.METADATA_TOPIC_ID);
|
||||
assertNotEquals(Uuid.ZERO_UUID, randomID);
|
||||
assertNotEquals(Uuid.METADATA_TOPIC_ID, randomID);
|
||||
assertFalse(randomID.toString().startsWith("-"));
|
||||
}
|
||||
|
||||
|
|
|
@ -379,8 +379,8 @@ public class AbstractConfigTest {
|
|||
Properties props = new Properties();
|
||||
props.put("config.providers", "file");
|
||||
TestIndirectConfigResolution config = new TestIndirectConfigResolution(props);
|
||||
assertEquals(config.originals().get("config.providers"), "file");
|
||||
assertEquals(config.originals(Collections.singletonMap("config.providers", "file2")).get("config.providers"), "file2");
|
||||
assertEquals("file", config.originals().get("config.providers"));
|
||||
assertEquals("file2", config.originals(Collections.singletonMap("config.providers", "file2")).get("config.providers"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -62,10 +62,10 @@ public class ChannelBuildersTest {
|
|||
assertNull(configs.get("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
|
||||
assertFalse(securityConfig.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
|
||||
|
||||
assertEquals(configs.get("gssapi.sasl.kerberos.service.name"), "testkafka");
|
||||
assertEquals("testkafka", configs.get("gssapi.sasl.kerberos.service.name"));
|
||||
assertFalse(securityConfig.unused().contains("gssapi.sasl.kerberos.service.name"));
|
||||
|
||||
assertEquals(configs.get("sasl.kerberos.service.name"), "testkafkaglobal");
|
||||
assertEquals("testkafkaglobal", configs.get("sasl.kerberos.service.name"));
|
||||
assertFalse(securityConfig.unused().contains("sasl.kerberos.service.name"));
|
||||
|
||||
assertNull(configs.get("listener.name.listener1.sasl.kerberos.service.name"));
|
||||
|
@ -74,35 +74,35 @@ public class ChannelBuildersTest {
|
|||
assertNull(configs.get("plain.sasl.server.callback.handler.class"));
|
||||
assertFalse(securityConfig.unused().contains("plain.sasl.server.callback.handler.class"));
|
||||
|
||||
assertEquals(configs.get("listener.name.listener1.gssapi.config1.key"), "custom.config1");
|
||||
assertEquals("custom.config1", configs.get("listener.name.listener1.gssapi.config1.key"));
|
||||
assertFalse(securityConfig.unused().contains("listener.name.listener1.gssapi.config1.key"));
|
||||
|
||||
assertEquals(configs.get("custom.config2.key"), "custom.config2");
|
||||
assertEquals("custom.config2", configs.get("custom.config2.key"));
|
||||
assertFalse(securityConfig.unused().contains("custom.config2.key"));
|
||||
|
||||
// test configs without listener prefix
|
||||
securityConfig = new TestSecurityConfig(props);
|
||||
configs = ChannelBuilders.channelBuilderConfigs(securityConfig, null);
|
||||
|
||||
assertEquals(configs.get("listener.name.listener1.gssapi.sasl.kerberos.service.name"), "testkafka");
|
||||
assertEquals("testkafka", configs.get("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
|
||||
assertFalse(securityConfig.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
|
||||
|
||||
assertNull(configs.get("gssapi.sasl.kerberos.service.name"));
|
||||
assertFalse(securityConfig.unused().contains("gssapi.sasl.kerberos.service.name"));
|
||||
|
||||
assertEquals(configs.get("listener.name.listener1.sasl.kerberos.service.name"), "testkafkaglobal");
|
||||
assertEquals("testkafkaglobal", configs.get("listener.name.listener1.sasl.kerberos.service.name"));
|
||||
assertFalse(securityConfig.unused().contains("listener.name.listener1.sasl.kerberos.service.name"));
|
||||
|
||||
assertNull(configs.get("sasl.kerberos.service.name"));
|
||||
assertFalse(securityConfig.unused().contains("sasl.kerberos.service.name"));
|
||||
|
||||
assertEquals(configs.get("plain.sasl.server.callback.handler.class"), "callback");
|
||||
assertEquals("callback", configs.get("plain.sasl.server.callback.handler.class"));
|
||||
assertFalse(securityConfig.unused().contains("plain.sasl.server.callback.handler.class"));
|
||||
|
||||
assertEquals(configs.get("listener.name.listener1.gssapi.config1.key"), "custom.config1");
|
||||
assertEquals("custom.config1", configs.get("listener.name.listener1.gssapi.config1.key"));
|
||||
assertFalse(securityConfig.unused().contains("listener.name.listener1.gssapi.config1.key"));
|
||||
|
||||
assertEquals(configs.get("custom.config2.key"), "custom.config2");
|
||||
assertEquals("custom.config2", configs.get("custom.config2.key"));
|
||||
assertFalse(securityConfig.unused().contains("custom.config2.key"));
|
||||
}
|
||||
|
||||
|
|
|
@ -53,7 +53,7 @@ public class ErrorsTest {
|
|||
public void testExceptionsAreNotGeneric() {
|
||||
for (Errors error : Errors.values()) {
|
||||
if (error != Errors.NONE)
|
||||
assertNotEquals(error.exception().getClass(), ApiException.class, "Generic ApiException should not be used");
|
||||
assertNotEquals(ApiException.class, error.exception().getClass(), "Generic ApiException should not be used");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -197,7 +197,7 @@ public class FileRecordsTest {
|
|||
* Test a simple append and read.
|
||||
*/
|
||||
@Test
|
||||
public void testRead() throws IOException {
|
||||
public void testRead() {
|
||||
FileRecords read = fileRecords.slice(0, fileRecords.sizeInBytes());
|
||||
assertEquals(fileRecords.sizeInBytes(), read.sizeInBytes());
|
||||
TestUtils.checkEquals(fileRecords.batches(), read.batches());
|
||||
|
@ -279,7 +279,7 @@ public class FileRecordsTest {
|
|||
* Test that the message set iterator obeys start and end slicing
|
||||
*/
|
||||
@Test
|
||||
public void testIteratorWithLimits() throws IOException {
|
||||
public void testIteratorWithLimits() {
|
||||
RecordBatch batch = batches(fileRecords).get(1);
|
||||
int start = fileRecords.searchForOffsetFromPosition(1, 0).position;
|
||||
int size = batch.sizeInBytes();
|
||||
|
|
|
@ -46,8 +46,8 @@ public class ReplicaSelectorTest {
|
|||
ReplicaSelector selector = new RackAwareReplicaSelector();
|
||||
Optional<ReplicaView> selected = selector.select(tp, metadata("rack-b"), partitionView);
|
||||
assertOptional(selected, replicaInfo -> {
|
||||
assertEquals(replicaInfo.endpoint().rack(), "rack-b", "Expect replica to be in rack-b");
|
||||
assertEquals(replicaInfo.endpoint().id(), 3, "Expected replica 3 since it is more caught-up");
|
||||
assertEquals("rack-b", replicaInfo.endpoint().rack(), "Expect replica to be in rack-b");
|
||||
assertEquals(3, replicaInfo.endpoint().id(), "Expected replica 3 since it is more caught-up");
|
||||
});
|
||||
|
||||
selected = selector.select(tp, metadata("not-a-rack"), partitionView);
|
||||
|
@ -57,7 +57,7 @@ public class ReplicaSelectorTest {
|
|||
|
||||
selected = selector.select(tp, metadata("rack-a"), partitionView);
|
||||
assertOptional(selected, replicaInfo -> {
|
||||
assertEquals(replicaInfo.endpoint().rack(), "rack-a", "Expect replica to be in rack-a");
|
||||
assertEquals("rack-a", replicaInfo.endpoint().rack(), "Expect replica to be in rack-a");
|
||||
assertEquals(replicaInfo, leader, "Expect the leader since it's in rack-a");
|
||||
});
|
||||
|
||||
|
|
Loading…
Reference in New Issue