在 Spring Boot 中休眠

问题描述 投票:0回答:1

我在 Spring Boot Web 服务中安排了一项定期重复的任务 (@EnableSceduling)。当该任务触发时,它会调用注册对象的 Runnable/run 方法。在该运行方法中,我需要执行工作,并且在工作完成之前不要退出该运行方法。问题是我有其他线程正在执行此运行线程工作所需的其他工作。所以在运行线程中我有这样的东西:

@Component
public class DoWork implements Runnable {
    
    @override
    public void run() {
    
    // Setup clients.

    // Call services.
    Mono<String> response1 = client1.post();
    response1.subscribe(new MyResponseCallback(), new MyErrorCallback()); 
    
    Mono<String> response2 = client2.post();
    response2.subscribe(new MyResponseCallback(), new MyErrorCallback()); 

    Mono<String> responseX = clientX.post();
    responseX.subscribe(new MyResponseCallback(), new MyErrorCallback());
        
    while(callbacksWorkCompletedFlag == false) {
        
            Thread.sleep (1000);
        }

        // Do computation with callback responses.

        // After computation is completed, exit run method.
    }
}

public class MyResponseCallback implements Consumer<String> {
    
    @override
    public void accept (final Sting response) {
    
        // Do work with response.
    }
}

public class MyErrorCallback implements Consumer<Throwable> {
    
    @override
    public void accept (final Throwable error) {
    
        // Log error.
    }
}

在 Java/Spring boot 中是否有更好的方法来做到这一点?

java spring spring-boot java-threads thread-synchronization
1个回答
0
投票

这是使用

CompletableFuture
的示例。它使用
Mono.subscribe
的第三个参数来让未来知道它何时完成。

@Override
public void run() {
    Mono<String> response1 = client1.post();
    CompletableFuture<?> future1 = new CompletableFuture<>();
    response1.subscribe(
                new MyResponseCallback(), new MyErrorCallback(),
                () -> future1.complete(null));

    Mono<String> response2 = client2.post();
    CompletableFuture<?> future2 = new CompletableFuture<>();
    response2.subscribe(
                new MyResponseCallback(), new MyErrorCallback(),
                () -> future2.complete(null));

    Mono<String> responseX = clientX.post();
    CompletableFuture<?> futureX = new CompletableFuture<>();
    responseX.subscribe(
                new MyResponseCallback(), new MyErrorCallback(),
                () -> futureX.complete(null));
    
    CompletableFuture.allOf(future1, future2, futureX).join();
}

这是一个

CountDownLatch
示例:

@Override
public void run() {
    CountDownLatch latch = new CountDownLatch(3);

    Mono<String> response1 = client1.post();
    response1.subscribe(new MyResponseCallback(), new MyErrorCallback(), latch::countDown);

    Mono<String> response2 = client2.post();
    response2.subscribe(new MyResponseCallback(), new MyErrorCallback(), latch::countDown);

    Mono<String> responseX = clientX.post();
    responseX.subscribe(new MyResponseCallback(), new MyErrorCallback(), latch::countDown);
    
    try {
        latch.await();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.