From 553e6b4c6d912b3b619b5a1a5263b616b6d3f62e Mon Sep 17 00:00:00 2001 From: TengYao Chi Date: Thu, 24 Oct 2024 18:13:30 +0800 Subject: [PATCH] KAFKA-17860 Remove log4j-appender module (#17588) Reviewers: Chia-Ping Tsai --- .github/configs/labeler.yml | 1 - build.gradle | 26 -- checkstyle/import-control.xml | 7 - checkstyle/suppressions.xml | 9 - docs/upgrade.html | 4 + .../log4jappender/KafkaLog4jAppender.java | 394 ------------------ .../log4jappender/KafkaLog4jAppenderTest.java | 230 ---------- .../log4jappender/MockKafkaLog4jAppender.java | 61 --- settings.gradle | 1 - .../services/kafka_log4j_appender.py | 93 ----- .../tests/tools/log4j_appender_test.py | 97 ----- .../kafka/tools/VerifiableLog4jAppender.java | 261 ------------ 12 files changed, 4 insertions(+), 1180 deletions(-) delete mode 100644 log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java delete mode 100644 log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java delete mode 100644 log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java delete mode 100644 tests/kafkatest/services/kafka_log4j_appender.py delete mode 100644 tests/kafkatest/tests/tools/log4j_appender_test.py delete mode 100644 tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java diff --git a/.github/configs/labeler.yml b/.github/configs/labeler.yml index b4f740aac14..be6944c0ab9 100644 --- a/.github/configs/labeler.yml +++ b/.github/configs/labeler.yml @@ -73,7 +73,6 @@ tools: - 'trogdor/**' - 'vagrant/**' - 'committer-tools/**' - - 'log4j-appender/**' - 'shell/**' docs: diff --git a/build.gradle b/build.gradle index 0e2e3a9a014..f3f01f0f5c6 100644 --- a/build.gradle +++ b/build.gradle @@ -1319,9 +1319,6 @@ project(':core') { //By default gradle does not handle test dependencies between the sub-projects //This line is to include clients project test jar to dependant-testlibs from (project(':clients').testJar ) { "$buildDir/dependant-testlibs" } - // log4j-appender is not in core dependencies, - // so we add it to dependant-testlibs to avoid ClassNotFoundException in running kafka_log4j_appender.py - from (project(':log4j-appender').jar ) { "$buildDir/dependant-testlibs" } duplicatesStrategy 'exclude' } @@ -3351,29 +3348,6 @@ project(':jmh-benchmarks') { } } -project(':log4j-appender') { - base { - archivesName = "kafka-log4j-appender" - } - - dependencies { - implementation project(':clients') - implementation libs.slf4jReload4j - - testImplementation project(':clients').sourceSets.test.output - testImplementation libs.junitJupiter - testImplementation libs.hamcrest - testImplementation libs.mockitoCore - - testRuntimeOnly libs.junitPlatformLanucher - } - - javadoc { - enabled = false - } - -} - project(':connect:api') { base { archivesName = "connect-api" diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 12eaed2efce..2524bb92ebc 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -440,13 +440,6 @@ - - - - - - - diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 4b243252f67..03423e3e88c 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -295,15 +295,6 @@ - - - - - - diff --git a/docs/upgrade.html b/docs/upgrade.html index 352fe6ff4fc..fa0207917a6 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -142,6 +142,10 @@ Java 8 support has been removed in Apache Kafka 4.0 See KIP-750 for more details +
  • + KafkaLog4jAppender has been remove, users should migrate to the log4j2 appender + See KafkaAppender for more details +
  • The --delete-config option in the kafka-topics command line tool has been deprecated.
  • diff --git a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java b/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java deleted file mode 100644 index f77c18e2158..00000000000 --- a/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java +++ /dev/null @@ -1,394 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.log4jappender; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.common.serialization.ByteArraySerializer; - -import org.apache.log4j.AppenderSkeleton; -import org.apache.log4j.helpers.LogLog; -import org.apache.log4j.spi.LoggingEvent; - -import java.nio.charset.StandardCharsets; -import java.util.Date; -import java.util.Properties; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - -import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; -import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; -import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG; -import static org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG; -import static org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG; -import static org.apache.kafka.clients.producer.ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG; -import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG; -import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG; -import static org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG; -import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG; -import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG; -import static org.apache.kafka.common.config.SaslConfigs.SASL_KERBEROS_SERVICE_NAME; -import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM; -import static org.apache.kafka.common.config.SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG; -import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG; -import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG; -import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_TYPE_CONFIG; -import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG; -import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG; - -/** - * A log4j appender that produces log messages to Kafka. - * This appender is deprecated and users should migrate to the log4j2 appender - * @see KafkaAppender - */ -@Deprecated -public class KafkaLog4jAppender extends AppenderSkeleton { - - private String brokerList; - private String topic; - private String compressionType; - private String securityProtocol; - private String sslTruststoreLocation; - private String sslTruststorePassword; - private String sslKeystoreType; - private String sslKeystoreLocation; - private String sslKeystorePassword; - private String saslKerberosServiceName; - private String saslMechanism; - private String clientJaasConfPath; - private String clientJaasConf; - private String kerb5ConfPath; - private Integer maxBlockMs; - private String sslEngineFactoryClass; - - private int retries = Integer.MAX_VALUE; - private int requiredNumAcks = 1; - private int deliveryTimeoutMs = 120000; - private int lingerMs = 0; - private int batchSize = 16384; - private boolean ignoreExceptions = true; - private boolean syncSend; - private Producer producer; - - public Producer getProducer() { - return producer; - } - - public String getBrokerList() { - return brokerList; - } - - public void setBrokerList(String brokerList) { - this.brokerList = brokerList; - } - - public int getRequiredNumAcks() { - return requiredNumAcks; - } - - public void setRequiredNumAcks(int requiredNumAcks) { - this.requiredNumAcks = requiredNumAcks; - } - - public int getLingerMs() { - return lingerMs; - } - - public void setLingerMs(int lingerMs) { - this.lingerMs = lingerMs; - } - - public int getBatchSize() { - return batchSize; - } - - public void setBatchSize(int batchSize) { - this.batchSize = batchSize; - } - - public int getRetries() { - return retries; - } - - public void setRetries(int retries) { - this.retries = retries; - } - - public int getDeliveryTimeoutMs() { - return deliveryTimeoutMs; - } - - public void setDeliveryTimeoutMs(int deliveryTimeoutMs) { - this.deliveryTimeoutMs = deliveryTimeoutMs; - } - - public String getCompressionType() { - return compressionType; - } - - public void setCompressionType(String compressionType) { - this.compressionType = compressionType; - } - - public String getTopic() { - return topic; - } - - public void setTopic(String topic) { - this.topic = topic; - } - - public boolean getIgnoreExceptions() { - return ignoreExceptions; - } - - public void setIgnoreExceptions(boolean ignoreExceptions) { - this.ignoreExceptions = ignoreExceptions; - } - - public boolean getSyncSend() { - return syncSend; - } - - public void setSyncSend(boolean syncSend) { - this.syncSend = syncSend; - } - - public String getSslTruststorePassword() { - return sslTruststorePassword; - } - - public String getSslTruststoreLocation() { - return sslTruststoreLocation; - } - - public String getSecurityProtocol() { - return securityProtocol; - } - - public void setSecurityProtocol(String securityProtocol) { - this.securityProtocol = securityProtocol; - } - - public void setSslTruststoreLocation(String sslTruststoreLocation) { - this.sslTruststoreLocation = sslTruststoreLocation; - } - - public void setSslTruststorePassword(String sslTruststorePassword) { - this.sslTruststorePassword = sslTruststorePassword; - } - - public void setSslKeystorePassword(String sslKeystorePassword) { - this.sslKeystorePassword = sslKeystorePassword; - } - - public void setSslKeystoreType(String sslKeystoreType) { - this.sslKeystoreType = sslKeystoreType; - } - - public void setSslKeystoreLocation(String sslKeystoreLocation) { - this.sslKeystoreLocation = sslKeystoreLocation; - } - - public void setSaslKerberosServiceName(String saslKerberosServiceName) { - this.saslKerberosServiceName = saslKerberosServiceName; - } - - public void setClientJaasConfPath(String clientJaasConfPath) { - this.clientJaasConfPath = clientJaasConfPath; - } - - public void setKerb5ConfPath(String kerb5ConfPath) { - this.kerb5ConfPath = kerb5ConfPath; - } - - public String getSslKeystoreLocation() { - return sslKeystoreLocation; - } - - public String getSslKeystoreType() { - return sslKeystoreType; - } - - public String getSslKeystorePassword() { - return sslKeystorePassword; - } - - public String getSaslKerberosServiceName() { - return saslKerberosServiceName; - } - - public String getClientJaasConfPath() { - return clientJaasConfPath; - } - - public void setSaslMechanism(String saslMechanism) { - this.saslMechanism = saslMechanism; - } - - public String getSaslMechanism() { - return this.saslMechanism; - } - - public void setClientJaasConf(final String clientJaasConf) { - this.clientJaasConf = clientJaasConf; - } - - public String getClientJaasConf() { - return this.clientJaasConf; - } - - public String getKerb5ConfPath() { - return kerb5ConfPath; - } - - public int getMaxBlockMs() { - return maxBlockMs; - } - - public void setMaxBlockMs(int maxBlockMs) { - this.maxBlockMs = maxBlockMs; - } - - public String getSslEngineFactoryClass() { - return sslEngineFactoryClass; - } - - public void setSslEngineFactoryClass(String sslEngineFactoryClass) { - this.sslEngineFactoryClass = sslEngineFactoryClass; - } - - @Override - public void activateOptions() { - // check for config parameter validity - Properties props = new Properties(); - if (brokerList != null) - props.put(BOOTSTRAP_SERVERS_CONFIG, brokerList); - if (props.isEmpty()) - throw new ConfigException("The bootstrap servers property should be specified"); - if (topic == null) - throw new ConfigException("Topic must be specified by the Kafka log4j appender"); - if (compressionType != null) - props.put(COMPRESSION_TYPE_CONFIG, compressionType); - - props.put(ACKS_CONFIG, Integer.toString(requiredNumAcks)); - props.put(RETRIES_CONFIG, retries); - props.put(DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeoutMs); - props.put(LINGER_MS_CONFIG, lingerMs); - props.put(BATCH_SIZE_CONFIG, batchSize); - // Disable idempotence to avoid deadlock when the producer network thread writes a log line while interacting - // with the TransactionManager, see KAFKA-13761 for more information. - props.put(ENABLE_IDEMPOTENCE_CONFIG, false); - - if (securityProtocol != null) { - props.put(SECURITY_PROTOCOL_CONFIG, securityProtocol); - } - - if (securityProtocol != null && (securityProtocol.contains("SSL") || securityProtocol.contains("SASL"))) { - if (sslEngineFactoryClass != null) { - props.put(SSL_ENGINE_FACTORY_CLASS_CONFIG, sslEngineFactoryClass); - } - } - - if (securityProtocol != null && securityProtocol.contains("SSL") && sslTruststoreLocation != null && sslTruststorePassword != null) { - props.put(SSL_TRUSTSTORE_LOCATION_CONFIG, sslTruststoreLocation); - props.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, sslTruststorePassword); - - if (sslKeystoreType != null && sslKeystoreLocation != null && - sslKeystorePassword != null) { - props.put(SSL_KEYSTORE_TYPE_CONFIG, sslKeystoreType); - props.put(SSL_KEYSTORE_LOCATION_CONFIG, sslKeystoreLocation); - props.put(SSL_KEYSTORE_PASSWORD_CONFIG, sslKeystorePassword); - } - } - - if (securityProtocol != null && securityProtocol.contains("SASL") && saslKerberosServiceName != null && clientJaasConfPath != null) { - props.put(SASL_KERBEROS_SERVICE_NAME, saslKerberosServiceName); - System.setProperty("java.security.auth.login.config", clientJaasConfPath); - } - if (kerb5ConfPath != null) { - System.setProperty("java.security.krb5.conf", kerb5ConfPath); - } - if (saslMechanism != null) { - props.put(SASL_MECHANISM, saslMechanism); - } - if (clientJaasConf != null) { - props.put(SASL_JAAS_CONFIG, clientJaasConf); - } - if (maxBlockMs != null) { - props.put(MAX_BLOCK_MS_CONFIG, maxBlockMs); - } - - props.put(KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - props.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - this.producer = getKafkaProducer(props); - LogLog.warn("log4j-appender is deprecated and will be removed in Kafka 4.0."); - LogLog.debug("Kafka producer connected to " + brokerList); - LogLog.debug("Logging for topic: " + topic); - } - - protected Producer getKafkaProducer(Properties props) { - return new KafkaProducer<>(props); - } - - @Override - protected void append(LoggingEvent event) { - String message = subAppend(event); - LogLog.debug("[" + new Date(event.getTimeStamp()) + "]" + message); - Future response; - try { - response = producer.send(new ProducerRecord<>(topic, message.getBytes(StandardCharsets.UTF_8))); - } catch (IllegalStateException e) { - // The producer has been closed - LogLog.debug("Exception while sending to Kafka", e); - return; - } - if (syncSend) { - try { - response.get(); - } catch (InterruptedException | ExecutionException ex) { - if (!ignoreExceptions) - throw new RuntimeException(ex); - LogLog.debug("Exception while getting response", ex); - } - } - } - - private String subAppend(LoggingEvent event) { - return (this.layout == null) ? event.getRenderedMessage() : this.layout.format(event); - } - - @Override - public void close() { - if (!this.closed) { - this.closed = true; - if (producer != null) { - producer.close(); - } - } - } - - @Override - public boolean requiresLayout() { - return true; - } -} diff --git a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java deleted file mode 100644 index 5895f370510..00000000000 --- a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/KafkaLog4jAppenderTest.java +++ /dev/null @@ -1,230 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.log4jappender; - -import org.apache.kafka.clients.producer.MockProducer; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.common.config.SaslConfigs; - -import org.apache.log4j.Appender; -import org.apache.log4j.Logger; -import org.apache.log4j.PropertyConfigurator; -import org.apache.log4j.helpers.LogLog; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.nio.charset.StandardCharsets; -import java.util.Properties; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeoutException; - -import static org.hamcrest.CoreMatchers.hasItem; -import static org.hamcrest.CoreMatchers.not; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class KafkaLog4jAppenderTest { - - private Logger logger = Logger.getLogger(KafkaLog4jAppenderTest.class); - - @BeforeEach - public void setup() { - LogLog.setInternalDebugging(true); - } - - @AfterEach - public void cleanup() { - Logger rootLogger = Logger.getRootLogger(); - Appender appender = rootLogger.getAppender("KAFKA"); - if (appender != null) { - // Tests which do not call PropertyConfigurator.configure don't create an appender to remove. - rootLogger.removeAppender(appender); - appender.close(); - } - } - - @Test - public void testKafkaLog4jConfigs() { - Properties hostMissingProps = new Properties(); - hostMissingProps.put("log4j.rootLogger", "INFO"); - hostMissingProps.put("log4j.appender.KAFKA", "org.apache.kafka.log4jappender.KafkaLog4jAppender"); - hostMissingProps.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout"); - hostMissingProps.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n"); - hostMissingProps.put("log4j.appender.KAFKA.Topic", "test-topic"); - hostMissingProps.put("log4j.logger.kafka.log4j", "INFO, KAFKA"); - - assertThrows(ConfigException.class, () -> PropertyConfigurator.configure(hostMissingProps), "Missing properties exception was expected !"); - - Properties topicMissingProps = new Properties(); - topicMissingProps.put("log4j.rootLogger", "INFO"); - topicMissingProps.put("log4j.appender.KAFKA", "org.apache.kafka.log4jappender.KafkaLog4jAppender"); - topicMissingProps.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout"); - topicMissingProps.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n"); - topicMissingProps.put("log4j.appender.KAFKA.brokerList", "127.0.0.1:9093"); - topicMissingProps.put("log4j.logger.kafka.log4j", "INFO, KAFKA"); - - assertThrows(ConfigException.class, () -> PropertyConfigurator.configure(topicMissingProps), "Missing properties exception was expected !"); - } - - @Test - public void testSetSaslMechanism() { - Properties props = getLog4jConfig(false); - props.put("log4j.appender.KAFKA.SaslMechanism", "PLAIN"); - PropertyConfigurator.configure(props); - - MockKafkaLog4jAppender mockKafkaLog4jAppender = getMockKafkaLog4jAppender(); - - assertEquals(mockKafkaLog4jAppender.getProducerProperties().getProperty(SaslConfigs.SASL_MECHANISM), "PLAIN"); - } - - @Test - public void testSaslMechanismNotSet() { - testProducerPropertyNotSet(SaslConfigs.SASL_MECHANISM); - } - - @Test - public void testSetJaasConfig() { - Properties props = getLog4jConfig(false); - props.put("log4j.appender.KAFKA.ClientJaasConf", "jaas-config"); - PropertyConfigurator.configure(props); - - MockKafkaLog4jAppender mockKafkaLog4jAppender = getMockKafkaLog4jAppender(); - assertEquals(mockKafkaLog4jAppender.getProducerProperties().getProperty(SaslConfigs.SASL_JAAS_CONFIG), "jaas-config"); - } - - @Test - public void testJaasConfigNotSet() { - testProducerPropertyNotSet(SaslConfigs.SASL_JAAS_CONFIG); - } - - private void testProducerPropertyNotSet(String name) { - PropertyConfigurator.configure(getLog4jConfig(false)); - MockKafkaLog4jAppender mockKafkaLog4jAppender = getMockKafkaLog4jAppender(); - assertThat(mockKafkaLog4jAppender.getProducerProperties().stringPropertyNames(), not(hasItem(name))); - } - - @Test - public void testLog4jAppends() { - PropertyConfigurator.configure(getLog4jConfig(false)); - - for (int i = 1; i <= 5; ++i) { - logger.error(getMessage(i)); - } - assertEquals(getMockKafkaLog4jAppender().getHistory().size(), 5); - } - - @Test - public void testSyncSendAndSimulateProducerFailShouldThrowException() { - Properties props = getLog4jConfig(true); - props.put("log4j.appender.KAFKA.IgnoreExceptions", "false"); - PropertyConfigurator.configure(props); - - MockKafkaLog4jAppender mockKafkaLog4jAppender = getMockKafkaLog4jAppender(); - replaceProducerWithMocked(mockKafkaLog4jAppender, false); - - assertThrows(RuntimeException.class, () -> logger.error(getMessage(0))); - } - - @Test - public void testSyncSendWithoutIgnoringExceptionsShouldNotThrowException() { - Properties props = getLog4jConfig(true); - props.put("log4j.appender.KAFKA.IgnoreExceptions", "false"); - PropertyConfigurator.configure(props); - - MockKafkaLog4jAppender mockKafkaLog4jAppender = getMockKafkaLog4jAppender(); - replaceProducerWithMocked(mockKafkaLog4jAppender, true); - - logger.error(getMessage(0)); - } - - @Test - public void testRealProducerConfigWithSyncSendShouldNotThrowException() { - Properties props = getLog4jConfigWithRealProducer(true); - PropertyConfigurator.configure(props); - - logger.error(getMessage(0)); - } - - @Test - public void testRealProducerConfigWithSyncSendAndNotIgnoringExceptionsShouldThrowException() { - Properties props = getLog4jConfigWithRealProducer(false); - PropertyConfigurator.configure(props); - - assertThrows(RuntimeException.class, () -> logger.error(getMessage(0))); - } - - private void replaceProducerWithMocked(MockKafkaLog4jAppender mockKafkaLog4jAppender, boolean success) { - @SuppressWarnings("unchecked") - MockProducer producer = mock(MockProducer.class); - CompletableFuture future = new CompletableFuture<>(); - if (success) - future.complete(new RecordMetadata(new TopicPartition("tp", 0), 0, 0, 0, 0, 0)); - else - future.completeExceptionally(new TimeoutException("simulated timeout")); - when(producer.send(any())).thenReturn(future); - // reconfiguring mock appender - mockKafkaLog4jAppender.setKafkaProducer(producer); - mockKafkaLog4jAppender.activateOptions(); - } - - private MockKafkaLog4jAppender getMockKafkaLog4jAppender() { - return (MockKafkaLog4jAppender) Logger.getRootLogger().getAppender("KAFKA"); - } - - private byte[] getMessage(int i) { - return ("test_" + i).getBytes(StandardCharsets.UTF_8); - } - - private Properties getLog4jConfigWithRealProducer(boolean ignoreExceptions) { - Properties props = new Properties(); - props.put("log4j.rootLogger", "INFO, KAFKA"); - props.put("log4j.appender.KAFKA", "org.apache.kafka.log4jappender.KafkaLog4jAppender"); - props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout"); - props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n"); - props.put("log4j.appender.KAFKA.BrokerList", "127.0.0.2:9093"); - props.put("log4j.appender.KAFKA.Topic", "test-topic"); - props.put("log4j.appender.KAFKA.RequiredNumAcks", "-1"); - props.put("log4j.appender.KAFKA.SyncSend", "true"); - // setting producer timeout (max.block.ms) to be low - props.put("log4j.appender.KAFKA.maxBlockMs", "10"); - // ignoring exceptions - props.put("log4j.appender.KAFKA.IgnoreExceptions", Boolean.toString(ignoreExceptions)); - props.put("log4j.logger.kafka.log4j", "INFO, KAFKA"); - return props; - } - - private Properties getLog4jConfig(boolean syncSend) { - Properties props = new Properties(); - props.put("log4j.rootLogger", "INFO, KAFKA"); - props.put("log4j.appender.KAFKA", "org.apache.kafka.log4jappender.MockKafkaLog4jAppender"); - props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout"); - props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n"); - props.put("log4j.appender.KAFKA.BrokerList", "127.0.0.1:9093"); - props.put("log4j.appender.KAFKA.Topic", "test-topic"); - props.put("log4j.appender.KAFKA.RequiredNumAcks", "-1"); - props.put("log4j.appender.KAFKA.SyncSend", Boolean.toString(syncSend)); - props.put("log4j.logger.kafka.log4j", "INFO, KAFKA"); - return props; - } -} diff --git a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java b/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java deleted file mode 100644 index 981c658b9a8..00000000000 --- a/log4j-appender/src/test/java/org/apache/kafka/log4jappender/MockKafkaLog4jAppender.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.log4jappender; - -import org.apache.kafka.clients.producer.MockProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.test.MockSerializer; - -import org.apache.log4j.spi.LoggingEvent; - -import java.util.List; -import java.util.Properties; - -@SuppressWarnings("deprecation") -public class MockKafkaLog4jAppender extends KafkaLog4jAppender { - private MockProducer mockProducer = - new MockProducer<>(false, new MockSerializer(), new MockSerializer()); - - private Properties producerProperties; - - @Override - protected Producer getKafkaProducer(Properties props) { - producerProperties = props; - return mockProducer; - } - - void setKafkaProducer(MockProducer producer) { - this.mockProducer = producer; - } - - @Override - protected void append(LoggingEvent event) { - if (super.getProducer() == null) { - activateOptions(); - } - super.append(event); - } - - List> getHistory() { - return mockProducer.history(); - } - - public Properties getProducerProperties() { - return producerProperties; - } -} diff --git a/settings.gradle b/settings.gradle index b2dd10b3a78..48f7924737e 100644 --- a/settings.gradle +++ b/settings.gradle @@ -74,7 +74,6 @@ include 'clients', 'group-coordinator', 'group-coordinator:group-coordinator-api', 'jmh-benchmarks', - 'log4j-appender', 'metadata', 'raft', 'server', diff --git a/tests/kafkatest/services/kafka_log4j_appender.py b/tests/kafkatest/services/kafka_log4j_appender.py deleted file mode 100644 index bd60cbcd188..00000000000 --- a/tests/kafkatest/services/kafka_log4j_appender.py +++ /dev/null @@ -1,93 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ducktape.services.background_thread import BackgroundThreadService - -from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin, CORE_DEPENDANT_TEST_LIBS_JAR_NAME -from kafkatest.services.security.security_config import SecurityConfig -from kafkatest.services.kafka.util import fix_opts_for_new_jvm -from kafkatest.version import DEV_BRANCH - - -class KafkaLog4jAppender(KafkaPathResolverMixin, BackgroundThreadService): - logs = { - "producer_log": { - "path": "/mnt/kafka_log4j_appender.log", - "collect_default": False} - } - - def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, security_protocol="PLAINTEXT", - tls_version=None): - super(KafkaLog4jAppender, self).__init__(context, num_nodes) - - self.kafka = kafka - self.topic = topic - self.max_messages = max_messages - self.security_protocol = security_protocol - self.security_config = SecurityConfig(self.context, security_protocol, tls_version=tls_version) - self.stop_timeout_sec = 30 - - for node in self.nodes: - node.version = kafka.nodes[0].version - - def _worker(self, idx, node): - cmd = self.start_cmd(node) - self.logger.debug("VerifiableLog4jAppender %d command: %s" % (idx, cmd)) - self.security_config.setup_node(node) - node.account.ssh(cmd) - - def start_cmd(self, node): - # Since the core module does not contain the log4j-appender, we need to add it manually. - core_dependant_test_libs_jar = self.path.jar(CORE_DEPENDANT_TEST_LIBS_JAR_NAME, DEV_BRANCH) - cmd = fix_opts_for_new_jvm(node) - cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done;" % core_dependant_test_libs_jar - cmd += " export CLASSPATH;" - cmd += self.path.script("kafka-run-class.sh", node) - cmd += " " - cmd += self.java_class_name() - cmd += " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers(self.security_protocol)) - - if self.max_messages > 0: - cmd += " --max-messages %s" % str(self.max_messages) - if self.security_protocol != SecurityConfig.PLAINTEXT: - cmd += " --security-protocol %s" % str(self.security_protocol) - if self.security_protocol == SecurityConfig.SSL or self.security_protocol == SecurityConfig.SASL_SSL: - cmd += " --ssl-truststore-location %s" % str(SecurityConfig.TRUSTSTORE_PATH) - cmd += " --ssl-truststore-password %s" % str(SecurityConfig.ssl_stores.truststore_passwd) - if self.security_protocol == SecurityConfig.SASL_PLAINTEXT or \ - self.security_protocol == SecurityConfig.SASL_SSL or \ - self.security_protocol == SecurityConfig.SASL_MECHANISM_GSSAPI or \ - self.security_protocol == SecurityConfig.SASL_MECHANISM_PLAIN: - cmd += " --sasl-kerberos-service-name %s" % str('kafka') - cmd += " --client-jaas-conf-path %s" % str(SecurityConfig.JAAS_CONF_PATH) - cmd += " --kerb5-conf-path %s" % str(SecurityConfig.KRB5CONF_PATH) - - cmd += " 2>> /mnt/kafka_log4j_appender.log | tee -a /mnt/kafka_log4j_appender.log &" - return cmd - - def stop_node(self, node): - node.account.kill_java_processes(self.java_class_name(), allow_fail=False) - - stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec) - assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \ - (str(node.account), str(self.stop_timeout_sec)) - - def clean_node(self, node): - node.account.kill_java_processes(self.java_class_name(), clean_shutdown=False, - allow_fail=False) - node.account.ssh("rm -rf /mnt/kafka_log4j_appender.log", allow_fail=False) - - def java_class_name(self): - return "org.apache.kafka.tools.VerifiableLog4jAppender" diff --git a/tests/kafkatest/tests/tools/log4j_appender_test.py b/tests/kafkatest/tests/tools/log4j_appender_test.py deleted file mode 100644 index 0287f2f4d0e..00000000000 --- a/tests/kafkatest/tests/tools/log4j_appender_test.py +++ /dev/null @@ -1,97 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from ducktape.utils.util import wait_until -from ducktape.tests.test import Test -from ducktape.mark import matrix -from ducktape.mark.resource import cluster - -from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService, quorum -from kafkatest.services.console_consumer import ConsoleConsumer -from kafkatest.services.kafka_log4j_appender import KafkaLog4jAppender - -TOPIC = "topic-log4j-appender" -MAX_MESSAGES = 100 - - -class Log4jAppenderTest(Test): - """ - Tests KafkaLog4jAppender using VerifiableKafkaLog4jAppender that appends increasing ints to a Kafka topic - """ - def __init__(self, test_context): - super(Log4jAppenderTest, self).__init__(test_context) - self.num_zk = 1 - self.num_brokers = 1 - self.messages_received_count = 0 - self.topics = { - TOPIC: {'partitions': 1, 'replication-factor': 1} - } - - self.zk = ZookeeperService(test_context, self.num_zk) if quorum.for_test(test_context) == quorum.zk else None - - def setUp(self): - if self.zk: - self.zk.start() - - def start_kafka(self, security_protocol, interbroker_security_protocol): - self.kafka = KafkaService( - self.test_context, self.num_brokers, - self.zk, security_protocol=security_protocol, - interbroker_security_protocol=interbroker_security_protocol, topics=self.topics, - controller_num_nodes_override=self.num_zk) - self.kafka.start() - - def start_appender(self, security_protocol): - self.appender = KafkaLog4jAppender(self.test_context, self.num_brokers, self.kafka, TOPIC, MAX_MESSAGES, - security_protocol=security_protocol) - self.appender.start() - - def custom_message_validator(self, msg): - if msg and "INFO : org.apache.kafka.tools.VerifiableLog4jAppender" in msg: - self.logger.debug("Received message: %s" % msg) - self.messages_received_count += 1 - - def start_consumer(self): - self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC, - consumer_timeout_ms=10000, - message_validator=self.custom_message_validator) - self.consumer.start() - - @cluster(num_nodes=4) - @matrix(security_protocol=['PLAINTEXT', 'SSL'], metadata_quorum=quorum.all_non_upgrade) - @cluster(num_nodes=5) - @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'], metadata_quorum=quorum.all_non_upgrade) - def test_log4j_appender(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.zk): - """ - Tests if KafkaLog4jAppender is producing to Kafka topic - :return: None - """ - self.start_kafka(security_protocol, security_protocol) - self.start_appender(security_protocol) - self.appender.wait() - - self.start_consumer() - node = self.consumer.nodes[0] - - wait_until(lambda: self.consumer.alive(node), - timeout_sec=20, backoff_sec=.2, err_msg="Consumer was too slow to start") - - # Verify consumed messages count - wait_until(lambda: self.messages_received_count == MAX_MESSAGES, timeout_sec=10, - err_msg="Timed out waiting to consume expected number of messages.") - - self.consumer.stop() diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java deleted file mode 100644 index 3cd3b0ec99a..00000000000 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.tools; - -import org.apache.kafka.common.security.auth.SecurityProtocol; -import org.apache.kafka.common.utils.Exit; - -import net.sourceforge.argparse4j.ArgumentParsers; -import net.sourceforge.argparse4j.inf.ArgumentParser; -import net.sourceforge.argparse4j.inf.ArgumentParserException; -import net.sourceforge.argparse4j.inf.Namespace; - -import org.apache.log4j.Logger; -import org.apache.log4j.PropertyConfigurator; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.Properties; - -import static net.sourceforge.argparse4j.impl.Arguments.store; - -/** - * Primarily intended for use with system testing, this appender produces message - * to Kafka on each "append" request. For example, this helps with end-to-end tests - * of KafkaLog4jAppender. - * - * When used as a command-line tool, it appends increasing integers. It will produce a - * fixed number of messages unless the default max-messages -1 is used, in which case - * it appends indefinitely. - */ - -public class VerifiableLog4jAppender { - Logger logger = Logger.getLogger(VerifiableLog4jAppender.class); - - // If maxMessages < 0, log until the process is killed externally - private long maxMessages = -1; - - // Hook to trigger logging thread to stop logging messages - private volatile boolean stopLogging = false; - - /** Get the command-line argument parser. */ - private static ArgumentParser argParser() { - ArgumentParser parser = ArgumentParsers - .newArgumentParser("verifiable-log4j-appender") - .defaultHelp(true) - .description("This tool produces increasing integers to the specified topic using KafkaLog4jAppender."); - - parser.addArgument("--topic") - .action(store()) - .required(true) - .type(String.class) - .metavar("TOPIC") - .help("Produce messages to this topic."); - - parser.addArgument("--broker-list") - .action(store()) - .required(true) - .type(String.class) - .metavar("HOST1:PORT1[,HOST2:PORT2[...]]") - .dest("brokerList") - .help("Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,..."); - - parser.addArgument("--max-messages") - .action(store()) - .required(false) - .setDefault(-1) - .type(Integer.class) - .metavar("MAX-MESSAGES") - .dest("maxMessages") - .help("Produce this many messages. If -1, produce messages until the process is killed externally."); - - parser.addArgument("--acks") - .action(store()) - .required(false) - .setDefault("-1") - .type(String.class) - .choices("0", "1", "-1") - .metavar("ACKS") - .help("Acks required on each produced message. See Kafka docs on request.required.acks for details."); - - parser.addArgument("--security-protocol") - .action(store()) - .required(false) - .setDefault("PLAINTEXT") - .type(String.class) - .choices("PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL") - .metavar("SECURITY-PROTOCOL") - .dest("securityProtocol") - .help("Security protocol to be used while communicating with Kafka brokers."); - - parser.addArgument("--ssl-truststore-location") - .action(store()) - .required(false) - .type(String.class) - .metavar("SSL-TRUSTSTORE-LOCATION") - .dest("sslTruststoreLocation") - .help("Location of SSL truststore to use."); - - parser.addArgument("--ssl-truststore-password") - .action(store()) - .required(false) - .type(String.class) - .metavar("SSL-TRUSTSTORE-PASSWORD") - .dest("sslTruststorePassword") - .help("Password for SSL truststore to use."); - - parser.addArgument("--appender.config") - .action(store()) - .required(false) - .type(String.class) - .metavar("CONFIG_FILE") - .help("Log4jAppender config properties file."); - - parser.addArgument("--sasl-kerberos-service-name") - .action(store()) - .required(false) - .type(String.class) - .metavar("SASL-KERBEROS-SERVICE-NAME") - .dest("saslKerberosServiceName") - .help("Name of sasl kerberos service."); - - parser.addArgument("--client-jaas-conf-path") - .action(store()) - .required(false) - .type(String.class) - .metavar("CLIENT-JAAS-CONF-PATH") - .dest("clientJaasConfPath") - .help("Path of JAAS config file of Kafka client."); - - parser.addArgument("--kerb5-conf-path") - .action(store()) - .required(false) - .type(String.class) - .metavar("KERB5-CONF-PATH") - .dest("kerb5ConfPath") - .help("Path of Kerb5 config file."); - - return parser; - } - - /** - * Read a properties file from the given path - * @param filename The path of the file to read - * - * Note: this duplication of org.apache.kafka.common.utils.Utils.loadProps is unfortunate - * but *intentional*. In order to use VerifiableProducer in compatibility and upgrade tests, - * we use VerifiableProducer from the development tools package, and run it against 0.8.X.X kafka jars. - * Since this method is not in Utils in the 0.8.X.X jars, we have to cheat a bit and duplicate. - */ - public static Properties loadProps(String filename) throws IOException { - Properties props = new Properties(); - try (InputStream propStream = Files.newInputStream(Paths.get(filename))) { - props.load(propStream); - } - return props; - } - - /** Construct a VerifiableLog4jAppender object from command-line arguments. */ - public static VerifiableLog4jAppender createFromArgs(String[] args) { - ArgumentParser parser = argParser(); - VerifiableLog4jAppender producer = null; - - try { - Namespace res = parser.parseArgs(args); - - int maxMessages = res.getInt("maxMessages"); - String topic = res.getString("topic"); - String configFile = res.getString("appender.config"); - - Properties props = new Properties(); - props.setProperty("log4j.rootLogger", "INFO, KAFKA"); - props.setProperty("log4j.appender.KAFKA", "org.apache.kafka.log4jappender.KafkaLog4jAppender"); - props.setProperty("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout"); - props.setProperty("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n"); - props.setProperty("log4j.appender.KAFKA.BrokerList", res.getString("brokerList")); - props.setProperty("log4j.appender.KAFKA.Topic", topic); - props.setProperty("log4j.appender.KAFKA.RequiredNumAcks", res.getString("acks")); - props.setProperty("log4j.appender.KAFKA.SyncSend", "true"); - final String securityProtocol = res.getString("securityProtocol"); - if (securityProtocol != null && !securityProtocol.equals(SecurityProtocol.PLAINTEXT.toString())) { - props.setProperty("log4j.appender.KAFKA.SecurityProtocol", securityProtocol); - } - if (securityProtocol != null && securityProtocol.contains("SSL")) { - props.setProperty("log4j.appender.KAFKA.SslTruststoreLocation", res.getString("sslTruststoreLocation")); - props.setProperty("log4j.appender.KAFKA.SslTruststorePassword", res.getString("sslTruststorePassword")); - } - if (securityProtocol != null && securityProtocol.contains("SASL")) { - props.setProperty("log4j.appender.KAFKA.SaslKerberosServiceName", res.getString("saslKerberosServiceName")); - props.setProperty("log4j.appender.KAFKA.clientJaasConfPath", res.getString("clientJaasConfPath")); - props.setProperty("log4j.appender.KAFKA.kerb5ConfPath", res.getString("kerb5ConfPath")); - } - props.setProperty("log4j.logger.kafka.log4j", "INFO, KAFKA"); - // Changing log level from INFO to WARN as a temporary workaround for KAFKA-6415. This is to - // avoid deadlock in system tests when producer network thread appends to log while updating metadata. - props.setProperty("log4j.logger.org.apache.kafka.clients.Metadata", "WARN, KAFKA"); - - if (configFile != null) { - try { - props.putAll(loadProps(configFile)); - } catch (IOException e) { - throw new ArgumentParserException(e.getMessage(), parser); - } - } - - producer = new VerifiableLog4jAppender(props, maxMessages); - } catch (ArgumentParserException e) { - if (args.length == 0) { - parser.printHelp(); - Exit.exit(0); - } else { - parser.handleError(e); - Exit.exit(1); - } - } - - return producer; - } - - - public VerifiableLog4jAppender(Properties props, int maxMessages) { - this.maxMessages = maxMessages; - PropertyConfigurator.configure(props); - } - - public static void main(String[] args) { - - final VerifiableLog4jAppender appender = createFromArgs(args); - boolean infinite = appender.maxMessages < 0; - - // Trigger main thread to stop producing messages when shutting down - Exit.addShutdownHook("verifiable-log4j-appender-shutdown-hook", () -> appender.stopLogging = true); - - long maxMessages = infinite ? Long.MAX_VALUE : appender.maxMessages; - for (long i = 0; i < maxMessages; i++) { - if (appender.stopLogging) { - break; - } - appender.append(String.format("%d", i)); - } - } - - private void append(String msg) { - logger.info(msg); - } -}