因此,问题是关于在流函数中使用外部状态存储的安全性,如过滤器,映射等。
做这样的事情是可以的:
JedisPool pool = ...;
KStream stream = ...;
stream.map((k, v) -> {
JedisClient client = pool.getResource();
....
client.close();
});
...
KafkaStreams streams = ...;
由于在多个流媒体任务中使用单个池,它会导致错误吗?在apache flink中我可以使用Rich*Function<>
,我可以在open
方法中只将一次连接池配置到任何存储。在apache spark中我也可以配置全局连接。我是否需要使用kafka流来做同样的事情?
相当于Rich*Function
将使用transform()
而不是map()
,允许你init()
和close()
一个Transformer
。
你的方法也应该有用,即使你可能想要try-catch
来确保执行close()
。但是,这不是推荐的模式。
根据您的使用情况,最好将Redis中的数据加载到Kafka主题中(不确定是否有Redis连接器)并将数据加载到KTable中。你可以做一个流表加入而不是map()
或transform()
。
不建议将Redis用于Spring Cloud Stream生产 - 绑定器功能不全,可能会丢失消息。