我已经准备好,流程也正在工作。我正在使用lambda函数将数据从Kinesis流发送到MSK,消息格式如下
{
"data": {
"RequestID": 517082653,
"ContentTypeID": 9,
"OrgID": 16145,
"UserID": 4,
"PromotionStartDateTime": "2019-12-14T16:06:21Z",
"PromotionEndDateTime": "2019-12-14T16:16:04Z",
"SystemStartDatetime": "2019-12-14T16:17:45.507000000Z"
},
"metadata": {
"timestamp": "2019-12-29T10:37:31.502042Z",
"record-type": "data",
"operation": "insert",
"partition-key-type": "schema-table",
"schema-name": "dbo",
"table-name": "TRFSDIQueue"
}
}
我正在发送到如下所示的kafka主题的此json消息
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("producer.type", "async");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
System.out.println("Inside loop successfully");
try {
producer.send(
new ProducerRecord<String, String>(topicName, new String(rec.getKinesis().getData().array())));
Thread.sleep(1000);
System.out.println("Message sent successfully");
} catch (Exception e) {
System.out.println("------------Exception message=-------------" + e.toString());
}
finally {
producer.flush();
producer.close();
}
[当我启动kafka连接进行弹性搜索时,出现类似错误
DataException: Converting byte[] to Kafka Connect data failed due to serialization error
而且我还修改了quickstart-elasticsearch.properties并将键值序列化器更改为字符串。
当它是json时会引发错误。
我可以看到在弹性搜索中使用kafka主题名称创建了索引,但没有记录。
所以请帮助我解决一些困惑。1.我是否从生产者运动流正确发送了消息?我正在使用
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
或者我应该在这里使用json。但是没有这样的json。
或者我是否必须在quickstart-elasticsearch.properties
中使用json序列化程序?
如果要插入事件,那么它将在elastci搜索中插入记录,关于删除和更新,弹性搜索中的Kafka-connect句柄删除和更新呢?
预先感谢
对于30天的免费试用版,您可以使用Kinesis Source Connector,或者您可以学习如何编写自己的Source Connector并将其与Elasticsearch接收器一起部署,而不是完全使用lambda ...
回答您的问题
1)您正在发送JSON 作为字符串。除非您要发送在映射器接口内映射到JSON字符串的POJO类,否则不需要为JSON使用单独的序列化器。
您正在发送JSON记录,因此应该在连接中使用JSONConverter,是的。但是,除非您单击have a schema and payload,否则我认为不会自动创建Elasticsearch映射,因此,一种简单的解决方法是提前创建ES索引映射(但是,如果您已经知道,那么您已经设计了一个架构,因此最终,生产者代码有责任发送正确的记录)。
如果您提前定义了映射,则应该能够简单地在Connect中使用StringConverter
关于生产者代码,我唯一要更改的是重试次数大于0。并尝试使用资源,而不是明确关闭生产者。
//... parse input
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
//... send record
}
2)您可以搜索Github问题中的连接器,但最后我检查了一下,它会进行完整的文档更新和插入,没有部分更新或任何删除