有没有办法让存储库调用并行运行?

问题描述 投票:0回答:2

在我的项目中,我同时运行多个查询。这是我目前正在做的事情的例子

在我的存储库中,我正在做这样的事情 “从表名中选择计数(列名)”

columnName 和 tableName 是我从服务传递的变量。假设我有 100 多个列需要计算,那么我必须调用 select 查询 100 多次,这很耗时。是否可以并行运行或至少批量运行?

已经尝试过反应堆堆芯并使用助焊剂,但看起来它仍在按顺序运行。我读过 jdbc 正在阻塞,这就是为什么它需要等待另一个查询完成才能执行另一个查询。但它总是如此还是有其他方式。

为此我正在使用 spring boot 和 jdbc。

编辑: 到目前为止,这是我目前的实施

List<Mono<Count>> countMonoList = new ArrayList<>();
countList.forEach(c -> {
  Mono<Count> blockingWrapper = Mono.fromCallable(() -> countRepo.selectQuery(c));
blockingWrapper = blockingWrapper.subscribeOn(Schedulers.boundedElastic());
blockingWrapper.subscribe();
countMonoList.add(blockingWrapper);
});

然后我需要将

List<Mono<Count>>
转换为
List<Count>
以进行下一个过程。现在我在运行一些查询后得到数据库连接失败。

spring-boot project-reactor
2个回答
0
投票

这将并行运行到池中的连接数。

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;

import com.example.demo.count.CountingService;

@SpringBootApplication
public class Springboot3Application {

    public static void main(String[] args) {
        SpringApplication.run(Springboot3Application.class, args);
    }

    @Autowired
    CountingService countingService;

    @EventListener
    public void onApplicationEvent(ContextRefreshedEvent event) {
        List<String[]> tableInfo = Arrays.asList(new String[][] {new String[] {"countries", "id"}, new String[] {"countries", "name"}});

        List<String[]> results = tableInfo.parallelStream().map(entry -> {
            int count = countingService.count(entry[0], entry[1]);
            return new String[] { entry[0], entry[1], count + "" };
        }).collect(Collectors.toList());

    }

}

计数服务

public interface CountingService {

    int count(String tableName, String tableColumn);

}

CountingServiceImpl

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;

@Service
public class CountingServiceImpl implements CountingService {
    
    @Autowired 
    JdbcTemplate jdbcTemplate;
    
    @Override
    public int count(String tableName, String columnName) {
        return jdbcTemplate.queryForObject(
                "select count(" + columnName + ") from " + tableName, Integer.class);
    }
    

}

0
投票

Reactor 提供了 2 个 API,通过 Mono.fromCallableMono.fromRunnable

如果多个调用需要并行发生,那么您可以将其卸载到一个 Schedular(可以为基于 IO 的工作选择 bounded elastic)

可以查看这里这里更多细节。下面是一个破解示例供参考

private static void computeFromCallable() {
    for (int i = 0; i<200; i++){
        Mono<Integer> integerMono = Mono
                .fromCallable(MTSample::compute)
                .subscribeOn(Schedulers.boundedElastic());
        integerMono
                .subscribe((x) -> System.out.println("return value for thread: " + Thread.currentThread().getName() + " is: " + x));
    }
}

static Random rand1 = new Random();
private static int compute() throws InterruptedException {
    int i = rand1.nextInt(2000);
    int j = rand1.nextInt(10);
    System.out.println("sleep delay for thread: " + Thread.currentThread().getName() + " is: " + i);
    Thread.sleep(i);
    return j;
}
© www.soinside.com 2019 - 2024. All rights reserved.