如何自定义Java 21的Structured Concurrency API的Shutdown策略?

问题描述 投票:0回答:1

我有以下用例,我想从多个数据源检索设备的多个错误状态信息。我想立即终止所有任务,如果任何状态为 true,则将错误状态返回到主任务。我怎样才能实现这个目标?这是我的查询逻辑:

try (var scope = new StructuredTaskScope<>()) {
            scope.fork(()->posRepo.findInspectionByUnitId(unitId));
            scope.fork(()->posRepo.findEventOfFireByUnitId(unitId));
            scope.fork(()->faultRepo.findGeneralErrorByUnitId(unitId));
            scope.fork(()->ardInfRepo.findArdModeByUnitId(unitId));
            scope.fork(()->ardInfRepo.findArdPowerStatusByUnitId(unitId));
            scope.fork(()->ardInfRepo.findArdModeByUnitId(unitId));
            // Once a task return true, close all remaining tasks,and return the  signal:"true" to main task;
            scope.join();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
public interface ElePosRepo extends CrudRepository<PositionInformation,Integer> {

    Optional<Boolean> findInspectionByUnitId(Integer unitId);
    Optional<Boolean> findEventOfFireByUnitId(Integer unitId);
}

我希望当一个子任务返回 true 时,任务组内的所有其他子任务都应该立即终止。如果所有子任务都返回 false 或为空,则主任务在完成所有子任务后应返回 false。

multithreading concurrency java-21 virtual-threads structured-concurrency
1个回答
0
投票
class ShutdownOnFirstTrue extends StructuredTaskScope<Optional<Boolean>> {
    
    private volatile boolean succeeded;
    
    @Override
    protected void handleComplete(Subtask<? extends Optional<Boolean>> subtask) {
        if (subtask.state() == State.SUCCESS) {
            subtask.get().filter((b) -> b.booleanValue()).ifPresent( (b) -> {
                succeeded = true;
                shutdown();
            });
        }
    }
    
    public boolean isSucceeded() {
        return succeeded;
    }

}

isSucceeded
应在
join
完成后访问

try (var scope = new ShutdownOnFirstTrue()) {
    scope.fork...
    scope.fork...
    scope.fork...
    scope.join();
    boolean succeeded = scope.isSucceeded();
}

但是,我建议学习

handleComplete
覆盖技术,而不是依赖于随意的解决方案来满足问题中表达的同样随意的要求(例如,您没有指定子任务引发的异常的行为)建议用于自定义行为的定义,在
StructuredTaskScope.ShutdownOnFailure
StructuredTaskScope.ShutdownOnSuccess
中引入,@Holger 在他的答案中给出了另一个很好的例子

一些技术细节:

  • fork
    之后的下一轮
    join
    课程不可重入。要实现它,要么为 
    succeeded
     字段设置一个(重新)设置器,要么像 @Holger 那样覆盖两个 
    join
    ,第二种方法更优雅,但更容易出错,因为如果下一个版本将添加第三个
    join
    ; (但希望在下一个版本中保护 
    joinImpl
     或在基类中拥有某种 
    joinCompleted
     占位符)。
  • shutdown
     没有针对并发调用进行防护,因为我相信 
    implShutdown
     已经做到了这一点,所以我发现 
    ShutdownOnSuccess.handleComplete
     中的防护过度。
  • 没有针对子任务引发的异常提供任何特殊操作。它只会向下传播。
© www.soinside.com 2019 - 2024. All rights reserved.