向AWS ElasticSearch添加5000万条记录的最快方法

问题描述 投票:1回答:2

我正在尝试确定一种方法,我可以将数据加载到弹性搜索中。 AWS-ES提供批量API,但是有效负载大小的aws设置了限制。

您能否告诉我从DB2获取数据并将其放入AWS-ES的最快方法。记录数约为5000万,每条记录的有效载荷约为1至3 KB。

我已经尝试过使用java模块并通过API调用将数据放到ES中,但速度非常慢。

是否有任何ETL工具或任何可用的服务可以读取JSON或csv并将数据放入ES?

java database elasticsearch pentaho aws-elasticsearch
2个回答
2
投票

您可以使用Logstash一次或连续从DB获取数据到Elasticsearch。关注如何安装Logstash的instructions然后你只需要一个用于db和Logstash配置文件的JDBC jar。配置文件的模板:

input {
  jdbc {
    jdbc_driver_library => "LOCATION_OF_db2jcc4.jar"
    jdbc_driver_class => "com.ibm.db2.jcc.DB2Driver"
    jdbc_connection_string => "jdbc:db2://_DB_CONNECTION_DETAILS"
    jdbc_user => "user"
    jdbc_password => "pass"
    jdbc_paging_enabled => "true" #Useful for big data sets
    jdbc_fetch_size => "50000"
    jdbc_page_size => 100000
    #schedule => "* * * * *" #Uncomment if you want to run your query continuously 
    statement => "SELECT * from table" #Query that selects data which you want to download
  }
}
output{
    elasticsearch {
    index => "YOUR_INDEX_NAME-%{+YYYY.MM.dd}"
    hosts => ["localhost:9200"]
}

创建配置文件后,启动Logstash,它将从DB启动数据导入。导入大数据集可能会导致一些问题,因此您应该至少为Logstash分配ram的5 GB,这样会更好。如果出现一些问题,那么你应该调整jdbc_fetch_sizejdbc_page_size参数。

如果要连续从数据库下载数据,例如只读取最新数据,请阅读有关sql_last_value参数的信息。

编辑:您还可以使用Amazon Elasticsearch Output插件将索引输出到AWS ES,这样您就不必配置端点,您可以阅读如何安装插件here。使用插件输出配置:

output {
    amazon_es {
        hosts => ["foo.us-east-1.es.amazonaws.com"]
        region => "us-east-1"
        aws_access_key_id => 'ACCESS_KEY'
        aws_secret_access_key => 'SECRET_KEY'
        index => "YOUR_INDEX_NAME-%{+YYYY.MM.dd}"
        }
}

一个视频,解释如何使用此插件集成Logstash:https://www.oreilly.com/learning/how_do_i_integrate_logstash_with_amazons_elasticsearch_service_es


0
投票

您好,您可以使用简单的节点js示例应用程序。我尝试了很多解决方案(包括logstash),但我发现最有用的是使用nodejs编写一个小代码。

我使用10K记录的JSON文件在本地计算机(低性能)上进行了测试,总计35MB。

要求:nodejs,npm

  1. 创建一个新的工作文件夹。
  2. 转到创建的文件夹。
  3. 输入npm install fs
  4. 输入npm install etl
  5. 输入npm install JSONStream
  6. 输入npm install elasticsearch
  7. 创建一个新的index.js文件,并将以下代码粘贴到您的数据(elasticsearch服务器,json文件)。在这种情况下,我将json数据文件放在同一个文件夹中。

在index.js中粘贴以下代码

var etl = require('etl');

var fs = require('fs');

var JSONStream = require('JSONStream');

var elasticsearch = require('elasticsearch');

//change with your data
var client = new elasticsearch.Client({
  host: 'localhost:9200', 
  log: 'trace'
});


var readStream = fs.createReadStream('file.json') //change with your filename

readStream    
  .pipe(JSONStream.parse('*'))    
  .pipe(etl.collect(100))    
  .pipe(etl.elastic.index(client,'testindex','testtype')) //testindex(your index)- testtype your es type

运行node index.js

这个怎么运作?

  1. 声明所需的模块
  2. 创建ES客户端并连接
  3. 读取创建流的json文件
  4. 管道流,解析每个JSON对象(我的文件包含10K对象)
  5. 使用etl收集100个对象
  6. index elasticsearch添加了100个收集的对象

使用ETL你也可以导入csv(和其他格式)

更多信息和规格:ETLJSONStreamElasticsearch(nodejs)

© www.soinside.com 2019 - 2024. All rights reserved.