Ignite Streaming.addData可以在与StreamReceiver / Visitor不同的节点上执行吗?

问题描述 投票:-3回答:1

是否可以从客户端节点执行流注入并拦截服务器节点中的相同流以在插入缓存之前处理流?

这样做的原因是客户端节点从外部源接收流,并且需要在多个服务器节点上基于AffinityKey将其注入分区缓存。需要在每个节点上截获流并以最低延迟进行处理。我可以使用缓存事件来做到这一点,但StreamVisitor应该更快。

以下是我尝试执行的示例。启动2个节点:一个包含流光,另一个包含streamReciever:

公共类StreamerNode { public static void main(String [] args){...... Ignition.setClientMode(false); Ignite ignite = Ignition.start(igniteConfiguration);

    CacheConfiguration<SeqKey, String> myCfg = new CacheConfiguration<SeqKey, String>("myCache");
    ......
    IgniteCache<SeqKey, String> myCache = ignite.getOrCreateCache(myCfg);
    IgniteDataStreamer<SeqKey, String> myStreamer = ignite.dataStreamer(myCache.getName()); // Create Ignite Streamer for windowing data

    for (int i = 51; i <= 100; i++) {
        String paddedString = org.apache.commons.lang.StringUtils.leftPad(i+"", 7, "0") ;
        String word = "TEST_" + paddedString;
        SeqKey seqKey = new SeqKey("TEST", counter++ );
        myStreamer.addData(seqKey, word) ;
    }
}

}

公共类VisitorNode { public static void main(String [] args){...... Ignition.setClientMode(false); Ignite ignite = Ignition.start(igniteConfiguration);

    CacheConfiguration<SeqKey, String> myCfg = new CacheConfiguration<SeqKey, String>("myCache");
    ......
    IgniteCache<SeqKey, String> myCache = ignite.getOrCreateCache(myCfg);
    IgniteDataStreamer<SeqKey, String> myStreamer = ignite.dataStreamer(myCache.getName()); // Create Ignite Streamer for windowing data

    myStreamer.receiver(new StreamVisitor<SeqKey, String>() {
        int i=1 ;
        @Override
        public void apply(IgniteCache<SeqKey, String> cache, Map.Entry<SeqKey, String> e) {
            String tradeGetData = e.getValue();
            System.out.println(nodeID+" : visitorNode ..count="+ i++ + " received key="+e.getKey() + " : val="+ e.getValue());
            //do some processing here before inserting in the cache .. 
            cache.put(e.getKey(), tradeGetData);
        }
    });
}

}

ignite
1个回答
1
投票

当然,它可以在不同的节点上执行。通常,addData()在客户端节点上执行,StreamReceiver在服务器节点上运行。你没有必要做任何特别的事情来实现它。

至于你的其他帖子,你能否详细说明更多细节和样本?我无法理解所需的设置。

如果您不需要修改数据,只能对其进行操作,则可以使用连续查询。

© www.soinside.com 2019 - 2024. All rights reserved.