使用Logstash将json字符串注入到不同的Elasticsearch索引中。

问题描述 投票:1回答:1

我有这些样本数据存储在postgresql中。events 表。

{"playhead":0,"metadata":"{\"class\":\"Broadway\\\\Domain\\\\Metadata\",\"payload\":{\"console\":{\"command\":\"Doctrine\\\\Bundle\\\\FixturesBundle\\\\Command\\\\LoadDataFixturesDoctrineCommand\",\"arguments\":\"'doctrine:fixtures:load' --env=prod -n\"}}}","@timestamp":"2020-05-13T18:03:24.295Z","payload":"{\"class\":\"OpenLoyalty\\\\Component\\\\Transaction\\\\Domain\\\\Event\\\\TransactionWasRegistered\",\"payload\":{\"transactionId\":\"91d83147-0280-481e-a984-c899e9720ec2\",\"transactionData\":{\"documentNumber\":\"not-matched\",\"purchasePlace\":\"wroclaw\",\"purchaseDate\":1582622706,\"documentType\":\"sell\"},\"customerData\":{\"name\":\"Jan Nowak\",\"email\":\"[email protected]\",\"nip\":\"aaa\",\"phone\":\"+0954730076810\",\"loyaltyCardNumber\":\"not_matched_card_number\",\"address\":{\"street\":\"Ko\\u015bciuszki\",\"address1\":\"12\",\"city\":\"Warsaw\",\"country\":\"PL\",\"province\":\"Mazowieckie\",\"postal\":\"00-800\"}},\"items\":[{\"sku\":{\"code\":\"SKU1\"},\"name\":\"item 1\",\"quantity\":1,\"grossValue\":1,\"category\":\"aaa\",\"labels\":[{\"key\":\"test\",\"value\":\"label\"},{\"key\":\"test\",\"value\":\"label2\"}],\"maker\":\"sss\"},{\"sku\":{\"code\":\"SKU2\"},\"name\":\"item 2\",\"quantity\":2,\"grossValue\":2,\"category\":\"bbb\",\"labels\":[],\"maker\":\"ccc\"}],\"posId\":null,\"excludedDeliverySKUs\":null,\"excludedLevelSKUs\":null,\"excludedLevelCategories\":null,\"revisedDocument\":null,\"labels\":[]}}","@version":"1","id":1,"type":"OpenLoyalty.Component.Transaction.Domain.Event.TransactionWasRegistered","uuid":"91d83147-0280-481e-a984-c899e9720ec2","recorded_on":"2020-02-24T09:25:06.222986+00:00"}

 {"playhead":1,"metadata":"{\"class\":\"Broadway\\\\Domain\\\\Metadata\",\"payload\":{\"console\":{\"command\":\"Doctrine\\\\Bundle\\\\FixturesBundle\\\\Command\\\\LoadDataFixturesDoctrineCommand\",\"arguments\":\"'doctrine:fixtures:load' --env=prod -n\"}}}","@timestamp":"2020-05-13T18:03:24.430Z","payload":"{\"class\":\"OpenLoyalty\\\\Component\\\\Customer\\\\Domain\\\\Event\\\\AssignedAccountToCustomer\",\"payload\":{\"customerId\":\"00000000-0000-474c-b092-b0dd880c07aa\",\"accountId\":\"3c067099-486a-41a8-87ca-343305126c5e\"}}","@version":"1","id":80,"type":"OpenLoyalty.Component.Customer.Domain.Event.AssignedAccountToCustomer","uuid":"00000000-0000-474c-b092-b0dd880c07aa","recorded_on":"2020-02-24T09:25:20.012550+00:00"}

{"playhead":2,"metadata":"{\"class\":\"Broadway\\\\Domain\\\\Metadata\",\"payload\":{\"console\":{\"command\":\"Doctrine\\\\Bundle\\\\FixturesBundle\\\\Command\\\\LoadDataFixturesDoctrineCommand\",\"arguments\":\"'doctrine:fixtures:load' --env=prod -n\"}}}","@timestamp":"2020-05-13T18:03:24.430Z","payload":"{\"class\":\"OpenLoyalty\\\\Component\\\\Customer\\\\Domain\\\\Event\\\\CustomerWasMovedToLevel\",\"payload\":{\"customerId\":\"00000000-0000-474c-b092-b0dd880c07aa\",\"levelId\":\"e82c96cf-32a3-43bd-9034-4df343e50000\",\"oldLevelId\":null,\"updatedAt\":1582536320,\"manually\":false,\"removeLevelManually\":false}}","@version":"1","id":81,"type":"OpenLoyalty.Component.Customer.Domain.Event.CustomerWasMovedToLevel","uuid":"00000000-0000-474c-b092-b0dd880c07aa","recorded_on":"2020-02-24T09:25:20.014810+00:00"}

和许多其他不同的人 type 值,我想要的是插入的是 payload的数据放入弹性搜索的相应索引中,例如带有 type = OpenLoyalty.Component.Customer.Domain.Event.AssignedAccountToCustomer必须进入。oloy.account_details 索引应该是这样的。

    {
    "_index": "oloy.account_details",
    "_type": "OpenLoyalty\Component\Account\Domain\ReadModel\AccountDetails",
    "_id": "AXGjvhDQUCC0J4ATWiCP",
    "_version": 1,
    "_score": 1,
    "_source": {
    "accountId": "5a9bdb83-f7e8-442c-b06f-387e1b1e95a7",
    "customerId": "11111111-0000-474c-b092-b0dd880c07e1"
    }
  }

我想知道我应该为Logstash设置什么样的配置 目前我的配置是这样的:

input {
    jdbc {
        jdbc_connection_string => "jdbc:postgresql://localhost:5432/openloyalty"
        jdbc_user => "openloyalty"
        jdbc_password => "openloyalty"
        jdbc_driver_library => "C:\logstash\postgresql-42.2.12.jar"
        jdbc_driver_class => "org.postgresql.Driver"
        statement => "SELECT * from events"
    }
}

output {
    elasticsearch {
            index => "???"
            document_type => "???"
            document_id => "???"
            hosts => ["localhost:9200"]
    }
}
elasticsearch logstash logstash-configuration logstash-filter
1个回答
1
投票

你的答案在这里

input {
    file{
        path => "C:/Users/Khatere Tajfar/Desktop/Log/application.log"
 start_position => "beginning"
        ignore_older => 0  
    }
}
filter {
    grok {
        match=>{"message"=>"%{GREEDYDATA:part1} INFO %{GREEDYDATA:part2}\{\"indexType\"\:\"%{GREEDYDATA:type}\"\,\"indexName\"\:\"%{GREEDYDATA:index}\"\,\"model\"\:\"%{GREEDYDATA:jsonpart}\"\,\"id\"\:\"%{GREEDYDATA:id}\"\}"}

    }

    mutate {
             remove_field => [ "part1","part2","message","path","host","@timestamp","@version"]
         gsub  => ["jsonpart","[\\\\\\]+","X"]
         gsub  => ["jsonpart","[\\]","X"]
         gsub  => ["jsonpart","X",""]
        }
    json {
        source => "jsonpart"
    }
    mutate {
        remove_field => [ "jsonpart"]   
    }
     mutate { add_field => { "[@metadata][type]" => "%{type}" } }
         mutate { remove_field => ["type"] }

     mutate { add_field => { "[@metadata][index]" => "%{index}" } }
         mutate { remove_field => ["index"] }

     mutate { add_field => { "[@metadata][id]" => "%{id}" } }
         mutate { remove_field => ["id"] }

}
output {
    stdout { codec => json_lines} 
    elasticsearch {
            index =>  "%{[@metadata][index]}" 
            document_type =>  "%{[@metadata][type]}" 
            document_id => "%{[@metadata][id]}" 
            hosts => ["localhost:9200"]
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.