如何禁用Kafka内部日志?

问题描述 投票:0回答:2

我想禁用我没有写的Kafka内部日志。 下面的代码是我的基本 kafka 生产者代码。

package me.sclee.kafka.basic.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.UUID;

import static me.sclee.kafka.basic.config.BasicKafkaConfig.*;

public class BasicKafkaProducer {

    private static final Logger logger = LoggerFactory.getLogger(BasicKafkaProducer.class);

    private static KafkaProducer<String, String> getKafkaProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.ACKS_CONFIG, ACK);
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, COMPRESSION_TYPE);
        props.put(ProducerConfig.RETRIES_CONFIG, RETIRES);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, BATCH_SIZE);
        props.put(ProducerConfig.LINGER_MS_CONFIG, LINGER_MS);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, BUFFER_MEMORY);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SER);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VALUE_SER);
        return new KafkaProducer<>(props);
    }

    public void send() throws Exception {
        logger.info("Sending kafka message..");

        KafkaProducer<String, String> producer = null;

        int line = 3;
        try {
            for (int n = 0; n < line; n++) {
                producer = getKafkaProducer();
                String uuid = UUID.randomUUID().toString();
                ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, uuid);
                producer.send(record, new ProducerCallBack(n));
                Thread.sleep(1000);
            }
        } catch (Exception e) {
            logger.error("There was a problem while sending a message in producer, {}" + e.getMessage());
            throw e;
        } finally {
            if (producer != null) {
                producer.flush();
                producer.close();
            }
        }
        logger.info("Exited the Kafka sending..");
    }

    public static void main(String[] args) throws Exception {
        BasicKafkaProducer basicKafkaProducer = new BasicKafkaProducer();
        basicKafkaProducer.send();
    }
}

如您所见,我为跟踪添加了一些日志,但是当我执行时,我看到了由内部日志生成的许多其他日志,如下所示。

我想在代码中查看我的日志以便更好地理解。 如何实现?

我使用了以下 slf4j 库。

<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>2.0.0-alpha1</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>2.0.0-alpha1</version>
        </dependency>

控制台

    batch.size = 16384
    bootstrap.servers = [localhost:9092]
    buffer.memory = 33554432
    client.dns.lookup = use_all_dns_ips
    client.id = producer-1
    compression.type = snappy
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = true
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 5
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 1
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer

[Timer-0] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.0.0
[Timer-0] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962
[Timer-0] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1642705323290
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: oXvU2NlRS8m6jKC3Zh1FcA
[kafka-producer-network-thread | producer-1] INFO me.sclee.kafka.basic.producer.ProducerCallBack - Producer sends a message. Topic : 1642705323550, partition : 0, offset : 139, line : 0
[Timer-0] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: 
    acks = -1
    batch.size = 16384
    bootstrap.servers = [localhost:9092]
    buffer.memory = 33554432
    client.dns.lookup = use_all_dns_ips
    client.id = producer-2
    compression.type = snappy
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = true
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 5
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 1
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer

[Timer-0] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.0.0
[Timer-0] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962
[Timer-0] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1642705324831
[kafka-producer-network-thread | producer-2] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-2] Cluster ID: oXvU2NlRS8m6jKC3Zh1FcA
[kafka-producer-network-thread | producer-2] INFO me.sclee.kafka.basic.producer.ProducerCallBack - Producer sends a message. Topic : 1642705324834, partition : 0, offset : 140, line : 1
[Timer-0] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: 
    acks = -1
    batch.size = 16384
    bootstrap.servers = [localhost:9092]
    buffer.memory = 33554432
    client.dns.lookup = use_all_dns_ips
    client.id = producer-3
    compression.type = snappy
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = true
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 5
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 1
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer

[Timer-0] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.0.0
[Timer-0] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962
[Timer-0] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1642705325844
[kafka-producer-network-thread | producer-3] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-3] Cluster ID: oXvU2NlRS8m6jKC3Zh1FcA
[kafka-producer-network-thread | producer-3] INFO me.sclee.kafka.basic.producer.ProducerCallBack - Producer sends a message. Topic : 1642705325848, partition : 0, offset : 141, line : 2
java apache-kafka log4j
2个回答
2
投票

您可以创建自己的

src/main/resources/log4j.properties
文件并配置您想要的任何级别/包/格式。

例如,

log4j.logger.kafka=OFF
log4j.logger.org.apache.kafka=OFF

默认值(针对代理和客户端)在此处定义https://github.com/apache/kafka/blob/trunk/config/log4j.properties


0
投票

在我们的例子中,额外的日志是由

TracingProducerInterceptor
写入的,它已在
ReactiveKafkaProducerTemplate

的 bean 中注册
 @Bean
  public ReactiveKafkaProducerTemplate<YourModelEventKey, YourModelEvent> reactiveKafkaProducerTemplate(
      final KafkaProperties properties) {
    var props = properties.buildProducerProperties();
    props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Collections.singletonList(TracingProducerInterceptor.class));
    return new ReactiveKafkaProducerTemplate<>(SenderOptions.create(props));
  }

解决方案:

logging.level.brave.Tracer=warn
用于 application.properties 对于 application.yaml 使用如下

logging.level
    brave.Tracer: warn

注意:我们只对警告或错误感兴趣。如果需要,您可以通过设置

OFF

来完全禁用
© www.soinside.com 2019 - 2024. All rights reserved.