KAFKA-17985: Set share.auto.offset.reset to earliest in ShareRoundTripWorker (#17758)

After the share.auto.offset.reset dynamic config was added for share groups in this commit - 9db5ed0, we needed to update this config value to "earliest" in ShareRoundTripWorker when it creates the consumer.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
ShivsundarR 2024-11-12 11:54:28 -05:00 committed by GitHub
parent c8f360c5f5
commit 6cf4081540
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 49 additions and 6 deletions

View File

@ -2469,11 +2469,16 @@ project(':trogdor') {
implementation libs.jettyServlet
implementation libs.jettyServlets
implementation project(':group-coordinator')
implementation project(':group-coordinator:group-coordinator-api')
testImplementation project(':clients')
testImplementation libs.junitJupiter
testImplementation project(':clients').sourceSets.test.output
testImplementation libs.mockitoCore
testImplementation project(':group-coordinator')
testRuntimeOnly runtimeTestLibs
}

View File

@ -368,6 +368,7 @@
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.test"/>
<allow pkg="org.apache.kafka.trogdor" />
<allow pkg="org.apache.kafka.coordinator" />
<allow pkg="org.eclipse.jetty" />
<allow pkg="org.glassfish.jersey" />
</subpackage>

View File

@ -333,7 +333,7 @@ public final class WorkerUtils {
return out;
}
private static Admin createAdminClient(
public static Admin createAdminClient(
String bootstrapServers,
Map<String, String> commonClientConf, Map<String, String> adminClientConf) {
Properties props = new Properties();

View File

@ -17,22 +17,35 @@
package org.apache.kafka.trogdor.workload;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsOptions;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.trogdor.common.WorkerUtils;
import java.time.Duration;
import java.util.HashSet;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class ShareRoundTripWorker extends RoundTripWorkerBase {
private static final Logger log = LoggerFactory.getLogger(ShareRoundTripWorker.class);
KafkaShareConsumer<byte[], byte[]> consumer;
ShareRoundTripWorker(String id, RoundTripWorkloadSpec spec) {
this.id = id;
this.spec = spec;
@ -49,7 +62,16 @@ public class ShareRoundTripWorker extends RoundTripWorkerBase {
// user may over-write the defaults with common client config and consumer config
WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.consumerConf());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "round-trip-share-group-" + id);
String groupId = "round-trip-share-group-" + id;
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
try (Admin adminClient = WorkerUtils.createAdminClient(spec.bootstrapServers(), spec.commonClientConf(), spec.adminClientConf())) {
alterShareAutoOffsetReset(groupId, "earliest", adminClient);
} catch (Exception e) {
log.warn("Failed to set share.auto.offset.reset config to 'earliest' mode", e);
throw e;
}
consumer = new KafkaShareConsumer<>(props, new ByteArrayDeserializer(),
new ByteArrayDeserializer());
consumer.subscribe(spec.activeTopics().materialize().keySet());
@ -65,4 +87,19 @@ public class ShareRoundTripWorker extends RoundTripWorkerBase {
Utils.closeQuietly(consumer, "consumer");
consumer = null;
}
private void alterShareAutoOffsetReset(String groupId, String newValue, Admin adminClient) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId);
Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new HashMap<>();
alterEntries.put(configResource, List.of(new AlterConfigOp(new ConfigEntry(
GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, newValue), AlterConfigOp.OpType.SET)));
AlterConfigsOptions alterOptions = new AlterConfigsOptions();
try {
adminClient.incrementalAlterConfigs(alterEntries, alterOptions)
.all()
.get(60, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException("Exception was thrown while attempting to set share.auto.offset.reset config: ", e);
}
}
}