KAFKA-17860 Remove log4j-appender module (#17588)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
TengYao Chi 2024-10-24 18:13:30 +08:00 committed by GitHub
parent 57053ef47d
commit 553e6b4c6d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 4 additions and 1180 deletions

View File

@ -73,7 +73,6 @@ tools:
- 'trogdor/**'
- 'vagrant/**'
- 'committer-tools/**'
- 'log4j-appender/**'
- 'shell/**'
docs:

View File

@ -1319,9 +1319,6 @@ project(':core') {
//By default gradle does not handle test dependencies between the sub-projects
//This line is to include clients project test jar to dependant-testlibs
from (project(':clients').testJar ) { "$buildDir/dependant-testlibs" }
// log4j-appender is not in core dependencies,
// so we add it to dependant-testlibs to avoid ClassNotFoundException in running kafka_log4j_appender.py
from (project(':log4j-appender').jar ) { "$buildDir/dependant-testlibs" }
duplicatesStrategy 'exclude'
}
@ -3351,29 +3348,6 @@ project(':jmh-benchmarks') {
}
}
project(':log4j-appender') {
base {
archivesName = "kafka-log4j-appender"
}
dependencies {
implementation project(':clients')
implementation libs.slf4jReload4j
testImplementation project(':clients').sourceSets.test.output
testImplementation libs.junitJupiter
testImplementation libs.hamcrest
testImplementation libs.mockitoCore
testRuntimeOnly libs.junitPlatformLanucher
}
javadoc {
enabled = false
}
}
project(':connect:api') {
base {
archivesName = "connect-api"

View File

@ -440,13 +440,6 @@
</subpackage>
</subpackage>
<subpackage name="log4jappender">
<allow pkg="org.apache.log4j" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.test" />
</subpackage>
<subpackage name="test">
<allow pkg="org.apache.kafka" />
<allow pkg="org.bouncycastle" />

View File

@ -295,15 +295,6 @@
<suppress checks="JavaNCSS"
files="(MetadataNodeManager).java"/>
<!-- Log4J-Appender -->
<suppress checks="CyclomaticComplexity"
files="KafkaLog4jAppender.java"/>
<suppress checks="NPathComplexity"
files="KafkaLog4jAppender.java"/>
<suppress checks="JavaNCSS"
files="RequestResponseTest.java"/>
<!-- metadata -->
<suppress checks="ClassDataAbstractionCoupling"
files="(QuorumController|QuorumControllerTest|ReplicationControlManager|ReplicationControlManagerTest|ClusterControlManagerTest|KRaftMigrationDriverTest).java"/>

View File

@ -142,6 +142,10 @@
Java 8 support has been removed in Apache Kafka 4.0
See <a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308223">KIP-750</a> for more details
</li>
<li>
KafkaLog4jAppender has been remove, users should migrate to the log4j2 appender
See <a href="https://logging.apache.org/log4j/2.x/manual/appenders.html#KafkaAppender">KafkaAppender</a> for more details
</li>
<li>The <code>--delete-config</code> option in the <code>kafka-topics</code> command line tool has been deprecated.
</li>
</ul>

View File

@ -1,394 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.log4jappender;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LoggingEvent;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
import static org.apache.kafka.common.config.SaslConfigs.SASL_KERBEROS_SERVICE_NAME;
import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM;
import static org.apache.kafka.common.config.SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_TYPE_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG;
/**
* A log4j appender that produces log messages to Kafka.
* This appender is deprecated and users should migrate to the log4j2 appender
* @see <a href="https://logging.apache.org/log4j/2.x/manual/appenders.html#KafkaAppender">KafkaAppender</a>
*/
@Deprecated
public class KafkaLog4jAppender extends AppenderSkeleton {
private String brokerList;
private String topic;
private String compressionType;
private String securityProtocol;
private String sslTruststoreLocation;
private String sslTruststorePassword;
private String sslKeystoreType;
private String sslKeystoreLocation;
private String sslKeystorePassword;
private String saslKerberosServiceName;
private String saslMechanism;
private String clientJaasConfPath;
private String clientJaasConf;
private String kerb5ConfPath;
private Integer maxBlockMs;
private String sslEngineFactoryClass;
private int retries = Integer.MAX_VALUE;
private int requiredNumAcks = 1;
private int deliveryTimeoutMs = 120000;
private int lingerMs = 0;
private int batchSize = 16384;
private boolean ignoreExceptions = true;
private boolean syncSend;
private Producer<byte[], byte[]> producer;
public Producer<byte[], byte[]> getProducer() {
return producer;
}
public String getBrokerList() {
return brokerList;
}
public void setBrokerList(String brokerList) {
this.brokerList = brokerList;
}
public int getRequiredNumAcks() {
return requiredNumAcks;
}
public void setRequiredNumAcks(int requiredNumAcks) {
this.requiredNumAcks = requiredNumAcks;
}
public int getLingerMs() {
return lingerMs;
}
public void setLingerMs(int lingerMs) {
this.lingerMs = lingerMs;
}
public int getBatchSize() {
return batchSize;
}
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
public int getRetries() {
return retries;
}
public void setRetries(int retries) {
this.retries = retries;
}
public int getDeliveryTimeoutMs() {
return deliveryTimeoutMs;
}
public void setDeliveryTimeoutMs(int deliveryTimeoutMs) {
this.deliveryTimeoutMs = deliveryTimeoutMs;
}
public String getCompressionType() {
return compressionType;
}
public void setCompressionType(String compressionType) {
this.compressionType = compressionType;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public boolean getIgnoreExceptions() {
return ignoreExceptions;
}
public void setIgnoreExceptions(boolean ignoreExceptions) {
this.ignoreExceptions = ignoreExceptions;
}
public boolean getSyncSend() {
return syncSend;
}
public void setSyncSend(boolean syncSend) {
this.syncSend = syncSend;
}
public String getSslTruststorePassword() {
return sslTruststorePassword;
}
public String getSslTruststoreLocation() {
return sslTruststoreLocation;
}
public String getSecurityProtocol() {
return securityProtocol;
}
public void setSecurityProtocol(String securityProtocol) {
this.securityProtocol = securityProtocol;
}
public void setSslTruststoreLocation(String sslTruststoreLocation) {
this.sslTruststoreLocation = sslTruststoreLocation;
}
public void setSslTruststorePassword(String sslTruststorePassword) {
this.sslTruststorePassword = sslTruststorePassword;
}
public void setSslKeystorePassword(String sslKeystorePassword) {
this.sslKeystorePassword = sslKeystorePassword;
}
public void setSslKeystoreType(String sslKeystoreType) {
this.sslKeystoreType = sslKeystoreType;
}
public void setSslKeystoreLocation(String sslKeystoreLocation) {
this.sslKeystoreLocation = sslKeystoreLocation;
}
public void setSaslKerberosServiceName(String saslKerberosServiceName) {
this.saslKerberosServiceName = saslKerberosServiceName;
}
public void setClientJaasConfPath(String clientJaasConfPath) {
this.clientJaasConfPath = clientJaasConfPath;
}
public void setKerb5ConfPath(String kerb5ConfPath) {
this.kerb5ConfPath = kerb5ConfPath;
}
public String getSslKeystoreLocation() {
return sslKeystoreLocation;
}
public String getSslKeystoreType() {
return sslKeystoreType;
}
public String getSslKeystorePassword() {
return sslKeystorePassword;
}
public String getSaslKerberosServiceName() {
return saslKerberosServiceName;
}
public String getClientJaasConfPath() {
return clientJaasConfPath;
}
public void setSaslMechanism(String saslMechanism) {
this.saslMechanism = saslMechanism;
}
public String getSaslMechanism() {
return this.saslMechanism;
}
public void setClientJaasConf(final String clientJaasConf) {
this.clientJaasConf = clientJaasConf;
}
public String getClientJaasConf() {
return this.clientJaasConf;
}
public String getKerb5ConfPath() {
return kerb5ConfPath;
}
public int getMaxBlockMs() {
return maxBlockMs;
}
public void setMaxBlockMs(int maxBlockMs) {
this.maxBlockMs = maxBlockMs;
}
public String getSslEngineFactoryClass() {
return sslEngineFactoryClass;
}
public void setSslEngineFactoryClass(String sslEngineFactoryClass) {
this.sslEngineFactoryClass = sslEngineFactoryClass;
}
@Override
public void activateOptions() {
// check for config parameter validity
Properties props = new Properties();
if (brokerList != null)
props.put(BOOTSTRAP_SERVERS_CONFIG, brokerList);
if (props.isEmpty())
throw new ConfigException("The bootstrap servers property should be specified");
if (topic == null)
throw new ConfigException("Topic must be specified by the Kafka log4j appender");
if (compressionType != null)
props.put(COMPRESSION_TYPE_CONFIG, compressionType);
props.put(ACKS_CONFIG, Integer.toString(requiredNumAcks));
props.put(RETRIES_CONFIG, retries);
props.put(DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeoutMs);
props.put(LINGER_MS_CONFIG, lingerMs);
props.put(BATCH_SIZE_CONFIG, batchSize);
// Disable idempotence to avoid deadlock when the producer network thread writes a log line while interacting
// with the TransactionManager, see KAFKA-13761 for more information.
props.put(ENABLE_IDEMPOTENCE_CONFIG, false);
if (securityProtocol != null) {
props.put(SECURITY_PROTOCOL_CONFIG, securityProtocol);
}
if (securityProtocol != null && (securityProtocol.contains("SSL") || securityProtocol.contains("SASL"))) {
if (sslEngineFactoryClass != null) {
props.put(SSL_ENGINE_FACTORY_CLASS_CONFIG, sslEngineFactoryClass);
}
}
if (securityProtocol != null && securityProtocol.contains("SSL") && sslTruststoreLocation != null && sslTruststorePassword != null) {
props.put(SSL_TRUSTSTORE_LOCATION_CONFIG, sslTruststoreLocation);
props.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, sslTruststorePassword);
if (sslKeystoreType != null && sslKeystoreLocation != null &&
sslKeystorePassword != null) {
props.put(SSL_KEYSTORE_TYPE_CONFIG, sslKeystoreType);
props.put(SSL_KEYSTORE_LOCATION_CONFIG, sslKeystoreLocation);
props.put(SSL_KEYSTORE_PASSWORD_CONFIG, sslKeystorePassword);
}
}
if (securityProtocol != null && securityProtocol.contains("SASL") && saslKerberosServiceName != null && clientJaasConfPath != null) {
props.put(SASL_KERBEROS_SERVICE_NAME, saslKerberosServiceName);
System.setProperty("java.security.auth.login.config", clientJaasConfPath);
}
if (kerb5ConfPath != null) {
System.setProperty("java.security.krb5.conf", kerb5ConfPath);
}
if (saslMechanism != null) {
props.put(SASL_MECHANISM, saslMechanism);
}
if (clientJaasConf != null) {
props.put(SASL_JAAS_CONFIG, clientJaasConf);
}
if (maxBlockMs != null) {
props.put(MAX_BLOCK_MS_CONFIG, maxBlockMs);
}
props.put(KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
props.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
this.producer = getKafkaProducer(props);
LogLog.warn("log4j-appender is deprecated and will be removed in Kafka 4.0.");
LogLog.debug("Kafka producer connected to " + brokerList);
LogLog.debug("Logging for topic: " + topic);
}
protected Producer<byte[], byte[]> getKafkaProducer(Properties props) {
return new KafkaProducer<>(props);
}
@Override
protected void append(LoggingEvent event) {
String message = subAppend(event);
LogLog.debug("[" + new Date(event.getTimeStamp()) + "]" + message);
Future<RecordMetadata> response;
try {
response = producer.send(new ProducerRecord<>(topic, message.getBytes(StandardCharsets.UTF_8)));
} catch (IllegalStateException e) {
// The producer has been closed
LogLog.debug("Exception while sending to Kafka", e);
return;
}
if (syncSend) {
try {
response.get();
} catch (InterruptedException | ExecutionException ex) {
if (!ignoreExceptions)
throw new RuntimeException(ex);
LogLog.debug("Exception while getting response", ex);
}
}
}
private String subAppend(LoggingEvent event) {
return (this.layout == null) ? event.getRenderedMessage() : this.layout.format(event);
}
@Override
public void close() {
if (!this.closed) {
this.closed = true;
if (producer != null) {
producer.close();
}
}
}
@Override
public boolean requiresLayout() {
return true;
}
}

View File

@ -1,230 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.log4jappender;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.log4j.Appender;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import org.apache.log4j.helpers.LogLog;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class KafkaLog4jAppenderTest {
private Logger logger = Logger.getLogger(KafkaLog4jAppenderTest.class);
@BeforeEach
public void setup() {
LogLog.setInternalDebugging(true);
}
@AfterEach
public void cleanup() {
Logger rootLogger = Logger.getRootLogger();
Appender appender = rootLogger.getAppender("KAFKA");
if (appender != null) {
// Tests which do not call PropertyConfigurator.configure don't create an appender to remove.
rootLogger.removeAppender(appender);
appender.close();
}
}
@Test
public void testKafkaLog4jConfigs() {
Properties hostMissingProps = new Properties();
hostMissingProps.put("log4j.rootLogger", "INFO");
hostMissingProps.put("log4j.appender.KAFKA", "org.apache.kafka.log4jappender.KafkaLog4jAppender");
hostMissingProps.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout");
hostMissingProps.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n");
hostMissingProps.put("log4j.appender.KAFKA.Topic", "test-topic");
hostMissingProps.put("log4j.logger.kafka.log4j", "INFO, KAFKA");
assertThrows(ConfigException.class, () -> PropertyConfigurator.configure(hostMissingProps), "Missing properties exception was expected !");
Properties topicMissingProps = new Properties();
topicMissingProps.put("log4j.rootLogger", "INFO");
topicMissingProps.put("log4j.appender.KAFKA", "org.apache.kafka.log4jappender.KafkaLog4jAppender");
topicMissingProps.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout");
topicMissingProps.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n");
topicMissingProps.put("log4j.appender.KAFKA.brokerList", "127.0.0.1:9093");
topicMissingProps.put("log4j.logger.kafka.log4j", "INFO, KAFKA");
assertThrows(ConfigException.class, () -> PropertyConfigurator.configure(topicMissingProps), "Missing properties exception was expected !");
}
@Test
public void testSetSaslMechanism() {
Properties props = getLog4jConfig(false);
props.put("log4j.appender.KAFKA.SaslMechanism", "PLAIN");
PropertyConfigurator.configure(props);
MockKafkaLog4jAppender mockKafkaLog4jAppender = getMockKafkaLog4jAppender();
assertEquals(mockKafkaLog4jAppender.getProducerProperties().getProperty(SaslConfigs.SASL_MECHANISM), "PLAIN");
}
@Test
public void testSaslMechanismNotSet() {
testProducerPropertyNotSet(SaslConfigs.SASL_MECHANISM);
}
@Test
public void testSetJaasConfig() {
Properties props = getLog4jConfig(false);
props.put("log4j.appender.KAFKA.ClientJaasConf", "jaas-config");
PropertyConfigurator.configure(props);
MockKafkaLog4jAppender mockKafkaLog4jAppender = getMockKafkaLog4jAppender();
assertEquals(mockKafkaLog4jAppender.getProducerProperties().getProperty(SaslConfigs.SASL_JAAS_CONFIG), "jaas-config");
}
@Test
public void testJaasConfigNotSet() {
testProducerPropertyNotSet(SaslConfigs.SASL_JAAS_CONFIG);
}
private void testProducerPropertyNotSet(String name) {
PropertyConfigurator.configure(getLog4jConfig(false));
MockKafkaLog4jAppender mockKafkaLog4jAppender = getMockKafkaLog4jAppender();
assertThat(mockKafkaLog4jAppender.getProducerProperties().stringPropertyNames(), not(hasItem(name)));
}
@Test
public void testLog4jAppends() {
PropertyConfigurator.configure(getLog4jConfig(false));
for (int i = 1; i <= 5; ++i) {
logger.error(getMessage(i));
}
assertEquals(getMockKafkaLog4jAppender().getHistory().size(), 5);
}
@Test
public void testSyncSendAndSimulateProducerFailShouldThrowException() {
Properties props = getLog4jConfig(true);
props.put("log4j.appender.KAFKA.IgnoreExceptions", "false");
PropertyConfigurator.configure(props);
MockKafkaLog4jAppender mockKafkaLog4jAppender = getMockKafkaLog4jAppender();
replaceProducerWithMocked(mockKafkaLog4jAppender, false);
assertThrows(RuntimeException.class, () -> logger.error(getMessage(0)));
}
@Test
public void testSyncSendWithoutIgnoringExceptionsShouldNotThrowException() {
Properties props = getLog4jConfig(true);
props.put("log4j.appender.KAFKA.IgnoreExceptions", "false");
PropertyConfigurator.configure(props);
MockKafkaLog4jAppender mockKafkaLog4jAppender = getMockKafkaLog4jAppender();
replaceProducerWithMocked(mockKafkaLog4jAppender, true);
logger.error(getMessage(0));
}
@Test
public void testRealProducerConfigWithSyncSendShouldNotThrowException() {
Properties props = getLog4jConfigWithRealProducer(true);
PropertyConfigurator.configure(props);
logger.error(getMessage(0));
}
@Test
public void testRealProducerConfigWithSyncSendAndNotIgnoringExceptionsShouldThrowException() {
Properties props = getLog4jConfigWithRealProducer(false);
PropertyConfigurator.configure(props);
assertThrows(RuntimeException.class, () -> logger.error(getMessage(0)));
}
private void replaceProducerWithMocked(MockKafkaLog4jAppender mockKafkaLog4jAppender, boolean success) {
@SuppressWarnings("unchecked")
MockProducer<byte[], byte[]> producer = mock(MockProducer.class);
CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
if (success)
future.complete(new RecordMetadata(new TopicPartition("tp", 0), 0, 0, 0, 0, 0));
else
future.completeExceptionally(new TimeoutException("simulated timeout"));
when(producer.send(any())).thenReturn(future);
// reconfiguring mock appender
mockKafkaLog4jAppender.setKafkaProducer(producer);
mockKafkaLog4jAppender.activateOptions();
}
private MockKafkaLog4jAppender getMockKafkaLog4jAppender() {
return (MockKafkaLog4jAppender) Logger.getRootLogger().getAppender("KAFKA");
}
private byte[] getMessage(int i) {
return ("test_" + i).getBytes(StandardCharsets.UTF_8);
}
private Properties getLog4jConfigWithRealProducer(boolean ignoreExceptions) {
Properties props = new Properties();
props.put("log4j.rootLogger", "INFO, KAFKA");
props.put("log4j.appender.KAFKA", "org.apache.kafka.log4jappender.KafkaLog4jAppender");
props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout");
props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n");
props.put("log4j.appender.KAFKA.BrokerList", "127.0.0.2:9093");
props.put("log4j.appender.KAFKA.Topic", "test-topic");
props.put("log4j.appender.KAFKA.RequiredNumAcks", "-1");
props.put("log4j.appender.KAFKA.SyncSend", "true");
// setting producer timeout (max.block.ms) to be low
props.put("log4j.appender.KAFKA.maxBlockMs", "10");
// ignoring exceptions
props.put("log4j.appender.KAFKA.IgnoreExceptions", Boolean.toString(ignoreExceptions));
props.put("log4j.logger.kafka.log4j", "INFO, KAFKA");
return props;
}
private Properties getLog4jConfig(boolean syncSend) {
Properties props = new Properties();
props.put("log4j.rootLogger", "INFO, KAFKA");
props.put("log4j.appender.KAFKA", "org.apache.kafka.log4jappender.MockKafkaLog4jAppender");
props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout");
props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n");
props.put("log4j.appender.KAFKA.BrokerList", "127.0.0.1:9093");
props.put("log4j.appender.KAFKA.Topic", "test-topic");
props.put("log4j.appender.KAFKA.RequiredNumAcks", "-1");
props.put("log4j.appender.KAFKA.SyncSend", Boolean.toString(syncSend));
props.put("log4j.logger.kafka.log4j", "INFO, KAFKA");
return props;
}
}

View File

@ -1,61 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.log4jappender;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.test.MockSerializer;
import org.apache.log4j.spi.LoggingEvent;
import java.util.List;
import java.util.Properties;
@SuppressWarnings("deprecation")
public class MockKafkaLog4jAppender extends KafkaLog4jAppender {
private MockProducer<byte[], byte[]> mockProducer =
new MockProducer<>(false, new MockSerializer(), new MockSerializer());
private Properties producerProperties;
@Override
protected Producer<byte[], byte[]> getKafkaProducer(Properties props) {
producerProperties = props;
return mockProducer;
}
void setKafkaProducer(MockProducer<byte[], byte[]> producer) {
this.mockProducer = producer;
}
@Override
protected void append(LoggingEvent event) {
if (super.getProducer() == null) {
activateOptions();
}
super.append(event);
}
List<ProducerRecord<byte[], byte[]>> getHistory() {
return mockProducer.history();
}
public Properties getProducerProperties() {
return producerProperties;
}
}

View File

@ -74,7 +74,6 @@ include 'clients',
'group-coordinator',
'group-coordinator:group-coordinator-api',
'jmh-benchmarks',
'log4j-appender',
'metadata',
'raft',
'server',

View File

@ -1,93 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.services.background_thread import BackgroundThreadService
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin, CORE_DEPENDANT_TEST_LIBS_JAR_NAME
from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.services.kafka.util import fix_opts_for_new_jvm
from kafkatest.version import DEV_BRANCH
class KafkaLog4jAppender(KafkaPathResolverMixin, BackgroundThreadService):
logs = {
"producer_log": {
"path": "/mnt/kafka_log4j_appender.log",
"collect_default": False}
}
def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, security_protocol="PLAINTEXT",
tls_version=None):
super(KafkaLog4jAppender, self).__init__(context, num_nodes)
self.kafka = kafka
self.topic = topic
self.max_messages = max_messages
self.security_protocol = security_protocol
self.security_config = SecurityConfig(self.context, security_protocol, tls_version=tls_version)
self.stop_timeout_sec = 30
for node in self.nodes:
node.version = kafka.nodes[0].version
def _worker(self, idx, node):
cmd = self.start_cmd(node)
self.logger.debug("VerifiableLog4jAppender %d command: %s" % (idx, cmd))
self.security_config.setup_node(node)
node.account.ssh(cmd)
def start_cmd(self, node):
# Since the core module does not contain the log4j-appender, we need to add it manually.
core_dependant_test_libs_jar = self.path.jar(CORE_DEPENDANT_TEST_LIBS_JAR_NAME, DEV_BRANCH)
cmd = fix_opts_for_new_jvm(node)
cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done;" % core_dependant_test_libs_jar
cmd += " export CLASSPATH;"
cmd += self.path.script("kafka-run-class.sh", node)
cmd += " "
cmd += self.java_class_name()
cmd += " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers(self.security_protocol))
if self.max_messages > 0:
cmd += " --max-messages %s" % str(self.max_messages)
if self.security_protocol != SecurityConfig.PLAINTEXT:
cmd += " --security-protocol %s" % str(self.security_protocol)
if self.security_protocol == SecurityConfig.SSL or self.security_protocol == SecurityConfig.SASL_SSL:
cmd += " --ssl-truststore-location %s" % str(SecurityConfig.TRUSTSTORE_PATH)
cmd += " --ssl-truststore-password %s" % str(SecurityConfig.ssl_stores.truststore_passwd)
if self.security_protocol == SecurityConfig.SASL_PLAINTEXT or \
self.security_protocol == SecurityConfig.SASL_SSL or \
self.security_protocol == SecurityConfig.SASL_MECHANISM_GSSAPI or \
self.security_protocol == SecurityConfig.SASL_MECHANISM_PLAIN:
cmd += " --sasl-kerberos-service-name %s" % str('kafka')
cmd += " --client-jaas-conf-path %s" % str(SecurityConfig.JAAS_CONF_PATH)
cmd += " --kerb5-conf-path %s" % str(SecurityConfig.KRB5CONF_PATH)
cmd += " 2>> /mnt/kafka_log4j_appender.log | tee -a /mnt/kafka_log4j_appender.log &"
return cmd
def stop_node(self, node):
node.account.kill_java_processes(self.java_class_name(), allow_fail=False)
stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec)
assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \
(str(node.account), str(self.stop_timeout_sec))
def clean_node(self, node):
node.account.kill_java_processes(self.java_class_name(), clean_shutdown=False,
allow_fail=False)
node.account.ssh("rm -rf /mnt/kafka_log4j_appender.log", allow_fail=False)
def java_class_name(self):
return "org.apache.kafka.tools.VerifiableLog4jAppender"

View File

@ -1,97 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.utils.util import wait_until
from ducktape.tests.test import Test
from ducktape.mark import matrix
from ducktape.mark.resource import cluster
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.kafka_log4j_appender import KafkaLog4jAppender
TOPIC = "topic-log4j-appender"
MAX_MESSAGES = 100
class Log4jAppenderTest(Test):
"""
Tests KafkaLog4jAppender using VerifiableKafkaLog4jAppender that appends increasing ints to a Kafka topic
"""
def __init__(self, test_context):
super(Log4jAppenderTest, self).__init__(test_context)
self.num_zk = 1
self.num_brokers = 1
self.messages_received_count = 0
self.topics = {
TOPIC: {'partitions': 1, 'replication-factor': 1}
}
self.zk = ZookeeperService(test_context, self.num_zk) if quorum.for_test(test_context) == quorum.zk else None
def setUp(self):
if self.zk:
self.zk.start()
def start_kafka(self, security_protocol, interbroker_security_protocol):
self.kafka = KafkaService(
self.test_context, self.num_brokers,
self.zk, security_protocol=security_protocol,
interbroker_security_protocol=interbroker_security_protocol, topics=self.topics,
controller_num_nodes_override=self.num_zk)
self.kafka.start()
def start_appender(self, security_protocol):
self.appender = KafkaLog4jAppender(self.test_context, self.num_brokers, self.kafka, TOPIC, MAX_MESSAGES,
security_protocol=security_protocol)
self.appender.start()
def custom_message_validator(self, msg):
if msg and "INFO : org.apache.kafka.tools.VerifiableLog4jAppender" in msg:
self.logger.debug("Received message: %s" % msg)
self.messages_received_count += 1
def start_consumer(self):
self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC,
consumer_timeout_ms=10000,
message_validator=self.custom_message_validator)
self.consumer.start()
@cluster(num_nodes=4)
@matrix(security_protocol=['PLAINTEXT', 'SSL'], metadata_quorum=quorum.all_non_upgrade)
@cluster(num_nodes=5)
@matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'], metadata_quorum=quorum.all_non_upgrade)
def test_log4j_appender(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.zk):
"""
Tests if KafkaLog4jAppender is producing to Kafka topic
:return: None
"""
self.start_kafka(security_protocol, security_protocol)
self.start_appender(security_protocol)
self.appender.wait()
self.start_consumer()
node = self.consumer.nodes[0]
wait_until(lambda: self.consumer.alive(node),
timeout_sec=20, backoff_sec=.2, err_msg="Consumer was too slow to start")
# Verify consumed messages count
wait_until(lambda: self.messages_received_count == MAX_MESSAGES, timeout_sec=10,
err_msg="Timed out waiting to consume expected number of messages.")
self.consumer.stop()

View File

@ -1,261 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Exit;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Properties;
import static net.sourceforge.argparse4j.impl.Arguments.store;
/**
* Primarily intended for use with system testing, this appender produces message
* to Kafka on each "append" request. For example, this helps with end-to-end tests
* of KafkaLog4jAppender.
*
* When used as a command-line tool, it appends increasing integers. It will produce a
* fixed number of messages unless the default max-messages -1 is used, in which case
* it appends indefinitely.
*/
public class VerifiableLog4jAppender {
Logger logger = Logger.getLogger(VerifiableLog4jAppender.class);
// If maxMessages < 0, log until the process is killed externally
private long maxMessages = -1;
// Hook to trigger logging thread to stop logging messages
private volatile boolean stopLogging = false;
/** Get the command-line argument parser. */
private static ArgumentParser argParser() {
ArgumentParser parser = ArgumentParsers
.newArgumentParser("verifiable-log4j-appender")
.defaultHelp(true)
.description("This tool produces increasing integers to the specified topic using KafkaLog4jAppender.");
parser.addArgument("--topic")
.action(store())
.required(true)
.type(String.class)
.metavar("TOPIC")
.help("Produce messages to this topic.");
parser.addArgument("--broker-list")
.action(store())
.required(true)
.type(String.class)
.metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
.dest("brokerList")
.help("Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
parser.addArgument("--max-messages")
.action(store())
.required(false)
.setDefault(-1)
.type(Integer.class)
.metavar("MAX-MESSAGES")
.dest("maxMessages")
.help("Produce this many messages. If -1, produce messages until the process is killed externally.");
parser.addArgument("--acks")
.action(store())
.required(false)
.setDefault("-1")
.type(String.class)
.choices("0", "1", "-1")
.metavar("ACKS")
.help("Acks required on each produced message. See Kafka docs on request.required.acks for details.");
parser.addArgument("--security-protocol")
.action(store())
.required(false)
.setDefault("PLAINTEXT")
.type(String.class)
.choices("PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL")
.metavar("SECURITY-PROTOCOL")
.dest("securityProtocol")
.help("Security protocol to be used while communicating with Kafka brokers.");
parser.addArgument("--ssl-truststore-location")
.action(store())
.required(false)
.type(String.class)
.metavar("SSL-TRUSTSTORE-LOCATION")
.dest("sslTruststoreLocation")
.help("Location of SSL truststore to use.");
parser.addArgument("--ssl-truststore-password")
.action(store())
.required(false)
.type(String.class)
.metavar("SSL-TRUSTSTORE-PASSWORD")
.dest("sslTruststorePassword")
.help("Password for SSL truststore to use.");
parser.addArgument("--appender.config")
.action(store())
.required(false)
.type(String.class)
.metavar("CONFIG_FILE")
.help("Log4jAppender config properties file.");
parser.addArgument("--sasl-kerberos-service-name")
.action(store())
.required(false)
.type(String.class)
.metavar("SASL-KERBEROS-SERVICE-NAME")
.dest("saslKerberosServiceName")
.help("Name of sasl kerberos service.");
parser.addArgument("--client-jaas-conf-path")
.action(store())
.required(false)
.type(String.class)
.metavar("CLIENT-JAAS-CONF-PATH")
.dest("clientJaasConfPath")
.help("Path of JAAS config file of Kafka client.");
parser.addArgument("--kerb5-conf-path")
.action(store())
.required(false)
.type(String.class)
.metavar("KERB5-CONF-PATH")
.dest("kerb5ConfPath")
.help("Path of Kerb5 config file.");
return parser;
}
/**
* Read a properties file from the given path
* @param filename The path of the file to read
*
* Note: this duplication of org.apache.kafka.common.utils.Utils.loadProps is unfortunate
* but *intentional*. In order to use VerifiableProducer in compatibility and upgrade tests,
* we use VerifiableProducer from the development tools package, and run it against 0.8.X.X kafka jars.
* Since this method is not in Utils in the 0.8.X.X jars, we have to cheat a bit and duplicate.
*/
public static Properties loadProps(String filename) throws IOException {
Properties props = new Properties();
try (InputStream propStream = Files.newInputStream(Paths.get(filename))) {
props.load(propStream);
}
return props;
}
/** Construct a VerifiableLog4jAppender object from command-line arguments. */
public static VerifiableLog4jAppender createFromArgs(String[] args) {
ArgumentParser parser = argParser();
VerifiableLog4jAppender producer = null;
try {
Namespace res = parser.parseArgs(args);
int maxMessages = res.getInt("maxMessages");
String topic = res.getString("topic");
String configFile = res.getString("appender.config");
Properties props = new Properties();
props.setProperty("log4j.rootLogger", "INFO, KAFKA");
props.setProperty("log4j.appender.KAFKA", "org.apache.kafka.log4jappender.KafkaLog4jAppender");
props.setProperty("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout");
props.setProperty("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n");
props.setProperty("log4j.appender.KAFKA.BrokerList", res.getString("brokerList"));
props.setProperty("log4j.appender.KAFKA.Topic", topic);
props.setProperty("log4j.appender.KAFKA.RequiredNumAcks", res.getString("acks"));
props.setProperty("log4j.appender.KAFKA.SyncSend", "true");
final String securityProtocol = res.getString("securityProtocol");
if (securityProtocol != null && !securityProtocol.equals(SecurityProtocol.PLAINTEXT.toString())) {
props.setProperty("log4j.appender.KAFKA.SecurityProtocol", securityProtocol);
}
if (securityProtocol != null && securityProtocol.contains("SSL")) {
props.setProperty("log4j.appender.KAFKA.SslTruststoreLocation", res.getString("sslTruststoreLocation"));
props.setProperty("log4j.appender.KAFKA.SslTruststorePassword", res.getString("sslTruststorePassword"));
}
if (securityProtocol != null && securityProtocol.contains("SASL")) {
props.setProperty("log4j.appender.KAFKA.SaslKerberosServiceName", res.getString("saslKerberosServiceName"));
props.setProperty("log4j.appender.KAFKA.clientJaasConfPath", res.getString("clientJaasConfPath"));
props.setProperty("log4j.appender.KAFKA.kerb5ConfPath", res.getString("kerb5ConfPath"));
}
props.setProperty("log4j.logger.kafka.log4j", "INFO, KAFKA");
// Changing log level from INFO to WARN as a temporary workaround for KAFKA-6415. This is to
// avoid deadlock in system tests when producer network thread appends to log while updating metadata.
props.setProperty("log4j.logger.org.apache.kafka.clients.Metadata", "WARN, KAFKA");
if (configFile != null) {
try {
props.putAll(loadProps(configFile));
} catch (IOException e) {
throw new ArgumentParserException(e.getMessage(), parser);
}
}
producer = new VerifiableLog4jAppender(props, maxMessages);
} catch (ArgumentParserException e) {
if (args.length == 0) {
parser.printHelp();
Exit.exit(0);
} else {
parser.handleError(e);
Exit.exit(1);
}
}
return producer;
}
public VerifiableLog4jAppender(Properties props, int maxMessages) {
this.maxMessages = maxMessages;
PropertyConfigurator.configure(props);
}
public static void main(String[] args) {
final VerifiableLog4jAppender appender = createFromArgs(args);
boolean infinite = appender.maxMessages < 0;
// Trigger main thread to stop producing messages when shutting down
Exit.addShutdownHook("verifiable-log4j-appender-shutdown-hook", () -> appender.stopLogging = true);
long maxMessages = infinite ? Long.MAX_VALUE : appender.maxMessages;
for (long i = 0; i < maxMessages; i++) {
if (appender.stopLogging) {
break;
}
appender.append(String.format("%d", i));
}
}
private void append(String msg) {
logger.info(msg);
}
}