Stack Overflow 社区,
我目前正在开发一个数据处理管道,我从 Kafka 接收以下格式的数据:
{
"id": "XYZ",
"index": "original_data_index",
"updated_data": {
"id": "XYZ1",
"index": "updated_data_index"
}
}
我的目标是将数据存储在两个单独的 Elasticsearch 索引中:
原始数据应存储在index1中,如下:
{
"id": "XYZ",
"index": "original_data_index"
}
更新后的数据应存储在index2中,如下所示:
{
"id": "XYZ1",
"index": "updated_data_index"
}
我目前正在使用 Logstash 作为我的管道的一部分,我想知道如何实现这种转变。有人可以提供有关配置 Logstash 管道以处理此特定数据转换场景的指导吗?
此外,如果有任何在 Elasticsearch 索引背景下使用 Logstash 处理 Kafka 数据转换的最佳实践或注意事项,我将不胜感激。
提前感谢您的帮助!
您可以使用具有“分叉路径模式”的管道到管道通信,以两种不同的方式处理事件。在每个路径中,您可以使用 prune 过滤器或 mutate+remove_field 删除路径输出到的索引中不需要的字段。