由于基础设施问题,我偶尔会看到从 avro 架构注册表获取架构超时。
我没有完整的堆栈跟踪,但典型的错误消息是:
org.apache.kafka.common.errors.SerializationException: Could not register new Avro schema: Could not retrieve schema for subject 'my-topic-name': connect timed out
这会将问题定位到
IOException
中的 com.tibco.messaging.kafka.avro.AvroSerializer.getSchemaID
。
我已经编写了一个重试序列化程序来解决该问题(并且我需要对反序列化执行相同的操作),但我想知道是否可以进行任何配置来避免编写此代码。
import com.tibco.messaging.kafka.avro.AvroSerializer;
import com.tibco.messaging.schema.registry.SchemaRegistryCache;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class RetryingAvroSerializer implements Serializer<Object> {
private static Logger log = LoggerFactory.getLogger(RetryingAvroSerializer.class);
private final AvroSerializer avroSerializer = new AvroSerializer();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
avroSerializer.configure(configs, isKey);
}
public void configure(Map<String, ?> configs, boolean isKey, SchemaRegistryCache sharedCache) {
avroSerializer.configure(configs, isKey, sharedCache);
}
@Override
public byte[] serialize(String topic, Object data) {
int retries = 5;
int delay = 10;
while (true) {
try {
return avroSerializer.serialize(topic, data);
} catch (org.apache.kafka.common.errors.SerializationException e) {
if (retries == 0) {
throw e;
}
log.warn("Retrying serialization for {}", topic);
try {
Thread.sleep(delay);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
retries--;
delay *= 2;
}
}
}
}
仅 Kafka 生产者就已经拥有整个请求的
retries
属性。
所以,不,Kafka(或者更确切地说,Confluence)序列化器没有重试机制,因此如果您只想重试序列化,则需要像您所编写的代码。
或者,我建议调整架构注册表以获得更高的可用性(运行更多实例?使用负载均衡器?)
现在看来,您可以执行如下操作,仅通过配置而不需要自定义代码即可获得完全相同的结果。
@Bean
public RetryTemplate retryForeverNetworkErrorsTemplate() {
return RetryTemplate.builder()
.infiniteRetry()
.retryOn(IOException.class)
.traversingCauses()
.build();
}
@Bean
public RetryingDeserializer<Object> retryingValueDeserializer(
RetryTemplate retryForeverNetworkErrorsTemplate) {
return new RetryingDeserializer<>(
new ErrorHandlingDeserializer<>(new KafkaAvroDeserializer()),
retryForeverNetworkErrorsTemplate);
}
@Bean
public ConsumerFactory<String, Object> consumerFactory(
RetryingDeserializer<String> retryingKeyDeserializer,
RetryingDeserializer<Object> retryingValueDeserializer) {
return new DefaultKafkaConsumerFactory<>(
consumerConfigs(), retryingKeyDeserializer, retryingValueDeserializer);
}