两个kafkaTemplate对象之间的动态切换

问题描述 投票:0回答:1

我有两个kafka群集(主动-被动)。我创建了两个KafkaTemplate对象来产生事件(每个事件一个)。只要运行状况良好,我将继续使用主kafkaTemplate发送事件。如果有任何问题,我想动态切换到第二个kafkaTemplate。我不想使用if条件在要发送的对象之间切换(不得已)

是否有一种动态切换对象的方法?

消费者方面,我们有MessageListenerContainers,并且我们有setAutoStartUp api来管理多个侦听器的启动和停止

有人可以帮助我与制作人吗?!

apache-kafka spring-kafka kafka-producer-api
1个回答
0
投票

只需实现您自己的KafkaOperations,即可委派给活动模板并在需要时进行故障转移。

编辑

例如:

public class DelegatingTemplate implements KafkaOperations<String, String> {

    private final KafkaTemplate<String, String> template1;

    private final KafkaTemplate<String, String> template2;

    private volatile KafkaTemplate<String, String> currentTemplate;

    public DelegatingTemplate(KafkaTemplate<String, String> template1, KafkaTemplate<String, String> template2) {
        this.template1 = template1;
        this.template2 = template2;
        this.currentTemplate = template1;
    }

    public void switchTemplates(boolean primary) {
        this.currentTemplate = primary ? template1 : template2;
    }

    @Override
    public boolean isTransactional() {
        return this.currentTemplate.isTransactional();
    }

    @Override
    public ListenableFuture<SendResult<String, String>> sendDefault(String data) {
        return this.currentTemplate.sendDefault(data);
    }

    @Override
    public ListenableFuture<SendResult<String, String>> sendDefault(String key, String data) {
        return this.currentTemplate.sendDefault(key, data);
    }

    @Override
    public ListenableFuture<SendResult<String, String>> sendDefault(Integer partition, String key, String data) {
        return this.currentTemplate.sendDefault(partition, key, data);
    }

    @Override
    public ListenableFuture<SendResult<String, String>> sendDefault(Integer partition, Long timestamp, String key,
            String data) {
        return this.currentTemplate.sendDefault(partition, timestamp, key, data);
    }

    @Override
    public ListenableFuture<SendResult<String, String>> send(String topic, String data) {
        return this.currentTemplate.send(topic, data);
    }

    @Override
    public ListenableFuture<SendResult<String, String>> send(String topic, String key, String data) {
        return this.currentTemplate.send(topic, key, data);
    }

    @Override
    public ListenableFuture<SendResult<String, String>> send(String topic, Integer partition, String key, String data) {
        return this.currentTemplate.send(topic, partition, key, data);
    }

    @Override
    public ListenableFuture<SendResult<String, String>> send(String topic, Integer partition, Long timestamp,
            String key, String data) {
        return this.currentTemplate.send(topic, partition, timestamp, key, data);
    }

    @Override
    public ListenableFuture<SendResult<String, String>> send(ProducerRecord<String, String> record) {
        return this.currentTemplate.send(record);
    }

    @Override
    public ListenableFuture<SendResult<String, String>> send(Message<?> message) {
        return this.currentTemplate.send(message);
    }

    @Override
    public List<PartitionInfo> partitionsFor(String topic) {
        return this.currentTemplate.partitionsFor(topic);
    }

    @Override
    public Map<MetricName, ? extends Metric> metrics() {
        return this.currentTemplate.metrics();
    }

    @Override
    public <T> T execute(ProducerCallback<String, String, T> callback) {
        return this.currentTemplate.execute(callback);
    }

    @Override
    public <T> T executeInTransaction(OperationsCallback<String, String, T> callback) {
        return this.currentTemplate.executeInTransaction(callback);
    }

    @Override
    public String toString() {
        return this.currentTemplate.toString();
    }

    @Override
    public void flush() {
        this.currentTemplate.flush();
    }

    @Override
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets) {
        this.currentTemplate.sendOffsetsToTransaction(offsets);
    }

    @Override
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) {
        this.currentTemplate.sendOffsetsToTransaction(offsets, consumerGroupId);
    }

}
© www.soinside.com 2019 - 2024. All rights reserved.