mirror of https://github.com/apache/kafka.git
KAFKA-7613: Enable -Xlint:rawtypes for connect, fixing warnings (#8571)
Reviewers: Konstantine Karantasis <konstantine@confluent.io>
This commit is contained in:
parent
e00c0f3719
commit
862f814cc3
|
@ -222,7 +222,8 @@ subprojects {
|
|||
options.encoding = 'UTF-8'
|
||||
options.compilerArgs << "-Xlint:all"
|
||||
// temporary exclusions until all the warnings are fixed
|
||||
options.compilerArgs << "-Xlint:-rawtypes"
|
||||
if (!project.path.startsWith(":connect"))
|
||||
options.compilerArgs << "-Xlint:-rawtypes"
|
||||
options.compilerArgs << "-Xlint:-serial"
|
||||
options.compilerArgs << "-Xlint:-try"
|
||||
options.compilerArgs << "-Werror"
|
||||
|
|
|
@ -155,7 +155,7 @@ public abstract class ConnectRecord<R extends ConnectRecord<R>> {
|
|||
if (o == null || getClass() != o.getClass())
|
||||
return false;
|
||||
|
||||
ConnectRecord that = (ConnectRecord) o;
|
||||
ConnectRecord<?> that = (ConnectRecord<?>) o;
|
||||
|
||||
return Objects.equals(kafkaPartition, that.kafkaPartition)
|
||||
&& Objects.equals(topic, that.topic)
|
||||
|
|
|
@ -32,11 +32,11 @@ public class ConnectSchema implements Schema {
|
|||
/**
|
||||
* Maps Schema.Types to a list of Java classes that can be used to represent them.
|
||||
*/
|
||||
private static final Map<Type, List<Class>> SCHEMA_TYPE_CLASSES = new EnumMap<>(Type.class);
|
||||
private static final Map<Type, List<Class<?>>> SCHEMA_TYPE_CLASSES = new EnumMap<>(Type.class);
|
||||
/**
|
||||
* Maps known logical types to a list of Java classes that can be used to represent them.
|
||||
*/
|
||||
private static final Map<String, List<Class>> LOGICAL_TYPE_CLASSES = new HashMap<>();
|
||||
private static final Map<String, List<Class<?>>> LOGICAL_TYPE_CLASSES = new HashMap<>();
|
||||
|
||||
/**
|
||||
* Maps the Java classes to the corresponding Schema.Type.
|
||||
|
@ -60,7 +60,7 @@ public class ConnectSchema implements Schema {
|
|||
SCHEMA_TYPE_CLASSES.put(Type.MAP, Collections.singletonList(Map.class));
|
||||
SCHEMA_TYPE_CLASSES.put(Type.STRUCT, Collections.singletonList(Struct.class));
|
||||
|
||||
for (Map.Entry<Type, List<Class>> schemaClasses : SCHEMA_TYPE_CLASSES.entrySet()) {
|
||||
for (Map.Entry<Type, List<Class<?>>> schemaClasses : SCHEMA_TYPE_CLASSES.entrySet()) {
|
||||
for (Class<?> schemaClass : schemaClasses.getValue())
|
||||
JAVA_CLASS_SCHEMA_TYPES.put(schemaClass, schemaClasses.getKey());
|
||||
}
|
||||
|
@ -221,7 +221,7 @@ public class ConnectSchema implements Schema {
|
|||
return;
|
||||
}
|
||||
|
||||
List<Class> expectedClasses = expectedClassesFor(schema);
|
||||
List<Class<?>> expectedClasses = expectedClassesFor(schema);
|
||||
|
||||
if (expectedClasses == null)
|
||||
throw new DataException("Invalid Java object for schema type " + schema.type()
|
||||
|
@ -263,8 +263,8 @@ public class ConnectSchema implements Schema {
|
|||
}
|
||||
}
|
||||
|
||||
private static List<Class> expectedClassesFor(Schema schema) {
|
||||
List<Class> expectedClasses = LOGICAL_TYPE_CLASSES.get(schema.name());
|
||||
private static List<Class<?>> expectedClassesFor(Schema schema) {
|
||||
List<Class<?>> expectedClasses = LOGICAL_TYPE_CLASSES.get(schema.name());
|
||||
if (expectedClasses == null)
|
||||
expectedClasses = SCHEMA_TYPE_CLASSES.get(schema.type());
|
||||
return expectedClasses;
|
||||
|
|
|
@ -33,7 +33,7 @@ public interface ConnectRestExtensionContext {
|
|||
*
|
||||
* @return @return the JAX-RS {@link javax.ws.rs.core.Configurable}; never {@code null}
|
||||
*/
|
||||
Configurable<? extends Configurable> configurable();
|
||||
Configurable<? extends Configurable<?>> configurable();
|
||||
|
||||
/**
|
||||
* Provides the cluster state and health information about the connectors and tasks.
|
||||
|
|
|
@ -610,7 +610,7 @@ public class JsonConverter implements Converter, HeaderConverter {
|
|||
else
|
||||
throw new DataException("Invalid type for bytes type: " + value.getClass());
|
||||
case ARRAY: {
|
||||
Collection collection = (Collection) value;
|
||||
Collection<?> collection = (Collection<?>) value;
|
||||
ArrayNode list = JSON_NODE_FACTORY.arrayNode();
|
||||
for (Object elem : collection) {
|
||||
Schema valueSchema = schema == null ? null : schema.valueSchema();
|
||||
|
|
|
@ -66,7 +66,7 @@ public class TransformationChain<R extends ConnectRecord<R>> implements AutoClos
|
|||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
TransformationChain that = (TransformationChain) o;
|
||||
TransformationChain<?> that = (TransformationChain<?>) o;
|
||||
return Objects.equals(transformations, that.transformations);
|
||||
}
|
||||
|
||||
|
|
|
@ -72,15 +72,15 @@ public class DelegatingClassLoader extends URLClassLoader {
|
|||
private final SortedSet<PluginDesc<Connector>> connectors;
|
||||
private final SortedSet<PluginDesc<Converter>> converters;
|
||||
private final SortedSet<PluginDesc<HeaderConverter>> headerConverters;
|
||||
private final SortedSet<PluginDesc<Transformation>> transformations;
|
||||
private final SortedSet<PluginDesc<Predicate>> predicates;
|
||||
private final SortedSet<PluginDesc<Transformation<?>>> transformations;
|
||||
private final SortedSet<PluginDesc<Predicate<?>>> predicates;
|
||||
private final SortedSet<PluginDesc<ConfigProvider>> configProviders;
|
||||
private final SortedSet<PluginDesc<ConnectRestExtension>> restExtensions;
|
||||
private final SortedSet<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies;
|
||||
private final List<String> pluginPaths;
|
||||
|
||||
private static final String MANIFEST_PREFIX = "META-INF/services/";
|
||||
private static final Class[] SERVICE_LOADER_PLUGINS = new Class[] {ConnectRestExtension.class, ConfigProvider.class};
|
||||
private static final Class<?>[] SERVICE_LOADER_PLUGINS = new Class<?>[] {ConnectRestExtension.class, ConfigProvider.class};
|
||||
private static final Set<String> PLUGIN_MANIFEST_FILES =
|
||||
Arrays.stream(SERVICE_LOADER_PLUGINS).map(serviceLoaderPlugin -> MANIFEST_PREFIX + serviceLoaderPlugin.getName())
|
||||
.collect(Collectors.toSet());
|
||||
|
@ -120,11 +120,11 @@ public class DelegatingClassLoader extends URLClassLoader {
|
|||
return headerConverters;
|
||||
}
|
||||
|
||||
public Set<PluginDesc<Transformation>> transformations() {
|
||||
public Set<PluginDesc<Transformation<?>>> transformations() {
|
||||
return transformations;
|
||||
}
|
||||
|
||||
public Set<PluginDesc<Predicate>> predicates() {
|
||||
public Set<PluginDesc<Predicate<?>>> predicates() {
|
||||
return predicates;
|
||||
}
|
||||
|
||||
|
@ -334,14 +334,24 @@ public class DelegatingClassLoader extends URLClassLoader {
|
|||
getPluginDesc(reflections, Connector.class, loader),
|
||||
getPluginDesc(reflections, Converter.class, loader),
|
||||
getPluginDesc(reflections, HeaderConverter.class, loader),
|
||||
getPluginDesc(reflections, Transformation.class, loader),
|
||||
getPluginDesc(reflections, Predicate.class, loader),
|
||||
getTransformationPluginDesc(loader, reflections),
|
||||
getPredicatePluginDesc(loader, reflections),
|
||||
getServiceLoaderPluginDesc(ConfigProvider.class, loader),
|
||||
getServiceLoaderPluginDesc(ConnectRestExtension.class, loader),
|
||||
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader)
|
||||
);
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
private Collection<PluginDesc<Predicate<?>>> getPredicatePluginDesc(ClassLoader loader, Reflections reflections) throws ReflectiveOperationException {
|
||||
return (Collection<PluginDesc<Predicate<?>>>) (Collection<?>) getPluginDesc(reflections, Predicate.class, loader);
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
private Collection<PluginDesc<Transformation<?>>> getTransformationPluginDesc(ClassLoader loader, Reflections reflections) throws ReflectiveOperationException {
|
||||
return (Collection<PluginDesc<Transformation<?>>>) (Collection<?>) getPluginDesc(reflections, Transformation.class, loader);
|
||||
}
|
||||
|
||||
private <T> Collection<PluginDesc<T>> getPluginDesc(
|
||||
Reflections reflections,
|
||||
Class<T> klass,
|
||||
|
@ -359,7 +369,7 @@ public class DelegatingClassLoader extends URLClassLoader {
|
|||
Collection<PluginDesc<T>> result = new ArrayList<>();
|
||||
for (Class<? extends T> plugin : plugins) {
|
||||
if (PluginUtils.isConcrete(plugin)) {
|
||||
result.add(new PluginDesc<>(plugin, versionFor(plugin), loader));
|
||||
result.add(pluginDesc(plugin, versionFor(plugin), loader));
|
||||
} else {
|
||||
log.debug("Skipping {} as it is not concrete implementation", plugin);
|
||||
}
|
||||
|
@ -367,6 +377,11 @@ public class DelegatingClassLoader extends URLClassLoader {
|
|||
return result;
|
||||
}
|
||||
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
private <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String version, ClassLoader loader) {
|
||||
return new PluginDesc(plugin, version, loader);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private <T> Collection<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass, ClassLoader loader) {
|
||||
ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);
|
||||
|
@ -374,7 +389,7 @@ public class DelegatingClassLoader extends URLClassLoader {
|
|||
try {
|
||||
ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
|
||||
for (T pluginImpl : serviceLoader) {
|
||||
result.add(new PluginDesc<>((Class<? extends T>) pluginImpl.getClass(),
|
||||
result.add(pluginDesc((Class<? extends T>) pluginImpl.getClass(),
|
||||
versionFor(pluginImpl), loader));
|
||||
}
|
||||
} finally {
|
||||
|
|
|
@ -103,7 +103,7 @@ public class PluginDesc<T> implements Comparable<PluginDesc<T>> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(PluginDesc other) {
|
||||
public int compareTo(PluginDesc<T> other) {
|
||||
int nameComp = name.compareTo(other.name);
|
||||
return nameComp != 0 ? nameComp : encodedVersion.compareTo(other.encodedVersion);
|
||||
}
|
||||
|
|
|
@ -33,20 +33,20 @@ public class PluginScanResult {
|
|||
private final Collection<PluginDesc<Connector>> connectors;
|
||||
private final Collection<PluginDesc<Converter>> converters;
|
||||
private final Collection<PluginDesc<HeaderConverter>> headerConverters;
|
||||
private final Collection<PluginDesc<Transformation>> transformations;
|
||||
private final Collection<PluginDesc<Predicate>> predicates;
|
||||
private final Collection<PluginDesc<Transformation<?>>> transformations;
|
||||
private final Collection<PluginDesc<Predicate<?>>> predicates;
|
||||
private final Collection<PluginDesc<ConfigProvider>> configProviders;
|
||||
private final Collection<PluginDesc<ConnectRestExtension>> restExtensions;
|
||||
private final Collection<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies;
|
||||
|
||||
private final List<Collection> allPlugins;
|
||||
private final List<Collection<?>> allPlugins;
|
||||
|
||||
public PluginScanResult(
|
||||
Collection<PluginDesc<Connector>> connectors,
|
||||
Collection<PluginDesc<Converter>> converters,
|
||||
Collection<PluginDesc<HeaderConverter>> headerConverters,
|
||||
Collection<PluginDesc<Transformation>> transformations,
|
||||
Collection<PluginDesc<Predicate>> predicates,
|
||||
Collection<PluginDesc<Transformation<?>>> transformations,
|
||||
Collection<PluginDesc<Predicate<?>>> predicates,
|
||||
Collection<PluginDesc<ConfigProvider>> configProviders,
|
||||
Collection<PluginDesc<ConnectRestExtension>> restExtensions,
|
||||
Collection<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies
|
||||
|
@ -76,11 +76,11 @@ public class PluginScanResult {
|
|||
return headerConverters;
|
||||
}
|
||||
|
||||
public Collection<PluginDesc<Transformation>> transformations() {
|
||||
public Collection<PluginDesc<Transformation<?>>> transformations() {
|
||||
return transformations;
|
||||
}
|
||||
|
||||
public Collection<PluginDesc<Predicate>> predicates() {
|
||||
public Collection<PluginDesc<Predicate<?>>> predicates() {
|
||||
return predicates;
|
||||
}
|
||||
|
||||
|
@ -98,7 +98,7 @@ public class PluginScanResult {
|
|||
|
||||
public boolean isEmpty() {
|
||||
boolean isEmpty = true;
|
||||
for (Collection plugins : allPlugins) {
|
||||
for (Collection<?> plugins : allPlugins) {
|
||||
isEmpty = isEmpty && plugins.isEmpty();
|
||||
}
|
||||
return isEmpty;
|
||||
|
|
|
@ -156,11 +156,11 @@ public class Plugins {
|
|||
return delegatingLoader.converters();
|
||||
}
|
||||
|
||||
public Set<PluginDesc<Transformation>> transformations() {
|
||||
public Set<PluginDesc<Transformation<?>>> transformations() {
|
||||
return delegatingLoader.transformations();
|
||||
}
|
||||
|
||||
public Set<PluginDesc<Predicate>> predicates() {
|
||||
public Set<PluginDesc<Predicate<?>>> predicates() {
|
||||
return delegatingLoader.predicates();
|
||||
}
|
||||
|
||||
|
|
|
@ -81,7 +81,7 @@ public class ConnectRestConfigurable implements Configurable<ResourceConfig> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public ResourceConfig register(Object component, Class... contracts) {
|
||||
public ResourceConfig register(Object component, Class<?>... contracts) {
|
||||
if (allowedToRegister(component)) {
|
||||
resourceConfig.register(component, contracts);
|
||||
}
|
||||
|
|
|
@ -24,11 +24,11 @@ import javax.ws.rs.core.Configurable;
|
|||
|
||||
public class ConnectRestExtensionContextImpl implements ConnectRestExtensionContext {
|
||||
|
||||
private Configurable<? extends Configurable> configurable;
|
||||
private Configurable<? extends Configurable<?>> configurable;
|
||||
private ConnectClusterState clusterState;
|
||||
|
||||
public ConnectRestExtensionContextImpl(
|
||||
Configurable<? extends Configurable> configurable,
|
||||
Configurable<? extends Configurable<?>> configurable,
|
||||
ConnectClusterState clusterState
|
||||
) {
|
||||
this.configurable = configurable;
|
||||
|
@ -36,7 +36,7 @@ public class ConnectRestExtensionContextImpl implements ConnectRestExtensionCont
|
|||
}
|
||||
|
||||
@Override
|
||||
public Configurable<? extends Configurable> configurable() {
|
||||
public Configurable<? extends Configurable<?>> configurable() {
|
||||
return configurable;
|
||||
}
|
||||
|
||||
|
|
|
@ -139,10 +139,10 @@ public class LoggingResource {
|
|||
} else {
|
||||
childLoggers = new ArrayList<>();
|
||||
Logger ancestorLogger = lookupLogger(namedLogger);
|
||||
Enumeration en = currentLoggers();
|
||||
Enumeration<Logger> en = currentLoggers();
|
||||
boolean present = false;
|
||||
while (en.hasMoreElements()) {
|
||||
Logger current = (Logger) en.nextElement();
|
||||
Logger current = en.nextElement();
|
||||
if (current.getName().startsWith(namedLogger)) {
|
||||
childLoggers.add(current);
|
||||
}
|
||||
|
|
|
@ -302,7 +302,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
|
|||
});
|
||||
}
|
||||
|
||||
private <V extends AbstractStatus> void send(final String key,
|
||||
private <V extends AbstractStatus<?>> void send(final String key,
|
||||
final V status,
|
||||
final CacheEntry<V> entry,
|
||||
final boolean safeWrite) {
|
||||
|
@ -492,7 +492,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
|
|||
}
|
||||
}
|
||||
|
||||
private byte[] serialize(AbstractStatus status) {
|
||||
private byte[] serialize(AbstractStatus<?> status) {
|
||||
Struct struct = new Struct(STATUS_SCHEMA_V0);
|
||||
struct.put(STATE_KEY_NAME, status.state().name());
|
||||
if (status.trace() != null)
|
||||
|
@ -645,7 +645,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
|
|||
}
|
||||
}
|
||||
|
||||
private static class CacheEntry<T extends AbstractStatus> {
|
||||
private static class CacheEntry<T extends AbstractStatus<?>> {
|
||||
private T value = null;
|
||||
private int sequence = 0;
|
||||
private boolean deleted = false;
|
||||
|
|
|
@ -455,13 +455,14 @@ public class AbstractHerderTest {
|
|||
verifyAll();
|
||||
}
|
||||
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
@Test()
|
||||
public void testConfigValidationTransformsExtendResults() throws Throwable {
|
||||
AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class, noneConnectorClientConfigOverridePolicy);
|
||||
|
||||
// 2 transform aliases defined -> 2 plugin lookups
|
||||
Set<PluginDesc<Transformation>> transformations = new HashSet<>();
|
||||
transformations.add(new PluginDesc<>(SampleTransformation.class, "1.0", classLoader));
|
||||
Set<PluginDesc<Transformation<?>>> transformations = new HashSet<>();
|
||||
transformations.add(transformationPluginDesc());
|
||||
EasyMock.expect(plugins.transformations()).andReturn(transformations).times(2);
|
||||
|
||||
replayAll();
|
||||
|
@ -512,12 +513,12 @@ public class AbstractHerderTest {
|
|||
AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class, noneConnectorClientConfigOverridePolicy);
|
||||
|
||||
// 2 transform aliases defined -> 2 plugin lookups
|
||||
Set<PluginDesc<Transformation>> transformations = new HashSet<>();
|
||||
transformations.add(new PluginDesc<>(SampleTransformation.class, "1.0", classLoader));
|
||||
Set<PluginDesc<Transformation<?>>> transformations = new HashSet<>();
|
||||
transformations.add(transformationPluginDesc());
|
||||
EasyMock.expect(plugins.transformations()).andReturn(transformations).times(1);
|
||||
|
||||
Set<PluginDesc<Predicate>> predicates = new HashSet<>();
|
||||
predicates.add(new PluginDesc<>(SamplePredicate.class, "1.0", classLoader));
|
||||
Set<PluginDesc<Predicate<?>>> predicates = new HashSet<>();
|
||||
predicates.add(predicatePluginDesc());
|
||||
EasyMock.expect(plugins.predicates()).andReturn(predicates).times(2);
|
||||
|
||||
replayAll();
|
||||
|
@ -579,6 +580,16 @@ public class AbstractHerderTest {
|
|||
verifyAll();
|
||||
}
|
||||
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
private PluginDesc<Predicate<?>> predicatePluginDesc() {
|
||||
return new PluginDesc(SamplePredicate.class, "1.0", classLoader);
|
||||
}
|
||||
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
private PluginDesc<Transformation<?>> transformationPluginDesc() {
|
||||
return new PluginDesc(SampleTransformation.class, "1.0", classLoader);
|
||||
}
|
||||
|
||||
@Test()
|
||||
public void testConfigValidationPrincipalOnlyOverride() throws Throwable {
|
||||
AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class, new PrincipalConnectorClientConfigOverridePolicy());
|
||||
|
|
|
@ -43,7 +43,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
|
|||
|
||||
public static final Plugins MOCK_PLUGINS = new Plugins(new HashMap<>()) {
|
||||
@Override
|
||||
public Set<PluginDesc<Transformation>> transformations() {
|
||||
public Set<PluginDesc<Transformation<?>>> transformations() {
|
||||
return Collections.emptySet();
|
||||
}
|
||||
};
|
||||
|
@ -149,7 +149,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
|
|||
final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props);
|
||||
final List<Transformation<R>> transformations = config.transformations();
|
||||
assertEquals(1, transformations.size());
|
||||
final SimpleTransformation xform = (SimpleTransformation) transformations.get(0);
|
||||
final SimpleTransformation<R> xform = (SimpleTransformation<R>) transformations.get(0);
|
||||
assertEquals(42, xform.magicNumber);
|
||||
}
|
||||
|
||||
|
@ -177,8 +177,8 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
|
|||
final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props);
|
||||
final List<Transformation<R>> transformations = config.transformations();
|
||||
assertEquals(2, transformations.size());
|
||||
assertEquals(42, ((SimpleTransformation) transformations.get(0)).magicNumber);
|
||||
assertEquals(84, ((SimpleTransformation) transformations.get(1)).magicNumber);
|
||||
assertEquals(42, ((SimpleTransformation<R>) transformations.get(0)).magicNumber);
|
||||
assertEquals(84, ((SimpleTransformation<R>) transformations.get(1)).magicNumber);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -427,11 +427,11 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
|
|||
}
|
||||
|
||||
|
||||
public static class Key extends AbstractKeyValueTransformation {
|
||||
public static class Key<R extends ConnectRecord<R>> extends AbstractKeyValueTransformation<R> {
|
||||
|
||||
|
||||
}
|
||||
public static class Value extends AbstractKeyValueTransformation {
|
||||
public static class Value<R extends ConnectRecord<R>> extends AbstractKeyValueTransformation<R> {
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,6 +48,7 @@ public class PluginDescTest {
|
|||
pluginLoader = new PluginClassLoader(location, new URL[0], systemLoader);
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Test
|
||||
public void testRegularPluginDesc() {
|
||||
PluginDesc<Connector> connectorDesc = new PluginDesc<>(
|
||||
|
@ -75,6 +76,7 @@ public class PluginDescTest {
|
|||
assertPluginDesc(transformDesc, Transformation.class, noVersion, pluginLoader.location());
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Test
|
||||
public void testPluginDescWithSystemClassLoader() {
|
||||
String location = "classpath";
|
||||
|
@ -129,6 +131,7 @@ public class PluginDescTest {
|
|||
assertPluginDesc(converterDesc, Converter.class, nullVersion, location);
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Test
|
||||
public void testPluginDescEquality() {
|
||||
PluginDesc<Connector> connectorDescPluginPath = new PluginDesc<>(
|
||||
|
@ -176,6 +179,7 @@ public class PluginDescTest {
|
|||
assertNotEquals(transformDescPluginPath, transformDescClasspath);
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Test
|
||||
public void testPluginDescComparison() {
|
||||
PluginDesc<Connector> connectorDescPluginPath = new PluginDesc<>(
|
||||
|
|
|
@ -67,7 +67,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
|
|||
|
||||
import javax.ws.rs.BadRequestException;
|
||||
import java.net.URL;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -77,6 +76,7 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertThrows;
|
||||
|
@ -125,7 +125,7 @@ public class ConnectorPluginsResourceTest {
|
|||
partialConfigs.add(configInfo);
|
||||
|
||||
configKeyInfo = new ConfigKeyInfo("test.int.config", "INT", true, null, "MEDIUM", "Test configuration for integer type.", "Test", 1, "MEDIUM", "test.int.config", Collections.emptyList());
|
||||
configValueInfo = new ConfigValueInfo("test.int.config", "1", Arrays.asList("1", "2", "3"), Collections.emptyList(), true);
|
||||
configValueInfo = new ConfigValueInfo("test.int.config", "1", asList("1", "2", "3"), Collections.emptyList(), true);
|
||||
configInfo = new ConfigInfo(configKeyInfo, configValueInfo);
|
||||
configs.add(configInfo);
|
||||
partialConfigs.add(configInfo);
|
||||
|
@ -137,7 +137,7 @@ public class ConnectorPluginsResourceTest {
|
|||
partialConfigs.add(configInfo);
|
||||
|
||||
configKeyInfo = new ConfigKeyInfo("test.list.config", "LIST", true, null, "HIGH", "Test configuration for list type.", "Test", 2, "LONG", "test.list.config", Collections.emptyList());
|
||||
configValueInfo = new ConfigValueInfo("test.list.config", "a,b", Arrays.asList("a", "b", "c"), Collections.emptyList(), true);
|
||||
configValueInfo = new ConfigValueInfo("test.list.config", "a,b", asList("a", "b", "c"), Collections.emptyList(), true);
|
||||
configInfo = new ConfigInfo(configKeyInfo, configValueInfo);
|
||||
configs.add(configInfo);
|
||||
partialConfigs.add(configInfo);
|
||||
|
@ -145,13 +145,13 @@ public class ConnectorPluginsResourceTest {
|
|||
CONFIG_INFOS = new ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(), ERROR_COUNT, Collections.singletonList("Test"), configs);
|
||||
PARTIAL_CONFIG_INFOS = new ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(), PARTIAL_CONFIG_ERROR_COUNT, Collections.singletonList("Test"), partialConfigs);
|
||||
|
||||
Class<?>[] abstractConnectorClasses = {
|
||||
List<Class<? extends Connector>> abstractConnectorClasses = asList(
|
||||
Connector.class,
|
||||
SourceConnector.class,
|
||||
SinkConnector.class
|
||||
};
|
||||
);
|
||||
|
||||
Class<?>[] connectorClasses = {
|
||||
List<Class<? extends Connector>> connectorClasses = asList(
|
||||
VerifiableSourceConnector.class,
|
||||
VerifiableSinkConnector.class,
|
||||
MockSourceConnector.class,
|
||||
|
@ -159,17 +159,17 @@ public class ConnectorPluginsResourceTest {
|
|||
MockConnector.class,
|
||||
SchemaSourceConnector.class,
|
||||
ConnectorPluginsResourceTestConnector.class
|
||||
};
|
||||
);
|
||||
|
||||
try {
|
||||
for (Class<?> klass : abstractConnectorClasses) {
|
||||
for (Class<? extends Connector> klass : abstractConnectorClasses) {
|
||||
@SuppressWarnings("unchecked")
|
||||
MockConnectorPluginDesc pluginDesc = new MockConnectorPluginDesc((Class<? extends Connector>) klass, "0.0.0");
|
||||
MockConnectorPluginDesc pluginDesc = new MockConnectorPluginDesc(klass, "0.0.0");
|
||||
CONNECTOR_PLUGINS.add(pluginDesc);
|
||||
}
|
||||
for (Class<?> klass : connectorClasses) {
|
||||
for (Class<? extends Connector> klass : connectorClasses) {
|
||||
@SuppressWarnings("unchecked")
|
||||
MockConnectorPluginDesc pluginDesc = new MockConnectorPluginDesc((Class<? extends Connector>) klass);
|
||||
MockConnectorPluginDesc pluginDesc = new MockConnectorPluginDesc(klass);
|
||||
CONNECTOR_PLUGINS.add(pluginDesc);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
@ -490,7 +490,7 @@ public class ConnectorPluginsResourceTest {
|
|||
|
||||
@Override
|
||||
public List<Object> validValues(String name, Map<String, Object> parsedConfig) {
|
||||
return Arrays.asList(1, 2, 3);
|
||||
return asList(1, 2, 3);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -502,7 +502,7 @@ public class ConnectorPluginsResourceTest {
|
|||
private static class ListRecommender implements Recommender {
|
||||
@Override
|
||||
public List<Object> validValues(String name, Map<String, Object> parsedConfig) {
|
||||
return Arrays.asList("a", "b", "c");
|
||||
return asList("a", "b", "c");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -313,7 +313,7 @@ public class ConnectorsResourceTest {
|
|||
herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
|
||||
expectAndCallbackNotLeaderException(cb);
|
||||
// Should forward request
|
||||
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false"), EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.eq(body), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
|
||||
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false"), EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.eq(body), EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class)))
|
||||
.andReturn(new RestClient.HttpResponse<>(201, new HashMap<>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES,
|
||||
ConnectorType.SOURCE)));
|
||||
|
||||
|
@ -798,7 +798,7 @@ public class ConnectorsResourceTest {
|
|||
expectAndCallbackNotLeaderException(cb);
|
||||
|
||||
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors/" + CONNECTOR_NAME + "/restart?forward=true&includeTasks=" + restartRequest.includeTasks() + "&onlyFailed=" + restartRequest.onlyFailed()),
|
||||
EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
|
||||
EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class)))
|
||||
.andReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null));
|
||||
|
||||
PowerMock.replayAll();
|
||||
|
@ -869,7 +869,7 @@ public class ConnectorsResourceTest {
|
|||
expectAndCallbackNotLeaderException(cb);
|
||||
|
||||
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors/" + CONNECTOR_NAME + "/restart?forward=true"),
|
||||
EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
|
||||
EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class)))
|
||||
.andReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null));
|
||||
|
||||
PowerMock.replayAll();
|
||||
|
@ -887,7 +887,7 @@ public class ConnectorsResourceTest {
|
|||
expectAndCallbackException(cb, new NotAssignedException("not owner test", ownerUrl));
|
||||
|
||||
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://owner:8083/connectors/" + CONNECTOR_NAME + "/restart?forward=false"),
|
||||
EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
|
||||
EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class)))
|
||||
.andReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null));
|
||||
|
||||
PowerMock.replayAll();
|
||||
|
@ -920,7 +920,7 @@ public class ConnectorsResourceTest {
|
|||
expectAndCallbackNotLeaderException(cb);
|
||||
|
||||
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors/" + CONNECTOR_NAME + "/tasks/0/restart?forward=true"),
|
||||
EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
|
||||
EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class)))
|
||||
.andReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null));
|
||||
|
||||
PowerMock.replayAll();
|
||||
|
@ -940,7 +940,7 @@ public class ConnectorsResourceTest {
|
|||
expectAndCallbackException(cb, new NotAssignedException("not owner test", ownerUrl));
|
||||
|
||||
EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://owner:8083/connectors/" + CONNECTOR_NAME + "/tasks/0/restart?forward=false"),
|
||||
EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
|
||||
EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.anyObject(), EasyMock.anyObject(WorkerConfig.class)))
|
||||
.andReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null));
|
||||
|
||||
PowerMock.replayAll();
|
||||
|
|
|
@ -978,7 +978,7 @@ public class KafkaConfigBackingStoreTest {
|
|||
|
||||
@Test
|
||||
public void testRecordToRestartRequest() throws Exception {
|
||||
ConsumerRecord record = new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
|
||||
ConsumerRecord<String, byte[]> record = new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
|
||||
CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty());
|
||||
Struct struct = RESTART_REQUEST_STRUCTS.get(0);
|
||||
SchemaAndValue schemaAndValue = new SchemaAndValue(struct.schema(), structToMap(struct));
|
||||
|
@ -990,7 +990,7 @@ public class KafkaConfigBackingStoreTest {
|
|||
|
||||
@Test
|
||||
public void testRecordToRestartRequestOnlyFailedInconsistent() throws Exception {
|
||||
ConsumerRecord record = new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
|
||||
ConsumerRecord<String, byte[]> record = new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
|
||||
CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty());
|
||||
Struct struct = ONLY_FAILED_MISSING_STRUCT;
|
||||
SchemaAndValue schemaAndValue = new SchemaAndValue(struct.schema(), structToMap(struct));
|
||||
|
@ -1002,7 +1002,7 @@ public class KafkaConfigBackingStoreTest {
|
|||
|
||||
@Test
|
||||
public void testRecordToRestartRequestIncludeTasksInconsistent() throws Exception {
|
||||
ConsumerRecord record = new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
|
||||
ConsumerRecord<String, byte[]> record = new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
|
||||
CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty());
|
||||
Struct struct = INLUDE_TASKS_MISSING_STRUCT;
|
||||
SchemaAndValue schemaAndValue = new SchemaAndValue(struct.schema(), structToMap(struct));
|
||||
|
|
|
@ -74,6 +74,7 @@ public class ReplaceFieldTest {
|
|||
assertEquals(schema, transformedRecord.valueSchema());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void schemaless() {
|
||||
final Map<String, String> props = new HashMap<>();
|
||||
|
@ -91,7 +92,7 @@ public class ReplaceFieldTest {
|
|||
final SinkRecord record = new SinkRecord("test", 0, null, null, null, value, 0);
|
||||
final SinkRecord transformedRecord = xform.apply(record);
|
||||
|
||||
final Map updatedValue = (Map) transformedRecord.value();
|
||||
final Map<String, Object> updatedValue = (Map<String, Object>) transformedRecord.value();
|
||||
assertEquals(3, updatedValue.size());
|
||||
assertEquals(42, updatedValue.get("xyz"));
|
||||
assertEquals(true, updatedValue.get("bar"));
|
||||
|
@ -144,7 +145,7 @@ public class ReplaceFieldTest {
|
|||
assertNull(transformedRecord.valueSchema());
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testExcludeBackwardsCompatibility() {
|
||||
final Map<String, String> props = new HashMap<>();
|
||||
|
@ -162,7 +163,7 @@ public class ReplaceFieldTest {
|
|||
final SinkRecord record = new SinkRecord("test", 0, null, null, null, value, 0);
|
||||
final SinkRecord transformedRecord = xform.apply(record);
|
||||
|
||||
final Map updatedValue = (Map) transformedRecord.value();
|
||||
final Map<String, Object> updatedValue = (Map<String, Object>) transformedRecord.value();
|
||||
assertEquals(3, updatedValue.size());
|
||||
assertEquals(42, updatedValue.get("xyz"));
|
||||
assertEquals(true, updatedValue.get("bar"));
|
||||
|
|
|
@ -79,7 +79,7 @@ public class HasHeaderKeyTest {
|
|||
}
|
||||
|
||||
private SimpleConfig config(Map<String, String> props) {
|
||||
return new SimpleConfig(new HasHeaderKey().config(), props);
|
||||
return new SimpleConfig(new HasHeaderKey<>().config(), props);
|
||||
}
|
||||
|
||||
private SourceRecord recordWithHeaders(String... headers) {
|
||||
|
|
Loading…
Reference in New Issue