mirror of https://github.com/apache/kafka.git
KAFKA-17235 system test test_performance_service.py failed (#16789)
related to https://issues.apache.org/jira/browse/KAFKA-17235 The root cause of this issue is a change we introduced in KAFKA-16879, where we modified the PushHttpMetricsReporter constructor to use Time.System [1]. However, Time.System doesn't exist in Kafka versions 0.8.2 and 0.9. In test_performance_services.py, we have system tests for Kafka versions 0.8.2 and 0.9 [2]. These tests always use the tools JAR from the trunk branch, regardless of the Kafka version being tested [3], while the client JAR aligns with the Kafka version specified in the test suite [4]. This discrepancy is what causes the issue to arise. To resolve this issue, we have a few options: 1) Add Time.System to Kafka 0.8.2 and 0.9: This isn't practical, as we no longer maintain these versions. 2) Modify the PushHttpMetricsReporter constructor to use new SystemTime() instead of Time.System: This would contradict the intent of KAFKA-16879, which aims to make SystemTime a singleton. 3) Implement Time in PushHttpMetricsReporter use the time to get current time 4) Remove system tests for Kafka 0.8.2 and 0.9 from test_performance_services.py Given that we no longer maintain Kafka 0.8.2 and 0.9, and altering the constructor goes against the design goals of KAFKA-16879, option 4 appears to be the most feasible solution. However, I'm not sure whether it's acceptable to remove these old version tests. Maybe someone else has a better solution "We'll proceed with option 3 since support for versions 0.8 and 0.9 is still required, meaning we can't remove those Kafka versions from the system tests." Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
81edb74c5e
commit
4537c8af5b
|
@ -50,6 +50,7 @@ import java.util.Scanner;
|
|||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* MetricsReporter that aggregates metrics data and reports it via HTTP requests to a configurable
|
||||
|
@ -72,7 +73,7 @@ public class PushHttpMetricsReporter implements MetricsReporter {
|
|||
}
|
||||
|
||||
private final Object lock = new Object();
|
||||
private final Time time;
|
||||
private final Supplier<Long> currentTimeMillis;
|
||||
private final ScheduledExecutorService executor;
|
||||
// The set of metrics are updated in init/metricChange/metricRemoval
|
||||
private final Map<MetricName, KafkaMetric> metrics = new LinkedHashMap<>();
|
||||
|
@ -96,12 +97,17 @@ public class PushHttpMetricsReporter implements MetricsReporter {
|
|||
"producer/consumer/streams/connect instance");
|
||||
|
||||
public PushHttpMetricsReporter() {
|
||||
time = Time.SYSTEM;
|
||||
// In test_performance_services.py, we have system tests for Kafka versions 0.8.2 and 0.9.
|
||||
// These tests always use the tools jar from the trunk branch, regardless of the Kafka version being tested,
|
||||
// while the client jar aligns with the Kafka version specified in the test suite. To ensure these system test
|
||||
// passed, we need to make this class compatible with older client jars. This discrepancy force us not to use
|
||||
// `Time.SYSTEM` here as there is no such field in the older Kafka version.
|
||||
currentTimeMillis = System::currentTimeMillis;
|
||||
executor = Executors.newSingleThreadScheduledExecutor();
|
||||
}
|
||||
|
||||
PushHttpMetricsReporter(Time mockTime, ScheduledExecutorService mockExecutor) {
|
||||
time = mockTime;
|
||||
currentTimeMillis = mockTime::milliseconds;
|
||||
executor = mockExecutor;
|
||||
}
|
||||
|
||||
|
@ -169,7 +175,7 @@ public class PushHttpMetricsReporter implements MetricsReporter {
|
|||
private class HttpReporter implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
long now = time.milliseconds();
|
||||
long now = currentTimeMillis.get();
|
||||
final List<MetricValue> samples;
|
||||
synchronized (lock) {
|
||||
samples = new ArrayList<>(metrics.size());
|
||||
|
|
Loading…
Reference in New Issue