KAFKA-15045: (KIP-924 pt. 20) Custom task assignment configuration fix (#16245)

The StreamsConfig class was not parsing the new task assignment
configuration flag properly, which made it impossible to properly
configure a custom task assignor.

This PR fixes this and adds a bit of INFO logging to help users diagnose
assignor misconfiguration issues.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This commit is contained in:
Antoine Pourchet 2024-06-08 20:55:13 -06:00 committed by A. Sophie Blee-Goldman
parent 81196bc6e2
commit b0333f2ad5
2 changed files with 2 additions and 0 deletions

View File

@ -1649,6 +1649,7 @@ public class StreamsConfig extends AbstractConfig {
consumerProps.put(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, getString(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG));
consumerProps.put(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, getList(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG));
consumerProps.put(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, getInt(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG));
consumerProps.put(TASK_ASSIGNOR_CLASS_CONFIG, getString(TASK_ASSIGNOR_CLASS_CONFIG));
// disable auto topic creation
consumerProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");

View File

@ -256,6 +256,7 @@ public final class AssignorConfiguration {
public Optional<org.apache.kafka.streams.processor.assignment.TaskAssignor> customTaskAssignor() {
final String userTaskAssignorClassname = streamsConfig.getString(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG);
if (userTaskAssignorClassname == null) {
log.info("No custom task assignors found, defaulting to internal task assignment with {}", INTERNAL_TASK_ASSIGNOR_CLASS);
return Optional.empty();
}
try {