KAFKA:分离并重试(还剩1次尝试)。错误:MESSAGE_TOO_LARGE。MESSAGE_TOO_LARGE。

问题描述 投票:0回答:1

我正在发送10条消息。2条消息是 "正确的",1条消息的大小超过1MB,被Kafka broker拒绝,原因是 RecordTooLargeException.

我有2个疑问1)MESSAGE_TOO_LARGE只有在调度器第二次调用方法时才会出现,当调度器第一次调用方法时进行拆分重试(还剩1次)。错误。MESSAGE_TOO_LARGE does not appear.2)为什么重试次数没有减少,我给了retry=1。

我使用Spring Boot Scheduling机制调用Sender类。

@Scheduled(fixedDelay = 30000)
    public void process() {

        sender.sendThem();

    }

我正在使用Spring Boot KafkaTemplate。

@Configuration
@EnableKafka
public class KakfaConfiguration {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> config = new HashMap<>();

        // props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
        // props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
        // appProps.getJksLocation());
        // props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
        // appProps.getJksPassword());
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.ACKS_CONFIG, acks);
        config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackOffMsConfig);
        config.put(ProducerConfig.RETRIES_CONFIG, retries);
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "prod-99");

        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean(name = "ktm")
    public KafkaTransactionManager kafkaTransactionManager() {
        KafkaTransactionManager ktm = new KafkaTransactionManager(producerFactory());
        ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
        return ktm;
    }

}
@Component
@EnableTransactionManagement
class Sender {

    @Autowired
    private KafkaTemplate<String, String> template;

    private static final Logger LOG = LoggerFactory.getLogger(Sender.class);

    @Transactional("ktm")
    public void sendThem(List<String> toSend) throws InterruptedException {
        List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
        CountDownLatch latch = new CountDownLatch(toSend.size());
        ListenableFutureCallback<SendResult<String, String>> callback = new ListenableFutureCallback<SendResult<String, String>>() {

            @Override
            public void onSuccess(SendResult<String, String> result) {
                LOG.info(" message sucess : " + result.getProducerRecord().value());
                latch.countDown();
            }

            @Override
            public void onFailure(Throwable ex) {
                LOG.error("Message Failed ");
                latch.countDown();
            }
        };

        toSend.forEach(str -> {
            ListenableFuture<SendResult<String, String>> future = template.send("t_101", str);
            future.addCallback(callback);
        });

        if (latch.await(12, TimeUnit.MINUTES)) {
            LOG.info("All sent ok");
        } else {
            for (int i = 0; i < toSend.size(); i++) {
                if (!futures.get(i).isDone()) {
                    LOG.error("No send result for " + toSend.get(i));
                }
            }
        }

我得到的日志如下

2020-05-01 15:55:18.346  INFO 6476 --- [   scheduling-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1588328718345
2020-05-01 15:55:18.347  INFO 6476 --- [   scheduling-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-prod-991, transactionalId=prod-991] ProducerId set to -1 with epoch -1
2020-05-01 15:55:18.351  INFO 6476 --- [oducer-prod-991] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-prod-991, transactionalId=prod-991] Cluster ID: bL-uhcXlRSWGaOaSeDpIog
2020-05-01 15:55:48.358  INFO 6476 --- [oducer-prod-991] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-prod-991, transactionalId=prod-991] ProducerId set to 13000 with epoch 10
 Value of kafka template----- 1518752790
2020-05-01 15:55:48.377  WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 8 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.379  INFO 6476 --- [oducer-prod-991] com.a.kafkaproducer.producer.Sender  :  message sucess : TTTT0
2020-05-01 15:55:48.379  INFO 6476 --- [oducer-prod-991] com.a.kafkaproducer.producer.Sender  :  message sucess : TTTT1
2020-05-01 15:55:48.511 ERROR 6476 --- [oducer-prod-991] com.a.kafkaproducer.producer.Sender  : Message Failed 
2020-05-01 15:55:48.512 ERROR 6476 --- [oducer-prod-991] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='

2020-05-01 15:55:48.514  WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 10 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.518  WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 11 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.523  WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 12 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.527  WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 13 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.531  WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 14 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.534  WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 15 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.538  WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 16 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.542  WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 17 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.546  WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 18 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE

然后过了一段时间,程序完成后,会有以下记录

Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for t_101-0:120000 ms has passed since batch creation

2020-05-01 16:18:31.322  WARN 17816 --- [   scheduling-1] o.s.k.core.DefaultKafkaProducerFactory   : Error during transactional operation; producer removed from cache; possible cause: broker restarted during transaction: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@7085a4dd, txId=prod-991]
2020-05-01 16:18:31.322  INFO 17816 --- [   scheduling-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-prod-991, transactionalId=prod-991] Closing the Kafka producer with timeoutMillis = 5000 ms.
2020-05-01 16:18:31.324  INFO 17816 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Aborting incomplete transaction due to shutdown
 error messahe here
------ processing done in parent class------
spring-boot apache-kafka spring-kafka kafka-producer-api
1个回答
1
投票

下面给出了生产者工作流程的大致情况。

enter image description here

通过设置 RETRIES_CONFIG 属性,我们可以保证在失败的情况下,这个生产者会尝试发送该消息。

如果 批量过大,我们拆分批次并再次发送拆分后的批次。在这种情况下,我们不递减重试次数。

你可以通过下面给出的源码,找到重试次数被递减的情况。

https:/github.comapachekafkablob68ac551966e2be5b13adb2f703a01211e6f7a34bclientssrcmainjavaorgapachekafkaclientsproducerinternalsSender.java。

© www.soinside.com 2019 - 2024. All rights reserved.