mirror of https://github.com/apache/kafka.git
lower -> uperrcase constants
This commit is contained in:
parent
e67f55423b
commit
66d6f4f2dc
|
@ -56,13 +56,13 @@ class ConsoleConsumerTest(Test):
|
|||
self.logger.info("consumer started in %s seconds " % str(time.time() - t0))
|
||||
|
||||
# Verify that log output is happening
|
||||
if not wait_until(lambda: file_exists(node, ConsoleConsumer.log_file), timeout_sec=10):
|
||||
if not wait_until(lambda: file_exists(node, ConsoleConsumer.LOG_FILE), timeout_sec=10):
|
||||
raise Exception("Timed out waiting for log file to exist")
|
||||
consumer_log_lines = [line for line in node.account.ssh_capture("cat %s" % ConsoleConsumer.log_file)]
|
||||
consumer_log_lines = [line for line in node.account.ssh_capture("cat %s" % ConsoleConsumer.LOG_FILE)]
|
||||
assert len(consumer_log_lines) > 0
|
||||
|
||||
# Verify no consumed messages
|
||||
consumed = [line for line in node.account.ssh_capture("cat %s" % ConsoleConsumer.stdout_capture)]
|
||||
consumed = [line for line in node.account.ssh_capture("cat %s" % ConsoleConsumer.STDOUT_CAPTURE)]
|
||||
assert len(consumed) == 0
|
||||
|
||||
self.consumer.stop_node(node)
|
||||
|
|
|
@ -17,6 +17,7 @@ from ducktape.services.background_thread import BackgroundThreadService
|
|||
|
||||
import os
|
||||
|
||||
|
||||
def is_int(msg):
|
||||
"""Default method used to check whether text pulled from console consumer is a message.
|
||||
|
||||
|
@ -27,7 +28,6 @@ def is_int(msg):
|
|||
except:
|
||||
return None
|
||||
|
||||
|
||||
"""
|
||||
0.8.2.1 ConsoleConsumer options
|
||||
|
||||
|
@ -71,23 +71,23 @@ Option Description
|
|||
|
||||
class ConsoleConsumer(BackgroundThreadService):
|
||||
# Root directory for persistent output
|
||||
persistent_root = "/mnt/console_consumer"
|
||||
stdout_capture = os.path.join(persistent_root, "console_consumer.stdout")
|
||||
stderr_capture = os.path.join(persistent_root, "console_consumer.stderr")
|
||||
log_dir = os.path.join(persistent_root, "logs")
|
||||
log_file = os.path.join(log_dir, "console_consumer.log")
|
||||
log4j_config = os.path.join(persistent_root, "tools-log4j.properties")
|
||||
config_file = os.path.join(persistent_root, "console_consumer.properties")
|
||||
PERSISTENT_ROOT = "/mnt/console_consumer"
|
||||
STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "console_consumer.stdout")
|
||||
STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "console_consumer.stderr")
|
||||
LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs")
|
||||
LOG_FILE = os.path.join(LOG_DIR, "console_consumer.log")
|
||||
LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
|
||||
CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "console_consumer.properties")
|
||||
|
||||
logs = {
|
||||
"consumer_stdout": {
|
||||
"path": stdout_capture,
|
||||
"path": STDOUT_CAPTURE,
|
||||
"collect_default": False},
|
||||
"consumer_stderr": {
|
||||
"path": stderr_capture,
|
||||
"path": STDERR_CAPTURE,
|
||||
"collect_default": False},
|
||||
"consumer_log": {
|
||||
"path": log_file,
|
||||
"path": LOG_FILE,
|
||||
"collect_default": True}
|
||||
}
|
||||
|
||||
|
@ -121,12 +121,12 @@ class ConsoleConsumer(BackgroundThreadService):
|
|||
def start_cmd(self):
|
||||
args = self.args.copy()
|
||||
args['zk_connect'] = self.kafka.zk.connect_setting()
|
||||
args['stdout'] = ConsoleConsumer.stdout_capture
|
||||
args['stderr'] = ConsoleConsumer.stderr_capture
|
||||
args['config_file'] = ConsoleConsumer.config_file
|
||||
args['stdout'] = ConsoleConsumer.STDOUT_CAPTURE
|
||||
args['stderr'] = ConsoleConsumer.STDERR_CAPTURE
|
||||
args['config_file'] = ConsoleConsumer.CONFIG_FILE
|
||||
|
||||
cmd = "export LOG_DIR=%s;" % ConsoleConsumer.log_dir
|
||||
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % ConsoleConsumer.log4j_config
|
||||
cmd = "export LOG_DIR=%s;" % ConsoleConsumer.LOG_DIR
|
||||
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % ConsoleConsumer.LOG4J_CONFIG
|
||||
cmd += " /opt/kafka/bin/kafka-console-consumer.sh --topic %(topic)s --zookeeper %(zk_connect)s" \
|
||||
" --consumer.config %(config_file)s" % args
|
||||
|
||||
|
@ -148,7 +148,7 @@ class ConsoleConsumer(BackgroundThreadService):
|
|||
return len(self.pids(node)) > 0
|
||||
|
||||
def _worker(self, idx, node):
|
||||
node.account.ssh("mkdir -p %s" % ConsoleConsumer.persistent_root, allow_fail=False)
|
||||
node.account.ssh("mkdir -p %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False)
|
||||
|
||||
# Create and upload config file
|
||||
if self.consumer_timeout_ms is not None:
|
||||
|
@ -158,11 +158,11 @@ class ConsoleConsumer(BackgroundThreadService):
|
|||
|
||||
self.logger.info("console_consumer.properties:")
|
||||
self.logger.info(prop_file)
|
||||
node.account.create_file(ConsoleConsumer.config_file, prop_file)
|
||||
node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file)
|
||||
|
||||
# Create and upload log properties
|
||||
log_config = self.render('console_consumer_log4j.properties', log_file=ConsoleConsumer.log_file)
|
||||
node.account.create_file(ConsoleConsumer.log4j_config, log_config)
|
||||
log_config = self.render('console_consumer_log4j.properties', log_file=ConsoleConsumer.LOG_FILE)
|
||||
node.account.create_file(ConsoleConsumer.LOG4J_CONFIG, log_config)
|
||||
|
||||
# Run and capture output
|
||||
cmd = self.start_cmd
|
||||
|
@ -181,5 +181,5 @@ class ConsoleConsumer(BackgroundThreadService):
|
|||
node.account.kill_process("java", allow_fail=True)
|
||||
|
||||
def clean_node(self, node):
|
||||
node.account.ssh("rm -rf %s" % ConsoleConsumer.persistent_root, allow_fail=False)
|
||||
node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False)
|
||||
|
||||
|
|
Loading…
Reference in New Issue