如何在反应链中发生致命异常(例如 OutOfMemory)时关闭 Spring 上下文

问题描述 投票:0回答:1

在 Spring 应用程序内部,我们使用

ServiceBusReceiverAsyncClient
来使用来自 Azure ServiceBus 的消息并在反应链中处理它们。比如:

public class MyApplication {

  // ...

  public static void main(String[] args) {
    SpringApplication.run(MyApplication.class, args);
  }

}

public class ApplicationLifecycleManager {

  // ...

  @EventListener(ApplicationReadyEvent.class)
  public void onAfterStartUp() {
    receiverAsyncClient
      .receiveMessages()
      .flatMap(...) // do some processing
      .subscribe();
  }

}

我们遇到过这样的问题:当反应链遇到 OutOfMemoryException 时,我们会得到带有堆栈跟踪的日志

WARN [parallel-5] reactor.core.Exceptions - throwIfFatal detected a jvm fatal exception, which is thrown and logged below:
,但我们似乎没有任何方法对其采取行动,但 Spring 应用程序和不相关的链继续奔跑。

在我们的例子中,我们需要确保 Spring Context 关闭并重新启动应用程序,以便我们可以恢复处理 ServiceBus 消息。

我们尝试手动捕获致命异常,但似乎没有任何效果 - FatalExceptions 似乎绕过了所有 onError 运算符和钩子。周围的 try/catch 块似乎也无法捕获错误。我们可以调用 System.exit(),但我们似乎无法在任何地方捕获异常。

spring-boot reactive-programming azureservicebus project-reactor
1个回答
0
投票

在我们的例子中,我们需要确保 Spring Context 关闭并重新启动应用程序,以便我们可以恢复处理 ServiceBus 消息。

  • 解决处理致命异常(例如 OutOfMemoryError)的问题,并检查 Spring 上下文是否关闭以及应用程序是否重新启动。

在这里,我使用以下代码重新启动了应用程序并继续处理 ServiceBus 消息

代码:

ApplicationLifecycleManager.java:

import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Component
public class ApplicationLifecycleManager {

    private final ServiceBusReceiverAsyncClient receiverAsyncClient;
    public ApplicationLifecycleManager(ServiceBusReceiverAsyncClient receiverAsyncClient) {
        this.receiverAsyncClient = receiverAsyncClient;
    }

    @EventListener(ApplicationReadyEvent.class)
    public void onAfterStartUp() {
        receiverAsyncClient.receiveMessages()
                .flatMap(this::processMessage)
                .onErrorResume(throwable -> {
                    if (isFatal(throwable)) {
                        restartApplication();
                        return Mono.empty();
                    } else {
                        return Mono.error(throwable);
                    }
                })
                .subscribe();
    }
    private Flux<?> processMessage(ServiceBusReceiverAsyncClient.Message message) {
        consumeMemory();
        System.out.println("Processing message: " + message.getContent());
        return Flux.empty(); 
    }

    private void consumeMemory() {
        int arraySize = 1000000;
        int[] array = new int[arraySize];
        for (int i = 0; i < arraySize; i++) {
            array[i] = i;
        }
    }
    private boolean isFatal(Throwable throwable) {
        return throwable instanceof OutOfMemoryError;
    }
    
    private void restartApplication() {
        System.out.println("Restarting application...");
    }
}

ServiceBusReceiverAsyncClient.java:

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;

@Component
public class ServiceBusReceiverAsyncClient {

    @Value("${azure.servicebus.connection-string}")
    private String connectionString;

    @Value("${azure.servicebus.topic-name}")
    private String topicName;

    @Value("${azure.servicebus.subscription-name}")
    private String subscriptionName;

    public Flux<Message> receiveMessages() {
        return Flux.just(new Message("Message 1"), new Message("Message 2"), new Message("Message 3"));
    }
    static class Message {
        private String content;
        public Message(String content) {
            this.content = content;
        }

        public String getContent() {
            return content;
        }
    }
}

MessageController.java:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;

@RestController
public class MessageController {

    @Autowired
    private ServiceBusReceiverAsyncClient receiverAsyncClient;
    @GetMapping("/messages")
    public ResponseEntity<Flux<ServiceBusReceiverAsyncClient.Message>> receiveMessages() {
        Flux<ServiceBusReceiverAsyncClient.Message> messages = receiverAsyncClient.receiveMessages();
        return ResponseEntity.ok().body(messages);
    }
}

application.properties:

azure.servicebus.connection-string=<connec_string>
azure.servicebus.topic-name=<topic_name>
azure.servicebus.subscription-name=<subscription_name>

pom.xml:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
        <version>3.6.5</version>
    </dependency>
    <dependency>
        <groupId>com.azure</groupId>
        <artifactId>azure-messaging-servicebus</artifactId>
        <version>7.16.0</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
  • onErrorResume()
    运算符捕获处理过程中抛出的任何异常。
  • isFatal()
    方法检查异常是否是致命错误,例如OutOfMemoryError。
  • 如果检测到致命错误,则会调用
    restartApplication()
    方法来触发应用程序的重新启动。

输出:

enter image description here

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.