在 Spring 应用程序内部,我们使用
ServiceBusReceiverAsyncClient
来使用来自 Azure ServiceBus 的消息并在反应链中处理它们。比如:
public class MyApplication {
// ...
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
}
public class ApplicationLifecycleManager {
// ...
@EventListener(ApplicationReadyEvent.class)
public void onAfterStartUp() {
receiverAsyncClient
.receiveMessages()
.flatMap(...) // do some processing
.subscribe();
}
}
我们遇到过这样的问题:当反应链遇到 OutOfMemoryException 时,我们会得到带有堆栈跟踪的日志
WARN [parallel-5] reactor.core.Exceptions - throwIfFatal detected a jvm fatal exception, which is thrown and logged below:
,但我们似乎没有任何方法对其采取行动,但 Spring 应用程序和不相关的链继续奔跑。
在我们的例子中,我们需要确保 Spring Context 关闭并重新启动应用程序,以便我们可以恢复处理 ServiceBus 消息。
我们尝试手动捕获致命异常,但似乎没有任何效果 - FatalExceptions 似乎绕过了所有 onError 运算符和钩子。周围的 try/catch 块似乎也无法捕获错误。我们可以调用 System.exit(),但我们似乎无法在任何地方捕获异常。
在我们的例子中,我们需要确保 Spring Context 关闭并重新启动应用程序,以便我们可以恢复处理 ServiceBus 消息。
在这里,我使用以下代码重新启动了应用程序并继续处理 ServiceBus 消息
代码:
ApplicationLifecycleManager.java:
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Component
public class ApplicationLifecycleManager {
private final ServiceBusReceiverAsyncClient receiverAsyncClient;
public ApplicationLifecycleManager(ServiceBusReceiverAsyncClient receiverAsyncClient) {
this.receiverAsyncClient = receiverAsyncClient;
}
@EventListener(ApplicationReadyEvent.class)
public void onAfterStartUp() {
receiverAsyncClient.receiveMessages()
.flatMap(this::processMessage)
.onErrorResume(throwable -> {
if (isFatal(throwable)) {
restartApplication();
return Mono.empty();
} else {
return Mono.error(throwable);
}
})
.subscribe();
}
private Flux<?> processMessage(ServiceBusReceiverAsyncClient.Message message) {
consumeMemory();
System.out.println("Processing message: " + message.getContent());
return Flux.empty();
}
private void consumeMemory() {
int arraySize = 1000000;
int[] array = new int[arraySize];
for (int i = 0; i < arraySize; i++) {
array[i] = i;
}
}
private boolean isFatal(Throwable throwable) {
return throwable instanceof OutOfMemoryError;
}
private void restartApplication() {
System.out.println("Restarting application...");
}
}
ServiceBusReceiverAsyncClient.java:
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
@Component
public class ServiceBusReceiverAsyncClient {
@Value("${azure.servicebus.connection-string}")
private String connectionString;
@Value("${azure.servicebus.topic-name}")
private String topicName;
@Value("${azure.servicebus.subscription-name}")
private String subscriptionName;
public Flux<Message> receiveMessages() {
return Flux.just(new Message("Message 1"), new Message("Message 2"), new Message("Message 3"));
}
static class Message {
private String content;
public Message(String content) {
this.content = content;
}
public String getContent() {
return content;
}
}
}
MessageController.java:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
@RestController
public class MessageController {
@Autowired
private ServiceBusReceiverAsyncClient receiverAsyncClient;
@GetMapping("/messages")
public ResponseEntity<Flux<ServiceBusReceiverAsyncClient.Message>> receiveMessages() {
Flux<ServiceBusReceiverAsyncClient.Message> messages = receiverAsyncClient.receiveMessages();
return ResponseEntity.ok().body(messages);
}
}
application.properties:
azure.servicebus.connection-string=<connec_string>
azure.servicebus.topic-name=<topic_name>
azure.servicebus.subscription-name=<subscription_name>
pom.xml:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.6.5</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
<version>7.16.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
onErrorResume()
运算符捕获处理过程中抛出的任何异常。isFatal()
方法检查异常是否是致命错误,例如OutOfMemoryError。restartApplication()
方法来触发应用程序的重新启动。输出: