我在3节点中运行独立的ksql-server
与3节点的Kafka
集群交谈。从Stream
创建了一个Topic
,有15个分区,数据在Stream中进行一些浓缩。得到一段代码作为UDF
来查找IP2Location.bin文件,UDF
类看起来像:
import java.io.IOException;
import java.util.Map;
import com.google.gson.Gson;
import io.confluent.common.Configurable;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
@UdfDescription(name = "Ip2Lookup", description = "Lookup class for IP2Location database.")
public class Ip2Lookup implements Configurable {
private IP2Location loc = null;
private Gson gson = null;
@Udf(description = "fetches the geoloc of the ipaddress.")
public synchronized String ip2lookup(String ip) {
String json = null;
if (loc != null) {
IP2LocationResult result = null;
try {
result = loc.query(ip);
System.out.println(result);
json = gson.toJson(result);
} catch (IOException e) {
e.printStackTrace();
}
return json;
}
return ip;
}
@Override
public void configure(Map<String, ?> arg0) {
try {
String db_path = null;
String os = System.getProperty("os.name").toLowerCase();
db_path = "/data/md0/ip2loc/ipv4-bin/IP-COUNTRY-REGION-CITY-LATITUDE-LONGITUDE-ZIPCODE-TIMEZONE-ISP-DOMAIN-NETSPEED-AREACODE-WEATHER-MOBILE-ELEVATION-USAGETYPE.BIN";
loc = new IP2Location(db_path);
gson = new Gson();
} catch (IOException e) {
e.printStackTrace();
}
}
}
进入Topic
和Stream
的数据非常快(可能是每秒百万条记录)。使用synchronized
方法,每个ksql-server
节点的速度为每秒3000条记录/消息。有了这个速度你知道,它需要赶上速度的时间。没有synchronized
方法,我看到了损坏的数据,因为单个对象/方法被多个线程使用。
问题1:KSQL如何调用/调用udf
调用?
问题2:我可以使用线程来处理udf
中的请求吗?
问题3:作为主题/流是15个分区,我应该旋转15个ksql-servers
节点吗?
谢谢。
问题1:KSQL如何调用/调用udf调用?
不明白你的意思。一旦您的UDF可用于KSQL(请参阅https://docs.confluent.io/current/ksql/docs/developer-guide/udf.html#deploying),您就可以将KSQL语句中的UDF称为IP2LOOKUP
。您还可以在KSQL中运行SHOW FUNCTIONS
以确认您的UDF可供使用。
也许你是因为下一个问题而问的? KSQL会一次调用您的UDF一条消息。
问题2:我可以使用线程来处理udf中的请求吗?
你为什么想这么做?您是否担心使用当前UDF代码的KSQL无法处理传入的数据量?说到这一点,您尝试处理的预期数据量是多少,因为您可能尝试过早优化?
另外,在不知道更多细节的情况下,我认为UDF的多线程设置不会产生任何优势,因为UDF在被调用时仍然只能一次处理一条消息(每个KSQL服务器,或者更准确地说,每个流任务,其中每个KSQL服务器可以有很多;我提到这一点是为了清楚地说明KSQL中的UDF并不是通过在所有服务器上处理单个消息来阻止您的处理;处理当然是分布式的并且发生在平行)。
问题3:作为主题/流是15个分区,我应该启动15个ksql服务器节点吗?
这取决于您的数据量。您可以根据需要旋转任意数量的KSQL服务器。如果数据量很低,单个KSQL服务器可能就足够了。如果数据量较高,您可以开始启动其他KSQL服务器,最多15个服务器(因为输入主题有15个分区)。任何其他KSQL服务器都会空闲。
在15 KSQL服务器不足的情况下,您应该将输入主题的分区数从15增加到更高的数量,然后您还可以启动更多的KSQL服务器(从而增加设置的计算能力) )。