mirror of https://github.com/apache/kafka.git
lower -> uperrcase constants
This commit is contained in:
parent
e67f55423b
commit
66d6f4f2dc
|
@ -56,13 +56,13 @@ class ConsoleConsumerTest(Test):
|
||||||
self.logger.info("consumer started in %s seconds " % str(time.time() - t0))
|
self.logger.info("consumer started in %s seconds " % str(time.time() - t0))
|
||||||
|
|
||||||
# Verify that log output is happening
|
# Verify that log output is happening
|
||||||
if not wait_until(lambda: file_exists(node, ConsoleConsumer.log_file), timeout_sec=10):
|
if not wait_until(lambda: file_exists(node, ConsoleConsumer.LOG_FILE), timeout_sec=10):
|
||||||
raise Exception("Timed out waiting for log file to exist")
|
raise Exception("Timed out waiting for log file to exist")
|
||||||
consumer_log_lines = [line for line in node.account.ssh_capture("cat %s" % ConsoleConsumer.log_file)]
|
consumer_log_lines = [line for line in node.account.ssh_capture("cat %s" % ConsoleConsumer.LOG_FILE)]
|
||||||
assert len(consumer_log_lines) > 0
|
assert len(consumer_log_lines) > 0
|
||||||
|
|
||||||
# Verify no consumed messages
|
# Verify no consumed messages
|
||||||
consumed = [line for line in node.account.ssh_capture("cat %s" % ConsoleConsumer.stdout_capture)]
|
consumed = [line for line in node.account.ssh_capture("cat %s" % ConsoleConsumer.STDOUT_CAPTURE)]
|
||||||
assert len(consumed) == 0
|
assert len(consumed) == 0
|
||||||
|
|
||||||
self.consumer.stop_node(node)
|
self.consumer.stop_node(node)
|
||||||
|
|
|
@ -17,6 +17,7 @@ from ducktape.services.background_thread import BackgroundThreadService
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
|
||||||
|
|
||||||
def is_int(msg):
|
def is_int(msg):
|
||||||
"""Default method used to check whether text pulled from console consumer is a message.
|
"""Default method used to check whether text pulled from console consumer is a message.
|
||||||
|
|
||||||
|
@ -27,7 +28,6 @@ def is_int(msg):
|
||||||
except:
|
except:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
0.8.2.1 ConsoleConsumer options
|
0.8.2.1 ConsoleConsumer options
|
||||||
|
|
||||||
|
@ -71,23 +71,23 @@ Option Description
|
||||||
|
|
||||||
class ConsoleConsumer(BackgroundThreadService):
|
class ConsoleConsumer(BackgroundThreadService):
|
||||||
# Root directory for persistent output
|
# Root directory for persistent output
|
||||||
persistent_root = "/mnt/console_consumer"
|
PERSISTENT_ROOT = "/mnt/console_consumer"
|
||||||
stdout_capture = os.path.join(persistent_root, "console_consumer.stdout")
|
STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "console_consumer.stdout")
|
||||||
stderr_capture = os.path.join(persistent_root, "console_consumer.stderr")
|
STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "console_consumer.stderr")
|
||||||
log_dir = os.path.join(persistent_root, "logs")
|
LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs")
|
||||||
log_file = os.path.join(log_dir, "console_consumer.log")
|
LOG_FILE = os.path.join(LOG_DIR, "console_consumer.log")
|
||||||
log4j_config = os.path.join(persistent_root, "tools-log4j.properties")
|
LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
|
||||||
config_file = os.path.join(persistent_root, "console_consumer.properties")
|
CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "console_consumer.properties")
|
||||||
|
|
||||||
logs = {
|
logs = {
|
||||||
"consumer_stdout": {
|
"consumer_stdout": {
|
||||||
"path": stdout_capture,
|
"path": STDOUT_CAPTURE,
|
||||||
"collect_default": False},
|
"collect_default": False},
|
||||||
"consumer_stderr": {
|
"consumer_stderr": {
|
||||||
"path": stderr_capture,
|
"path": STDERR_CAPTURE,
|
||||||
"collect_default": False},
|
"collect_default": False},
|
||||||
"consumer_log": {
|
"consumer_log": {
|
||||||
"path": log_file,
|
"path": LOG_FILE,
|
||||||
"collect_default": True}
|
"collect_default": True}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -121,12 +121,12 @@ class ConsoleConsumer(BackgroundThreadService):
|
||||||
def start_cmd(self):
|
def start_cmd(self):
|
||||||
args = self.args.copy()
|
args = self.args.copy()
|
||||||
args['zk_connect'] = self.kafka.zk.connect_setting()
|
args['zk_connect'] = self.kafka.zk.connect_setting()
|
||||||
args['stdout'] = ConsoleConsumer.stdout_capture
|
args['stdout'] = ConsoleConsumer.STDOUT_CAPTURE
|
||||||
args['stderr'] = ConsoleConsumer.stderr_capture
|
args['stderr'] = ConsoleConsumer.STDERR_CAPTURE
|
||||||
args['config_file'] = ConsoleConsumer.config_file
|
args['config_file'] = ConsoleConsumer.CONFIG_FILE
|
||||||
|
|
||||||
cmd = "export LOG_DIR=%s;" % ConsoleConsumer.log_dir
|
cmd = "export LOG_DIR=%s;" % ConsoleConsumer.LOG_DIR
|
||||||
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % ConsoleConsumer.log4j_config
|
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % ConsoleConsumer.LOG4J_CONFIG
|
||||||
cmd += " /opt/kafka/bin/kafka-console-consumer.sh --topic %(topic)s --zookeeper %(zk_connect)s" \
|
cmd += " /opt/kafka/bin/kafka-console-consumer.sh --topic %(topic)s --zookeeper %(zk_connect)s" \
|
||||||
" --consumer.config %(config_file)s" % args
|
" --consumer.config %(config_file)s" % args
|
||||||
|
|
||||||
|
@ -148,7 +148,7 @@ class ConsoleConsumer(BackgroundThreadService):
|
||||||
return len(self.pids(node)) > 0
|
return len(self.pids(node)) > 0
|
||||||
|
|
||||||
def _worker(self, idx, node):
|
def _worker(self, idx, node):
|
||||||
node.account.ssh("mkdir -p %s" % ConsoleConsumer.persistent_root, allow_fail=False)
|
node.account.ssh("mkdir -p %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False)
|
||||||
|
|
||||||
# Create and upload config file
|
# Create and upload config file
|
||||||
if self.consumer_timeout_ms is not None:
|
if self.consumer_timeout_ms is not None:
|
||||||
|
@ -158,11 +158,11 @@ class ConsoleConsumer(BackgroundThreadService):
|
||||||
|
|
||||||
self.logger.info("console_consumer.properties:")
|
self.logger.info("console_consumer.properties:")
|
||||||
self.logger.info(prop_file)
|
self.logger.info(prop_file)
|
||||||
node.account.create_file(ConsoleConsumer.config_file, prop_file)
|
node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file)
|
||||||
|
|
||||||
# Create and upload log properties
|
# Create and upload log properties
|
||||||
log_config = self.render('console_consumer_log4j.properties', log_file=ConsoleConsumer.log_file)
|
log_config = self.render('console_consumer_log4j.properties', log_file=ConsoleConsumer.LOG_FILE)
|
||||||
node.account.create_file(ConsoleConsumer.log4j_config, log_config)
|
node.account.create_file(ConsoleConsumer.LOG4J_CONFIG, log_config)
|
||||||
|
|
||||||
# Run and capture output
|
# Run and capture output
|
||||||
cmd = self.start_cmd
|
cmd = self.start_cmd
|
||||||
|
@ -181,5 +181,5 @@ class ConsoleConsumer(BackgroundThreadService):
|
||||||
node.account.kill_process("java", allow_fail=True)
|
node.account.kill_process("java", allow_fail=True)
|
||||||
|
|
||||||
def clean_node(self, node):
|
def clean_node(self, node):
|
||||||
node.account.ssh("rm -rf %s" % ConsoleConsumer.persistent_root, allow_fail=False)
|
node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue