我需要构建一个后台进程来定期处理数据库表的所有元素。 由于我无法加载内存中的所有元素,因此我需要将数据库划分为子部分。不幸的是,我无法使用 JPQL Stream 返回类型,因为查询使用基于复杂过滤器的逻辑,而这是通过编写 JPQL 查询无法实现的。
因此,我构建了以下条件查询,仅返回产品页面。处理完一页后,如何迭代下一页直到处理完所有页面?
import org.springframework.data.jpa.repository.query.QueryUtils;
@Repository
@AllArgsConstructor
public class ProductRepositoryImpl implements CustomProductRepository {
private final EntityManager entityManager;
@Override
public Page<Product> getProducts(Pageable pageable, /* fields used to filter the results */) {
CriteriaBuilder criteriaBuilder = entityManager.getCriteriaBuilder();
CriteriaQuery<Product> criteriaQuery = criteriaBuilder.createQuery(Product.class);
Root<Product> root = criteriaQuery.from(Product.class);
List<Predicate> predicates = getPredicates(/* fields used to filter the results */);
criteriaQuery.where(combinePredicatesWithAndStatement(criteriaBuilder, predicates))
.orderBy(QueryUtils.toOrders(pageable.getSort(), root, criteriaBuilder));
List<Product> result = entityManager.createQuery(criteriaQuery)
.setFirstResult((int) pageable.getOffset())
.setMaxResults(pageable.getPageSize())
.getResultList();
CriteriaBuilder criteriaBuilderCount = entityManager.getCriteriaBuilder();
CriteriaQuery<Long> countQuery = criteriaBuilderCount.createQuery(Long.class);
Root<Product> rootCount = countQuery.from(Product.class);
List<Predicate> predicatesCount = getPredicates(/* fields used to filter the results */);
countQuery.select(criteriaBuilderCount.count(rootCount))
.where(combinePredicatesWithAndStatement(criteriaBuilderCount, predicatesCount));
Long totalElements = entityManager.createQuery(countQuery).getSingleResult();
return new PageImpl<>(result, pageable, totalElements);
}
private List<Product> getPredicates(/* fields used to filter the results */) {
List<Predicate> predicates = new ArrayList<Predicate>();
// assemble predicates based on some complex conditions not replicable with JPQL
return predicates;
}
private Predicate combinePredicatesWithAndStatement(CriteriaBuilder criteriaBuilder, List<Predicate> predicates) {
return criteriaBuilder.and(predicates.stream().filter(Objects::nonNull).toArray(Predicate[]::new));
}
}
要迭代所有页面,您可以创建一个 PageStreamer 类,如下所示:
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Stream;
public class PageStreamer<T> {
private final Pageable pageable;
private final Function<Pageable, Page<T>> getPage;
public PageStreamer(Pageable pageable, Function<Pageable, Page<T>> getPage) {
this.pageable = pageable;
this.getPage = getPage;
}
public Stream<Page<T>> stream() {
val currentPageableReference = new AtomicReference<>(pageable);
return Stream.generate(currentPageableReference::get)
.takeWhile(Objects::nonNull)
.map(getPage)
.takeWhile(Slice::hasContent)
.map(page -> setNextPageable(page, currentPageableReference));
}
private Page<T> setNextPageable(Page<T> page, AtomicReference<Pageable> currentPageableReference) {
currentPageableReference.set(page.hasNext() ? page.nextPageable() : null);
return page;
}
}
然后您可以按如下方式使用它:
PageStreamer<Product> pageStreamer = new PageStreamer<>(
// starts retrieving the products from the first page,
// using a 50 elements window size
PageRequest.of(0, 50),
pageable -> productRepository.getProducts(pageable, /* fields used to filter the results */)
);
pageStreamer.stream().forEach(product -> {
// perform some processing
});