我正在尝试确定一种方法,我可以将数据加载到弹性搜索中。 AWS-ES提供批量API,但是有效负载大小的aws设置了限制。
您能否告诉我从DB2获取数据并将其放入AWS-ES的最快方法。记录数约为5000万,每条记录的有效载荷约为1至3 KB。
我已经尝试过使用java模块并通过API调用将数据放到ES中,但速度非常慢。
是否有任何ETL工具或任何可用的服务可以读取JSON或csv并将数据放入ES?
您可以使用Logstash一次或连续从DB获取数据到Elasticsearch。关注如何安装Logstash的instructions然后你只需要一个用于db和Logstash配置文件的JDBC jar。配置文件的模板:
input {
jdbc {
jdbc_driver_library => "LOCATION_OF_db2jcc4.jar"
jdbc_driver_class => "com.ibm.db2.jcc.DB2Driver"
jdbc_connection_string => "jdbc:db2://_DB_CONNECTION_DETAILS"
jdbc_user => "user"
jdbc_password => "pass"
jdbc_paging_enabled => "true" #Useful for big data sets
jdbc_fetch_size => "50000"
jdbc_page_size => 100000
#schedule => "* * * * *" #Uncomment if you want to run your query continuously
statement => "SELECT * from table" #Query that selects data which you want to download
}
}
output{
elasticsearch {
index => "YOUR_INDEX_NAME-%{+YYYY.MM.dd}"
hosts => ["localhost:9200"]
}
创建配置文件后,启动Logstash,它将从DB启动数据导入。导入大数据集可能会导致一些问题,因此您应该至少为Logstash分配ram的5 GB
,这样会更好。如果出现一些问题,那么你应该调整jdbc_fetch_size
和jdbc_page_size
参数。
如果要连续从数据库下载数据,例如只读取最新数据,请阅读有关sql_last_value
参数的信息。
编辑:您还可以使用Amazon Elasticsearch Output插件将索引输出到AWS ES,这样您就不必配置端点,您可以阅读如何安装插件here。使用插件输出配置:
output {
amazon_es {
hosts => ["foo.us-east-1.es.amazonaws.com"]
region => "us-east-1"
aws_access_key_id => 'ACCESS_KEY'
aws_secret_access_key => 'SECRET_KEY'
index => "YOUR_INDEX_NAME-%{+YYYY.MM.dd}"
}
}
一个视频,解释如何使用此插件集成Logstash:https://www.oreilly.com/learning/how_do_i_integrate_logstash_with_amazons_elasticsearch_service_es
您好,您可以使用简单的节点js示例应用程序。我尝试了很多解决方案(包括logstash),但我发现最有用的是使用nodejs编写一个小代码。
我使用10K记录的JSON文件在本地计算机(低性能)上进行了测试,总计35MB。
要求:nodejs,npm
npm install fs
npm install etl
npm install JSONStream
npm install elasticsearch
index.js
文件,并将以下代码粘贴到您的数据(elasticsearch服务器,json文件)。在这种情况下,我将json数据文件放在同一个文件夹中。在index.js中粘贴以下代码
var etl = require('etl');
var fs = require('fs');
var JSONStream = require('JSONStream');
var elasticsearch = require('elasticsearch');
//change with your data
var client = new elasticsearch.Client({
host: 'localhost:9200',
log: 'trace'
});
var readStream = fs.createReadStream('file.json') //change with your filename
readStream
.pipe(JSONStream.parse('*'))
.pipe(etl.collect(100))
.pipe(etl.elastic.index(client,'testindex','testtype')) //testindex(your index)- testtype your es type
运行node index.js
这个怎么运作?
使用ETL你也可以导入csv(和其他格式)
更多信息和规格:ETL,JSONStream,Elasticsearch(nodejs)