mirror of https://github.com/apache/kafka.git
Remove duplicated TopicPartition implementation.
This commit is contained in:
parent
dec137998a
commit
e849e10c7e
|
@ -1,100 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.connector;
|
||||
|
||||
/**
|
||||
* A topic name and partition number
|
||||
*/
|
||||
public class TopicPartition {
|
||||
|
||||
private int hash = 0;
|
||||
private final int partition;
|
||||
private final String topic;
|
||||
|
||||
public TopicPartition(String topic, int partition) {
|
||||
this.partition = partition;
|
||||
this.topic = topic;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse the TopicPartition from the string representation.
|
||||
* @param topicPartition string representation of a TopicPartition
|
||||
*/
|
||||
public TopicPartition(String topicPartition) {
|
||||
int lastDash = topicPartition.lastIndexOf('-');
|
||||
if (lastDash < 0) {
|
||||
throw new IllegalArgumentException("Invalid TopicPartition format");
|
||||
}
|
||||
this.topic = topicPartition.substring(0, lastDash);
|
||||
String partitionStr = topicPartition.substring(lastDash + 1);
|
||||
this.partition = Integer.parseInt(partitionStr);
|
||||
}
|
||||
|
||||
public int partition() {
|
||||
return partition;
|
||||
}
|
||||
|
||||
public String topic() {
|
||||
return topic;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
if (hash != 0) {
|
||||
return hash;
|
||||
}
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = prime * result + partition;
|
||||
result = prime * result + ((topic == null) ? 0 : topic.hashCode());
|
||||
this.hash = result;
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
TopicPartition other = (TopicPartition) obj;
|
||||
if (partition != other.partition) {
|
||||
return false;
|
||||
}
|
||||
if (topic == null) {
|
||||
if (other.topic != null) {
|
||||
return false;
|
||||
}
|
||||
} else if (!topic.equals(other.topic)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return topic + "-" + partition;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -16,8 +16,8 @@
|
|||
**/
|
||||
package org.apache.kafka.copycat.sink;
|
||||
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.copycat.connector.Task;
|
||||
import org.apache.kafka.copycat.connector.TopicPartition;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
|
||||
import java.util.Collection;
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package org.apache.kafka.copycat.sink;
|
||||
|
||||
import org.apache.kafka.copycat.connector.TopicPartition;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package org.apache.kafka.copycat.file;
|
||||
|
||||
import org.apache.kafka.copycat.connector.TopicPartition;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||
import org.apache.kafka.copycat.sink.SinkRecord;
|
||||
|
|
|
@ -17,8 +17,8 @@
|
|||
|
||||
package org.apache.kafka.copycat.file;
|
||||
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.copycat.connector.ConnectorContext;
|
||||
import org.apache.kafka.copycat.connector.TopicPartition;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.apache.kafka.copycat.sink.SinkConnector;
|
||||
import org.junit.Before;
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package org.apache.kafka.copycat.file;
|
||||
|
||||
import org.apache.kafka.copycat.connector.TopicPartition;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.apache.kafka.copycat.sink.SinkRecord;
|
||||
import org.junit.Before;
|
||||
|
|
|
@ -17,11 +17,11 @@
|
|||
|
||||
package org.apache.kafka.copycat.runtime;
|
||||
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.clients.consumer.*;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.copycat.cli.WorkerConfig;
|
||||
import org.apache.kafka.copycat.connector.TopicPartition;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||
import org.apache.kafka.copycat.sink.SinkRecord;
|
||||
|
@ -118,14 +118,9 @@ public class WorkerSinkTask implements WorkerTask {
|
|||
* the write commit. This should only be invoked by the WorkerSinkTaskThread.
|
||||
**/
|
||||
public void commitOffsets(long now, boolean sync, final int seqno, boolean flush) {
|
||||
// Because of the different representations, we need to build two copies of the same map
|
||||
HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
|
||||
HashMap<org.apache.kafka.common.TopicPartition, Long> offsetsKafka
|
||||
= new HashMap<org.apache.kafka.common.TopicPartition, Long>();
|
||||
for (org.apache.kafka.common.TopicPartition tp : consumer.subscriptions()) {
|
||||
long pos = consumer.position(tp);
|
||||
offsets.put(new TopicPartition(tp.topic(), tp.partition()), pos);
|
||||
offsetsKafka.put(tp, pos);
|
||||
for (TopicPartition tp : consumer.subscriptions()) {
|
||||
offsets.put(tp, consumer.position(tp));
|
||||
}
|
||||
// We only don't flush the task in one case: when shutting down, the task has already been
|
||||
// stopped and all data should have already been flushed
|
||||
|
@ -141,11 +136,11 @@ public class WorkerSinkTask implements WorkerTask {
|
|||
|
||||
ConsumerCommitCallback cb = new ConsumerCommitCallback() {
|
||||
@Override
|
||||
public void onComplete(Map<org.apache.kafka.common.TopicPartition, Long> offsets, Exception error) {
|
||||
public void onComplete(Map<TopicPartition, Long> offsets, Exception error) {
|
||||
workThread.onCommitCompleted(error, seqno);
|
||||
}
|
||||
};
|
||||
consumer.commit(offsetsKafka, sync ? CommitType.SYNC : CommitType.ASYNC, cb);
|
||||
consumer.commit(offsets, sync ? CommitType.SYNC : CommitType.ASYNC, cb);
|
||||
}
|
||||
|
||||
public Time getTime() {
|
||||
|
@ -193,11 +188,10 @@ public class WorkerSinkTask implements WorkerTask {
|
|||
// We ask for offsets after this poll to make sure any offsets committed before the rebalance are picked up correctly.
|
||||
newConsumer.poll(0);
|
||||
Map<TopicPartition, Long> offsets = context.getOffsets();
|
||||
for (org.apache.kafka.common.TopicPartition kafkatp : newConsumer.subscriptions()) {
|
||||
TopicPartition tp = new TopicPartition(kafkatp.topic(), kafkatp.partition());
|
||||
for (TopicPartition tp : newConsumer.subscriptions()) {
|
||||
Long offset = offsets.get(tp);
|
||||
if (offset != null)
|
||||
newConsumer.seek(kafkatp, offset);
|
||||
newConsumer.seek(tp, offset);
|
||||
}
|
||||
return newConsumer;
|
||||
}
|
||||
|
|
|
@ -59,8 +59,6 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
|||
private static final String TOPIC_PARTITION_STR = "test-12";
|
||||
|
||||
private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION);
|
||||
private static final org.apache.kafka.copycat.connector.TopicPartition TOPIC_PARTITION_COPYCAT =
|
||||
new org.apache.kafka.copycat.connector.TopicPartition(TOPIC, PARTITION);
|
||||
|
||||
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
|
||||
private Time time;
|
||||
|
@ -348,7 +346,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
|||
}
|
||||
);
|
||||
|
||||
sinkTask.flush(Collections.singletonMap(TOPIC_PARTITION_COPYCAT, finalOffset));
|
||||
sinkTask.flush(Collections.singletonMap(TOPIC_PARTITION, finalOffset));
|
||||
IExpectationSetters<Object> flushExpectation = PowerMock.expectLastCall();
|
||||
if (flushError != null) {
|
||||
flushExpectation.andThrow(flushError).once();
|
||||
|
|
|
@ -17,9 +17,9 @@
|
|||
|
||||
package org.apache.kafka.copycat.runtime.standalone;
|
||||
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.copycat.connector.Connector;
|
||||
import org.apache.kafka.copycat.connector.Task;
|
||||
import org.apache.kafka.copycat.connector.TopicPartition;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.apache.kafka.copycat.runtime.ConnectorConfig;
|
||||
import org.apache.kafka.copycat.runtime.Worker;
|
||||
|
|
Loading…
Reference in New Issue