KAFKA-15888: DistributedHerder log context should not use the same client ID for each Connect worker by default (#14825)

Reviewers: Chris Egerton <chrise@aiven.io>
This commit is contained in:
Yash Mayya 2023-11-23 10:01:39 +00:00 committed by GitHub
parent 75572f904b
commit ebf435e718
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 5 additions and 7 deletions

View File

@ -51,14 +51,13 @@ import org.apache.kafka.connect.runtime.SourceConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.runtime.rest.entities.Message;
import org.apache.kafka.connect.storage.PrivilegedWriteException;
import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.apache.kafka.connect.runtime.rest.entities.Message;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
@ -69,6 +68,7 @@ import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.PrivilegedWriteException;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
@ -100,12 +100,11 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@ -147,7 +146,6 @@ import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativ
* </p>
*/
public class DistributedHerder extends AbstractHerder implements Runnable {
private static final AtomicInteger CONNECT_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
private final Logger log;
private static final long FORWARD_REQUEST_SHUTDOWN_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10);
@ -296,7 +294,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
this.uponShutdown = Arrays.asList(uponShutdown);
String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
String clientId = clientIdConfig.length() <= 0 ? "connect-" + CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig;
String clientId = clientIdConfig.isEmpty() ? "connect-" + workerId : clientIdConfig;
// Thread factory uses String.format and '%' is handled as a placeholder
// need to escape if the client.id contains an actual % character
String escapedClientIdForThreadNameFormat = clientId.replace("%", "%%");