mirror of https://github.com/apache/kafka.git
Rename Coordinator to Herder to avoid confusion with the consumer coordinator.
This commit is contained in:
parent
7bf807596a
commit
8c108b0cac
|
@ -19,9 +19,9 @@ package org.apache.kafka.copycat.cli;
|
|||
|
||||
import org.apache.kafka.common.annotation.InterfaceStability;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.copycat.runtime.Coordinator;
|
||||
import org.apache.kafka.copycat.runtime.Herder;
|
||||
import org.apache.kafka.copycat.runtime.Worker;
|
||||
import org.apache.kafka.copycat.runtime.standalone.StandaloneCoordinator;
|
||||
import org.apache.kafka.copycat.runtime.standalone.StandaloneHerder;
|
||||
import org.apache.kafka.copycat.util.Callback;
|
||||
import org.apache.kafka.copycat.util.FutureCallback;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -59,8 +59,8 @@ public class CopycatStandalone {
|
|||
|
||||
WorkerConfig workerConfig = new WorkerConfig(workerProps);
|
||||
Worker worker = new Worker(workerConfig);
|
||||
Coordinator coordinator = new StandaloneCoordinator(worker);
|
||||
final org.apache.kafka.copycat.runtime.Copycat copycat = new org.apache.kafka.copycat.runtime.Copycat(worker, coordinator);
|
||||
Herder herder = new StandaloneHerder(worker);
|
||||
final org.apache.kafka.copycat.runtime.Copycat copycat = new org.apache.kafka.copycat.runtime.Copycat(worker, herder);
|
||||
copycat.start();
|
||||
|
||||
try {
|
||||
|
@ -73,7 +73,7 @@ public class CopycatStandalone {
|
|||
log.error("Failed to create job for {}", connectorPropsFile);
|
||||
}
|
||||
});
|
||||
coordinator.addConnector(connectorProps, cb);
|
||||
herder.addConnector(connectorProps, cb);
|
||||
cb.get();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
|
|
|
@ -25,7 +25,7 @@ import java.util.concurrent.CountDownLatch;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* This class ties together all the components of a Copycat process (coordinator, worker,
|
||||
* This class ties together all the components of a Copycat process (herder, worker,
|
||||
* storage, command interface), managing their lifecycle.
|
||||
*/
|
||||
@InterfaceStability.Unstable
|
||||
|
@ -33,16 +33,16 @@ public class Copycat {
|
|||
private static final Logger log = LoggerFactory.getLogger(Copycat.class);
|
||||
|
||||
private final Worker worker;
|
||||
private final Coordinator coordinator;
|
||||
private final Herder herder;
|
||||
private final CountDownLatch startLatch = new CountDownLatch(1);
|
||||
private final CountDownLatch stopLatch = new CountDownLatch(1);
|
||||
private final AtomicBoolean shutdown = new AtomicBoolean(false);
|
||||
private final ShutdownHook shutdownHook;
|
||||
|
||||
public Copycat(Worker worker, Coordinator coordinator) {
|
||||
public Copycat(Worker worker, Herder herder) {
|
||||
log.debug("Copycat created");
|
||||
this.worker = worker;
|
||||
this.coordinator = coordinator;
|
||||
this.herder = herder;
|
||||
shutdownHook = new ShutdownHook();
|
||||
}
|
||||
|
||||
|
@ -51,7 +51,7 @@ public class Copycat {
|
|||
Runtime.getRuntime().addShutdownHook(shutdownHook);
|
||||
|
||||
worker.start();
|
||||
coordinator.start();
|
||||
herder.start();
|
||||
|
||||
log.info("Copycat started");
|
||||
|
||||
|
@ -63,7 +63,7 @@ public class Copycat {
|
|||
if (!wasShuttingDown) {
|
||||
log.info("Copycat stopping");
|
||||
|
||||
coordinator.stop();
|
||||
herder.stop();
|
||||
worker.stop();
|
||||
|
||||
log.info("Copycat stopped");
|
||||
|
|
|
@ -23,7 +23,7 @@ import java.util.Properties;
|
|||
|
||||
/**
|
||||
* <p>
|
||||
* The coordinator interface manages the interaction between workers and is the main interface for external components
|
||||
* The herder interface tracks and manages workers and connectors. It is the main interface for external components
|
||||
* to make changes to the state of the cluster. For example, in distributed mode, an implementation of this class
|
||||
* knows how to accept a connector configuration, may need to route it to the current leader worker for the cluster so
|
||||
* the config can be written to persistent storage, and then ensures the new connector is correctly instantiated on one
|
||||
|
@ -38,11 +38,11 @@ import java.util.Properties;
|
|||
* In standalone mode, this implementation of this class will be trivial because no coordination is needed. In that case,
|
||||
* the implementation will mainly be delegating tasks directly to other components. For example, when creating a new
|
||||
* connector in standalone mode, there is no need to persist the config and the connector and its tasks must run in the
|
||||
* same process, so the standalone coordinator implementation can immediately instantiate and start the connector and its
|
||||
* same process, so the standalone herder implementation can immediately instantiate and start the connector and its
|
||||
* tasks.
|
||||
* </p>
|
||||
*/
|
||||
public interface Coordinator {
|
||||
public interface Herder {
|
||||
|
||||
void start();
|
||||
|
||||
|
@ -50,7 +50,7 @@ public interface Coordinator {
|
|||
|
||||
/**
|
||||
* Submit a connector job to the cluster. This works from any node by forwarding the request to
|
||||
* the leader coordinator if necessary.
|
||||
* the leader herder if necessary.
|
||||
*
|
||||
* @param connectorProps user-specified properties for this job
|
||||
* @param callback callback to invoke when the request completes
|
|
@ -141,7 +141,7 @@ public class Worker<K, V> {
|
|||
|
||||
for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) {
|
||||
WorkerTask task = entry.getValue();
|
||||
log.warn("Shutting down task {} uncleanly; coordinator should have shut down "
|
||||
log.warn("Shutting down task {} uncleanly; herder should have shut down "
|
||||
+ "tasks before the Worker is stopped.", task);
|
||||
try {
|
||||
task.stop();
|
||||
|
@ -178,7 +178,7 @@ public class Worker<K, V> {
|
|||
public void addTask(ConnectorTaskId id, String taskClassName, Properties props)
|
||||
throws CopycatException {
|
||||
if (tasks.containsKey(id)) {
|
||||
String msg = "Task already exists in this worker; the coordinator should not have requested "
|
||||
String msg = "Task already exists in this worker; the herder should not have requested "
|
||||
+ "that this : " + id;
|
||||
log.error(msg);
|
||||
throw new CopycatRuntimeException(msg);
|
||||
|
|
|
@ -20,23 +20,23 @@ package org.apache.kafka.copycat.runtime.standalone;
|
|||
import org.apache.kafka.copycat.connector.ConnectorContext;
|
||||
|
||||
/**
|
||||
* ConnectorContext for use with the StandaloneCoordinator, which maintains all connectors and tasks
|
||||
* ConnectorContext for use with the StandaloneHerder, which maintains all connectors and tasks
|
||||
* in a single process.
|
||||
*/
|
||||
class StandaloneConnectorContext implements ConnectorContext {
|
||||
|
||||
private StandaloneCoordinator coordinator;
|
||||
private StandaloneHerder herder;
|
||||
private String connectorName;
|
||||
|
||||
public StandaloneConnectorContext(StandaloneCoordinator coordinator, String connectorName) {
|
||||
this.coordinator = coordinator;
|
||||
public StandaloneConnectorContext(StandaloneHerder herder, String connectorName) {
|
||||
this.herder = herder;
|
||||
this.connectorName = connectorName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void requestTaskReconfiguration() {
|
||||
// This is trivial to forward since there is only one coordinator and it's in memory in this
|
||||
// This is trivial to forward since there is only one herder and it's in memory in this
|
||||
// process
|
||||
coordinator.requestTaskReconfiguration(connectorName);
|
||||
herder.requestTaskReconfiguration(connectorName);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ import org.apache.kafka.copycat.connector.Connector;
|
|||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||
import org.apache.kafka.copycat.runtime.ConnectorConfig;
|
||||
import org.apache.kafka.copycat.runtime.Coordinator;
|
||||
import org.apache.kafka.copycat.runtime.Herder;
|
||||
import org.apache.kafka.copycat.runtime.Worker;
|
||||
import org.apache.kafka.copycat.sink.SinkConnector;
|
||||
import org.apache.kafka.copycat.sink.SinkTask;
|
||||
|
@ -33,25 +33,25 @@ import org.slf4j.LoggerFactory;
|
|||
import java.util.*;
|
||||
|
||||
/**
|
||||
* Single process, in-memory "coordinator". Useful for a standalone copycat process.
|
||||
* Single process, in-memory "herder". Useful for a standalone copycat process.
|
||||
*/
|
||||
public class StandaloneCoordinator implements Coordinator {
|
||||
private static final Logger log = LoggerFactory.getLogger(StandaloneCoordinator.class);
|
||||
public class StandaloneHerder implements Herder {
|
||||
private static final Logger log = LoggerFactory.getLogger(StandaloneHerder.class);
|
||||
|
||||
private Worker worker;
|
||||
private HashMap<String, ConnectorState> connectors = new HashMap<>();
|
||||
|
||||
public StandaloneCoordinator(Worker worker) {
|
||||
public StandaloneHerder(Worker worker) {
|
||||
this.worker = worker;
|
||||
}
|
||||
|
||||
public synchronized void start() {
|
||||
log.info("Coordinator starting");
|
||||
log.info("Coordinator started");
|
||||
log.info("Herder starting");
|
||||
log.info("Herder started");
|
||||
}
|
||||
|
||||
public synchronized void stop() {
|
||||
log.info("Coordinator stopping");
|
||||
log.info("Herder stopping");
|
||||
|
||||
// There's no coordination/hand-off to do here since this is all standalone. Instead, we
|
||||
// should just clean up the stuff we normally would, i.e. cleanly checkpoint and shutdown all
|
||||
|
@ -62,7 +62,7 @@ public class StandaloneCoordinator implements Coordinator {
|
|||
}
|
||||
connectors.clear();
|
||||
|
||||
log.info("Coordinator stopped");
|
||||
log.info("Herder stopped");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -173,7 +173,7 @@ public class StandaloneCoordinator implements Coordinator {
|
|||
for (int i = 0; i < taskConfigs.size(); i++) {
|
||||
ConnectorTaskId taskId = new ConnectorTaskId(state.name, i);
|
||||
Properties config = taskConfigs.get(i);
|
||||
// TODO: This probably shouldn't be in the Coordinator. It's nice to have Copycat ensure the list of topics
|
||||
// TODO: This probably shouldn't be in the Herder. It's nice to have Copycat ensure the list of topics
|
||||
// is automatically provided to tasks since it is required by the framework, but this
|
||||
String subscriptionTopics = Utils.join(state.inputTopics, ",");
|
||||
if (state.connector instanceof SinkConnector) {
|
|
@ -44,13 +44,13 @@ import java.util.Properties;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@RunWith(PowerMockRunner.class)
|
||||
@PrepareForTest({StandaloneCoordinator.class})
|
||||
@PrepareForTest({StandaloneHerder.class})
|
||||
@PowerMockIgnore("javax.management.*")
|
||||
public class StandaloneCoordinatorTest {
|
||||
public class StandaloneHerderTest {
|
||||
private static final String CONNECTOR_NAME = "test";
|
||||
private static final String TOPICS_LIST_STR = "topic1,topic2";
|
||||
|
||||
private StandaloneCoordinator coordinator;
|
||||
private StandaloneHerder herder;
|
||||
@Mock protected Worker worker;
|
||||
private Connector connector;
|
||||
@Mock protected Callback<String> createCallback;
|
||||
|
@ -61,12 +61,12 @@ public class StandaloneCoordinatorTest {
|
|||
@Before
|
||||
public void setup() {
|
||||
worker = PowerMock.createMock(Worker.class);
|
||||
coordinator = new StandaloneCoordinator(worker);
|
||||
herder = new StandaloneHerder(worker);
|
||||
|
||||
connectorProps = new Properties();
|
||||
connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
|
||||
connectorProps.setProperty(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
|
||||
PowerMock.mockStatic(StandaloneCoordinator.class);
|
||||
PowerMock.mockStatic(StandaloneHerder.class);
|
||||
|
||||
// These can be anything since connectors can pass along whatever they want.
|
||||
taskProps = new Properties();
|
||||
|
@ -79,7 +79,7 @@ public class StandaloneCoordinatorTest {
|
|||
expectAdd(BogusSourceClass.class, BogusSourceTask.class, false);
|
||||
PowerMock.replayAll();
|
||||
|
||||
coordinator.addConnector(connectorProps, createCallback);
|
||||
herder.addConnector(connectorProps, createCallback);
|
||||
|
||||
PowerMock.verifyAll();
|
||||
}
|
||||
|
@ -91,7 +91,7 @@ public class StandaloneCoordinatorTest {
|
|||
|
||||
PowerMock.replayAll();
|
||||
|
||||
coordinator.addConnector(connectorProps, createCallback);
|
||||
herder.addConnector(connectorProps, createCallback);
|
||||
|
||||
PowerMock.verifyAll();
|
||||
}
|
||||
|
@ -103,14 +103,14 @@ public class StandaloneCoordinatorTest {
|
|||
expectDestroy();
|
||||
PowerMock.replayAll();
|
||||
|
||||
coordinator.addConnector(connectorProps, createCallback);
|
||||
herder.addConnector(connectorProps, createCallback);
|
||||
FutureCallback<Void> futureCb = new FutureCallback<>(new Callback<Void>() {
|
||||
@Override
|
||||
public void onCompletion(Throwable error, Void result) {
|
||||
|
||||
}
|
||||
});
|
||||
coordinator.deleteConnector(CONNECTOR_NAME, futureCb);
|
||||
herder.deleteConnector(CONNECTOR_NAME, futureCb);
|
||||
futureCb.get(1000L, TimeUnit.MILLISECONDS);
|
||||
PowerMock.verifyAll();
|
||||
}
|
||||
|
@ -133,7 +133,7 @@ public class StandaloneCoordinatorTest {
|
|||
boolean sink, boolean expectCallback) throws Exception {
|
||||
connectorProps.setProperty(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connClass.getName());
|
||||
|
||||
PowerMock.expectPrivate(StandaloneCoordinator.class, "instantiateConnector", connClass.getName())
|
||||
PowerMock.expectPrivate(StandaloneHerder.class, "instantiateConnector", connClass.getName())
|
||||
.andReturn(connector);
|
||||
if (expectCallback) {
|
||||
createCallback.onCompletion(null, CONNECTOR_NAME);
|
Loading…
Reference in New Issue