KAFKA-15387: Remove Connect's deprecated task configurations endpoint (#17412)

Reviewers: Mickael Maison <mickael.maison@gmail.com>
This commit is contained in:
Yash Mayya 2024-11-21 21:13:54 +07:00 committed by GitHub
parent e9ccc2d6f5
commit 4f1688742e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 15 additions and 136 deletions

View File

@ -327,20 +327,6 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
);
}
protected Map<ConnectorTaskId, Map<String, String>> buildTasksConfig(String connector) {
final ClusterConfigState configState = configBackingStore.snapshot();
if (!configState.contains(connector))
return Collections.emptyMap();
Map<ConnectorTaskId, Map<String, String>> configs = new HashMap<>();
for (ConnectorTaskId cti : configState.tasks(connector)) {
configs.put(cti, configState.rawTaskConfig(cti));
}
return configs;
}
@Override
public ConnectorStateInfo connectorStatus(String connName) {
ConnectorStatus connector = statusBackingStore.get(connName);

View File

@ -102,13 +102,6 @@ public interface Herder {
*/
void connectorConfig(String connName, Callback<Map<String, String>> callback);
/**
* Get the configuration for all tasks of a connector.
* @param connName name of the connector
* @param callback callback to invoke with the configuration
*/
void tasksConfig(String connName, Callback<Map<ConnectorTaskId, Map<String, String>>> callback);
/**
* Set the configuration for a connector. This supports creation and updating.
* @param connName name of the connector

View File

@ -906,26 +906,6 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
);
}
@Override
public void tasksConfig(String connName, final Callback<Map<ConnectorTaskId, Map<String, String>>> callback) {
log.trace("Submitting tasks config request {}", connName);
addRequest(
() -> {
if (checkRebalanceNeeded(callback))
return null;
if (!configState.contains(connName)) {
callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null);
} else {
callback.onCompletion(null, buildTasksConfig(connName));
}
return null;
},
forwardErrorAndTickThreadStages(callback)
);
}
@Override
protected Map<String, String> rawConfig(String connName) {
return configState.rawConnectorConfig(connName);

View File

@ -176,18 +176,6 @@ public class ConnectorsResource {
return requestHandler.completeRequest(cb);
}
@GET
@Path("/{connector}/tasks-config")
@Operation(deprecated = true, summary = "Get the configuration of all tasks for the specified connector")
public Map<ConnectorTaskId, Map<String, String>> getTasksConfig(
final @PathParam("connector") String connector) throws Throwable {
log.warn("The 'GET /connectors/{connector}/tasks-config' endpoint is deprecated and will be removed in the next major release. "
+ "Please use the 'GET /connectors/{connector}/tasks' endpoint instead.");
FutureCallback<Map<ConnectorTaskId, Map<String, String>>> cb = new FutureCallback<>();
herder.tasksConfig(connector, cb);
return requestHandler.completeRequest(cb);
}
@GET
@Path("/{connector}/status")
@Operation(summary = "Get the status for the specified connector")

View File

@ -635,15 +635,4 @@ public final class StandaloneHerder extends AbstractHerder {
return Objects.hash(seq);
}
}
@Override
public void tasksConfig(String connName, Callback<Map<ConnectorTaskId, Map<String, String>>> callback) {
Map<ConnectorTaskId, Map<String, String>> tasksConfig = buildTasksConfig(connName);
if (tasksConfig.isEmpty()) {
callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null);
return;
}
callback.onCompletion(null, tasksConfig);
}
}

View File

@ -32,7 +32,6 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
@ -55,7 +54,6 @@ import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import java.io.File;
import java.io.FileOutputStream;
@ -568,35 +566,6 @@ public class ConnectWorkerIntegrationTest {
);
}
/**
* The <strong><em>GET /connectors/{connector}/tasks-config</em></strong> endpoint was deprecated in
* <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-970%3A+Deprecate+and+remove+Connect%27s+redundant+task+configurations+endpoint">KIP-970</a>
* and is slated for removal in the next major release. This test verifies that the deprecation warning log is emitted on trying to use the
* deprecated endpoint.
*/
@Test
public void testTasksConfigDeprecation() throws Exception {
connect = connectBuilder.build();
// start the clusters
connect.start();
connect.configureConnector(CONNECTOR_NAME, defaultSourceConnectorProps(TOPIC_NAME));
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
NUM_TASKS,
"Connector tasks did not start in time"
);
try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(ConnectorsResource.class)) {
connect.requestGet(connect.endpointForResource("connectors/" + CONNECTOR_NAME + "/tasks-config"));
List<LogCaptureAppender.Event> logEvents = logCaptureAppender.getEvents();
assertEquals(1, logEvents.size());
assertEquals(Level.WARN.toString(), logEvents.get(0).getLevel());
assertTrue(logEvents.get(0).getMessage().contains("deprecated"));
}
}
@Test
public void testCreateConnectorWithPausedInitialState() throws Exception {
connect = connectBuilder.build();

View File

@ -2333,8 +2333,6 @@ public class DistributedHerderTest {
herder.connectorConfig(CONN1, connectorConfigCb);
FutureCallback<List<TaskInfo>> taskConfigsCb = new FutureCallback<>();
herder.taskConfigs(CONN1, taskConfigsCb);
FutureCallback<Map<ConnectorTaskId, Map<String, String>>> tasksConfigCb = new FutureCallback<>();
herder.tasksConfig(CONN1, tasksConfigCb);
herder.tick();
assertTrue(listConnectorsCb.isDone());
@ -2351,11 +2349,6 @@ public class DistributedHerderTest {
new TaskInfo(TASK1, TASK_CONFIG),
new TaskInfo(TASK2, TASK_CONFIG)),
taskConfigsCb.get());
Map<ConnectorTaskId, Map<String, String>> tasksConfig = new HashMap<>();
tasksConfig.put(TASK0, TASK_CONFIG);
tasksConfig.put(TASK1, TASK_CONFIG);
tasksConfig.put(TASK2, TASK_CONFIG);
assertEquals(tasksConfig, tasksConfigCb.get());
// Config transformation should not occur when requesting connector or task info
verify(configTransformer, never()).transform(eq(CONN1), any());

View File

@ -486,7 +486,7 @@ public class ConnectorsResourceTest {
}
@Test
public void testGetTasksConfig() throws Throwable {
public void testGetTaskConfigs() throws Throwable {
final ConnectorTaskId connectorTask0 = new ConnectorTaskId(CONNECTOR_NAME, 0);
final Map<String, String> connectorTask0Configs = new HashMap<>();
connectorTask0Configs.put("connector-task0-config0", "123");
@ -498,31 +498,22 @@ public class ConnectorsResourceTest {
final ConnectorTaskId connector2Task0 = new ConnectorTaskId(CONNECTOR2_NAME, 0);
final Map<String, String> connector2Task0Configs = Collections.singletonMap("connector2-task0-config0", "789");
final Map<ConnectorTaskId, Map<String, String>> expectedTasksConnector = new HashMap<>();
expectedTasksConnector.put(connectorTask0, connectorTask0Configs);
expectedTasksConnector.put(connectorTask1, connectorTask1Configs);
final Map<ConnectorTaskId, Map<String, String>> expectedTasksConnector2 = new HashMap<>();
expectedTasksConnector2.put(connector2Task0, connector2Task0Configs);
final List<TaskInfo> expectedTasksConnector = new ArrayList<>();
expectedTasksConnector.add(new TaskInfo(connectorTask0, connectorTask0Configs));
expectedTasksConnector.add(new TaskInfo(connectorTask1, connectorTask1Configs));
final ArgumentCaptor<Callback<Map<ConnectorTaskId, Map<String, String>>>> cb1 = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb1, expectedTasksConnector).when(herder).tasksConfig(eq(CONNECTOR_NAME), cb1.capture());
final ArgumentCaptor<Callback<Map<ConnectorTaskId, Map<String, String>>>> cb2 = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb2, expectedTasksConnector2).when(herder).tasksConfig(eq(CONNECTOR2_NAME), cb2.capture());
final List<TaskInfo> expectedTasksConnector2 = new ArrayList<>();
expectedTasksConnector2.add(new TaskInfo(connector2Task0, connector2Task0Configs));
Map<ConnectorTaskId, Map<String, String>> tasksConfig = connectorsResource.getTasksConfig(CONNECTOR_NAME);
assertEquals(expectedTasksConnector, tasksConfig);
Map<ConnectorTaskId, Map<String, String>> tasksConfig2 = connectorsResource.getTasksConfig(CONNECTOR2_NAME);
assertEquals(expectedTasksConnector2, tasksConfig2);
}
final ArgumentCaptor<Callback<List<TaskInfo>>> cb1 = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb1, expectedTasksConnector).when(herder).taskConfigs(eq(CONNECTOR_NAME), cb1.capture());
final ArgumentCaptor<Callback<List<TaskInfo>>> cb2 = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb2, expectedTasksConnector2).when(herder).taskConfigs(eq(CONNECTOR2_NAME), cb2.capture());
@Test
public void testGetTasksConfigConnectorNotFound() {
final ArgumentCaptor<Callback<Map<ConnectorTaskId, Map<String, String>>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackException(cb, new NotFoundException("not found"))
.when(herder).tasksConfig(eq(CONNECTOR_NAME), cb.capture());
assertThrows(NotFoundException.class, () ->
connectorsResource.getTasksConfig(CONNECTOR_NAME));
List<TaskInfo> taskConfigs = connectorsResource.getTaskConfigs(CONNECTOR_NAME);
assertEquals(expectedTasksConnector, taskConfigs);
List<TaskInfo> taskConfigs2 = connectorsResource.getTaskConfigs(CONNECTOR2_NAME);
assertEquals(expectedTasksConnector2, taskConfigs2);
}
@Test

View File

@ -677,16 +677,13 @@ public class StandaloneHerderTest {
Callback<ConnectorInfo> connectorInfoCb = mock(Callback.class);
Callback<Map<String, String>> connectorConfigCb = mock(Callback.class);
Callback<List<TaskInfo>> taskConfigsCb = mock(Callback.class);
Callback<Map<ConnectorTaskId, Map<String, String>>> tasksConfigCb = mock(Callback.class);
// Check accessors with empty worker
doNothing().when(listConnectorsCb).onCompletion(null, Collections.EMPTY_SET);
doNothing().when(connectorInfoCb).onCompletion(any(NotFoundException.class), isNull());
doNothing().when(taskConfigsCb).onCompletion(any(NotFoundException.class), isNull());
doNothing().when(tasksConfigCb).onCompletion(any(NotFoundException.class), isNull());
doNothing().when(connectorConfigCb).onCompletion(any(NotFoundException.class), isNull());
expectAdd(SourceSink.SOURCE);
expectConfigValidation(SourceSink.SOURCE, connConfig);
@ -699,16 +696,11 @@ public class StandaloneHerderTest {
TaskInfo taskInfo = new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig(SourceSink.SOURCE));
doNothing().when(taskConfigsCb).onCompletion(null, singletonList(taskInfo));
Map<ConnectorTaskId, Map<String, String>> tasksConfig = Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 0),
taskConfig(SourceSink.SOURCE));
doNothing().when(tasksConfigCb).onCompletion(null, tasksConfig);
// All operations are synchronous for StandaloneHerder, so we don't need to actually wait after making each call
herder.connectors(listConnectorsCb);
herder.connectorInfo(CONNECTOR_NAME, connectorInfoCb);
herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb);
herder.taskConfigs(CONNECTOR_NAME, taskConfigsCb);
herder.tasksConfig(CONNECTOR_NAME, tasksConfigCb);
herder.putConnectorConfig(CONNECTOR_NAME, connConfig, false, createCallback);
Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
@ -719,7 +711,6 @@ public class StandaloneHerderTest {
herder.connectorInfo(CONNECTOR_NAME, connectorInfoCb);
herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb);
herder.taskConfigs(CONNECTOR_NAME, taskConfigsCb);
herder.tasksConfig(CONNECTOR_NAME, tasksConfigCb);
// Config transformation should not occur when requesting connector or task info
verify(transformer, never()).transform(eq(CONNECTOR_NAME), any());
}

View File

@ -294,7 +294,6 @@ predicates.IsBar.pattern=bar</code></pre>
<li><code>PATCH /connectors/{name}/config</code> - patch the configuration parameters for a specific connector, where <code>null</code> values in the JSON body indicates removing of the key from the final configuration</li>
<li><code>GET /connectors/{name}/status</code> - get current status of the connector, including if it is running, failed, paused, etc., which worker it is assigned to, error information if it has failed, and the state of all its tasks</li>
<li><code>GET /connectors/{name}/tasks</code> - get a list of tasks currently running for a connector along with their configurations</li>
<li><code>GET /connectors/{name}/tasks-config</code> - get the configuration of all tasks for a specific connector. This endpoint is deprecated and will be removed in the next major release. Please use the <code>GET /connectors/{name}/tasks</code> endpoint instead. Note that the response structures of the two endpoints differ slightly, please refer to the <a href="/{{version}}/generated/connect_rest.yaml">OpenAPI documentation</a> for more details</li>
<li><code>GET /connectors/{name}/tasks/{taskid}/status</code> - get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failed</li>
<li><code>PUT /connectors/{name}/pause</code> - pause the connector and its tasks, which stops message processing until the connector is resumed. Any resources claimed by its tasks are left allocated, which allows the connector to begin processing data quickly once it is resumed.</li>
<li id="connect_stopconnector"><code>PUT /connectors/{name}/stop</code> - stop the connector and shut down its tasks, deallocating any resources claimed by its tasks. This is more efficient from a resource usage standpoint than pausing the connector, but can cause it to take longer to begin processing data once resumed. Note that the offsets for a connector can be only modified via the offsets management endpoints if it is in the stopped state</li>