使用kafka connect时hdfs中没有avro数据

问题描述 投票:0回答:2

我正在使用 kafka connect 发行版。 命令是:bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties

worker配置为:


    bootstrap.servers = kafka1:9092,kafka2:9092,kafka3:9092
    group.id=连接集群
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false

kafka 连接重新启动,没有错误!

主题 connect-configs,connect-offsets,connect-statuses 已创建。 主题 mysiteview 已创建。

然后我使用 RESTful API 创建 kafka 连接器,如下所示:


    curl -X POST -H "Content-Type: application/json" --data '{"name":"hdfs-sink-mysiteview","config":{"connector.class":"io.confluence.connect. hdfs.HdfsSinkConnector","tasks.max":"3","topics":"mysiteview","hdfs.url":"hdfs://master1:8020","topics.dir":"/kafka/topics ","logs.dir":"/kafka/logs","format.class":"io.confluence.connect.hdfs.avro.AvroFormat","flush.size":"1000","旋转.间隔。 ms":"1000","partitioner.class":"io.confluence.connect.hdfs.partitioner.DailyPartitioner","path.format":"YYYY-MM-dd","schema.compatibility":"向后" ,"locale":"zh_CN","timezone":"亚洲/上海"}}' http://kafka1:8083/connectors

当我向主题“mysiteview”生成数据时,如下所示:


    {"f1":"192.168.1.1","f2":"aa.example.com"}

java代码如下:



Properties props = new Properties();
props.put("bootstrap.servers","kafka1:9092");
props.put("acks","all");
props.put("retries",3);
props.put("batch.size", 16384);
props.put("linger.ms",30);
props.put("buffer.memory",33554432);
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String,String>(props);
Random rnd = new Random();
for(long nEvents = 0; nEvents < events; nEvents++) {
    long runtime = new Date().getTime();
    String site = "www.example.com";
    String ipString = "192.168.2." + rnd.nextInt(255);
    String key = "" + rnd.nextInt(255);
    User u = new User();
    u.setF1(ipString);
    u.setF2(site+" "+rnd.nextInt(255));
    System.out.println(JSON.toJSONString(u));
    producer.send(new ProducerRecord<String,String>("mysiteview",JSON.toJSONString(u)));
    Thread.sleep(50);
}

producer.flush();
producer.close();

Properties props = new Properties();
props.put("bootstrap.servers","kafka1:9092");
props.put("acks","all");
props.put("retries",3);
props.put("batch.size", 16384);
props.put("linger.ms",30);
props.put("buffer.memory",33554432);
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String,String>(props);
Random rnd = new Random();
for(long nEvents = 0; nEvents < events; nEvents++) {
    long runtime = new Date().getTime();
    String site = "www.example.com";
    String ipString = "192.168.2." + rnd.nextInt(255);
    String key = "" + rnd.nextInt(255);
    User u = new User();
    u.setF1(ipString);
    u.setF2(site+" "+rnd.nextInt(255));
    System.out.println(JSON.toJSONString(u));
    producer.send(new ProducerRecord<String,String>("mysiteview",JSON.toJSONString(u)));
    Thread.sleep(50);
}

producer.flush();
producer.close();

奇怪的事情发生了。 我从 kafka-logs 获取数据,但 hdfs 中没有数据(没有主题目录)。 我尝试连接器命令:


    卷曲-X GET http://kafka1:8083/connectors/hdfs-sink-mysiteview/status

输出为:

 {“名称”:“hdfs-sink-mysiteview”,“连接器”:{“状态”:“正在运行”,“worker_id”:“10.255.223.178:8083”},“任务”:[{“状态”: "RUNNING","id":0,"worker_id":"10.255.223.178:8083"},{"state":"RUNNING","id":1,"worker_id":"10.255.223.178:8083"} ,{"state":"正在运行","id":2,"worker_id":"10.255.223.178:8083"}]}

但是当我使用以下命令检查任务状态时:

curl -X GET http://kafka1:8083/connectors/hdfs-sink-mysiteview/hdfs-sink-siteview-1

我得到的结果是:“Error 404”。三个任务都是同样的错误!

出了什么问题?

apache-kafka apache-kafka-connect confluent-platform
2个回答
0
投票

在没有看到工作人员日志的情况下,我不确定当您使用上面描述的设置时,您的 HDFS 连接器实例到底是因哪种异常而失败。不过,我可以发现配置中的一些问题:

  1. 您提到您使用以下方式启动 Connect 工作线程:
    。这些属性默认将键和值转换器设置为
    bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties
    ,并要求您运行
    AvroConverter
    服务。如果您确实编辑了
    schema-registry
    中的配置以使用
    connect-avro-distributed.properties
    ,那么在将 Kafka 记录转换为 Connect 的
    JsonConverter
    数据类型期间(就在尝试将数据导出到 HDFS 之前),您的 HDFS 连接器可能会失败。
  2. 直到最近,HDFS 连接器只能将 Avro 记录导出为 Avro 或 Parquet 格式的文件。这需要使用上面提到的
    SinkRecord
    。最近添加了将记录导出为 JSON 文本文件的功能,并将出现在连接器的
    AvroConverter
    版本中(您可以通过从源代码签出并构建连接器来尝试此功能)。

此时,我的第一个建议是尝试使用

4.0.0
导入数据。定义其架构,使用
bin/kafka-avro-console-producer
确认数据已成功导入,然后将 HDFS 连接器设置为使用
bin/kafka-avro-console-consumer
,如上所述。连接器页面上的quickstart描述了一个非常相似的过程,也许这将是您的用例的一个很好的起点。


0
投票

也许您只是错误地使用了 REST-Api。 根据文档,调用应该是

AvroFormat

https://docs.confluence.io/3.3.1/connect/restapi.html#get--connectors-(string-name)-tasks-(int-taskid)-status

© www.soinside.com 2019 - 2024. All rights reserved.