mirror of https://github.com/apache/kafka.git
Remove duplicated TopicPartition implementation.
This commit is contained in:
parent
dec137998a
commit
e849e10c7e
|
@ -1,100 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
**/
|
|
||||||
|
|
||||||
package org.apache.kafka.copycat.connector;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A topic name and partition number
|
|
||||||
*/
|
|
||||||
public class TopicPartition {
|
|
||||||
|
|
||||||
private int hash = 0;
|
|
||||||
private final int partition;
|
|
||||||
private final String topic;
|
|
||||||
|
|
||||||
public TopicPartition(String topic, int partition) {
|
|
||||||
this.partition = partition;
|
|
||||||
this.topic = topic;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Parse the TopicPartition from the string representation.
|
|
||||||
* @param topicPartition string representation of a TopicPartition
|
|
||||||
*/
|
|
||||||
public TopicPartition(String topicPartition) {
|
|
||||||
int lastDash = topicPartition.lastIndexOf('-');
|
|
||||||
if (lastDash < 0) {
|
|
||||||
throw new IllegalArgumentException("Invalid TopicPartition format");
|
|
||||||
}
|
|
||||||
this.topic = topicPartition.substring(0, lastDash);
|
|
||||||
String partitionStr = topicPartition.substring(lastDash + 1);
|
|
||||||
this.partition = Integer.parseInt(partitionStr);
|
|
||||||
}
|
|
||||||
|
|
||||||
public int partition() {
|
|
||||||
return partition;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String topic() {
|
|
||||||
return topic;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int hashCode() {
|
|
||||||
if (hash != 0) {
|
|
||||||
return hash;
|
|
||||||
}
|
|
||||||
final int prime = 31;
|
|
||||||
int result = 1;
|
|
||||||
result = prime * result + partition;
|
|
||||||
result = prime * result + ((topic == null) ? 0 : topic.hashCode());
|
|
||||||
this.hash = result;
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean equals(Object obj) {
|
|
||||||
if (this == obj) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if (obj == null) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (getClass() != obj.getClass()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
TopicPartition other = (TopicPartition) obj;
|
|
||||||
if (partition != other.partition) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (topic == null) {
|
|
||||||
if (other.topic != null) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
} else if (!topic.equals(other.topic)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return topic + "-" + partition;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
|
@ -16,8 +16,8 @@
|
||||||
**/
|
**/
|
||||||
package org.apache.kafka.copycat.sink;
|
package org.apache.kafka.copycat.sink;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.TopicPartition;
|
||||||
import org.apache.kafka.copycat.connector.Task;
|
import org.apache.kafka.copycat.connector.Task;
|
||||||
import org.apache.kafka.copycat.connector.TopicPartition;
|
|
||||||
import org.apache.kafka.copycat.errors.CopycatException;
|
import org.apache.kafka.copycat.errors.CopycatException;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
|
|
||||||
package org.apache.kafka.copycat.sink;
|
package org.apache.kafka.copycat.sink;
|
||||||
|
|
||||||
import org.apache.kafka.copycat.connector.TopicPartition;
|
import org.apache.kafka.common.TopicPartition;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
|
|
||||||
package org.apache.kafka.copycat.file;
|
package org.apache.kafka.copycat.file;
|
||||||
|
|
||||||
import org.apache.kafka.copycat.connector.TopicPartition;
|
import org.apache.kafka.common.TopicPartition;
|
||||||
import org.apache.kafka.copycat.errors.CopycatException;
|
import org.apache.kafka.copycat.errors.CopycatException;
|
||||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||||
import org.apache.kafka.copycat.sink.SinkRecord;
|
import org.apache.kafka.copycat.sink.SinkRecord;
|
||||||
|
|
|
@ -17,8 +17,8 @@
|
||||||
|
|
||||||
package org.apache.kafka.copycat.file;
|
package org.apache.kafka.copycat.file;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.TopicPartition;
|
||||||
import org.apache.kafka.copycat.connector.ConnectorContext;
|
import org.apache.kafka.copycat.connector.ConnectorContext;
|
||||||
import org.apache.kafka.copycat.connector.TopicPartition;
|
|
||||||
import org.apache.kafka.copycat.errors.CopycatException;
|
import org.apache.kafka.copycat.errors.CopycatException;
|
||||||
import org.apache.kafka.copycat.sink.SinkConnector;
|
import org.apache.kafka.copycat.sink.SinkConnector;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
|
|
||||||
package org.apache.kafka.copycat.file;
|
package org.apache.kafka.copycat.file;
|
||||||
|
|
||||||
import org.apache.kafka.copycat.connector.TopicPartition;
|
import org.apache.kafka.common.TopicPartition;
|
||||||
import org.apache.kafka.copycat.errors.CopycatException;
|
import org.apache.kafka.copycat.errors.CopycatException;
|
||||||
import org.apache.kafka.copycat.sink.SinkRecord;
|
import org.apache.kafka.copycat.sink.SinkRecord;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
|
@ -17,11 +17,11 @@
|
||||||
|
|
||||||
package org.apache.kafka.copycat.runtime;
|
package org.apache.kafka.copycat.runtime;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.TopicPartition;
|
||||||
import org.apache.kafka.common.utils.Time;
|
import org.apache.kafka.common.utils.Time;
|
||||||
import org.apache.kafka.clients.consumer.*;
|
import org.apache.kafka.clients.consumer.*;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
import org.apache.kafka.common.utils.Utils;
|
||||||
import org.apache.kafka.copycat.cli.WorkerConfig;
|
import org.apache.kafka.copycat.cli.WorkerConfig;
|
||||||
import org.apache.kafka.copycat.connector.TopicPartition;
|
|
||||||
import org.apache.kafka.copycat.errors.CopycatException;
|
import org.apache.kafka.copycat.errors.CopycatException;
|
||||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||||
import org.apache.kafka.copycat.sink.SinkRecord;
|
import org.apache.kafka.copycat.sink.SinkRecord;
|
||||||
|
@ -118,14 +118,9 @@ public class WorkerSinkTask implements WorkerTask {
|
||||||
* the write commit. This should only be invoked by the WorkerSinkTaskThread.
|
* the write commit. This should only be invoked by the WorkerSinkTaskThread.
|
||||||
**/
|
**/
|
||||||
public void commitOffsets(long now, boolean sync, final int seqno, boolean flush) {
|
public void commitOffsets(long now, boolean sync, final int seqno, boolean flush) {
|
||||||
// Because of the different representations, we need to build two copies of the same map
|
|
||||||
HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
|
HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
|
||||||
HashMap<org.apache.kafka.common.TopicPartition, Long> offsetsKafka
|
for (TopicPartition tp : consumer.subscriptions()) {
|
||||||
= new HashMap<org.apache.kafka.common.TopicPartition, Long>();
|
offsets.put(tp, consumer.position(tp));
|
||||||
for (org.apache.kafka.common.TopicPartition tp : consumer.subscriptions()) {
|
|
||||||
long pos = consumer.position(tp);
|
|
||||||
offsets.put(new TopicPartition(tp.topic(), tp.partition()), pos);
|
|
||||||
offsetsKafka.put(tp, pos);
|
|
||||||
}
|
}
|
||||||
// We only don't flush the task in one case: when shutting down, the task has already been
|
// We only don't flush the task in one case: when shutting down, the task has already been
|
||||||
// stopped and all data should have already been flushed
|
// stopped and all data should have already been flushed
|
||||||
|
@ -141,11 +136,11 @@ public class WorkerSinkTask implements WorkerTask {
|
||||||
|
|
||||||
ConsumerCommitCallback cb = new ConsumerCommitCallback() {
|
ConsumerCommitCallback cb = new ConsumerCommitCallback() {
|
||||||
@Override
|
@Override
|
||||||
public void onComplete(Map<org.apache.kafka.common.TopicPartition, Long> offsets, Exception error) {
|
public void onComplete(Map<TopicPartition, Long> offsets, Exception error) {
|
||||||
workThread.onCommitCompleted(error, seqno);
|
workThread.onCommitCompleted(error, seqno);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
consumer.commit(offsetsKafka, sync ? CommitType.SYNC : CommitType.ASYNC, cb);
|
consumer.commit(offsets, sync ? CommitType.SYNC : CommitType.ASYNC, cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Time getTime() {
|
public Time getTime() {
|
||||||
|
@ -193,11 +188,10 @@ public class WorkerSinkTask implements WorkerTask {
|
||||||
// We ask for offsets after this poll to make sure any offsets committed before the rebalance are picked up correctly.
|
// We ask for offsets after this poll to make sure any offsets committed before the rebalance are picked up correctly.
|
||||||
newConsumer.poll(0);
|
newConsumer.poll(0);
|
||||||
Map<TopicPartition, Long> offsets = context.getOffsets();
|
Map<TopicPartition, Long> offsets = context.getOffsets();
|
||||||
for (org.apache.kafka.common.TopicPartition kafkatp : newConsumer.subscriptions()) {
|
for (TopicPartition tp : newConsumer.subscriptions()) {
|
||||||
TopicPartition tp = new TopicPartition(kafkatp.topic(), kafkatp.partition());
|
|
||||||
Long offset = offsets.get(tp);
|
Long offset = offsets.get(tp);
|
||||||
if (offset != null)
|
if (offset != null)
|
||||||
newConsumer.seek(kafkatp, offset);
|
newConsumer.seek(tp, offset);
|
||||||
}
|
}
|
||||||
return newConsumer;
|
return newConsumer;
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,8 +59,6 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
||||||
private static final String TOPIC_PARTITION_STR = "test-12";
|
private static final String TOPIC_PARTITION_STR = "test-12";
|
||||||
|
|
||||||
private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION);
|
private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION);
|
||||||
private static final org.apache.kafka.copycat.connector.TopicPartition TOPIC_PARTITION_COPYCAT =
|
|
||||||
new org.apache.kafka.copycat.connector.TopicPartition(TOPIC, PARTITION);
|
|
||||||
|
|
||||||
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
|
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
|
||||||
private Time time;
|
private Time time;
|
||||||
|
@ -348,7 +346,7 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
sinkTask.flush(Collections.singletonMap(TOPIC_PARTITION_COPYCAT, finalOffset));
|
sinkTask.flush(Collections.singletonMap(TOPIC_PARTITION, finalOffset));
|
||||||
IExpectationSetters<Object> flushExpectation = PowerMock.expectLastCall();
|
IExpectationSetters<Object> flushExpectation = PowerMock.expectLastCall();
|
||||||
if (flushError != null) {
|
if (flushError != null) {
|
||||||
flushExpectation.andThrow(flushError).once();
|
flushExpectation.andThrow(flushError).once();
|
||||||
|
|
|
@ -17,9 +17,9 @@
|
||||||
|
|
||||||
package org.apache.kafka.copycat.runtime.standalone;
|
package org.apache.kafka.copycat.runtime.standalone;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.TopicPartition;
|
||||||
import org.apache.kafka.copycat.connector.Connector;
|
import org.apache.kafka.copycat.connector.Connector;
|
||||||
import org.apache.kafka.copycat.connector.Task;
|
import org.apache.kafka.copycat.connector.Task;
|
||||||
import org.apache.kafka.copycat.connector.TopicPartition;
|
|
||||||
import org.apache.kafka.copycat.errors.CopycatException;
|
import org.apache.kafka.copycat.errors.CopycatException;
|
||||||
import org.apache.kafka.copycat.runtime.ConnectorConfig;
|
import org.apache.kafka.copycat.runtime.ConnectorConfig;
|
||||||
import org.apache.kafka.copycat.runtime.Worker;
|
import org.apache.kafka.copycat.runtime.Worker;
|
||||||
|
|
Loading…
Reference in New Issue