KAFKA-10407: Have KafkaLog4jAppender support `linger.ms` and `batch.size` (#9189)

* KAFKA-10407: Have KafkaLog4jAppender `batch.size` and `linger.ms`

https://issues.apache.org/jira/browse/KAFKA-10407

Currently, KafkaLog4jAppender does not support `batch.size` or `linger.ms` which would otherwise be beneficial in some situations.
This commit is contained in:
huxi 2020-08-20 09:18:11 +08:00 committed by GitHub
parent b7856df21b
commit dc2b7d6f88
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 22 additions and 0 deletions

View File

@ -35,9 +35,11 @@ import java.util.concurrent.Future;
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
@ -76,6 +78,8 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
private int retries = Integer.MAX_VALUE;
private int requiredNumAcks = 1;
private int deliveryTimeoutMs = 120000;
private int lingerMs = 0;
private int batchSize = 16384;
private boolean ignoreExceptions = true;
private boolean syncSend;
private Producer<byte[], byte[]> producer;
@ -100,6 +104,22 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
this.requiredNumAcks = requiredNumAcks;
}
public int getLingerMs() {
return lingerMs;
}
public void setLingerMs(int lingerMs) {
this.lingerMs = lingerMs;
}
public int getBatchSize() {
return batchSize;
}
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
public int getRetries() {
return retries;
}
@ -269,6 +289,8 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
props.put(ACKS_CONFIG, Integer.toString(requiredNumAcks));
props.put(RETRIES_CONFIG, retries);
props.put(DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeoutMs);
props.put(LINGER_MS_CONFIG, lingerMs);
props.put(BATCH_SIZE_CONFIG, batchSize);
if (securityProtocol != null) {
props.put(SECURITY_PROTOCOL_CONFIG, securityProtocol);