KAFKA-18811: Added command configs to admin client as well in VerifiableShareConsumer (#19130)

This PR includes a new flag in VerifiableShareConsumer.java called
command.config to include a properties file for admin client
related configs

Co-authored-by: Andrew Schofield <aschofield@confluent.io>

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Chirag Wadhwa 2025-03-06 18:57:15 +05:30 committed by GitHub
parent 6ad9ca518c
commit 899cdb598a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 22 additions and 14 deletions

View File

@ -84,13 +84,13 @@ public class VerifiableShareConsumer implements Closeable, AcknowledgementCommit
private final ObjectMapper mapper = new ObjectMapper();
private final PrintStream out;
private final KafkaShareConsumer<String, String> consumer;
private final Admin adminClient;
private final String topic;
private final AcknowledgementMode acknowledgementMode;
private final String offsetResetStrategy;
private final Boolean verbose;
private final int maxMessages;
private Integer totalAcknowledged = 0;
private final String brokerHostandPort;
private final String groupId;
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
@ -317,22 +317,22 @@ public class VerifiableShareConsumer implements Closeable, AcknowledgementCommit
}
public VerifiableShareConsumer(KafkaShareConsumer<String, String> consumer,
Admin adminClient,
PrintStream out,
Integer maxMessages,
String topic,
AcknowledgementMode acknowledgementMode,
String offsetResetStrategy,
String brokerHostandPort,
String groupId,
Boolean verbose) {
this.out = out;
this.consumer = consumer;
this.adminClient = adminClient;
this.topic = topic;
this.acknowledgementMode = acknowledgementMode;
this.offsetResetStrategy = offsetResetStrategy;
this.verbose = verbose;
this.maxMessages = maxMessages;
this.brokerHostandPort = brokerHostandPort;
this.groupId = groupId;
addKafkaSerializerModule();
}
@ -407,11 +407,6 @@ public class VerifiableShareConsumer implements Closeable, AcknowledgementCommit
ShareGroupAutoOffsetResetStrategy offsetResetStrategy =
ShareGroupAutoOffsetResetStrategy.fromString(this.offsetResetStrategy);
Properties adminClientProps = new Properties();
adminClientProps.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerHostandPort);
Admin adminClient = Admin.create(adminClientProps);
ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId);
Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new HashMap<>();
alterEntries.put(configResource, List.of(new AlterConfigOp(new ConfigEntry(
@ -555,12 +550,13 @@ public class VerifiableShareConsumer implements Closeable, AcknowledgementCommit
.dest("offsetResetStrategy")
.help("Set share group reset strategy (must be either 'earliest' or 'latest')");
parser.addArgument("--consumer.config")
parser.addArgument("--command-config")
.action(store())
.required(false)
.type(String.class)
.dest("commandConfig")
.metavar("CONFIG_FILE")
.help("Consumer config properties file (config options shared with command line parameters will be overridden).");
.help("Config properties file (config options shared with command line parameters will be overridden).");
return parser;
}
@ -571,8 +567,8 @@ public class VerifiableShareConsumer implements Closeable, AcknowledgementCommit
AcknowledgementMode acknowledgementMode =
AcknowledgementMode.valueOf(res.getString("acknowledgementMode").toUpperCase(Locale.ROOT));
String offsetResetStrategy = res.getString("offsetResetStrategy").toLowerCase(Locale.ROOT);
String configFile = res.getString("consumer.config");
String brokerHostandPort = res.getString("bootstrapServer");
String configFile = res.getString("commandConfig");
String brokerHostAndPort = res.getString("bootstrapServer");
Properties consumerProps = new Properties();
if (configFile != null) {
@ -587,7 +583,7 @@ public class VerifiableShareConsumer implements Closeable, AcknowledgementCommit
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostandPort);
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostAndPort);
String topic = res.getString("topic");
int maxMessages = res.getInt("maxMessages");
@ -596,14 +592,26 @@ public class VerifiableShareConsumer implements Closeable, AcknowledgementCommit
StringDeserializer deserializer = new StringDeserializer();
KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(consumerProps, deserializer, deserializer);
Properties adminClientProps = new Properties();
if (configFile != null) {
try {
adminClientProps.putAll(Utils.loadProps(configFile));
} catch (IOException e) {
throw new ArgumentParserException(e.getMessage(), parser);
}
}
adminClientProps.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerHostAndPort);
Admin adminClient = Admin.create(adminClientProps);
return new VerifiableShareConsumer(
consumer,
adminClient,
System.out,
maxMessages,
topic,
acknowledgementMode,
offsetResetStrategy,
brokerHostandPort,
groupId,
verbose);
}