KSQL:我可以在KSQL UDF函数中使用线程来加速这个过程吗?

问题描述 投票:1回答:1

我在3节点中运行独立的ksql-server与3节点的Kafka集群交谈。从Stream创建了一个Topic,有15个分区,数据在Stream中进行一些浓缩。得到一段代码作为UDF来查找IP2Location.bin文件,UDF类看起来像:

import java.io.IOException;
import java.util.Map;

import com.google.gson.Gson;

import io.confluent.common.Configurable;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;

@UdfDescription(name = "Ip2Lookup", description = "Lookup class for IP2Location database.")
public class Ip2Lookup implements Configurable {

    private IP2Location loc = null;
    private Gson gson = null;

    @Udf(description = "fetches the geoloc of the ipaddress.")
    public synchronized String ip2lookup(String ip) {

        String json = null;
        if (loc != null) {
            IP2LocationResult result = null;
            try {
                result = loc.query(ip);
                System.out.println(result);
                json = gson.toJson(result);
            } catch (IOException e) {
                e.printStackTrace();
            }
            return json;
        }
        return ip;
    }

    @Override
    public void configure(Map<String, ?> arg0) {

        try {
            String db_path = null;
            String os = System.getProperty("os.name").toLowerCase();

            db_path = "/data/md0/ip2loc/ipv4-bin/IP-COUNTRY-REGION-CITY-LATITUDE-LONGITUDE-ZIPCODE-TIMEZONE-ISP-DOMAIN-NETSPEED-AREACODE-WEATHER-MOBILE-ELEVATION-USAGETYPE.BIN";

            loc = new IP2Location(db_path);
            gson = new Gson();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

进入TopicStream的数据非常快(可能是每秒百万条记录)。使用synchronized方法,每个ksql-server节点的速度为每秒3000条记录/消息。有了这个速度你知道,它需要赶上速度的时间。没有synchronized方法,我看到了损坏的数据,因为单个对象/方法被多个线程使用。

问题1:KSQL如何调用/调用udf调用?

问题2:我可以使用线程来处理udf中的请求吗?

问题3:作为主题/流是15个分区,我应该旋转15个ksql-servers节点吗?

谢谢。

java apache-kafka user-defined-functions ksql
1个回答
1
投票

问题1:KSQL如何调用/调用udf调用?

不明白你的意思。一旦您的UDF可用于KSQL(请参阅https://docs.confluent.io/current/ksql/docs/developer-guide/udf.html#deploying),您就可以将KSQL语句中的UDF称为IP2LOOKUP。您还可以在KSQL中运行SHOW FUNCTIONS以确认您的UDF可供使用。

也许你是因为下一个问题而问的? KSQL会一次调用您的UDF一条消息。

问题2:我可以使用线程来处理udf中的请求吗?

你为什么想这么做?您是否担心使用当前UDF代码的KSQL无法处理传入的数据量?说到这一点,您尝试处理的预期数据量是多少,因为您可能尝试过早优化?

另外,在不知道更多细节的情况下,我认为UDF的多线程设置不会产生任何优势,因为UDF在被调用时仍然只能一次处理一条消息(每个KSQL服务器,或者更准确地说,每个流任务,其中每个KSQL服务器可以有很多;我提到这一点是为了清楚地说明KSQL中的UDF并不是通过在所有服务器上处理单个消息来阻止您的处理;处理当然是分布式的并且发生在平行)。

问题3:作为主题/流是15个分区,我应该启动15个ksql服务器节点吗?

这取决于您的数据量。您可以根据需要旋转任意数量的KSQL服务器。如果数据量很低,单个KSQL服务器可能就足够了。如果数据量较高,您可以开始启动其他KSQL服务器,最多15个服务器(因为输入主题有15个分区)。任何其他KSQL服务器都会空闲。

在15 KSQL服务器不足的情况下,您应该将输入主题的分区数从15增加到更高的数量,然后您还可以启动更多的KSQL服务器(从而增加设置的计算能力) )。

© www.soinside.com 2019 - 2024. All rights reserved.