有没有办法在 Apache Flink 中连接 RabbitMQ“流”而不是队列?
谢谢!
我能够获取 RabbitMQ 队列作为在 Apache Flink 中工作的源,但如何获取在 Flink 中工作的 RabbitMQ 流?
Flink 支持官方 RabbitMQ 连接器,它应该支持您使用 RabbitMQ 作为源或接收器所需的任何实现。
如果您已经定义了一个从 RabbitMQ 作为队列读取的源,您可能可以通过创建您自己的
RMQSource
类扩展来覆盖它以支持您所需的行为,这将允许您更细粒度地控制和配置来源的行为。
根据来源,您应该能够在自己的实现中重写
RMQSource.setupQueue()
函数,以避免显式使用队列:
protected void setupQueue() throws IOException {
Util.declareQueueDefaults(channel, queueName);
}
这篇博文虽然有点过时,但提供了类似内容的示例实现:
public class RabbitmqStreamProcessor extends RMQSource{
// This is mainly because we have to bind our queue to an exchange.
// If you are using a queue directly, you may skip it.
@Override
protected void setupQueue() throws IOException {
AMQP.Queue.DeclareOk result = channel.queueDeclare("simple_dev", true, false, false, null);
channel.queueBind(result.getQueue(), "simple_exchange", "*");
}
}