KAFKA-9832: Extend Streams system tests for EOS-beta (#8443)

Reviewers: Boyang Chen <boyang@confluent.io>, Guozhang Wang <guozhang@confluent.io>
This commit is contained in:
Matthias J. Sax 2020-04-10 11:55:01 -07:00 committed by GitHub
parent ea1e634664
commit 20e4a74c35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 60 additions and 56 deletions

View File

@ -62,7 +62,6 @@ class ActiveTaskCreator {
private final StreamsProducer threadProducer;
private final Map<TaskId, StreamsProducer> taskProducers;
private final StreamThread.ProcessingMode processingMode;
private final String transactionalId;
ActiveTaskCreator(final InternalTopologyBuilder builder,
final StreamsConfig config,
@ -87,8 +86,6 @@ class ActiveTaskCreator {
this.log = log;
createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics);
final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
transactionalId = applicationId + "-" + processId + "-StreamThread-" + threadId.split("-StreamThread-")[1];
processingMode = StreamThread.processingMode(config);
if (processingMode == EXACTLY_ONCE_ALPHA) {

View File

@ -113,7 +113,9 @@ public class StreamsProducer {
final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
producerConfigs.put(
ProducerConfig.TRANSACTIONAL_ID_CONFIG,
applicationId + "-" + Objects.requireNonNull(processId, "processId cannot be null for exactly-once beta"));
applicationId + "-" +
Objects.requireNonNull(processId, "processId cannot be null for exactly-once beta") +
"-" + threadId.split("-StreamThread-")[1]);
eosBetaProducerConfigs = producerConfigs;

View File

@ -142,7 +142,7 @@ public class StreamsProducerTest {
nonEosStreamsProducer =
new StreamsProducer(
nonEosConfig,
"threadId",
"threadId-StreamThread-0",
mockClientSupplier,
null,
null,
@ -155,7 +155,7 @@ public class StreamsProducerTest {
eosAlphaStreamsProducer =
new StreamsProducer(
eosAlphaConfig,
"threadId",
"threadId-StreamThread-0",
eosAlphaMockClientSupplier,
new TaskId(0, 0),
null,
@ -169,7 +169,7 @@ public class StreamsProducerTest {
eosBetaStreamsProducer =
new StreamsProducer(
eosBetaConfig,
"threadId",
"threadId-StreamThread-0",
eosBetaMockClientSupplier,
null,
UUID.randomUUID(),
@ -470,11 +470,11 @@ public class StreamsProducerTest {
final UUID processId = UUID.randomUUID();
final Map<String, Object> mockMap = mock(Map.class);
expect(mockMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "appId-" + processId)).andReturn(null);
expect(mockMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "appId-" + processId + "-0")).andReturn(null);
expect(mockMap.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)).andReturn("appId-" + processId);
final StreamsConfig mockConfig = mock(StreamsConfig.class);
expect(mockConfig.getProducerConfigs("threadId-producer")).andReturn(mockMap);
expect(mockConfig.getProducerConfigs("threadId-StreamThread-0-producer")).andReturn(mockMap);
expect(mockConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).andReturn("appId");
expect(mockConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)).andReturn(StreamsConfig.EXACTLY_ONCE_BETA).anyTimes();
@ -482,7 +482,7 @@ public class StreamsProducerTest {
new StreamsProducer(
mockConfig,
"threadId",
"threadId-StreamThread-0",
eosAlphaMockClientSupplier,
null,
processId,
@ -605,7 +605,7 @@ public class StreamsProducerTest {
final StreamsProducer streamsProducer = new StreamsProducer(
eosBetaConfig,
"threadId",
"threadId-StreamThread-0",
clientSupplier,
null,
UUID.randomUUID(),
@ -737,7 +737,7 @@ public class StreamsProducerTest {
final StreamsProducer streamsProducer =
new StreamsProducer(
eosBetaConfig,
"threadId",
"threadId-StreamThread-0",
eosBetaMockClientSupplier,
null,
UUID.randomUUID(),
@ -1076,7 +1076,7 @@ public class StreamsProducerTest {
public void shouldResetTransactionInitializedOnResetProducer() {
final StreamsProducer streamsProducer = new StreamsProducer(
eosBetaConfig,
"threadId",
"threadId-StreamThread-0",
clientSupplier,
null,
UUID.randomUUID(),

View File

@ -637,13 +637,14 @@ public class RelationalSmokeTest extends SmokeTestUtil {
public static Properties getConfig(final String broker,
final String application,
final String id,
final String processingGuarantee,
final String stateDir) {
return mkProperties(
mkMap(
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, broker),
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, application),
mkEntry(StreamsConfig.CLIENT_ID_CONFIG, id),
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE),
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee),
mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1000"),
mkEntry(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, stateDir)
@ -654,9 +655,10 @@ public class RelationalSmokeTest extends SmokeTestUtil {
public static KafkaStreams startSync(final String broker,
final String application,
final String id,
final String processingGuarantee,
final String stateDir) throws InterruptedException {
final KafkaStreams kafkaStreams =
new KafkaStreams(getTopology(), getConfig(broker, application, id, stateDir));
new KafkaStreams(getTopology(), getConfig(broker, application, id, processingGuarantee, stateDir));
final CountDownLatch startUpLatch = new CountDownLatch(1);
kafkaStreams.setStateListener((newState, oldState) -> {
if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
@ -983,8 +985,9 @@ public class RelationalSmokeTest extends SmokeTestUtil {
}
case "application": {
final String nodeId = args[2];
final String stateDir = args[3];
App.startSync(kafka, UUID.randomUUID().toString(), nodeId, stateDir);
final String processingGuarantee = args[3];
final String stateDir = args[4];
App.startSync(kafka, UUID.randomUUID().toString(), nodeId, processingGuarantee, stateDir);
break;
}
default:

View File

@ -18,6 +18,7 @@ package org.apache.kafka.streams.tests;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
@ -31,6 +32,7 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
public class RelationalSmokeTestTest extends SmokeTestUtil {
@Test
public void verifySmokeTestLogic() {
try (final TopologyTestDriver driver =
@ -39,6 +41,7 @@ public class RelationalSmokeTestTest extends SmokeTestUtil {
"nothing:0",
"test",
"test",
StreamsConfig.AT_LEAST_ONCE,
TestUtils.tempDirectory().getAbsolutePath()
))) {

View File

@ -49,12 +49,25 @@ public class StreamsSmokeTest {
final Properties streamsProperties = Utils.loadProps(propFileName);
final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
final String processingGuarantee = streamsProperties.getProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG);
if (kafka == null) {
System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
System.exit(1);
}
if ("process".equals(command)) {
if (!StreamsConfig.AT_LEAST_ONCE.equals(processingGuarantee) &&
!StreamsConfig.EXACTLY_ONCE.equals(processingGuarantee) &&
!StreamsConfig.EXACTLY_ONCE_BETA.equals(processingGuarantee)) {
System.err.println("processingGuarantee must be either " + StreamsConfig.AT_LEAST_ONCE + ", " +
StreamsConfig.EXACTLY_ONCE + ", or " + StreamsConfig.EXACTLY_ONCE_BETA);
System.exit(1);
}
}
System.out.println("StreamsTest instance started (StreamsSmokeTest)");
System.out.println("command=" + command);
System.out.println("props=" + streamsProperties);
@ -79,11 +92,6 @@ public class StreamsSmokeTest {
// this starts the stream processing app
new SmokeTestClient(UUID.randomUUID().toString()).start(streamsProperties);
break;
case "process-eos":
// this starts the stream processing app with EOS
streamsProperties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
new SmokeTestClient(UUID.randomUUID().toString()).start(streamsProperties);
break;
case "close-deadlock-test":
final ShutdownDeadlockTest test = new ShutdownDeadlockTest(kafka);
test.start();

View File

@ -302,16 +302,18 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
class StreamsSmokeTestBaseService(StreamsTestBaseService):
"""Base class for Streams Smoke Test services providing some common settings and functionality"""
def __init__(self, test_context, kafka, command, num_threads = 3):
def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', num_threads = 3):
super(StreamsSmokeTestBaseService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.tests.StreamsSmokeTest",
command)
self.NUM_THREADS = num_threads
self.PROCESSING_GUARANTEE = processing_guarantee
def prop_file(self):
properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(),
streams_property.PROCESSING_GUARANTEE: self.PROCESSING_GUARANTEE,
streams_property.NUM_THREADS: self.NUM_THREADS}
cfg = KafkaConfig(**properties)
@ -359,13 +361,8 @@ class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
return cmd
class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
def __init__(self, test_context, kafka, num_threads = 3):
super(StreamsSmokeTestJobRunnerService, self).__init__(test_context, kafka, "process", num_threads)
class StreamsSmokeTestEOSJobRunnerService(StreamsSmokeTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsSmokeTestEOSJobRunnerService, self).__init__(test_context, kafka, "process-eos")
def __init__(self, test_context, kafka, processing_guarantee, num_threads = 3):
super(StreamsSmokeTestJobRunnerService, self).__init__(test_context, kafka, "process", processing_guarantee, num_threads)
class StreamsEosTestDriverService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka):

View File

@ -20,3 +20,4 @@ Define Streams configuration property names here.
STATE_DIR = "state.dir"
KAFKA_SERVERS = "bootstrap.servers"
NUM_THREADS = "num.stream.threads"
PROCESSING_GUARANTEE = "processing.guarantee"

View File

@ -22,7 +22,7 @@ from kafkatest.tests.kafka_test import KafkaTest
class StreamsRelationalSmokeTestService(StreamsTestBaseService):
def __init__(self, test_context, kafka, mode, nodeId):
def __init__(self, test_context, kafka, mode, nodeId, processing_guarantee):
super(StreamsRelationalSmokeTestService, self).__init__(
test_context,
kafka,
@ -31,18 +31,20 @@ class StreamsRelationalSmokeTestService(StreamsTestBaseService):
)
self.mode = mode
self.nodeId = nodeId
self.processing_guarantee = processing_guarantee
self.log4j_template = 'log4j_template.properties'
def start_cmd(self, node):
return "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
"INCLUDE_TEST_JARS=true %(kafka_run_class)s org.apache.kafka.streams.tests.RelationalSmokeTest " \
" %(mode)s %(kafka)s %(nodeId)s %(state_dir)s" \
" %(mode)s %(kafka)s %(nodeId)s %(processing_guarantee)s %(state_dir)s" \
" & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % {
"log4j": self.LOG4J_CONFIG_FILE,
"kafka_run_class": self.path.script("kafka-run-class.sh", node),
"mode": self.mode,
"nodeId": self.nodeId,
"kafka": self.kafka.bootstrap_servers(),
"nodeId": self.nodeId,
"processing_guarantee": self.processing_guarantee,
"state_dir": self.PERSISTENT_ROOT,
"stdout": self.STDOUT_FILE,
"stderr": self.STDERR_FILE,
@ -82,14 +84,15 @@ class StreamsRelationalSmokeTest(KafkaTest):
self.test_context = test_context
@cluster(num_nodes=8)
@matrix(crash=[False, True])
def test_streams(self, crash):
driver = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "driver", "ignored")
@matrix(crash=[False, True],
processing_guarantee=['exactly_once', 'exactly_once_beta'])
def test_streams(self, crash, processing_guarantee):
driver = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "driver", "ignored", "ignored")
LOG_FILE = driver.LOG_FILE # this is the same for all instaces of the service, so we can just declare a "constant"
processor1 = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "application", "processor1")
processor2 = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "application", "processor2")
processor1 = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "application", "processor1", processing_guarantee)
processor2 = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "application", "processor2", processing_guarantee)
processor1.start()
processor2.start()
@ -104,7 +107,7 @@ class StreamsRelationalSmokeTest(KafkaTest):
processor1.stop_nodes(not crash)
processor3 = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "application", "processor3")
processor3 = StreamsRelationalSmokeTestService(self.test_context, self.kafka, "application", "processor3", processing_guarantee)
processor3.start()
processor3.await_command("grep -q 'Streams has started' %s" % LOG_FILE)

View File

@ -13,13 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.mark import matrix
from ducktape.mark.resource import cluster
from kafkatest.tests.kafka_test import KafkaTest
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService, StreamsSmokeTestEOSJobRunnerService
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
class StreamsSmokeTest(KafkaTest):
"""
@ -48,19 +46,11 @@ class StreamsSmokeTest(KafkaTest):
self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
@cluster(num_nodes=8)
@matrix(eos=[True, False], crash=[True, False])
def test_streams(self, eos, crash):
#
if eos:
processor1 = StreamsSmokeTestEOSJobRunnerService(self.test_context, self.kafka)
processor2 = StreamsSmokeTestEOSJobRunnerService(self.test_context, self.kafka)
processor3 = StreamsSmokeTestEOSJobRunnerService(self.test_context, self.kafka)
else:
processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
processor2 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
processor3 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
@matrix(processing_guarantee=['at_least_once', 'exactly_once', 'exactly_once_beta'], crash=[True, False])
def test_streams(self, processing_guarantee, crash):
processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee)
processor2 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee)
processor3 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee)
with processor1.node.account.monitor_log(processor1.STDOUT_FILE) as monitor1:
processor1.start()
@ -114,7 +104,7 @@ class StreamsSmokeTest(KafkaTest):
processor3.stop()
if crash and not eos:
if crash and processing_guarantee == 'at_least_once':
self.driver.node.account.ssh("grep -E 'SUCCESS|PROCESSED-MORE-THAN-GENERATED' %s" % self.driver.STDOUT_FILE, allow_fail=False)
else:
self.driver.node.account.ssh("grep SUCCESS %s" % self.driver.STDOUT_FILE, allow_fail=False)