Rename Coordinator to Herder to avoid confusion with the consumer coordinator.

This commit is contained in:
Ewen Cheslack-Postava 2015-08-14 14:14:44 -07:00
parent 7bf807596a
commit 8c108b0cac
7 changed files with 43 additions and 43 deletions

View File

@ -19,9 +19,9 @@ package org.apache.kafka.copycat.cli;
import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.copycat.runtime.Coordinator; import org.apache.kafka.copycat.runtime.Herder;
import org.apache.kafka.copycat.runtime.Worker; import org.apache.kafka.copycat.runtime.Worker;
import org.apache.kafka.copycat.runtime.standalone.StandaloneCoordinator; import org.apache.kafka.copycat.runtime.standalone.StandaloneHerder;
import org.apache.kafka.copycat.util.Callback; import org.apache.kafka.copycat.util.Callback;
import org.apache.kafka.copycat.util.FutureCallback; import org.apache.kafka.copycat.util.FutureCallback;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -59,8 +59,8 @@ public class CopycatStandalone {
WorkerConfig workerConfig = new WorkerConfig(workerProps); WorkerConfig workerConfig = new WorkerConfig(workerProps);
Worker worker = new Worker(workerConfig); Worker worker = new Worker(workerConfig);
Coordinator coordinator = new StandaloneCoordinator(worker); Herder herder = new StandaloneHerder(worker);
final org.apache.kafka.copycat.runtime.Copycat copycat = new org.apache.kafka.copycat.runtime.Copycat(worker, coordinator); final org.apache.kafka.copycat.runtime.Copycat copycat = new org.apache.kafka.copycat.runtime.Copycat(worker, herder);
copycat.start(); copycat.start();
try { try {
@ -73,7 +73,7 @@ public class CopycatStandalone {
log.error("Failed to create job for {}", connectorPropsFile); log.error("Failed to create job for {}", connectorPropsFile);
} }
}); });
coordinator.addConnector(connectorProps, cb); herder.addConnector(connectorProps, cb);
cb.get(); cb.get();
} }
} catch (Throwable t) { } catch (Throwable t) {

View File

@ -25,7 +25,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* This class ties together all the components of a Copycat process (coordinator, worker, * This class ties together all the components of a Copycat process (herder, worker,
* storage, command interface), managing their lifecycle. * storage, command interface), managing their lifecycle.
*/ */
@InterfaceStability.Unstable @InterfaceStability.Unstable
@ -33,16 +33,16 @@ public class Copycat {
private static final Logger log = LoggerFactory.getLogger(Copycat.class); private static final Logger log = LoggerFactory.getLogger(Copycat.class);
private final Worker worker; private final Worker worker;
private final Coordinator coordinator; private final Herder herder;
private final CountDownLatch startLatch = new CountDownLatch(1); private final CountDownLatch startLatch = new CountDownLatch(1);
private final CountDownLatch stopLatch = new CountDownLatch(1); private final CountDownLatch stopLatch = new CountDownLatch(1);
private final AtomicBoolean shutdown = new AtomicBoolean(false); private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final ShutdownHook shutdownHook; private final ShutdownHook shutdownHook;
public Copycat(Worker worker, Coordinator coordinator) { public Copycat(Worker worker, Herder herder) {
log.debug("Copycat created"); log.debug("Copycat created");
this.worker = worker; this.worker = worker;
this.coordinator = coordinator; this.herder = herder;
shutdownHook = new ShutdownHook(); shutdownHook = new ShutdownHook();
} }
@ -51,7 +51,7 @@ public class Copycat {
Runtime.getRuntime().addShutdownHook(shutdownHook); Runtime.getRuntime().addShutdownHook(shutdownHook);
worker.start(); worker.start();
coordinator.start(); herder.start();
log.info("Copycat started"); log.info("Copycat started");
@ -63,7 +63,7 @@ public class Copycat {
if (!wasShuttingDown) { if (!wasShuttingDown) {
log.info("Copycat stopping"); log.info("Copycat stopping");
coordinator.stop(); herder.stop();
worker.stop(); worker.stop();
log.info("Copycat stopped"); log.info("Copycat stopped");

View File

@ -23,7 +23,7 @@ import java.util.Properties;
/** /**
* <p> * <p>
* The coordinator interface manages the interaction between workers and is the main interface for external components * The herder interface tracks and manages workers and connectors. It is the main interface for external components
* to make changes to the state of the cluster. For example, in distributed mode, an implementation of this class * to make changes to the state of the cluster. For example, in distributed mode, an implementation of this class
* knows how to accept a connector configuration, may need to route it to the current leader worker for the cluster so * knows how to accept a connector configuration, may need to route it to the current leader worker for the cluster so
* the config can be written to persistent storage, and then ensures the new connector is correctly instantiated on one * the config can be written to persistent storage, and then ensures the new connector is correctly instantiated on one
@ -38,11 +38,11 @@ import java.util.Properties;
* In standalone mode, this implementation of this class will be trivial because no coordination is needed. In that case, * In standalone mode, this implementation of this class will be trivial because no coordination is needed. In that case,
* the implementation will mainly be delegating tasks directly to other components. For example, when creating a new * the implementation will mainly be delegating tasks directly to other components. For example, when creating a new
* connector in standalone mode, there is no need to persist the config and the connector and its tasks must run in the * connector in standalone mode, there is no need to persist the config and the connector and its tasks must run in the
* same process, so the standalone coordinator implementation can immediately instantiate and start the connector and its * same process, so the standalone herder implementation can immediately instantiate and start the connector and its
* tasks. * tasks.
* </p> * </p>
*/ */
public interface Coordinator { public interface Herder {
void start(); void start();
@ -50,7 +50,7 @@ public interface Coordinator {
/** /**
* Submit a connector job to the cluster. This works from any node by forwarding the request to * Submit a connector job to the cluster. This works from any node by forwarding the request to
* the leader coordinator if necessary. * the leader herder if necessary.
* *
* @param connectorProps user-specified properties for this job * @param connectorProps user-specified properties for this job
* @param callback callback to invoke when the request completes * @param callback callback to invoke when the request completes

View File

@ -141,7 +141,7 @@ public class Worker<K, V> {
for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) { for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) {
WorkerTask task = entry.getValue(); WorkerTask task = entry.getValue();
log.warn("Shutting down task {} uncleanly; coordinator should have shut down " log.warn("Shutting down task {} uncleanly; herder should have shut down "
+ "tasks before the Worker is stopped.", task); + "tasks before the Worker is stopped.", task);
try { try {
task.stop(); task.stop();
@ -178,7 +178,7 @@ public class Worker<K, V> {
public void addTask(ConnectorTaskId id, String taskClassName, Properties props) public void addTask(ConnectorTaskId id, String taskClassName, Properties props)
throws CopycatException { throws CopycatException {
if (tasks.containsKey(id)) { if (tasks.containsKey(id)) {
String msg = "Task already exists in this worker; the coordinator should not have requested " String msg = "Task already exists in this worker; the herder should not have requested "
+ "that this : " + id; + "that this : " + id;
log.error(msg); log.error(msg);
throw new CopycatRuntimeException(msg); throw new CopycatRuntimeException(msg);

View File

@ -20,23 +20,23 @@ package org.apache.kafka.copycat.runtime.standalone;
import org.apache.kafka.copycat.connector.ConnectorContext; import org.apache.kafka.copycat.connector.ConnectorContext;
/** /**
* ConnectorContext for use with the StandaloneCoordinator, which maintains all connectors and tasks * ConnectorContext for use with the StandaloneHerder, which maintains all connectors and tasks
* in a single process. * in a single process.
*/ */
class StandaloneConnectorContext implements ConnectorContext { class StandaloneConnectorContext implements ConnectorContext {
private StandaloneCoordinator coordinator; private StandaloneHerder herder;
private String connectorName; private String connectorName;
public StandaloneConnectorContext(StandaloneCoordinator coordinator, String connectorName) { public StandaloneConnectorContext(StandaloneHerder herder, String connectorName) {
this.coordinator = coordinator; this.herder = herder;
this.connectorName = connectorName; this.connectorName = connectorName;
} }
@Override @Override
public void requestTaskReconfiguration() { public void requestTaskReconfiguration() {
// This is trivial to forward since there is only one coordinator and it's in memory in this // This is trivial to forward since there is only one herder and it's in memory in this
// process // process
coordinator.requestTaskReconfiguration(connectorName); herder.requestTaskReconfiguration(connectorName);
} }
} }

View File

@ -22,7 +22,7 @@ import org.apache.kafka.copycat.connector.Connector;
import org.apache.kafka.copycat.errors.CopycatException; import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.errors.CopycatRuntimeException; import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.apache.kafka.copycat.runtime.ConnectorConfig; import org.apache.kafka.copycat.runtime.ConnectorConfig;
import org.apache.kafka.copycat.runtime.Coordinator; import org.apache.kafka.copycat.runtime.Herder;
import org.apache.kafka.copycat.runtime.Worker; import org.apache.kafka.copycat.runtime.Worker;
import org.apache.kafka.copycat.sink.SinkConnector; import org.apache.kafka.copycat.sink.SinkConnector;
import org.apache.kafka.copycat.sink.SinkTask; import org.apache.kafka.copycat.sink.SinkTask;
@ -33,25 +33,25 @@ import org.slf4j.LoggerFactory;
import java.util.*; import java.util.*;
/** /**
* Single process, in-memory "coordinator". Useful for a standalone copycat process. * Single process, in-memory "herder". Useful for a standalone copycat process.
*/ */
public class StandaloneCoordinator implements Coordinator { public class StandaloneHerder implements Herder {
private static final Logger log = LoggerFactory.getLogger(StandaloneCoordinator.class); private static final Logger log = LoggerFactory.getLogger(StandaloneHerder.class);
private Worker worker; private Worker worker;
private HashMap<String, ConnectorState> connectors = new HashMap<>(); private HashMap<String, ConnectorState> connectors = new HashMap<>();
public StandaloneCoordinator(Worker worker) { public StandaloneHerder(Worker worker) {
this.worker = worker; this.worker = worker;
} }
public synchronized void start() { public synchronized void start() {
log.info("Coordinator starting"); log.info("Herder starting");
log.info("Coordinator started"); log.info("Herder started");
} }
public synchronized void stop() { public synchronized void stop() {
log.info("Coordinator stopping"); log.info("Herder stopping");
// There's no coordination/hand-off to do here since this is all standalone. Instead, we // There's no coordination/hand-off to do here since this is all standalone. Instead, we
// should just clean up the stuff we normally would, i.e. cleanly checkpoint and shutdown all // should just clean up the stuff we normally would, i.e. cleanly checkpoint and shutdown all
@ -62,7 +62,7 @@ public class StandaloneCoordinator implements Coordinator {
} }
connectors.clear(); connectors.clear();
log.info("Coordinator stopped"); log.info("Herder stopped");
} }
@Override @Override
@ -173,7 +173,7 @@ public class StandaloneCoordinator implements Coordinator {
for (int i = 0; i < taskConfigs.size(); i++) { for (int i = 0; i < taskConfigs.size(); i++) {
ConnectorTaskId taskId = new ConnectorTaskId(state.name, i); ConnectorTaskId taskId = new ConnectorTaskId(state.name, i);
Properties config = taskConfigs.get(i); Properties config = taskConfigs.get(i);
// TODO: This probably shouldn't be in the Coordinator. It's nice to have Copycat ensure the list of topics // TODO: This probably shouldn't be in the Herder. It's nice to have Copycat ensure the list of topics
// is automatically provided to tasks since it is required by the framework, but this // is automatically provided to tasks since it is required by the framework, but this
String subscriptionTopics = Utils.join(state.inputTopics, ","); String subscriptionTopics = Utils.join(state.inputTopics, ",");
if (state.connector instanceof SinkConnector) { if (state.connector instanceof SinkConnector) {

View File

@ -44,13 +44,13 @@ import java.util.Properties;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@RunWith(PowerMockRunner.class) @RunWith(PowerMockRunner.class)
@PrepareForTest({StandaloneCoordinator.class}) @PrepareForTest({StandaloneHerder.class})
@PowerMockIgnore("javax.management.*") @PowerMockIgnore("javax.management.*")
public class StandaloneCoordinatorTest { public class StandaloneHerderTest {
private static final String CONNECTOR_NAME = "test"; private static final String CONNECTOR_NAME = "test";
private static final String TOPICS_LIST_STR = "topic1,topic2"; private static final String TOPICS_LIST_STR = "topic1,topic2";
private StandaloneCoordinator coordinator; private StandaloneHerder herder;
@Mock protected Worker worker; @Mock protected Worker worker;
private Connector connector; private Connector connector;
@Mock protected Callback<String> createCallback; @Mock protected Callback<String> createCallback;
@ -61,12 +61,12 @@ public class StandaloneCoordinatorTest {
@Before @Before
public void setup() { public void setup() {
worker = PowerMock.createMock(Worker.class); worker = PowerMock.createMock(Worker.class);
coordinator = new StandaloneCoordinator(worker); herder = new StandaloneHerder(worker);
connectorProps = new Properties(); connectorProps = new Properties();
connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME); connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
connectorProps.setProperty(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR); connectorProps.setProperty(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
PowerMock.mockStatic(StandaloneCoordinator.class); PowerMock.mockStatic(StandaloneHerder.class);
// These can be anything since connectors can pass along whatever they want. // These can be anything since connectors can pass along whatever they want.
taskProps = new Properties(); taskProps = new Properties();
@ -79,7 +79,7 @@ public class StandaloneCoordinatorTest {
expectAdd(BogusSourceClass.class, BogusSourceTask.class, false); expectAdd(BogusSourceClass.class, BogusSourceTask.class, false);
PowerMock.replayAll(); PowerMock.replayAll();
coordinator.addConnector(connectorProps, createCallback); herder.addConnector(connectorProps, createCallback);
PowerMock.verifyAll(); PowerMock.verifyAll();
} }
@ -91,7 +91,7 @@ public class StandaloneCoordinatorTest {
PowerMock.replayAll(); PowerMock.replayAll();
coordinator.addConnector(connectorProps, createCallback); herder.addConnector(connectorProps, createCallback);
PowerMock.verifyAll(); PowerMock.verifyAll();
} }
@ -103,14 +103,14 @@ public class StandaloneCoordinatorTest {
expectDestroy(); expectDestroy();
PowerMock.replayAll(); PowerMock.replayAll();
coordinator.addConnector(connectorProps, createCallback); herder.addConnector(connectorProps, createCallback);
FutureCallback<Void> futureCb = new FutureCallback<>(new Callback<Void>() { FutureCallback<Void> futureCb = new FutureCallback<>(new Callback<Void>() {
@Override @Override
public void onCompletion(Throwable error, Void result) { public void onCompletion(Throwable error, Void result) {
} }
}); });
coordinator.deleteConnector(CONNECTOR_NAME, futureCb); herder.deleteConnector(CONNECTOR_NAME, futureCb);
futureCb.get(1000L, TimeUnit.MILLISECONDS); futureCb.get(1000L, TimeUnit.MILLISECONDS);
PowerMock.verifyAll(); PowerMock.verifyAll();
} }
@ -133,7 +133,7 @@ public class StandaloneCoordinatorTest {
boolean sink, boolean expectCallback) throws Exception { boolean sink, boolean expectCallback) throws Exception {
connectorProps.setProperty(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connClass.getName()); connectorProps.setProperty(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connClass.getName());
PowerMock.expectPrivate(StandaloneCoordinator.class, "instantiateConnector", connClass.getName()) PowerMock.expectPrivate(StandaloneHerder.class, "instantiateConnector", connClass.getName())
.andReturn(connector); .andReturn(connector);
if (expectCallback) { if (expectCallback) {
createCallback.onCompletion(null, CONNECTOR_NAME); createCallback.onCompletion(null, CONNECTOR_NAME);