我写了一个@Aspect来拦截以Mono / Flux返回值的反应性方法。使用@AfterReturning建议,我试图通过调用Web服务来触发APNS通知。
不幸的是,processNotification Mono服务立即返回onComplete信号,而不执行调用链。下面是我的示例程序。
@Aspect
@Component
@Slf4j
public class NotifyAspect{
private final NotificationServiceHelper notificationServiceHelper;
@Autowired
public NotifyAspect(NotificationServiceHelper notificationServiceHelper) {
this.notificationServiceHelper = notificationServiceHelper;
}
@AfterReturning(pointcut = "@annotation(com.cupid9.api.common.annotations.Notify)", returning = "returnValue")
public void generateNotification(JoinPoint joinPoint, Object returnValue) throws Throwable {
log.info("AfterReturning Advice - Intercepting Method : {}", joinPoint.getSignature().getName());
//Get Intercepted method details.
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
//Get the Notification Details.
Notify myNotify = method.getAnnotation(Notify.class);
if (Mono.class.isAssignableFrom(returnValue.getClass())) {
Mono<Object> result = (Mono<Object>) returnValue;
result.doOnSubscribe(o -> {
log.debug("On Subscription...");
notificationServiceHelper.processNotification(myNotify.notificationType())
.doOnError(throwable -> {
log.error("Exception in notification processor",throwable);
});
});
}
}
}
@Slf4j
@Service
public class NotificationServiceHelper {
private ReactiveUserProfileRepository userProfileRepository;
@Value("${services.notification.url}")
private String notificationServiceUrl;
private RestWebClient restWebClient;
@Autowired
public NotificationServiceHelper(RestWebClient restWebClient,
ReactiveUserProfileRepository reactiveUserProfileRepository) {
this.restWebClient = restWebClient;
this.userProfileRepository = reactiveUserProfileRepository;
}
public Flux<Notification> processNotification(NotificationSchema.NotificationType notificationType) {
/*Get user profile details*/
return SessionHelper.getProfileId()
.switchIfEmpty( Mono.error(new BadRequest("Invalid Account 1!")))
.flatMap(profileId ->
Mono.zip(userProfileRepository.findByIdAndStatus(profileId, Status.Active), SessionHelper.getJwtToken()))
.switchIfEmpty( Mono.error(new BadRequest("Invalid Account 2!")))
.flatMapMany(tuple2 ->{
//Get user details and make sure there are some valid devices associated.
var userProfileSchema = tuple2.getT1();
log.info("Processing Notifications for User Profile : {}", userProfileSchema.getId());
if (Objects.isNull(userProfileSchema.getDevices()) || (userProfileSchema.getDevices().size() < 1)) {
return Flux.error(new InternalServerError("No Devices associate with this user. Can not send notifications."));
}
//Build Notification message from the Notification Type
var notificationsMap = new LinkedHashSet<Notification>();
userProfileSchema.getDevices().forEach(device -> {
var notificationPayload = Notification.builder()
.notificationType(notificationType)
.receiverDevice(device)
.receiverProfileRef(userProfileSchema.getId())
.build();
notificationsMap.add(notificationPayload);
});
//Get session token for authorization
var jwtToken = tuple2.getT2();
//Build the URI needed to make the rest call.
var uri = UriComponentsBuilder.fromUriString(notificationServiceUrl).build().toUri();
log.info("URI built String : {}", uri.toString());
//Build the Headers needed to make the rest call.
var headers = new HttpHeaders();
headers.add(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE);
headers.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
headers.add(HttpHeaders.AUTHORIZATION, jwtToken);
var publishers = new ArrayList<Mono<ClientResponse>>();
notificationsMap.forEach(notification -> {
publishers.add(restWebClient.post(uri, headers, notification));
});
return Flux.merge(publishers).flatMap(clientResponse -> {
var httpStatus = clientResponse.statusCode();
log.info("NotificationService HTTP status code : {}", httpStatus.value());
if (httpStatus.is2xxSuccessful()) {
log.info("Successfully received response from Notification Service...");
return clientResponse.bodyToMono(Notification.class);
} else {
// return Flux.empty();
return clientResponse.bodyToMono(Error.class)
.flatMap(error -> {
log.error("Error calling Notification Service :{}", httpStatus.getReasonPhrase());
if (httpStatus.value() == 400) {
return Mono.error(new BadRequest(error.getMessage()));
}
return Mono.error(new InternalServerError(String.format("Error calling Notification Service : %s", error.getMessage())));
});
}
});
}).doOnError(throwable -> {
throw new InternalServerError(throwable.getMessage(), throwable);
});
}
}
我们如何在不等待拦截等待的情况下异步触发此调用。.现在,processNotification总是返回onComplete信号而不执行。链未按预期执行]
@Target({ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Log {
public String title() default "";
}
@SuppressWarnings({"unchecked"})
@Around("@annotation(operlog)")
public Mono<Result> doAround(ProceedingJoinPoint joinPoint, Log operlog) {
Mono<Result> mono;
try {
mono = (Mono<Result>) joinPoint.proceed();
} catch (Throwable throwable) {
throw new RuntimeException(throwable);
}
return mono.doOnNext(result -> {
//doSomething(result);
};
}