如何在kafka生产者和消费者中重试短暂的Avro序列化/反序列化问题?

问题描述 投票:0回答:2

由于基础设施问题,我偶尔会看到从 avro 架构注册表获取架构超时。

我没有完整的堆栈跟踪,但典型的错误消息是:

org.apache.kafka.common.errors.SerializationException: Could not register new Avro schema: Could not retrieve schema for subject 'my-topic-name': connect timed out

这会将问题定位到

IOException
中的
com.tibco.messaging.kafka.avro.AvroSerializer.getSchemaID

我已经编写了一个重试序列化程序来解决该问题(并且我需要对反序列化执行相同的操作),但我想知道是否可以进行任何配置来避免编写此代码。

import com.tibco.messaging.kafka.avro.AvroSerializer;
import com.tibco.messaging.schema.registry.SchemaRegistryCache;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class RetryingAvroSerializer implements Serializer<Object> {
    private static Logger log = LoggerFactory.getLogger(RetryingAvroSerializer.class);
    private final AvroSerializer avroSerializer = new AvroSerializer();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        avroSerializer.configure(configs, isKey);
    }

    public void configure(Map<String, ?> configs, boolean isKey, SchemaRegistryCache sharedCache) {
        avroSerializer.configure(configs, isKey, sharedCache);
    }

    @Override
    public byte[] serialize(String topic, Object data) {
        int retries = 5;
        int delay = 10;
        while (true) {
            try {
                return avroSerializer.serialize(topic, data);
            } catch (org.apache.kafka.common.errors.SerializationException e) {
                if (retries == 0) {
                    throw e;
                }
                log.warn("Retrying serialization for {}", topic);
                try {
                    Thread.sleep(delay);
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                }
                retries--;
                delay *= 2;
            }
        }
    }
}
java apache-kafka avro
2个回答
1
投票

仅 Kafka 生产者就已经拥有整个请求的

retries
属性。

所以,不,Kafka(或者更确切地说,Confluence)序列化器没有重试机制,因此如果您只想重试序列化,则需要像您所编写的代码。

或者,我建议调整架构注册表以获得更高的可用性(运行更多实例?使用负载均衡器?)


0
投票

现在看来,您可以执行如下操作,仅通过配置而不需要自定义代码即可获得完全相同的结果。

  @Bean
  public RetryTemplate retryForeverNetworkErrorsTemplate() {
    return RetryTemplate.builder()
        .infiniteRetry()
        .retryOn(IOException.class)
        .traversingCauses()
        .build();
  }


  @Bean
  public RetryingDeserializer<Object> retryingValueDeserializer(
      RetryTemplate retryForeverNetworkErrorsTemplate) {
    return new RetryingDeserializer<>(
        new ErrorHandlingDeserializer<>(new KafkaAvroDeserializer()),
        retryForeverNetworkErrorsTemplate);
  }

  @Bean
  public ConsumerFactory<String, Object> consumerFactory(
      RetryingDeserializer<String> retryingKeyDeserializer,
      RetryingDeserializer<Object> retryingValueDeserializer) {
    return new DefaultKafkaConsumerFactory<>(
            consumerConfigs(), retryingKeyDeserializer, retryingValueDeserializer);
  }
© www.soinside.com 2019 - 2024. All rights reserved.