Remove duplicated TopicPartition implementation.

This commit is contained in:
Ewen Cheslack-Postava 2015-07-28 23:50:45 -07:00
parent dec137998a
commit e849e10c7e
9 changed files with 14 additions and 122 deletions

View File

@ -1,100 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.connector;
/**
* A topic name and partition number
*/
public class TopicPartition {
private int hash = 0;
private final int partition;
private final String topic;
public TopicPartition(String topic, int partition) {
this.partition = partition;
this.topic = topic;
}
/**
* Parse the TopicPartition from the string representation.
* @param topicPartition string representation of a TopicPartition
*/
public TopicPartition(String topicPartition) {
int lastDash = topicPartition.lastIndexOf('-');
if (lastDash < 0) {
throw new IllegalArgumentException("Invalid TopicPartition format");
}
this.topic = topicPartition.substring(0, lastDash);
String partitionStr = topicPartition.substring(lastDash + 1);
this.partition = Integer.parseInt(partitionStr);
}
public int partition() {
return partition;
}
public String topic() {
return topic;
}
@Override
public int hashCode() {
if (hash != 0) {
return hash;
}
final int prime = 31;
int result = 1;
result = prime * result + partition;
result = prime * result + ((topic == null) ? 0 : topic.hashCode());
this.hash = result;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
TopicPartition other = (TopicPartition) obj;
if (partition != other.partition) {
return false;
}
if (topic == null) {
if (other.topic != null) {
return false;
}
} else if (!topic.equals(other.topic)) {
return false;
}
return true;
}
@Override
public String toString() {
return topic + "-" + partition;
}
}

View File

@ -16,8 +16,8 @@
**/
package org.apache.kafka.copycat.sink;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.copycat.connector.Task;
import org.apache.kafka.copycat.connector.TopicPartition;
import org.apache.kafka.copycat.errors.CopycatException;
import java.util.Collection;

View File

@ -17,7 +17,7 @@
package org.apache.kafka.copycat.sink;
import org.apache.kafka.copycat.connector.TopicPartition;
import org.apache.kafka.common.TopicPartition;
import java.util.HashMap;
import java.util.Map;

View File

@ -17,7 +17,7 @@
package org.apache.kafka.copycat.file;
import org.apache.kafka.copycat.connector.TopicPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.apache.kafka.copycat.sink.SinkRecord;

View File

@ -17,8 +17,8 @@
package org.apache.kafka.copycat.file;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.copycat.connector.ConnectorContext;
import org.apache.kafka.copycat.connector.TopicPartition;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.sink.SinkConnector;
import org.junit.Before;

View File

@ -17,7 +17,7 @@
package org.apache.kafka.copycat.file;
import org.apache.kafka.copycat.connector.TopicPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.sink.SinkRecord;
import org.junit.Before;

View File

@ -17,11 +17,11 @@
package org.apache.kafka.copycat.runtime;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.copycat.cli.WorkerConfig;
import org.apache.kafka.copycat.connector.TopicPartition;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.apache.kafka.copycat.sink.SinkRecord;
@ -118,14 +118,9 @@ public class WorkerSinkTask implements WorkerTask {
* the write commit. This should only be invoked by the WorkerSinkTaskThread.
**/
public void commitOffsets(long now, boolean sync, final int seqno, boolean flush) {
// Because of the different representations, we need to build two copies of the same map
HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
HashMap<org.apache.kafka.common.TopicPartition, Long> offsetsKafka
= new HashMap<org.apache.kafka.common.TopicPartition, Long>();
for (org.apache.kafka.common.TopicPartition tp : consumer.subscriptions()) {
long pos = consumer.position(tp);
offsets.put(new TopicPartition(tp.topic(), tp.partition()), pos);
offsetsKafka.put(tp, pos);
for (TopicPartition tp : consumer.subscriptions()) {
offsets.put(tp, consumer.position(tp));
}
// We only don't flush the task in one case: when shutting down, the task has already been
// stopped and all data should have already been flushed
@ -141,11 +136,11 @@ public class WorkerSinkTask implements WorkerTask {
ConsumerCommitCallback cb = new ConsumerCommitCallback() {
@Override
public void onComplete(Map<org.apache.kafka.common.TopicPartition, Long> offsets, Exception error) {
public void onComplete(Map<TopicPartition, Long> offsets, Exception error) {
workThread.onCommitCompleted(error, seqno);
}
};
consumer.commit(offsetsKafka, sync ? CommitType.SYNC : CommitType.ASYNC, cb);
consumer.commit(offsets, sync ? CommitType.SYNC : CommitType.ASYNC, cb);
}
public Time getTime() {
@ -193,11 +188,10 @@ public class WorkerSinkTask implements WorkerTask {
// We ask for offsets after this poll to make sure any offsets committed before the rebalance are picked up correctly.
newConsumer.poll(0);
Map<TopicPartition, Long> offsets = context.getOffsets();
for (org.apache.kafka.common.TopicPartition kafkatp : newConsumer.subscriptions()) {
TopicPartition tp = new TopicPartition(kafkatp.topic(), kafkatp.partition());
for (TopicPartition tp : newConsumer.subscriptions()) {
Long offset = offsets.get(tp);
if (offset != null)
newConsumer.seek(kafkatp, offset);
newConsumer.seek(tp, offset);
}
return newConsumer;
}

View File

@ -59,8 +59,6 @@ public class WorkerSinkTaskTest extends ThreadedTest {
private static final String TOPIC_PARTITION_STR = "test-12";
private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION);
private static final org.apache.kafka.copycat.connector.TopicPartition TOPIC_PARTITION_COPYCAT =
new org.apache.kafka.copycat.connector.TopicPartition(TOPIC, PARTITION);
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
private Time time;
@ -348,7 +346,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
}
);
sinkTask.flush(Collections.singletonMap(TOPIC_PARTITION_COPYCAT, finalOffset));
sinkTask.flush(Collections.singletonMap(TOPIC_PARTITION, finalOffset));
IExpectationSetters<Object> flushExpectation = PowerMock.expectLastCall();
if (flushError != null) {
flushExpectation.andThrow(flushError).once();

View File

@ -17,9 +17,9 @@
package org.apache.kafka.copycat.runtime.standalone;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.copycat.connector.Connector;
import org.apache.kafka.copycat.connector.Task;
import org.apache.kafka.copycat.connector.TopicPartition;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.runtime.ConnectorConfig;
import org.apache.kafka.copycat.runtime.Worker;