创建想要STREAM的Kapacitor UDF并提供BATCH(Python)

问题描述 投票:0回答:1

我在制作想要STREAM并提供BATCH的UDF时遇到了麻烦。

这条路:

def info(self):
    response = udf_pb2.Response()
    response.info.wants = udf_pb2.STREAM
    response.info.provides = udf_pb2.BATCH
    response.info.options['field'].valueTypes.append(udf_pb2.STRING)
    return response

有没有人有一个示例代码?我搜索了网络(论坛,文档),但所有的例子都是针对BATCH-BACH,STREAM-STREAM或BATCH-STREAM。

我在示例中看到,在向“Kapacitor”写入响应时,在“end_batch(self,end_req)”方法中,有必要“告知”BATCH已经结束,在一个示例中,这是通过这种方式实现的:

def end_batch(self, end_req):
    # Send begin batch with count of outliers
    self._begin_response.begin.size = len(self._batch)
    self._agent.write_response(self._begin_response)

    response = udf_pb2.Response()


                                  ...    


    # Send an identical end batch back to Kapacitor
        # HERE
    response.end.CopyFrom(end_req)
    self._agent.write_response(response)

为了发送BATCH,我必须从“point(self,point)”方法发送它,但是不能访问end req对象而不知道如何创建它。

提前致谢!再见!

user-defined-functions influxdb kapacitor influxdb-python
1个回答
0
投票

希望这仍然相关,我会创建一个STREAM-STREAM UDF并将其传输到一个窗口节点。您可以保留数据窗口的副本,例如移动平均值示例,并对其进行任何批处理分析。如果你想出如何编写一个STREAM-BATCH UDF,我很乐意看到它,但不如我的回答那么难看。

编辑

jdv绝对是对的,我的最后一个答案肯定是更多的评论。这是python中的STREAM-BATCH UDF,它只是回显批处理流中的数据。它仍然有点坏,因为它无法在处理程序快照方法中序列化点类。因此,无论何时需要拍摄快照,它都会崩溃,可以通过使用不同的序列化方法(如酸洗)或通过编写JSON编码器/解码器来解决问题。我会在某个时候解决这个问题,但我的工作周已接近完成。制作STREAM-BATCH UDF所需要做的主要事情是构造批处理开始和结束消息,这分别在createEndBatch和createStartBatch方法中完成。

编辑2

通过使用protobufs方法和json的组合修复了序列化。

© www.soinside.com 2019 - 2024. All rights reserved.