mirror of https://github.com/apache/kafka.git
MINOR: Fix typos for connect (#13885)
Reviewers: Chris Egerton <chrise@aiven.io>
This commit is contained in:
parent
dfaae317b8
commit
d751c13950
|
@ -94,7 +94,7 @@ public class Checkpoint {
|
|||
@Override
|
||||
public String toString() {
|
||||
return String.format("Checkpoint{consumerGroupId=%s, topicPartition=%s, "
|
||||
+ "upstreamOffset=%d, downstreamOffset=%d, metatadata=%s}",
|
||||
+ "upstreamOffset=%d, downstreamOffset=%d, metadata=%s}",
|
||||
consumerGroupId, topicPartition, upstreamOffset, downstreamOffset, metadata);
|
||||
}
|
||||
|
||||
|
|
|
@ -105,7 +105,7 @@ public class MirrorMakerConfigTest {
|
|||
assertEquals("b__topic1", aClientConfig.replicationPolicy().formatRemoteTopic("b", "topic1"),
|
||||
"replication.policy.separator is honored");
|
||||
assertEquals(clusterABootstrap, aClientConfig.adminConfig().get("bootstrap.servers"),
|
||||
"client configs include boostrap.servers");
|
||||
"client configs include bootstrap.servers");
|
||||
assertEquals(ForwardingAdmin.class.getName(), aClientConfig.forwardingAdmin(aClientConfig.adminConfig()).getClass().getName(),
|
||||
"Cluster a uses the default ForwardingAdmin");
|
||||
assertEquals("PLAINTEXT", aClientConfig.adminConfig().get("security.protocol"),
|
||||
|
@ -115,7 +115,7 @@ public class MirrorMakerConfigTest {
|
|||
assertFalse(aClientConfig.adminConfig().containsKey("xxx"),
|
||||
"unknown properties aren't included in client configs");
|
||||
assertFalse(aClientConfig.adminConfig().containsKey("metric.reporters"),
|
||||
"top-leve metrics reporters aren't included in client configs");
|
||||
"top-level metrics reporters aren't included in client configs");
|
||||
assertEquals("secret2", aClientConfig.getPassword("ssl.key.password").value(),
|
||||
"security properties are translated from external sources");
|
||||
assertEquals("secret2", ((Password) aClientConfig.adminConfig().get("ssl.key.password")).value(),
|
||||
|
|
|
@ -98,7 +98,7 @@ public class TopicCreationConfig {
|
|||
}
|
||||
}
|
||||
|
||||
public static ConfigDef configDef(String group, short defaultReplicationFactor, int defaultParitionCount) {
|
||||
public static ConfigDef configDef(String group, short defaultReplicationFactor, int defaultPartitionCount) {
|
||||
int orderInGroup = 0;
|
||||
ConfigDef configDef = new ConfigDef();
|
||||
configDef
|
||||
|
@ -115,7 +115,7 @@ public class TopicCreationConfig {
|
|||
ConfigDef.Importance.LOW, REPLICATION_FACTOR_DOC, group, ++orderInGroup,
|
||||
ConfigDef.Width.LONG, "Replication Factor for Topics in " + group)
|
||||
.define(PARTITIONS_CONFIG, ConfigDef.Type.INT,
|
||||
defaultParitionCount, PARTITIONS_VALIDATOR,
|
||||
defaultPartitionCount, PARTITIONS_VALIDATOR,
|
||||
ConfigDef.Importance.LOW, PARTITIONS_DOC, group, ++orderInGroup,
|
||||
ConfigDef.Width.LONG, "Partition Count for Topics in " + group);
|
||||
return configDef;
|
||||
|
|
|
@ -34,7 +34,7 @@ public class WorkerTransactionContext implements TransactionContext {
|
|||
|
||||
private static final Logger log = LoggerFactory.getLogger(WorkerTransactionContext.class);
|
||||
|
||||
private final Set<SourceRecord> commitableRecords = new HashSet<>();
|
||||
private final Set<SourceRecord> committableRecords = new HashSet<>();
|
||||
private final Set<SourceRecord> abortableRecords = new HashSet<>();
|
||||
private boolean batchCommitRequested = false;
|
||||
private boolean batchAbortRequested = false;
|
||||
|
@ -47,7 +47,7 @@ public class WorkerTransactionContext implements TransactionContext {
|
|||
@Override
|
||||
public synchronized void commitTransaction(SourceRecord record) {
|
||||
Objects.requireNonNull(record, "Source record used to define transaction boundaries may not be null");
|
||||
commitableRecords.add(record);
|
||||
committableRecords.add(record);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -82,7 +82,7 @@ public class WorkerTransactionContext implements TransactionContext {
|
|||
// Essentially, instead of telling the task that it screwed up and trusting it to do the right thing, we rat on it to the
|
||||
// worker and let it get punished accordingly.
|
||||
checkRecordRequestConsistency(record);
|
||||
return commitableRecords.remove(record);
|
||||
return committableRecords.remove(record);
|
||||
}
|
||||
|
||||
public synchronized boolean shouldAbortOn(SourceRecord record) {
|
||||
|
@ -97,7 +97,7 @@ public class WorkerTransactionContext implements TransactionContext {
|
|||
}
|
||||
|
||||
private void checkRecordRequestConsistency(SourceRecord record) {
|
||||
if (commitableRecords.contains(record) && abortableRecords.contains(record)) {
|
||||
if (committableRecords.contains(record) && abortableRecords.contains(record)) {
|
||||
log.trace("Connector will fail as it has requested both commit and abort of transaction for same record: {}", record);
|
||||
throw new IllegalStateException(String.format(
|
||||
"Connector requested both commit and abort of same record against topic/partition %s/%s",
|
||||
|
|
|
@ -128,7 +128,7 @@ public class ErrorHandlingIntegrationTest {
|
|||
props.put(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true");
|
||||
props.put(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
|
||||
|
||||
// tolerate all erros
|
||||
// tolerate all errors
|
||||
props.put(ERRORS_TOLERANCE_CONFIG, "all");
|
||||
|
||||
// retry for up to one second
|
||||
|
@ -205,7 +205,7 @@ public class ErrorHandlingIntegrationTest {
|
|||
props.put(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true");
|
||||
props.put(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
|
||||
|
||||
// tolerate all erros
|
||||
// tolerate all errors
|
||||
props.put(ERRORS_TOLERANCE_CONFIG, "all");
|
||||
|
||||
// retry for up to one second
|
||||
|
|
|
@ -399,7 +399,7 @@ public class ErrorHandlingTaskTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testErrorHandlingInSourceTasksWthBadConverter() throws Exception {
|
||||
public void testErrorHandlingInSourceTasksWithBadConverter() throws Exception {
|
||||
Map<String, String> reportProps = new HashMap<>();
|
||||
reportProps.put(ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG, "true");
|
||||
reportProps.put(ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true");
|
||||
|
|
|
@ -37,7 +37,7 @@ public class PluginDescTest {
|
|||
private final ClassLoader systemLoader = ClassLoader.getSystemClassLoader();
|
||||
private final String regularVersion = "1.0.0";
|
||||
private final String newerVersion = "1.0.1";
|
||||
private final String snaphotVersion = "1.0.0-SNAPSHOT";
|
||||
private final String snapshotVersion = "1.0.0-SNAPSHOT";
|
||||
private final String noVersion = "undefined";
|
||||
private PluginClassLoader pluginLoader;
|
||||
|
||||
|
@ -62,11 +62,11 @@ public class PluginDescTest {
|
|||
|
||||
PluginDesc<Converter> converterDesc = new PluginDesc<>(
|
||||
Converter.class,
|
||||
snaphotVersion,
|
||||
snapshotVersion,
|
||||
pluginLoader
|
||||
);
|
||||
|
||||
assertPluginDesc(converterDesc, Converter.class, snaphotVersion, pluginLoader.location());
|
||||
assertPluginDesc(converterDesc, Converter.class, snapshotVersion, pluginLoader.location());
|
||||
|
||||
PluginDesc<Transformation> transformDesc = new PluginDesc<>(
|
||||
Transformation.class,
|
||||
|
@ -91,11 +91,11 @@ public class PluginDescTest {
|
|||
|
||||
PluginDesc<Converter> converterDesc = new PluginDesc<>(
|
||||
Converter.class,
|
||||
snaphotVersion,
|
||||
snapshotVersion,
|
||||
systemLoader
|
||||
);
|
||||
|
||||
assertPluginDesc(converterDesc, Converter.class, snaphotVersion, location);
|
||||
assertPluginDesc(converterDesc, Converter.class, snapshotVersion, location);
|
||||
|
||||
PluginDesc<Transformation> transformDesc = new PluginDesc<>(
|
||||
Transformation.class,
|
||||
|
@ -137,13 +137,13 @@ public class PluginDescTest {
|
|||
public void testPluginDescEquality() {
|
||||
PluginDesc<Connector> connectorDescPluginPath = new PluginDesc<>(
|
||||
Connector.class,
|
||||
snaphotVersion,
|
||||
snapshotVersion,
|
||||
pluginLoader
|
||||
);
|
||||
|
||||
PluginDesc<Connector> connectorDescClasspath = new PluginDesc<>(
|
||||
Connector.class,
|
||||
snaphotVersion,
|
||||
snapshotVersion,
|
||||
systemLoader
|
||||
);
|
||||
|
||||
|
@ -205,7 +205,7 @@ public class PluginDescTest {
|
|||
|
||||
PluginDesc<Converter> converterDescClasspath = new PluginDesc<>(
|
||||
Converter.class,
|
||||
snaphotVersion,
|
||||
snapshotVersion,
|
||||
systemLoader
|
||||
);
|
||||
|
||||
|
|
|
@ -693,7 +693,7 @@ public class PluginUtilsTest {
|
|||
|
||||
private void assertUrls(List<Path> expected, List<Path> actual) {
|
||||
Collections.sort(expected);
|
||||
// not sorting 'actual' because it should be returned sorted from withing the PluginUtils.
|
||||
// not sorting 'actual' because it should be returned sorted from within the PluginUtils.
|
||||
assertEquals(expected, actual);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -135,7 +135,7 @@ public class ConnectRestServerTest {
|
|||
Assert.assertEquals("https://localhost:8443/", server.advertisedUrl().toString());
|
||||
server.stop();
|
||||
|
||||
// Listener is overriden by advertised values
|
||||
// Listener is overridden by advertised values
|
||||
configMap = new HashMap<>(baseServerProps());
|
||||
configMap.put(RestServerConfig.LISTENERS_CONFIG, "https://localhost:8443");
|
||||
configMap.put(RestServerConfig.REST_ADVERTISED_LISTENER_CONFIG, "http");
|
||||
|
|
|
@ -164,11 +164,11 @@ public class KafkaConfigBackingStoreTest {
|
|||
= new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 0);
|
||||
|
||||
private static final Struct ONLY_FAILED_MISSING_STRUCT = new Struct(KafkaConfigBackingStore.RESTART_REQUEST_V0).put(INCLUDE_TASKS_FIELD_NAME, false);
|
||||
private static final Struct INLUDE_TASKS_MISSING_STRUCT = new Struct(KafkaConfigBackingStore.RESTART_REQUEST_V0).put(ONLY_FAILED_FIELD_NAME, true);
|
||||
private static final Struct INCLUDE_TASKS_MISSING_STRUCT = new Struct(KafkaConfigBackingStore.RESTART_REQUEST_V0).put(ONLY_FAILED_FIELD_NAME, true);
|
||||
private static final List<Struct> RESTART_REQUEST_STRUCTS = Arrays.asList(
|
||||
new Struct(KafkaConfigBackingStore.RESTART_REQUEST_V0).put(ONLY_FAILED_FIELD_NAME, true).put(INCLUDE_TASKS_FIELD_NAME, false),
|
||||
ONLY_FAILED_MISSING_STRUCT,
|
||||
INLUDE_TASKS_MISSING_STRUCT);
|
||||
INCLUDE_TASKS_MISSING_STRUCT);
|
||||
|
||||
// The exact format doesn't matter here since both conversions are mocked
|
||||
private static final List<byte[]> CONFIGS_SERIALIZED = Arrays.asList(
|
||||
|
@ -1414,7 +1414,7 @@ public class KafkaConfigBackingStoreTest {
|
|||
public void testRecordToRestartRequestIncludeTasksInconsistent() {
|
||||
ConsumerRecord<String, byte[]> record = new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, RESTART_CONNECTOR_KEYS.get(0),
|
||||
CONFIGS_SERIALIZED.get(0), new RecordHeaders(), Optional.empty());
|
||||
Struct struct = INLUDE_TASKS_MISSING_STRUCT;
|
||||
Struct struct = INCLUDE_TASKS_MISSING_STRUCT;
|
||||
SchemaAndValue schemaAndValue = new SchemaAndValue(struct.schema(), structToMap(struct));
|
||||
RestartRequest restartRequest = configStorage.recordToRestartRequest(record, schemaAndValue);
|
||||
assertEquals(CONNECTOR_1_NAME, restartRequest.connectorName());
|
||||
|
|
|
@ -343,7 +343,7 @@ public class EmbeddedKafkaCluster {
|
|||
Throwable cause = e.getCause();
|
||||
if (cause instanceof UnknownTopicOrPartitionException) {
|
||||
results.put(topicName, Optional.empty());
|
||||
log.info("Found non-existant topic {}", topicName);
|
||||
log.info("Found non-existent topic {}", topicName);
|
||||
continue;
|
||||
}
|
||||
throw new AssertionError("Could not describe topic(s)" + topicNames, e);
|
||||
|
|
Loading…
Reference in New Issue