我正在处理流口水和火花流。我想在Spark Streaming上下文中的整个工作中维护KieSession。每个工作节点的每个会话都在执行spark。我知道,插入事实是查询事实的基础。根据我的理解,会话实际上是建立网状网络并将事实插入到alpha和beta内存中的会话。因此,我的想法是为整个工作中的每个工作名称创建一个每个kiesession,以便在kiesession中保持状态。但是我无法广播kiesession,因为它没有序列化。是否还有其他方法可以在Spark流上下文中为每个工作程序节点仅实现一个有状态会话(KieSession)。
您可以使用org.kie.api.marshalling.Marshaller
编组/解组KieSession,这是org.kie.internal.marshalling.MarshallerFactory
的Javadoc
MarshallerFactory用于封送和取消封送StatefulKnowledgeSessions。最简单的方法如下:
// ksession is the StatefulKnowledgeSession // kbase is the KnowledgeBase ByteArrayOutputStream baos = new ByteArrayOutputStream(); Marshaller marshaller = MarshallerFactory.newMarshaller( kbase ); marshaller.marshall( baos, ksession ); baos.close();