用于 Elasticsearch 扫描和滚动的 spring-batch ItemReader

问题描述 投票:0回答:3

在 spring-batch 中,是否支持 Elasticsearch ItemReader,使用扫描和滚动功能?我确实看到了 this 扩展,但这是基于正常的 spring-data 搜索查询。最好有一个基于扫描和滚动功能的功能,因为批处理作业大多需要处理大量数据。谢谢。

spring-batch spring-data-elasticsearch
3个回答
1
投票

虽然 ElasticSearch 没有“原生”

ItemReader
实现,但 Spring Batch 确实提供了一个
RepositoryItemReader
来包装 Spring Data
PagingAndSortingRepository
。这样,您就可以使用 Spring Data ElasticSearch 项目提供的 ElasticSearch 存储库定义。

您可以在此处阅读有关 Spring Batch 文档中

RepositoryItemReader
的更多信息:http://docs.spring.io/spring-batch/trunk/apidocs/org/springframework/batch/item/data/RepositoryItemReader.html

您可以在此处阅读有关 Spring Data ElasticSearch 项目的更多信息:http://docs.spring.io/spring-data/elasticsearch/docs/current/reference/html/


1
投票
import java.util.Iterator;

import org.springframework.batch.item.data.AbstractPaginatedDataItemReader;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.query.SearchQuery;

public class ElasticsearchItemReader<T> extends AbstractPaginatedDataItemReader<T> {
    private ElasticsearchOperations elasticsearchOperations;
    private final SearchQuery searchQuery;
    private String scrollId;
    private int scrollTimeinMillis = 60000;
    private Class<T> type;

    public ElasticsearchItemReader(
        final ElasticsearchOperations elasticsearchOperations,
        final SearchQuery searchQuery,
        final Class<T> type
    ) {
        this.elasticsearchOperations = elasticsearchOperations;
        this.searchQuery = searchQuery;
        this.type = type;
    }

    @Override
    protected void doOpen() throws Exception {
        scrollId = elasticsearchOperations.scan(searchQuery, scrollTimeinMillis, false);
    }

    @Override
    protected Iterator<T> doPageRead() {
        return elasticsearchOperations.scroll(scrollId, scrollTimeinMillis, type).iterator();
   }
}

0
投票

我会给你一个更高版本的如何写。

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-batch</artifactId>
   <version>2.4.5</version>
</dependency>

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
   <version>2.4.5</version>
</dependency>

public class ElasticsearchItemReader<T> extends AbstractPaginatedDataItemReader<T> implements InitializingBean {

    private final ElasticsearchRestTemplate elasticsearchRestTemplate;

    private final Query query;

    private final int scrollTimeinMillis = 60000;

    private final Class<T> type;

    private final String index;

    private String scrollId;

    private boolean isFirstCall = true;


    @Override
    @SuppressWarnings("unchecked")
    protected Iterator<T> doPageRead() {
        if (isFirstCall) {
            SearchScrollHits<T> searchHits = elasticsearchRestTemplate.searchScrollStart(scrollTimeinMillis, query, type, IndexCoordinates.of(index));
            scrollId = searchHits.getScrollId();
            List<SearchHit<T>> searchHitList = searchHits.getSearchHits();
            isFirstCall = false;

            return (Iterator<T>) searchHitList.iterator();
        } else {
            SearchHitsImpl<T> searchHits = (SearchHitsImpl) elasticsearchRestTemplate.searchScrollContinue(scrollId, scrollTimeinMillis, type, IndexCoordinates.of(index));
            return (Iterator<T>) searchHits.getSearchHits().iterator();
        }

    }



    public ElasticsearchItemReader(ElasticsearchRestTemplate elasticsearchRestTemplate,
                                   Query query,
                                   Class<T> type,
                                   String index) {
        setName(getShortName(getClass()));
        this.elasticsearchRestTemplate = elasticsearchRestTemplate;
        this.query = query;
        this.type = type;
        this.index = index;
    }


    @Override
    public void afterPropertiesSet() throws Exception {
        state(elasticsearchRestTemplate != null, "An ElasticsearchOperations implementation is required.");
        state(query != null, "A query is required.");
        state(type != null, "A target type to convert the input into is required.");
    }
}

和 itemReader 代码

public ItemReader<SearchHit<T>> reader() {

     BoolQueryBuilder boolQueryBuilder = QueryBuilders
                .boolQuery();

     NativeSearchQuery query = new NativeSearchQueryBuilder()
                .withQuery(boolQueryBuilder)
                .build();


     return new ElasticsearchItemReader(elasticsearchRestTemplate, query, T.class, "*");

    }
© www.soinside.com 2019 - 2024. All rights reserved.