KAFKA-9747: Creating connect reconfiguration URL safely (#11174)

* URL wasn't urlencoded when forwarded reconfiguration to leader connect worker
* handling previously swallowed errors in connect RestClient

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Viktor Somogyi-Vass <viktorsomogyi@gmail.com>

Co-authored-by: Andras Katona  <akatona@cloudera.com>
Co-authored-by: Daniel Urban <durban@cloudera.com>
This commit is contained in:
Andras Katona 2021-09-02 10:09:55 +02:00 committed by GitHub
parent 1a33b65e0f
commit 0093b19e2e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 10 additions and 9 deletions

View File

@ -52,7 +52,6 @@ import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
@ -69,6 +68,7 @@ import org.slf4j.Logger;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -1597,7 +1597,12 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
"because the URL of the leader's REST interface is empty!"), null);
return;
}
String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + connName + "/tasks");
String reconfigUrl = UriBuilder.fromUri(leaderUrl)
.path("connectors")
.path(connName)
.path("tasks")
.build()
.toString();
log.trace("Forwarding task configurations for connector {} to leader", connName);
RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm);
cb.onCompletion(null, null);

View File

@ -142,6 +142,9 @@ public class RestClient {
} catch (IOException | InterruptedException | TimeoutException | ExecutionException e) {
log.error("IO error forwarding REST request: ", e);
throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO Error trying to forward REST request: " + e.getMessage(), e);
} catch (Throwable t) {
log.error("Error forwarding REST request", t);
throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, "Error trying to forward REST request: " + t.getMessage(), t);
} finally {
try {
client.stop();

View File

@ -455,13 +455,6 @@ public class RestServer {
}
public static String urlJoin(String base, String path) {
if (base.endsWith("/") && path.startsWith("/"))
return base + path.substring(1);
else
return base + path;
}
/**
* Register header filter to ServletContextHandler.
* @param context The serverlet context handler