如何在 Apache Flink 中连接 RabbitMQ“流”?

问题描述 投票:0回答:1

有没有办法在 Apache Flink 中连接 RabbitMQ“流”而不是队列?

谢谢!

我能够获取 RabbitMQ 队列作为在 Apache Flink 中工作的源,但如何获取在 Flink 中工作的 RabbitMQ 流?

rabbitmq apache-flink
1个回答
0
投票

Flink 支持官方 RabbitMQ 连接器,它应该支持您使用 RabbitMQ 作为源或接收器所需的任何实现。

如果您已经定义了一个从 RabbitMQ 作为队列读取的源,您可能可以通过创建您自己的

RMQSource
类扩展来覆盖它以支持您所需的行为,这将允许您更细粒度地控制和配置来源的行为。

根据来源,您应该能够在自己的实现中重写

RMQSource.setupQueue()
函数,以避免显式使用队列:

protected void setupQueue() throws IOException {
    Util.declareQueueDefaults(channel, queueName);
}

这篇博文虽然有点过时,但提供了类似内容的示例实现:

public class RabbitmqStreamProcessor extends RMQSource{

    // This is mainly because we have to bind our queue to an exchange. 
    // If you are using a queue directly, you may skip it.
    @Override
    protected void setupQueue() throws IOException {
        AMQP.Queue.DeclareOk result = channel.queueDeclare("simple_dev", true, false, false, null);
        channel.queueBind(result.getQueue(), "simple_exchange", "*");
    }
}

© www.soinside.com 2019 - 2024. All rights reserved.