mirror of https://github.com/apache/kafka.git
Back-port KAFKA-16230 to 3.7 branch (#16951)
Reviewers: Andrew Schofield <aschofield@confluent.io>, Lianet Magrans <lianetmr@gmail.com>
This commit is contained in:
parent
57b6c2ef98
commit
c7c3e609c0
|
@ -21,7 +21,7 @@ from ducktape.services.background_thread import BackgroundThreadService
|
||||||
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
|
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
|
||||||
from kafkatest.services.kafka import TopicPartition
|
from kafkatest.services.kafka import TopicPartition
|
||||||
from kafkatest.services.verifiable_client import VerifiableClientMixin
|
from kafkatest.services.verifiable_client import VerifiableClientMixin
|
||||||
from kafkatest.version import DEV_BRANCH, V_2_3_0, V_2_3_1, V_0_10_0_0
|
from kafkatest.version import DEV_BRANCH, V_2_3_0, V_2_3_1, V_3_7_0, V_0_10_0_0
|
||||||
|
|
||||||
|
|
||||||
class ConsumerState:
|
class ConsumerState:
|
||||||
|
@ -167,7 +167,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
|
||||||
|
|
||||||
def __init__(self, context, num_nodes, kafka, topic, group_id,
|
def __init__(self, context, num_nodes, kafka, topic, group_id,
|
||||||
static_membership=False, max_messages=-1, session_timeout_sec=30, enable_autocommit=False,
|
static_membership=False, max_messages=-1, session_timeout_sec=30, enable_autocommit=False,
|
||||||
assignment_strategy=None,
|
assignment_strategy=None, group_protocol=None, group_remote_assignor=None,
|
||||||
version=DEV_BRANCH, stop_timeout_sec=30, log_level="INFO", jaas_override_variables=None,
|
version=DEV_BRANCH, stop_timeout_sec=30, log_level="INFO", jaas_override_variables=None,
|
||||||
on_record_consumed=None, reset_policy="earliest", verify_offsets=True):
|
on_record_consumed=None, reset_policy="earliest", verify_offsets=True):
|
||||||
"""
|
"""
|
||||||
|
@ -184,6 +184,8 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
|
||||||
self.session_timeout_sec = session_timeout_sec
|
self.session_timeout_sec = session_timeout_sec
|
||||||
self.enable_autocommit = enable_autocommit
|
self.enable_autocommit = enable_autocommit
|
||||||
self.assignment_strategy = assignment_strategy
|
self.assignment_strategy = assignment_strategy
|
||||||
|
self.group_protocol = group_protocol
|
||||||
|
self.group_remote_assignor = group_remote_assignor
|
||||||
self.prop_file = ""
|
self.prop_file = ""
|
||||||
self.stop_timeout_sec = stop_timeout_sec
|
self.stop_timeout_sec = stop_timeout_sec
|
||||||
self.on_record_consumed = on_record_consumed
|
self.on_record_consumed = on_record_consumed
|
||||||
|
@ -306,8 +308,20 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
|
||||||
# if `None` is passed as the argument value
|
# if `None` is passed as the argument value
|
||||||
cmd += " --group-instance-id None"
|
cmd += " --group-instance-id None"
|
||||||
|
|
||||||
if self.assignment_strategy:
|
# 3.7.0 includes support for KIP-848 which introduced a new implementation of the consumer group protocol.
|
||||||
cmd += " --assignment-strategy %s" % self.assignment_strategy
|
# The two implementations use slightly different configuration, hence these arguments are conditional.
|
||||||
|
#
|
||||||
|
# See the Java class/method VerifiableConsumer.createFromArgs() for how the command line arguments are
|
||||||
|
# parsed and used as configuration in the runner.
|
||||||
|
if node.version >= V_3_7_0 and self.is_consumer_group_protocol_enabled():
|
||||||
|
cmd += " --group-protocol %s" % self.group_protocol
|
||||||
|
|
||||||
|
if self.group_remote_assignor:
|
||||||
|
cmd += " --group-remote-assignor %s" % self.group_remote_assignor
|
||||||
|
else:
|
||||||
|
# Either we're an older consumer version or we're using the old consumer group protocol.
|
||||||
|
if self.assignment_strategy:
|
||||||
|
cmd += " --assignment-strategy %s" % self.assignment_strategy
|
||||||
|
|
||||||
if self.enable_autocommit:
|
if self.enable_autocommit:
|
||||||
cmd += " --enable-autocommit "
|
cmd += " --enable-autocommit "
|
||||||
|
@ -416,3 +430,6 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
|
||||||
with self.lock:
|
with self.lock:
|
||||||
return [handler.node for handler in self.event_handlers.values()
|
return [handler.node for handler in self.event_handlers.values()
|
||||||
if handler.state != ConsumerState.Dead]
|
if handler.state != ConsumerState.Dead]
|
||||||
|
|
||||||
|
def is_consumer_group_protocol_enabled(self):
|
||||||
|
return self.group_protocol and self.group_protocol.upper() == "CONSUMER"
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
|
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||||
|
import org.apache.kafka.clients.consumer.GroupProtocol;
|
||||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||||
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
|
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
|
||||||
|
@ -529,6 +530,24 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
|
||||||
.metavar("TOPIC")
|
.metavar("TOPIC")
|
||||||
.help("Consumes messages from this topic.");
|
.help("Consumes messages from this topic.");
|
||||||
|
|
||||||
|
parser.addArgument("--group-protocol")
|
||||||
|
.action(store())
|
||||||
|
.required(false)
|
||||||
|
.type(String.class)
|
||||||
|
.setDefault(ConsumerConfig.DEFAULT_GROUP_PROTOCOL)
|
||||||
|
.metavar("GROUP_PROTOCOL")
|
||||||
|
.dest("groupProtocol")
|
||||||
|
.help(String.format("Group protocol (must be one of %s)", Utils.join(GroupProtocol.values(), ", ")));
|
||||||
|
|
||||||
|
parser.addArgument("--group-remote-assignor")
|
||||||
|
.action(store())
|
||||||
|
.required(false)
|
||||||
|
.type(String.class)
|
||||||
|
.setDefault(ConsumerConfig.DEFAULT_GROUP_REMOTE_ASSIGNOR)
|
||||||
|
.metavar("GROUP_REMOTE_ASSIGNOR")
|
||||||
|
.dest("groupRemoteAssignor")
|
||||||
|
.help(String.format("Group remote assignor; only used if the group protocol is %s", GroupProtocol.CONSUMER.name()));
|
||||||
|
|
||||||
parser.addArgument("--group-id")
|
parser.addArgument("--group-id")
|
||||||
.action(store())
|
.action(store())
|
||||||
.required(true)
|
.required(true)
|
||||||
|
@ -590,7 +609,7 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
|
||||||
.setDefault(RangeAssignor.class.getName())
|
.setDefault(RangeAssignor.class.getName())
|
||||||
.type(String.class)
|
.type(String.class)
|
||||||
.dest("assignmentStrategy")
|
.dest("assignmentStrategy")
|
||||||
.help("Set assignment strategy (e.g. " + RoundRobinAssignor.class.getName() + ")");
|
.help(String.format("Set assignment strategy (e.g. %s); only used if the group protocol is %s", RoundRobinAssignor.class.getName(), GroupProtocol.CLASSIC.name()));
|
||||||
|
|
||||||
parser.addArgument("--consumer.config")
|
parser.addArgument("--consumer.config")
|
||||||
.action(store())
|
.action(store())
|
||||||
|
@ -618,6 +637,21 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
String groupProtocol = res.getString("groupProtocol");
|
||||||
|
|
||||||
|
// 3.7.0 includes support for KIP-848 which introduced a new implementation of the consumer group protocol.
|
||||||
|
// The two implementations use slightly different configuration, hence these arguments are conditional.
|
||||||
|
//
|
||||||
|
// See the Python class/method VerifiableConsumer.start_cmd() in verifiable_consumer.py for how the
|
||||||
|
// command line arguments are passed in by the system test framework.
|
||||||
|
if (groupProtocol.equalsIgnoreCase(GroupProtocol.CONSUMER.name())) {
|
||||||
|
consumerProps.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
|
||||||
|
consumerProps.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, res.getString("groupRemoteAssignor"));
|
||||||
|
} else {
|
||||||
|
// This means we're using the old consumer group protocol.
|
||||||
|
consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, res.getString("assignmentStrategy"));
|
||||||
|
}
|
||||||
|
|
||||||
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, res.getString("groupId"));
|
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, res.getString("groupId"));
|
||||||
|
|
||||||
String groupInstanceId = res.getString("groupInstanceId");
|
String groupInstanceId = res.getString("groupInstanceId");
|
||||||
|
@ -640,7 +674,6 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
|
||||||
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, useAutoCommit);
|
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, useAutoCommit);
|
||||||
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, res.getString("resetPolicy"));
|
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, res.getString("resetPolicy"));
|
||||||
consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(res.getInt("sessionTimeout")));
|
consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(res.getInt("sessionTimeout")));
|
||||||
consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, res.getString("assignmentStrategy"));
|
|
||||||
|
|
||||||
StringDeserializer deserializer = new StringDeserializer();
|
StringDeserializer deserializer = new StringDeserializer();
|
||||||
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps, deserializer, deserializer);
|
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps, deserializer, deserializer);
|
||||||
|
|
Loading…
Reference in New Issue