如何在 Spring Batch 中使用 MongoItemReader 进行聚合查询

问题描述 投票:0回答:2

需求发生了一些变化,我必须在 setQuery() 中使用聚合查询而不是基本查询。这可能吗? 请建议我该怎么做?我的聚合查询已准备就绪,但不确定如何在春季批处理中使用它

public ItemReader<ProfileCollection> searchMongoItemReader() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {

        MongoItemReader<MyCollection> mongoItemReader = new MongoItemReader<>();
        mongoItemReader.setTemplate(myMongoTemplate);
        mongoItemReader.setCollection(myMongoCollection);

        mongoItemReader.setQuery(" Some Simple Query - Basic");

        mongoItemReader.setTargetType(MyCollection.class);
        Map<String, Sort.Direction> sort = new HashMap<>();
        sort.put("field4", Sort.Direction.ASC);
        mongoItemReader.setSort(sort);
        return mongoItemReader;

    }
java mongodb spring-batch
2个回答
5
投票

扩展 MongoItemReader 并为方法 doPageRead() 提供您自己的实现。这样您将获得完整的分页支持,并且文档的阅读将成为步骤的一部分。

public class CustomMongoItemReader<T, O> extends MongoItemReader<T> {
private MongoTemplate template;
private Class<? extends T> inputType;
private Class<O> outputType
private MatchOperation match;
private ProjectionOperation projection;
private String collection;

@Override
protected Iterator<T> doPageRead() {
    Pageable page = PageRequest.of(page, pageSize) //page and page size are coming from the class that MongoItemReader extends
    Aggregation agg = newAggregation(match, projection, skip(page.getPageNumber() * page.getPageSize()), limit(page.getPageSize()));
    return (Iterator<T>) template.aggregate(agg, collection, outputType).iterator();

}
}

以及其他getter和setter等方法。只需查看 MongoItemReader 的源代码here。 我还从中删除了查询支持。您也可以使用相同的方法,只需从 MongoItemReader 复制粘贴即可。与排序相同。

在有读者的课堂上,你会做类似的事情:

public MongoItemReader<T> reader() {
    CustomMongoItemReader reader = new CustomMongoItemReader();
    reader.setTemplate(mongoTemplate);
    reader.setName("abc");
    reader.setTargetType(input.class);
    reader.setOutputType(output.class);
    reader.setCollection(myMongoCollection);
    reader.setMatch(Aggregation.match(new Criteria()....)));
    reader.setProjection(Aggregation.project("..","..");
    return reader;
}

4
投票

为了能够在作业中使用聚合,利用 Spring Batch 具有的所有功能,您必须创建一个自定义 ItemReader。 扩展

AbstractPaginatedDataItemReader
我们可以使用可分页操作中的所有元素。

这是该自定义类的示例:

public class CustomAggreagationPaginatedItemReader<T> extends AbstractPaginatedDataItemReader<T> implements InitializingBean {

    private static final Pattern PLACEHOLDER = Pattern.compile("\\?(\\d+)");
    private MongoOperations template;
    private Class<? extends T> type;
    private Sort sort;
    private String collection;

    public CustomAggreagationPaginatedItemReader() {
        super();
        setName(ClassUtils.getShortName(CustomAggreagationPaginatedItemReader.class));
    }

    public void setTemplate(MongoOperations template) {
        this.template = template;
    }

    public void setTargetType(Class<? extends T> type) {
        this.type = type;
    }

    public void setSort(Map<String, Sort.Direction> sorts) {
        this.sort = convertToSort(sorts);
    }

    public void setCollection(String collection) {
        this.collection = collection;
    }

    @Override
    @SuppressWarnings("unchecked")
    protected Iterator<T> doPageRead() {
        Pageable pageRequest = new PageRequest(page, pageSize, sort);
        
        BasicDBObject cursor = new BasicDBObject();
        cursor.append("batchSize", 100);
        
        SkipOperation skipOperation = skip(Long.valueOf(pageRequest.getPageNumber()) * Long.valueOf(pageRequest.getPageSize()));

        Aggregation aggregation = newAggregation(
                //Include here all your aggreationOperations,
                skipOperation,
                limit(pageRequest.getPageSize())
            ).withOptions(newAggregationOptions().cursor(cursor).build());

        return (Iterator<T>) template.aggregate(aggregation, collection, type).iterator();
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        Assert.state(template != null, "An implementation of MongoOperations is required.");
        Assert.state(type != null, "A type to convert the input into is required.");
        Assert.state(collection != null, "A collection is required.");
    }

    private String replacePlaceholders(String input, List<Object> values) {
        Matcher matcher = PLACEHOLDER.matcher(input);
        String result = input;

        while (matcher.find()) {
            String group = matcher.group();
            int index = Integer.parseInt(matcher.group(1));
            result = result.replace(group, getParameterWithIndex(values, index));
        }

        return result;
    }

    private String getParameterWithIndex(List<Object> values, int index) {
        return JSON.serialize(values.get(index));
    }

    private Sort convertToSort(Map<String, Sort.Direction> sorts) {
        List<Sort.Order> sortValues = new ArrayList<Sort.Order>();

        for (Map.Entry<String, Sort.Direction> curSort : sorts.entrySet()) {
            sortValues.add(new Sort.Order(curSort.getValue(), curSort.getKey()));
        }

        return new Sort(sortValues);
    }
}

如果你仔细观察,你会发现它是使用 Spring 框架中的

MongoItemReader
创建的,你可以在
org.springframework.batch.item.data.MongoItemReader
看到该类,你必须创建一个扩展 AbstractPaginatedDataItemReader 的全新类,如果你看一下“doPageRead”方法你应该可以看到它只使用了MongoTemplate的find操作,无法在其中使用Aggregate操作。

以下是如何在 CustomReader 中定义重写 main 方法的聚合:

@Bean
public ItemReader<YourDataClass> reader(MongoTemplate mongoTemplate) {
    CustomAggreagationPaginatedItemReader<YourDataClass> customAggreagationPaginatedItemReader = new CustomAggreagationPaginatedItemReader<>();

    Map<String, Direction> sort = new HashMap<String, Direction>();
    sort.put("id", Direction.ASC);
        
    customAggreagationPaginatedItemReader.setTemplate(mongoTemplate);
    customAggreagationPaginatedItemReader.setCollection("collectionName");
    customAggreagationPaginatedItemReader.setTargetType(YourDataClass.class);
    customAggreagationPaginatedItemReader.setSort(sort);
        
    return customAggreagationPaginatedItemReader;
}

您可能会注意到,您还需要一个 MongoTemplate 实例,它也应如下所示:

@Bean
public MongoTemplate mongoTemplate(MongoDbFactory mongoDbFactory) {
    return new MongoTemplate(mongoDbFactory);
}

其中 MongoDbFactory 是 spring 框架自动装配的对象。

希望这足以帮助您。

© www.soinside.com 2019 - 2024. All rights reserved.