xxxxx 的 1 条记录即将过期:自批创建以来已过去 30030 毫秒加上逗留时间

问题描述 投票:0回答:1

我的用例: 使用 Postman,我调用了一个 Spring boot soap 端点。端点创建一个 KafkaProducer 并将消息发送到特定主题。我还有一个 TaskScheduler 来使用该主题。

问题: 调用 soap 向主题推送消息时,出现此错误:

2017-11-14 21:29:31.463 错误 6389 --- [广告 |制作人-3] DomainEntityProducer : 1 条记录即将过期 DomainEntityCommandStream-0:自批次创建以来已过去 30030 毫秒 加上逗留时间 2017-11-14 21:29:31.464 错误 6389 --- [nio-8080-exec-6] 域实体生产者: org.apache.kafka.common.errors.TimeoutException:1 条记录即将过期 对于 DomainEntityCommandStream-0:自批次以来已过去 30030 毫秒 创作加逗留时间

下面是我推入主题的方法:

public DomainEntity push(DomainEntity pDomainEntity) throws Exception {
    logger.log(Level.INFO, "streaming...");
    wKafkaProperties.put("bootstrap.servers", "localhost:9092");
    wKafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    wKafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    KafkaProducer wKafkaProducer = new KafkaProducer(wKafkaProperties);
    ProducerRecord wProducerRecord = new ProducerRecord("DomainEntityCommandStream", getJSON(pDomainEntity));
    wKafkaProducer.send(wProducerRecord, (RecordMetadata r, Exception e) -> {
        if (e != null) {
            logger.log(Level.SEVERE, e.getMessage());
        }
    }).get();
    return pDomainEntity;
}

使用命令 shell 脚本

./kafka-console-producer.sh --broker-list 10.0.1.15:9092 --topic 域实体命令流

./kafka-console-consumer.sh --boostrap-server 10.0.1.15:9092 --topic DomainEntityCommandStream——从头开始

效果很好。

在Stackoverflow上经历了一些相关问题,我试图清除主题:

./kafka-topics.sh --zookeeper 10.0.1.15:9092 --alter --topic DomainEntityCommandStream --config retention.ms=1000

查看 kafka 日志,我发现保留时间已更改。

但是,不走运,我得到了同样的错误。

payload 小得离谱,为什么我要改变 batch.size?

<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"
                  xmlns:gs="http://soap.problem.com">
   <soapenv:Header/>
   <soapenv:Body>
      <gs:streamDomainEntityRequest>
         <gs:domainEntity>
                <gs:name>12345</gs:name>
                <gs:value>Quebec</gs:value>
                <gs:version>666</gs:version>
            </gs:domainEntity>
      </gs:streamDomainEntityRequest>
   </soapenv:Body>
</soapenv:Envelope>
java docker apache-kafka kafka-producer-api
1个回答
-1
投票

使用 Docker 和 Kafka 镜像,您需要将以下环境参数添加到容器中:

KAFKA_ZOOKEEPER_CONNECT = X.X.X.X:XXXX(你的 zookeeper IP 或域名:端口默认 2181)

KAFKA_ADVERTISED_HOST_NAME = X.X.X.X(您的 kafka IP 或域名)

KAFKA_ADVERTISED_PORT = XXXX(你的kafka PORT号默认9092)

可选:

KAFKA_BROKER_ID = 999(一些值)

KAFKA_CREATE_TOPICS=test:1:1(开始时创建的主题名称)

如果它不起作用并且您仍然收到相同的消息(“xxxxx 的 X 记录过期:自批创建以来已经过去了 XXXXX 毫秒加上逗留时间”)您可以尝试从 zookeeper 中清理 kafka 数据。

© www.soinside.com 2019 - 2024. All rights reserved.