Apache ignite 数据流传输器

问题描述 投票:0回答:1

我对 Ignite 的流媒体部分有疑问。

我的理解是,这是将数据导入缓存的方法,但我也看到我们可以配置流接收器来应用一些其他自定义逻辑。

因此,我尝试创建一个带有接收器的类和一个将数据注入流中的类(因此服务器模式下有 2 个主实例和 2 个 Ignite 实例),但我“只是”将数据放入流媒体的缓存中(没有任何自定义逻辑处理到接收器中)。所以,我想问一下我是否错过了一些东西,或者我是否不太了解什么是 Streams into Ignite。

如果我将发送器部分放入接收器中,我就可以打印出来。

有人知道我做(或理解)错了什么吗?

接收器类:

public class Receiver {
    public static void main(String[] args){
        IgniteConfiguration igniteConfig = new IgniteConfiguration();
        CacheConfiguration<String, String> cacheConfig = new CacheConfiguration<>("CacheStream");   


        igniteConfig.setCacheConfiguration(cacheConfig);

        
        Ignite ignite = Ignition.getOrStart(igniteConfig);
        
        IgniteDataStreamer<String, String> streamer = ignite.dataStreamer("CacheStream");
        
        streamer.receiver(StreamVisitor.from((cacheLambda, e) -> {
            System.out.println("Value : " + e.getValue());
        }));
    }
}

发送者类别:

public class Sender {
    public static void main(String[] args){
        IgniteConfiguration igniteConfig = new IgniteConfiguration();
        CacheConfiguration<String, String> cacheConfig = new CacheConfiguration<>("CacheStream");

        igniteConfig.setCacheConfiguration(cacheConfig);

        Ignite ignite = Ignition.getOrStart(igniteConfig);
        
        IgniteDataStreamer<String, String> streamer = ignite.dataStreamer("CacheStream");
        
        for(int i = 0 ; i < 10 ; i++){
            streamer.addData("key-"+i, "value-"+i);
        }
        streamer.flush();
    }
}
java stream ignite
1个回答
1
投票

ignite.dataStreamer("CacheStream") 不会返回您之前创建的相同数据流,它每次都会创建新的数据流。

因此,在您的情况下,您配置了 2 个不同的数据流传输器,并且使用未配置接收器的流传输器上传数据。

© www.soinside.com 2019 - 2024. All rights reserved.