KAFKA-15387: Deprecate Connect's redundant task configurations endpoint (#14361)

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Sagar Rao <sagarmeansocean@gmail.com>
This commit is contained in:
Yash Mayya 2023-10-14 10:16:50 +01:00 committed by GitHub
parent cd1b7639cb
commit 1c8bb61a43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 55 additions and 12 deletions

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.kafka.connect.runtime.rest.entities; package org.apache.kafka.connect.runtime.rest.entities;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.ConnectorTaskId;
@ -26,7 +27,8 @@ public class TaskInfo {
private final ConnectorTaskId id; private final ConnectorTaskId id;
private final Map<String, String> config; private final Map<String, String> config;
public TaskInfo(ConnectorTaskId id, Map<String, String> config) { @JsonCreator
public TaskInfo(@JsonProperty("id") ConnectorTaskId id, @JsonProperty("config") Map<String, String> config) {
this.id = id; this.id = id;
this.config = config; this.config = config;
} }

View File

@ -173,9 +173,11 @@ public class ConnectorsResource implements ConnectResource {
@GET @GET
@Path("/{connector}/tasks-config") @Path("/{connector}/tasks-config")
@Operation(summary = "Get the configuration of all tasks for the specified connector") @Operation(deprecated = true, summary = "Get the configuration of all tasks for the specified connector")
public Map<ConnectorTaskId, Map<String, String>> getTasksConfig( public Map<ConnectorTaskId, Map<String, String>> getTasksConfig(
final @PathParam("connector") String connector) throws Throwable { final @PathParam("connector") String connector) throws Throwable {
log.warn("The 'GET /connectors/{connector}/tasks-config' endpoint is deprecated and will be removed in the next major release. "
+ "Please use the 'GET /connectors/{connector}/tasks' endpoint instead.");
FutureCallback<Map<ConnectorTaskId, Map<String, String>>> cb = new FutureCallback<>(); FutureCallback<Map<ConnectorTaskId, Map<String, String>>> cb = new FutureCallback<>();
herder.tasksConfig(connector, cb); herder.tasksConfig(connector, cb);
return requestHandler.completeRequest(cb); return requestHandler.completeRequest(cb);

View File

@ -16,7 +16,9 @@
*/ */
package org.apache.kafka.connect.integration; package org.apache.kafka.connect.integration;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.WorkerHandle; import org.apache.kafka.connect.util.clusters.WorkerHandle;
@ -29,9 +31,11 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule; import org.junit.rules.TestRule;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Properties; import java.util.Properties;
@ -51,6 +55,8 @@ import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_F
import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
import static org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.CONNECTOR_SETUP_DURATION_MS; import static org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.CONNECTOR_SETUP_DURATION_MS;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -382,7 +388,7 @@ public class ConnectWorkerIntegrationTest {
); );
// If the connector is truly stopped, we should also see an empty set of tasks and task configs // If the connector is truly stopped, we should also see an empty set of tasks and task configs
assertEquals(Collections.emptyList(), connect.connectorInfo(CONNECTOR_NAME).tasks()); assertEquals(Collections.emptyList(), connect.connectorInfo(CONNECTOR_NAME).tasks());
assertEquals(Collections.emptyMap(), connect.taskConfigs(CONNECTOR_NAME)); assertEquals(Collections.emptyList(), connect.taskConfigs(CONNECTOR_NAME));
// Transition to RUNNING // Transition to RUNNING
connect.resumeConnector(CONNECTOR_NAME); connect.resumeConnector(CONNECTOR_NAME);
@ -411,7 +417,7 @@ public class ConnectWorkerIntegrationTest {
"Connector did not stop in time" "Connector did not stop in time"
); );
assertEquals(Collections.emptyList(), connect.connectorInfo(CONNECTOR_NAME).tasks()); assertEquals(Collections.emptyList(), connect.connectorInfo(CONNECTOR_NAME).tasks());
assertEquals(Collections.emptyMap(), connect.taskConfigs(CONNECTOR_NAME)); assertEquals(Collections.emptyList(), connect.taskConfigs(CONNECTOR_NAME));
// Transition to PAUSED // Transition to PAUSED
connect.pauseConnector(CONNECTOR_NAME); connect.pauseConnector(CONNECTOR_NAME);
@ -471,7 +477,7 @@ public class ConnectWorkerIntegrationTest {
); );
// If the connector is truly stopped, we should also see an empty set of tasks and task configs // If the connector is truly stopped, we should also see an empty set of tasks and task configs
assertEquals(Collections.emptyList(), connect.connectorInfo(CONNECTOR_NAME).tasks()); assertEquals(Collections.emptyList(), connect.connectorInfo(CONNECTOR_NAME).tasks());
assertEquals(Collections.emptyMap(), connect.taskConfigs(CONNECTOR_NAME)); assertEquals(Collections.emptyList(), connect.taskConfigs(CONNECTOR_NAME));
// Can resume a connector after its Connector has failed before shutdown after receiving a stop request // Can resume a connector after its Connector has failed before shutdown after receiving a stop request
props.remove("connector.start.inject.error"); props.remove("connector.start.inject.error");
@ -493,7 +499,7 @@ public class ConnectWorkerIntegrationTest {
"Connector did not stop in time" "Connector did not stop in time"
); );
assertEquals(Collections.emptyList(), connect.connectorInfo(CONNECTOR_NAME).tasks()); assertEquals(Collections.emptyList(), connect.connectorInfo(CONNECTOR_NAME).tasks());
assertEquals(Collections.emptyMap(), connect.taskConfigs(CONNECTOR_NAME)); assertEquals(Collections.emptyList(), connect.taskConfigs(CONNECTOR_NAME));
// Can resume a connector after its Connector has failed during shutdown after receiving a stop request // Can resume a connector after its Connector has failed during shutdown after receiving a stop request
connect.resumeConnector(CONNECTOR_NAME); connect.resumeConnector(CONNECTOR_NAME);
@ -511,6 +517,37 @@ public class ConnectWorkerIntegrationTest {
); );
} }
/**
* The <strong><em>GET /connectors/{connector}/tasks-config</em></strong> endpoint was deprecated in
* <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-970%3A+Deprecate+and+remove+Connect%27s+redundant+task+configurations+endpoint">KIP-970</a>
* and is slated for removal in the next major release. This test verifies that the deprecation warning log is emitted on trying to use the
* deprecated endpoint.
*/
@Test
public void testTasksConfigDeprecation() throws Exception {
connect = connectBuilder.build();
// start the clusters
connect.start();
connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");
connect.configureConnector(CONNECTOR_NAME, defaultSourceConnectorProps(TOPIC_NAME));
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
NUM_TASKS,
"Connector tasks did not start in time"
);
try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(ConnectorsResource.class)) {
connect.requestGet(connect.endpointForResource("connectors/" + CONNECTOR_NAME + "/tasks-config"));
List<LogCaptureAppender.Event> logEvents = logCaptureAppender.getEvents();
assertEquals(1, logEvents.size());
assertEquals(Level.WARN.toString(), logEvents.get(0).getLevel());
assertThat(logEvents.get(0).getMessage(), containsString("deprecated"));
}
}
private Map<String, String> defaultSourceConnectorProps(String topic) { private Map<String, String> defaultSourceConnectorProps(String topic) {
// setup up props for the source connector // setup up props for the source connector
Map<String, String> props = new HashMap<>(); Map<String, String> props = new HashMap<>();

View File

@ -31,6 +31,7 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ServerInfo; import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.util.SinkUtils; import org.apache.kafka.connect.util.SinkUtils;
import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpClient;
@ -611,19 +612,19 @@ public class EmbeddedConnectCluster {
/** /**
* Get the task configs of a connector running in this cluster. * Get the task configs of a connector running in this cluster.
*
* @param connectorName name of the connector * @param connectorName name of the connector
* @return a map from task ID (connector name + "-" + task number) to task config * @return a list of task configurations for the connector
*/ */
public Map<String, Map<String, String>> taskConfigs(String connectorName) { public List<TaskInfo> taskConfigs(String connectorName) {
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
String url = endpointForResource(String.format("connectors/%s/tasks-config", connectorName)); String url = endpointForResource(String.format("connectors/%s/tasks", connectorName));
Response response = requestGet(url); Response response = requestGet(url);
try { try {
if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) { if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) {
// We use String instead of ConnectorTaskId as the key here since the latter can't be automatically // We use String instead of ConnectorTaskId as the key here since the latter can't be automatically
// deserialized by Jackson when used as a JSON object key (i.e., when it's serialized as a JSON string) // deserialized by Jackson when used as a JSON object key (i.e., when it's serialized as a JSON string)
return mapper.readValue(responseToString(response), new TypeReference<Map<String, Map<String, String>>>() { }); return mapper.readValue(responseToString(response), new TypeReference<List<TaskInfo>>() { });
} }
} catch (IOException e) { } catch (IOException e) {
log.error("Could not read task configs from response: {}", log.error("Could not read task configs from response: {}",

View File

@ -298,7 +298,8 @@ listeners=http://localhost:8080,https://localhost:8443</pre>
<li><code>GET /connectors/{name}/config</code> - get the configuration parameters for a specific connector</li> <li><code>GET /connectors/{name}/config</code> - get the configuration parameters for a specific connector</li>
<li><code>PUT /connectors/{name}/config</code> - update the configuration parameters for a specific connector</li> <li><code>PUT /connectors/{name}/config</code> - update the configuration parameters for a specific connector</li>
<li><code>GET /connectors/{name}/status</code> - get current status of the connector, including if it is running, failed, paused, etc., which worker it is assigned to, error information if it has failed, and the state of all its tasks</li> <li><code>GET /connectors/{name}/status</code> - get current status of the connector, including if it is running, failed, paused, etc., which worker it is assigned to, error information if it has failed, and the state of all its tasks</li>
<li><code>GET /connectors/{name}/tasks</code> - get a list of tasks currently running for a connector</li> <li><code>GET /connectors/{name}/tasks</code> - get a list of tasks currently running for a connector along with their configurations</li>
<li><code>GET /connectors/{name}/tasks-config</code> - get the configuration of all tasks for a specific connector. This endpoint is deprecated and will be removed in the next major release. Please use the <code>GET /connectors/{name}/tasks</code> endpoint instead. Note that the response structures of the two endpoints differ slightly, please refer to the <a href="/{{version}}/generated/connect_rest.yaml">OpenAPI documentation</a> for more details</li>
<li><code>GET /connectors/{name}/tasks/{taskid}/status</code> - get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failed</li> <li><code>GET /connectors/{name}/tasks/{taskid}/status</code> - get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failed</li>
<li><code>PUT /connectors/{name}/pause</code> - pause the connector and its tasks, which stops message processing until the connector is resumed. Any resources claimed by its tasks are left allocated, which allows the connector to begin processing data quickly once it is resumed.</li> <li><code>PUT /connectors/{name}/pause</code> - pause the connector and its tasks, which stops message processing until the connector is resumed. Any resources claimed by its tasks are left allocated, which allows the connector to begin processing data quickly once it is resumed.</li>
<li id="connect_stopconnector"><code>PUT /connectors/{name}/stop</code> - stop the connector and shut down its tasks, deallocating any resources claimed by its tasks. This is more efficient from a resource usage standpoint than pausing the connector, but can cause it to take longer to begin processing data once resumed. Note that the offsets for a connector can be only modified via the offsets management endpoints if it is in the stopped state</li> <li id="connect_stopconnector"><code>PUT /connectors/{name}/stop</code> - stop the connector and shut down its tasks, deallocating any resources claimed by its tasks. This is more efficient from a resource usage standpoint than pausing the connector, but can cause it to take longer to begin processing data once resumed. Note that the offsets for a connector can be only modified via the offsets management endpoints if it is in the stopped state</li>