Kafka Connector HDFS Sink 5.3.1无法生成所有JSON记录

问题描述 投票:0回答:1

用例

我正在阅读一个已经创建的Kafka主题,在该主题上一个单独的集群正在生成一些键和值。我的最终目标是以JSON格式写入HDFS,为此我一直在尝试使用Kafka HDFS Sink 5.3。我面临的问题是我无法提取主题中的所有记录并将其写入HDFS。到目前为止,如果我的主题包含数百万条记录的每小时数据,那么我只能写10万条记录。

以下是我用于kafka-connect-standalone.properties和我的HDFS quickstart-hdfs.properties]的配置>

kafka-connect-standalone.properties
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
schema.enable=false

offset.flush.interval.ms=10000

group.id=x-hdfs-consumer-group
consumer.session.timeout.ms=10000
consumer.heartbeat.interval.ms=3000
consumer.request.timeout.ms=1810000
consumer.max.poll.interval.ms=1800000

quickstart-hdfs.properties
name=hdfs-sink-mapr
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=10
topics=topic_name
hdfs.url=maprfs:///hive/x/poc_kafka_connect/
flush.size=20000
errors.tolerance=all 

format.class=io.confluent.connect.hdfs.json.JsonFormat
partitioner.class=io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner
timestamp.extractor=RecordField
timestamp.field=timestamp
partition.duration.ms=3600000
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
locale=en
timezone=UTC

如果我不使用errors.tolerance = all属性,那么我只产生约500条记录。

就工作者日志而言,我没有收到任何错误,因此我不确定自己缺少什么。

由于我是Kafka Connector的新手,并且已经尝试了一段时间,如果有人可以提供一些有关我做错事情的见解,我将非常感激。

用例,我从一个已经创建的Kafka主题中读取,在该主题上一个单独的集群正在生成一些键和值。我的最终目标是以JSON格式写入HDFS,对此我一直......>

apache-kafka hdfs apache-kafka-connect confluent
1个回答
0
投票

为了将主题中的所有现有记录获取到接收器连接器,请将其添加到工作器属性中,然后重新启动连接

consumer.auto.offset.reset=earliest
© www.soinside.com 2019 - 2024. All rights reserved.