我将 Elasticsearch 6.8.10 与 Spring Boot 2.2.7 和 Spring Data Elasticsearch 结合使用。
我将统计数据和趋势数据流存储在 Kafka 主题中。这些主题使用 Spring Kafka 读取并存储到 MongoDB 和 Elasticsearch 中以进行分析和报告。我遇到的问题是,当处理队列并将数据写入 Elasticsearch 时,Elasticsearch CPU 消耗持续在 250% 左右。这会导致整个应用程序出现零星的超时错误。我知道索引是一项密集型操作,但我试图了解可以采取哪些措施来减少 CPU 使用率。
数据:
虚拟机配置详细信息是:
Docker Elasticsearch 配置详细信息:
version: '2.4'
services:
elasticsearch:
container_name: elasticsearch
image: 'docker.elastic.co/elasticsearch/elasticsearch:6.8.10'
ports:
- '9200:9200'
- '9300:9300'
mem_limit: 16GB
environment:
- discovery.type=single-node
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms8g -Xmx8g"
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
volumes:
- 'esdata1:/usr/share/elasticsearch/data'
restart: always
volumes:
esdata1:
driver: local
Spring Stat 文档示例:
@Document(indexName = "stats_test", type = "stat", shards = 1, replicas = 0)
public class EsStat {
@Id
@Field(type = FieldType.Keyword)
private String id;
@Field(type = FieldType.Keyword)
private String entityOrRelationshipId;
@Field(type = FieldType.Keyword)
private String articleId;
@Field(type = FieldType.Keyword)
private String status;
@Field(type = FieldType.Keyword)
private String type;
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSSZ")
@Field(type = FieldType.Date, format = DateFormat.custom, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSSZ")
private ZonedDateTime date;
@JsonProperty("type")
@Field(type = FieldType.Keyword)
private String dataSource;
// getter and setters
}
Stats Spring 存储库:
public interface StatElasticsearchRepository extends ElasticsearchRepository<EsStat, String> {
}
统计映射:
{
"stats": {
"mappings": {
"stat": {
"properties": {
"_class": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"articleId": {
"type": "keyword"
},
"dataSource": {
"type": "keyword"
},
"date": {
"type": "date",
"format": "yyyy-MM-dd'T'HH:mm:ss.SSSZ"
},
"entityOrRelationshipId": {
"type": "keyword"
},
"id": {
"type": "keyword"
},
"status": {
"type": "keyword"
},
"type": {
"type": "keyword"
}
}
}
}
}
}
如何确定 CPU 使用率如此高的原因以及如何降低 CPU 使用率?
任何意见或建议将不胜感激。如果需要,我很乐意添加更多配置/输出。
因此,我们假设刷新调用是在每次
save()
之后完成的 -
向存储库添加一个无需调用刷新即可进行保存的方法非常容易:
首先添加自定义存储库接口 - 我在这里将其作为通用接口:
CustomRepository.java:
public interface CustomRepository<T> {
T saveNoRefresh(T entity);
}
实施将是:
CustomRepositoryImpl.java:
public class CustomRepositoryImpl<T> implements CustomRepository<T> {
private final ElasticsearchOperations operations;
public CustomRepositoryImpl(ElasticsearchOperations operations) {
this.operations = operations;
}
@Override
public T saveNoRefresh(T entity) {
IndexQuery query = new IndexQueryBuilder().withObject(entity).build();
operations.index(query, operations.getIndexCoordinatesFor(entity.getClass()));
return entity;
}
}
您的存储库将更改为:
public interface StatElasticsearchRepository
extends ElasticsearchRepository<EsStat, String>,
CustomRepository<EsStat> {
}
调用存储库的
save(T)
方法而不是 saveNoRefresh(T)
方法。
如果您可以将数据收集到一定大小的批次,然后进行
saveAll()
调用,那就更好了。