在flink中如何跟踪扩展UserDefinedFunction的类中的自定义指标

问题描述 投票:0回答:1

我有一个简单的函数来处理一些 json 字符串

public class ArraySizeUdf extends ScalarFunction {
    private static final Logger LOG = LoggerFactory.getLogger(ArraySizeUdf.class);
    private final static ObjectMapper mapper = new ObjectMapper();

    public int eval(String stringJsonArray) {
        try {

            if (stringJsonArray == null) {
                return 0;
            }

            JsonNode actualObj = mapper.readTree(stringJsonArray);
            ArrayNode aa = (ArrayNode) actualObj;
            return aa.size();
        } catch (Exception e) {
            LOG.error("Error deserializing json to find size : {}", stringJsonArray);
            return -1;
        }
    }
}

我想跟踪发生的异常数量。由于我无权访问 getRuntimeContext(),我可以使用什么来跟踪我在此处定义的任何自定义指标?

我已经使用

streamTableEnvironment.createTemporaryFunction("array_size", ArraySizeUdf.class);

注册了 udf

并将其用作

select array_size('["some", "other"]') from table;

apache-flink flink-streaming flink-sql
1个回答
0
投票

你的

ScalarFunction
可以定义一个
open(FunctionContext context)
方法,传入的上下文有一个
getMetricGroup
方法。

© www.soinside.com 2019 - 2024. All rights reserved.