Rename Coordinator to Herder to avoid confusion with the consumer coordinator.

This commit is contained in:
Ewen Cheslack-Postava 2015-08-14 14:14:44 -07:00
parent 7bf807596a
commit 8c108b0cac
7 changed files with 43 additions and 43 deletions

View File

@ -19,9 +19,9 @@ package org.apache.kafka.copycat.cli;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.copycat.runtime.Coordinator;
import org.apache.kafka.copycat.runtime.Herder;
import org.apache.kafka.copycat.runtime.Worker;
import org.apache.kafka.copycat.runtime.standalone.StandaloneCoordinator;
import org.apache.kafka.copycat.runtime.standalone.StandaloneHerder;
import org.apache.kafka.copycat.util.Callback;
import org.apache.kafka.copycat.util.FutureCallback;
import org.slf4j.Logger;
@ -59,8 +59,8 @@ public class CopycatStandalone {
WorkerConfig workerConfig = new WorkerConfig(workerProps);
Worker worker = new Worker(workerConfig);
Coordinator coordinator = new StandaloneCoordinator(worker);
final org.apache.kafka.copycat.runtime.Copycat copycat = new org.apache.kafka.copycat.runtime.Copycat(worker, coordinator);
Herder herder = new StandaloneHerder(worker);
final org.apache.kafka.copycat.runtime.Copycat copycat = new org.apache.kafka.copycat.runtime.Copycat(worker, herder);
copycat.start();
try {
@ -73,7 +73,7 @@ public class CopycatStandalone {
log.error("Failed to create job for {}", connectorPropsFile);
}
});
coordinator.addConnector(connectorProps, cb);
herder.addConnector(connectorProps, cb);
cb.get();
}
} catch (Throwable t) {

View File

@ -25,7 +25,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* This class ties together all the components of a Copycat process (coordinator, worker,
* This class ties together all the components of a Copycat process (herder, worker,
* storage, command interface), managing their lifecycle.
*/
@InterfaceStability.Unstable
@ -33,16 +33,16 @@ public class Copycat {
private static final Logger log = LoggerFactory.getLogger(Copycat.class);
private final Worker worker;
private final Coordinator coordinator;
private final Herder herder;
private final CountDownLatch startLatch = new CountDownLatch(1);
private final CountDownLatch stopLatch = new CountDownLatch(1);
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final ShutdownHook shutdownHook;
public Copycat(Worker worker, Coordinator coordinator) {
public Copycat(Worker worker, Herder herder) {
log.debug("Copycat created");
this.worker = worker;
this.coordinator = coordinator;
this.herder = herder;
shutdownHook = new ShutdownHook();
}
@ -51,7 +51,7 @@ public class Copycat {
Runtime.getRuntime().addShutdownHook(shutdownHook);
worker.start();
coordinator.start();
herder.start();
log.info("Copycat started");
@ -63,7 +63,7 @@ public class Copycat {
if (!wasShuttingDown) {
log.info("Copycat stopping");
coordinator.stop();
herder.stop();
worker.stop();
log.info("Copycat stopped");

View File

@ -23,7 +23,7 @@ import java.util.Properties;
/**
* <p>
* The coordinator interface manages the interaction between workers and is the main interface for external components
* The herder interface tracks and manages workers and connectors. It is the main interface for external components
* to make changes to the state of the cluster. For example, in distributed mode, an implementation of this class
* knows how to accept a connector configuration, may need to route it to the current leader worker for the cluster so
* the config can be written to persistent storage, and then ensures the new connector is correctly instantiated on one
@ -38,11 +38,11 @@ import java.util.Properties;
* In standalone mode, this implementation of this class will be trivial because no coordination is needed. In that case,
* the implementation will mainly be delegating tasks directly to other components. For example, when creating a new
* connector in standalone mode, there is no need to persist the config and the connector and its tasks must run in the
* same process, so the standalone coordinator implementation can immediately instantiate and start the connector and its
* same process, so the standalone herder implementation can immediately instantiate and start the connector and its
* tasks.
* </p>
*/
public interface Coordinator {
public interface Herder {
void start();
@ -50,7 +50,7 @@ public interface Coordinator {
/**
* Submit a connector job to the cluster. This works from any node by forwarding the request to
* the leader coordinator if necessary.
* the leader herder if necessary.
*
* @param connectorProps user-specified properties for this job
* @param callback callback to invoke when the request completes

View File

@ -141,7 +141,7 @@ public class Worker<K, V> {
for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) {
WorkerTask task = entry.getValue();
log.warn("Shutting down task {} uncleanly; coordinator should have shut down "
log.warn("Shutting down task {} uncleanly; herder should have shut down "
+ "tasks before the Worker is stopped.", task);
try {
task.stop();
@ -178,7 +178,7 @@ public class Worker<K, V> {
public void addTask(ConnectorTaskId id, String taskClassName, Properties props)
throws CopycatException {
if (tasks.containsKey(id)) {
String msg = "Task already exists in this worker; the coordinator should not have requested "
String msg = "Task already exists in this worker; the herder should not have requested "
+ "that this : " + id;
log.error(msg);
throw new CopycatRuntimeException(msg);

View File

@ -20,23 +20,23 @@ package org.apache.kafka.copycat.runtime.standalone;
import org.apache.kafka.copycat.connector.ConnectorContext;
/**
* ConnectorContext for use with the StandaloneCoordinator, which maintains all connectors and tasks
* ConnectorContext for use with the StandaloneHerder, which maintains all connectors and tasks
* in a single process.
*/
class StandaloneConnectorContext implements ConnectorContext {
private StandaloneCoordinator coordinator;
private StandaloneHerder herder;
private String connectorName;
public StandaloneConnectorContext(StandaloneCoordinator coordinator, String connectorName) {
this.coordinator = coordinator;
public StandaloneConnectorContext(StandaloneHerder herder, String connectorName) {
this.herder = herder;
this.connectorName = connectorName;
}
@Override
public void requestTaskReconfiguration() {
// This is trivial to forward since there is only one coordinator and it's in memory in this
// This is trivial to forward since there is only one herder and it's in memory in this
// process
coordinator.requestTaskReconfiguration(connectorName);
herder.requestTaskReconfiguration(connectorName);
}
}

View File

@ -22,7 +22,7 @@ import org.apache.kafka.copycat.connector.Connector;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.apache.kafka.copycat.runtime.ConnectorConfig;
import org.apache.kafka.copycat.runtime.Coordinator;
import org.apache.kafka.copycat.runtime.Herder;
import org.apache.kafka.copycat.runtime.Worker;
import org.apache.kafka.copycat.sink.SinkConnector;
import org.apache.kafka.copycat.sink.SinkTask;
@ -33,25 +33,25 @@ import org.slf4j.LoggerFactory;
import java.util.*;
/**
* Single process, in-memory "coordinator". Useful for a standalone copycat process.
* Single process, in-memory "herder". Useful for a standalone copycat process.
*/
public class StandaloneCoordinator implements Coordinator {
private static final Logger log = LoggerFactory.getLogger(StandaloneCoordinator.class);
public class StandaloneHerder implements Herder {
private static final Logger log = LoggerFactory.getLogger(StandaloneHerder.class);
private Worker worker;
private HashMap<String, ConnectorState> connectors = new HashMap<>();
public StandaloneCoordinator(Worker worker) {
public StandaloneHerder(Worker worker) {
this.worker = worker;
}
public synchronized void start() {
log.info("Coordinator starting");
log.info("Coordinator started");
log.info("Herder starting");
log.info("Herder started");
}
public synchronized void stop() {
log.info("Coordinator stopping");
log.info("Herder stopping");
// There's no coordination/hand-off to do here since this is all standalone. Instead, we
// should just clean up the stuff we normally would, i.e. cleanly checkpoint and shutdown all
@ -62,7 +62,7 @@ public class StandaloneCoordinator implements Coordinator {
}
connectors.clear();
log.info("Coordinator stopped");
log.info("Herder stopped");
}
@Override
@ -173,7 +173,7 @@ public class StandaloneCoordinator implements Coordinator {
for (int i = 0; i < taskConfigs.size(); i++) {
ConnectorTaskId taskId = new ConnectorTaskId(state.name, i);
Properties config = taskConfigs.get(i);
// TODO: This probably shouldn't be in the Coordinator. It's nice to have Copycat ensure the list of topics
// TODO: This probably shouldn't be in the Herder. It's nice to have Copycat ensure the list of topics
// is automatically provided to tasks since it is required by the framework, but this
String subscriptionTopics = Utils.join(state.inputTopics, ",");
if (state.connector instanceof SinkConnector) {

View File

@ -44,13 +44,13 @@ import java.util.Properties;
import java.util.concurrent.TimeUnit;
@RunWith(PowerMockRunner.class)
@PrepareForTest({StandaloneCoordinator.class})
@PrepareForTest({StandaloneHerder.class})
@PowerMockIgnore("javax.management.*")
public class StandaloneCoordinatorTest {
public class StandaloneHerderTest {
private static final String CONNECTOR_NAME = "test";
private static final String TOPICS_LIST_STR = "topic1,topic2";
private StandaloneCoordinator coordinator;
private StandaloneHerder herder;
@Mock protected Worker worker;
private Connector connector;
@Mock protected Callback<String> createCallback;
@ -61,12 +61,12 @@ public class StandaloneCoordinatorTest {
@Before
public void setup() {
worker = PowerMock.createMock(Worker.class);
coordinator = new StandaloneCoordinator(worker);
herder = new StandaloneHerder(worker);
connectorProps = new Properties();
connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
connectorProps.setProperty(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
PowerMock.mockStatic(StandaloneCoordinator.class);
PowerMock.mockStatic(StandaloneHerder.class);
// These can be anything since connectors can pass along whatever they want.
taskProps = new Properties();
@ -79,7 +79,7 @@ public class StandaloneCoordinatorTest {
expectAdd(BogusSourceClass.class, BogusSourceTask.class, false);
PowerMock.replayAll();
coordinator.addConnector(connectorProps, createCallback);
herder.addConnector(connectorProps, createCallback);
PowerMock.verifyAll();
}
@ -91,7 +91,7 @@ public class StandaloneCoordinatorTest {
PowerMock.replayAll();
coordinator.addConnector(connectorProps, createCallback);
herder.addConnector(connectorProps, createCallback);
PowerMock.verifyAll();
}
@ -103,14 +103,14 @@ public class StandaloneCoordinatorTest {
expectDestroy();
PowerMock.replayAll();
coordinator.addConnector(connectorProps, createCallback);
herder.addConnector(connectorProps, createCallback);
FutureCallback<Void> futureCb = new FutureCallback<>(new Callback<Void>() {
@Override
public void onCompletion(Throwable error, Void result) {
}
});
coordinator.deleteConnector(CONNECTOR_NAME, futureCb);
herder.deleteConnector(CONNECTOR_NAME, futureCb);
futureCb.get(1000L, TimeUnit.MILLISECONDS);
PowerMock.verifyAll();
}
@ -133,7 +133,7 @@ public class StandaloneCoordinatorTest {
boolean sink, boolean expectCallback) throws Exception {
connectorProps.setProperty(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connClass.getName());
PowerMock.expectPrivate(StandaloneCoordinator.class, "instantiateConnector", connClass.getName())
PowerMock.expectPrivate(StandaloneHerder.class, "instantiateConnector", connClass.getName())
.andReturn(connector);
if (expectCallback) {
createCallback.onCompletion(null, CONNECTOR_NAME);