我正在使用 Spring 的 STOMP over WebSocket 实现和功能齐全的 ActiveMQ 代理。当用户
SUBSCRIBE
订阅某个主题时,必须经过一些权限逻辑才能成功订阅。我正在使用 ChannelInterceptor 来应用权限逻辑,配置如下:
WebSocketConfig.java:
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/stomp")
.setAllowedOrigins("*")
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableStompBrokerRelay("/topic", "/queue")
.setRelayHost("relayhost.mydomain.com")
.setRelayPort(61613);
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.setInterceptors(new MySubscriptionInterceptor());
}
}
WebSocketSecurityConfig.java:
public class WebSocketSecurityConfig extends AbstractSecurityWebSocketMessageBrokerConfigurer {
@Override
protected void configureInbound(MessageSecurityMetadataSourceRegistry messages) {
messages
.simpSubscribeDestMatchers("/stomp/**").authenticated()
.simpSubscribeDestMatchers("/user/queue/errors").authenticated()
.anyMessage().denyAll();
}
}
MySubscriptionInterceptor.java:
public class MySubscriptionInterceptor extends ChannelInterceptorAdapter {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor headerAccessor= StompHeaderAccessor.wrap(message);
Principal principal = headerAccessor.getUser();
if (StompCommand.SUBSCRIBE.equals(headerAccessor.getCommand())) {
checkPermissions(principal);
}
return message;
}
private void checkPermissions(Principal principal) {
// apply permissions logic
// throw Exception permissions not sufficient
}
}
当没有足够权限的客户端尝试订阅受限主题时,他们实际上从未收到来自该主题的任何消息,但也不会收到拒绝其订阅的抛出异常的通知。相反,客户端会收到一个 ActiveMQ 代理一无所知的死订阅。 (正常的、经过充分许可的客户端与 STOMP 端点和主题的交互按预期工作。)
我尝试在成功连接后使用我的 Java 测试客户端订阅
users/{subscribingUsername}/queue/errors
和简单的 users/queue/errors
,但迄今为止我无法从服务器传递到客户端获取有关订阅异常的错误消息。这显然不太理想,因为客户永远不会收到他们被拒绝访问的通知。
您不能只从
MySubscriptionInterceptor
上的 clientInboundChannel
上抛出异常,因为最后一个是 ExecutorSubscribableChannel
,因此是 async
,并且这些线程中的任何异常都会在日志中重新抛出给呼叫者 - StompSubProtocolHandler.handleMessageFromClient
。
但是你可以做类似
clientOutboundChannel
的事情并像这样使用它:
StompHeaderAccessor headerAccessor = StompHeaderAccessor.create(StompCommand.ERROR);
headerAccessor.setMessage(error.getMessage());
clientOutboundChannel.send(MessageBuilder.createMessage(new byte[0], headerAccessor.getMessageHeaders()));
另一个需要考虑的选项是注释映射:
@SubscribeMapping("/foo")
public void handleWithError() {
throw new IllegalArgumentException("Bad input");
}
@MessageExceptionHandler
@SendToUser("/queue/error")
public String handleException(IllegalArgumentException ex) {
return "Got error: " + ex.getMessage();
}
我面临着同样的问题,并找到了一个基于
WebSocketMessageBrokerConfigurer
类的简单解决方案。
要从 ExecutorSubscribableChannel
自动返回的异常中提取
your异常,您可以通过
registerStompEndpoints(...)
方法应用错误处理程序的自定义实现:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// ...
registry.setErrorHandler(new StompSubProtocolErrorHandler() {
@Override
public Message<byte[]> handleClientMessageProcessingError(Message<byte[]> clientMessage, Throwable ex) {
return super.handleClientMessageProcessingError(clientMessage, ex.getCause());
}
});
}
}
通过调用 ex.getCause(),您可以检索导致返回错误消息的底层 Throwable。