Kafka文件流连接和流API

问题描述 投票:0回答:1

正在使用文件流连接器,我在文件中有超过一千万条记录(这不是一个文件,它是按帐户#划分的)。我必须将这些文件加载​​到主题中并更新我的流。已经经历了独立的流,我有以下问题,需要帮助才能实现。

  1. 看一下数据集,我有两个帐户#,每个帐户有5行,我需要将它们分为两行并将键作为acctNbr。

如何编写我的源连接器以读取文件并获得分组逻辑?

  1. 我的经纪人正在Linux机器X,Y,Z ..中运行。源连接器的开发后,我的jar文件应该部署在每个经纪人中(如果我开始在分布式代理中运行)?

  2. 我只有30分钟的时间将文件提取到主题中?调整逻辑以降低工作窗口的所有参数有哪些?仅供参考,此主题将有50个以上的分区和3个代理设置。

数据集:

{"acctNbr":"1234567","secNbr":"AAPL","date":"2010-01-01","currentPrice":"10","availQnty":"10"}
{"acctNbr":"1234567","secNbr":"AAPL","date":"2010-01-02","currentPrice":"10","availQnty":"10"}
{"acctNbr":"1234567","secNbr":"AAPL","date":"2010-01-03","currentPrice":"10","availQnty":"10"}
{"acctNbr":"1234567","secNbr":"AAPL","date":"2010-01-04","currentPrice":"10","availQnty":"10"}
{"acctNbr":"1234567","secNbr":"AAPL","date":"2010-01-05","currentPrice":"10","availQnty":"10"}
{"acctNbr":"abc3355","secNbr":"AAPL","date":"2010-01-01","currentPrice":"10","availQnty":"10"}
{"acctNbr":"abc3355","secNbr":"AAPL","date":"2010-01-02","currentPrice":"10","availQnty":"10"}
{"acctNbr":"abc3355","secNbr":"AAPL","date":"2010-01-03","currentPrice":"10","availQnty":"10"}
{"acctNbr":"abc3355","secNbr":"AAPL","date":"2010-01-04","currentPrice":"10","availQnty":"10"}
{"acctNbr":"abc3355","secNbr":"AAPL","date":"2010-01-05","currentPrice":"10","availQnty":"10"}
apache-kafka apache-kafka-streams apache-kafka-connect kafka-producer-api
1个回答
0
投票

如何编写我的源连接器以读取文件并获得分组逻辑

FileSream连接器无法执行此操作,除用于编写自己的连接器的示例外,无其他目的。换句话说,不要在生产中使用。

话虽如此,您可以使用Flume,Filebeat,Fluentd,NiFi,Streamsets等替代解决方案来glob您的文件路径,然后将所有记录逐行发送到Kafka主题中。

源连接器的后期开发,我的jar文件应该部署在每个代理中

您不应在任何代理上运行Connect。连接服务器称为workers

只有30分钟的窗口可将文件拖放到主题中?

不清楚此数字来自何处。上面列出的任何上述方法都将监视所有new files,而没有任何定义的窗口。

© www.soinside.com 2019 - 2024. All rights reserved.