上下文:我想在ElasticSearch和Spring WebFlux的完整反应堆化合物中使用ElasticSearch。
这是我第一次使用springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient和springframework.data.elasticsearch.core.ReactiveElasticsearchOperations。我曾使用MongoDb在反应堆中工作,但这是我第一次使用ElasticSearch。
[我已经成功地使用了ReactiveElasticsearchOperations和spring-data-elasticsearch-3.2.6和elasticsearch-6.8.7(Elastic Tutorial)的教程
并且findAll / findById在Elastic-6.8.7和spring-data-elasticsearch-3.2.6中正常工作
MyModelService:
...
private final ReactiveElasticsearchOperations reactiveElasticsearchOperations;
private final ReactiveElasticsearchClient reactiveElasticsearchClient;
public MyModelServiceImpl(ReactiveElasticsearchOperations reactiveElasticsearchOperations,
ReactiveElasticsearchClient reactiveElasticsearchClient) {
this.reactiveElasticsearchOperations = reactiveElasticsearchOperations;
this.reactiveElasticsearchClient = reactiveElasticsearchClient;
}
@Override
public Mono<MyModel> findMyModelById(String id){
return reactiveElasticsearchOperations.findById(
id,
MyModel.class,
MYMODEL_ES_INDEX,
DEFAULT_ES_DOC_TYPE
).doOnError(throwable -> logger.error(throwable.getMessage(), throwable));
}
@Override
public Flux<MyModel> findAllMyModels(String field, String value){
NativeSearchQueryBuilder query = new NativeSearchQueryBuilder();
if (!StringUtils.isEmpty(field) && !StringUtils.isEmpty(value)) {
query.withQuery(QueryBuilders.matchQuery(field, value));
}
return reactiveElasticsearchOperations.find(
query.build(),
MyModel.class,
MYMODEL_ES_INDEX
).doOnError(throwable -> logger.error(throwable.getMessage(), throwable));
}
我尝试使用更新版本(spring-data-elasticsearch-4和elast-7.6.2。)遵循相同的想法。由于我可以阅读“自4.0起不推荐使用,因此请使用search(Query,...)Flux发出匹配实体然后将结果完全包裹在SearchHit中。然后,我被完全卡住了。好吧,到处搜寻我不知道为什么这样的包装器既不如何将/ map / flatMap / etc转换为模型的Flux通过控制器方法返回。
这是我的试探性问题,它引起了此问题的提及:
服务:
import com.poc.favoritos.model.Sugestao;
import org.elasticsearch.index.query.QueryBuilders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class SugestaoServiceImpl implements SugestaoService{
private static final Logger logger = LoggerFactory.getLogger(SugestaoServiceImpl.class);
private final ReactiveElasticsearchOperations reactiveElasticsearchOperations;
private final ReactiveElasticsearchClient reactiveElasticsearchClient;
public SugestaoServiceImpl(ReactiveElasticsearchOperations reactiveElasticsearchOperations,
ReactiveElasticsearchClient reactiveElasticsearchClient) {
this.reactiveElasticsearchOperations = reactiveElasticsearchOperations;
this.reactiveElasticsearchClient = reactiveElasticsearchClient;
}
@Override
public Mono<Sugestao> findSugestaoById(String id) {
return reactiveElasticsearchOperations.get(id, Sugestao.class)
.doOnError(throwable -> logger.error(throwable.getMessage(), throwable));
}
@Override
public Flux<Sugestao> findAllMySugestoes(String field, String value) {
NativeSearchQueryBuilder query = new NativeSearchQueryBuilder();
if (!StringUtils.isEmpty(field) && !StringUtils.isEmpty(value)) {
query.withQuery(QueryBuilders.matchQuery(field, value));
}
return reactiveElasticsearchOperations.search(query.build(), Sugestao.class);
}
}
ElastiSearchConfig最初从Same tutorial mentioned above复制。老实说,我不确定为什么需要这个配置以及添加到我的项目中的内容。顺便说一句,我也在operations reference中学习。
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.client.reactive.ReactiveRestClients;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter;
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
@Configuration
public class ElasticsearchConfig {
@Bean
public ReactiveElasticsearchClient reactiveElasticsearchClient() {
ClientConfiguration clientConfiguration = ClientConfiguration.builder()
.connectedTo(elassandraHostAndPort)
.withWebClientConfigurer(webClient -> {
ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
.codecs(configurer -> configurer.defaultCodecs()
.maxInMemorySize(-1))
.build();
return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
})
.build();
return ReactiveRestClients.create(clientConfiguration);
}
@Bean
public ElasticsearchConverter elasticsearchConverter() {
return new MappingElasticsearchConverter(elasticsearchMappingContext());
}
@Bean
public SimpleElasticsearchMappingContext elasticsearchMappingContext() {
return new SimpleElasticsearchMappingContext();
}
@Bean
public ReactiveElasticsearchOperations reactiveElasticsearchOperations() {
return new ReactiveElasticsearchTemplate(reactiveElasticsearchClient(), elasticsearchConverter());
}
@Value("${spring.data.elasticsearch.client.reactive.endpoints}")
private String elassandraHostAndPort;
}
关于SearchHit
:此类包含来自搜索结果的信息,该信息不属于实体,而是诸如分数,排序值,突出显示条目之类的搜索结果的一部分。如果您不需要此功能,而只想与实体一起使用助焊剂:
Flux<SearchHit<Entity>> fluxSearchHits = ...
Flux<Entity> fluxEntity = fluxSearchHits.map(searchHit -> searchHit.getContent);
关于配置:
您需要ReactiveElasticsearchClient
bean来配置Spring Data Elasticsearch。其他3种豆:不知道它们为什么在那里。 Spring Data Elasticsearch 4.0不需要它们]
Edit 16.05.2020:
配置:您应该从AbstractReactiveElasticsearchConfiguration
派生您的配置类,因此您不需要其他bean,因为基类定义了必要的东西:
@Configuration
public class ElasticsearchConfig extends AbstractReactiveElasticsearchConfiguration{
@Value("${spring.data.elasticsearch.client.reactive.endpoints}")
private String elassandraHostAndPort;
@Bean
public ReactiveElasticsearchClient reactiveElasticsearchClient() {
ClientConfiguration clientConfiguration = ClientConfiguration.builder()
.connectedTo(elassandraHostAndPort)
.build();
return ReactiveRestClients.create(clientConfiguration);
}
}
并且仅当您检索较大的结果集并且结果缓冲区的默认内存大小太低时才需要自定义的WebClientConfiguration。