如何配置activemq ex。排队时间,或者将消息保持在队列中直到一段时间或行动?

问题描述 投票:0回答:2

我正在使用spring boot和websocket构建通知系统,我使用ActiveMQ为队列用户保留队列,它工作正常。

我需要编辑一些配置,比如队列时间才能生效,将消息保留在队列中直到用户阅读它,我不知道如何配置它?

以下是其实施:

@Configuration
@EnableWebSocketMessageBroker 
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

      @Override
        public void configureMessageBroker(MessageBrokerRegistry config) {
            /*config.enableSimpleBroker("/topic");
            config.setApplicationDestinationPrefixes("/app");*/

          config
            .setApplicationDestinationPrefixes("/app")
            .setUserDestinationPrefix("/user")
            .enableStompBrokerRelay("/topic","/queue","/user")

            .setRelayHost("localhost")
            .setRelayPort(61613)
            .setClientLogin("guest")
            .setClientPasscode("guest");


        }


        public void registerStompEndpoints(StompEndpointRegistry registry) {
            registry.addEndpoint("/websocket").withSockJS();
        }

}

和:

@Service
public  class NotificationWebSocketService {
@Autowired
private SimpMessagingTemplate messagingTemplate;

public void initiateNotification(WebSocketNotification notificationData) throws InterruptedException {

messagingTemplate.convertAndSendToUser(notificationData.getUserID(), "/reply", notificationData.getMessage());

}
}

在调用NotificationWebSocketService之后,它将在activemq中创建队列“/ user / Johon / reply”,当收到此队列消息中的用户订阅时,该消息包含消息。

如何配置队列生存时间,将消息保留在队列中直到用户读取它为止?

spring-boot activemq stomp spring-websocket server-push
2个回答
0
投票

单元测试说明如何在用户队列中设置消息的到期时间。必需的tomcat嵌入式,spring-messaging和active-mq

import org.apache.catalina.Context;
import org.apache.catalina.Wrapper;
import org.apache.catalina.connector.Connector;
import org.apache.catalina.startup.Tomcat;
import org.apache.coyote.http11.Http11NioProtocol;
import org.apache.tomcat.util.descriptor.web.ApplicationListener;
import org.apache.tomcat.websocket.server.WsContextListener;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.simp.stomp.*;
import org.springframework.messaging.support.ChannelInterceptorAdapter;
import org.springframework.web.SpringServletContainerInitializer;
import org.springframework.web.WebApplicationInitializer;
import org.springframework.web.servlet.support.AbstractAnnotationConfigDispatcherServletInitializer;
import org.springframework.web.socket.WebSocketHttpHeaders;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.messaging.WebSocketStompClient;
import org.springframework.web.socket.sockjs.client.SockJsClient;
import org.springframework.web.socket.sockjs.client.WebSocketTransport;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import static java.util.concurrent.TimeUnit.SECONDS;

public class Test48402361 {

    private static final Logger logger = LoggerFactory.getLogger(Test48402361.class);

    private static TomcatWebSocketTestServer server = new TomcatWebSocketTestServer(33333);

    @BeforeClass
    public static void beforeClass() throws Exception {
        server.deployConfig(Config.class);
        server.start();
    }

    @AfterClass
    public static void afterClass() throws Exception {
        server.stop();
    }

    @Test
    public void testUser() throws Exception {

        WebSocketStompClient stompClient = new WebSocketStompClient(new SockJsClient(Collections.singletonList(new WebSocketTransport(new StandardWebSocketClient()))));

        BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();
        StompSession session = stompClient
                .connect("ws://localhost:" + server.getPort() + "/test", new WebSocketHttpHeaders(), new StompSessionHandlerAdapter() {
                })
                .get();
        // waiting until message 2 expired
        Thread.sleep(3000);
        session.subscribe("/user/john/reply", new StompFrameHandler() {
            @Override
            public Type getPayloadType(StompHeaders headers) {
                return byte[].class;
            }

            @Override
            public void handleFrame(StompHeaders headers, Object payload) {
                String message = new String((byte[]) payload);
                logger.debug("message: {}, headers: {}", message, headers);
                blockingQueue.add(message);
            }
        });
        String message = blockingQueue.poll(1, SECONDS);
        Assert.assertEquals("1", message);
        message = blockingQueue.poll(1, SECONDS);
        Assert.assertEquals("3", message);

    }

    public static class Config extends AbstractAnnotationConfigDispatcherServletInitializer {

        @Override
        protected Class<?>[] getRootConfigClasses() {
            return new Class[] { };
        }

        @Override
        protected Class<?>[] getServletConfigClasses() {
            return new Class[] { Mvc.class };
        }

        @Override
        protected String[] getServletMappings() {
            return new String[] { "/" };
        }
    }

    @Configuration
    @EnableWebSocketMessageBroker
    public static class Mvc extends AbstractWebSocketMessageBrokerConfigurer {

        @Override
        public void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) {

            stompEndpointRegistry.addEndpoint("/test")
                    .withSockJS()
                    .setWebSocketEnabled(true);
        }

        @Override
        public void configureMessageBroker(MessageBrokerRegistry registry) {
            registry.enableStompBrokerRelay("/user").setRelayHost("localhost").setRelayPort(61614);
        }

        @Autowired
        private SimpMessagingTemplate template;

        @Override
        public void configureClientInboundChannel(ChannelRegistration registration) {
            registration.setInterceptors(new ChannelInterceptorAdapter() {
                @Override
                public Message<?> preSend(Message<?> message, MessageChannel channel) {

                    StompHeaderAccessor sha = StompHeaderAccessor.wrap(message);
                    switch (sha.getCommand()) {
                        case CONNECT:
    // after connect we send 3 messages to user john, one will purged after 2 seconds.
                            template.convertAndSendToUser("john", "/reply", "1");
                            Map<String, Object> headers = new HashMap<>();
                            headers.put("expires", System.currentTimeMillis() + 2000);
                            template.convertAndSendToUser("john", "/reply", "2", headers);
                            template.convertAndSendToUser("john", "/reply", "3");
                            break;
                    }
                    return super.preSend(message, channel);
                }
            });
        }
    }

    public static class TomcatWebSocketTestServer {

        private static final ApplicationListener WS_APPLICATION_LISTENER =
                new ApplicationListener(WsContextListener.class.getName(), false);

        private final Tomcat tomcatServer;

        private final int port;

        private Context context;


        public TomcatWebSocketTestServer(int port) {

            this.port = port;

            Connector connector = new Connector(Http11NioProtocol.class.getName());
            connector.setPort(this.port);

            File baseDir = createTempDir("tomcat");
            String baseDirPath = baseDir.getAbsolutePath();

            this.tomcatServer = new Tomcat();
            this.tomcatServer.setBaseDir(baseDirPath);
            this.tomcatServer.setPort(this.port);
            this.tomcatServer.getService().addConnector(connector);
            this.tomcatServer.setConnector(connector);
        }

        private File createTempDir(String prefix) {
            try {
                File tempFolder = File.createTempFile(prefix + '.', "." + getPort());
                tempFolder.delete();
                tempFolder.mkdir();
                tempFolder.deleteOnExit();
                return tempFolder;
            } catch (IOException ex) {
                throw new RuntimeException("Unable to create temp directory", ex);
            }
        }

        public int getPort() {
            return this.port;
        }


        @SafeVarargs
        public final void deployConfig(Class<? extends WebApplicationInitializer>... initializers) {

            this.context = this.tomcatServer.addContext("", System.getProperty("java.io.tmpdir"));

            // Add Tomcat's DefaultServlet
            Wrapper defaultServlet = this.context.createWrapper();
            defaultServlet.setName("default");
            defaultServlet.setServletClass("org.apache.catalina.servlets.DefaultServlet");
            this.context.addChild(defaultServlet);

            // Ensure WebSocket support
            this.context.addApplicationListener(WS_APPLICATION_LISTENER);

            this.context.addServletContainerInitializer(
                    new SpringServletContainerInitializer(), new HashSet<>(Arrays.asList(initializers)));
        }

        public void start() throws Exception {
            this.tomcatServer.start();
        }

        public void stop() throws Exception {
            this.tomcatServer.stop();
        }

    }

}

0
投票

“stompClient.subscribe('/ user / Johon / reply' - >'/ user / Johon / reply'是一个主题,而不是一个队列。

如果您的Stomp客户端未连接到主题'/ user / Johon / reply',他将丢失发送到该主题的每条消息。

所以你的解决方案是:

  1. 将您的主题'/ user / Johon / reply'转换为队列,因此消息将无限期地保留在队列中,或者直到服务器对消息进行最终处理。
  2. 使用追溯消费者和订阅恢复政策

追溯消费者只是一个常规的JMS主题消费者,它表示在订阅开始时,应该使用每次尝试来回溯并发送消费者可能错过的任何旧消息(或该主题上发送的最后消息) 。 http://activemq.apache.org/retroactive-consumer.html

订阅恢复策略允许您在订阅主题时返回。 http://activemq.apache.org/subscription-recovery-policy.html

  1. 使用持久订阅者

在系统中通常不需要长时间离线的持久主题订户。原因是经纪人需要保留发送给所述订户的那些主题的所有消息。并且这个消息打桩可以随着时间的推移经纪人存储限制,例如导致系统的整体减速。 http://activemq.apache.org/manage-durable-subscribers.html

Stomp的耐用订阅者:http://activemq.apache.org/stomp.html#Stomp-ActiveMQExtensionstoSTOMP

CONNECT client-id string指定与activemq.subcriptionName结合使用的JMS clientID,以表示持久订阅者。

关于TTL的一些解释

客户端可以为其发送的每条消息指定生存时间值(以毫秒为单位)。此值定义消息到期时间,即消息的生存时间和GMT发送时的总和(对于事务发送,这是客户端发送消息的时间,而不是事务提交的时间)。

默认生存时间为0,因此消息将无限期地保留在队列中,或者直到服务器对消息进行最终处理

UPDATE

如果你想使用外部ActiveMQ Broker

删除@EnableWebSocketMessageBroker并添加到连接器下面的activemq.xml并重新启动代理。

 <transportConnector name="stomp" uri="stomp://localhost:61613"/>

如果要嵌入ActiveMQ Broker,请将bean添加到WebSocketConfig:

 @Bean(initMethod = "start", destroyMethod = "stop")
    public BrokerService broker() throws Exception {
        final BrokerService broker = new BrokerService();
        broker.addConnector("stomp://localhost:61613");    
        return broker;
    }

和所需的依赖项

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-stomp</artifactId>
    </dependency>

完整的例子Spring Boot WebSocket with embedded ActiveMQ Broker

http://www.devglan.com/spring-boot/spring-boot-websocket-integration-example

© www.soinside.com 2019 - 2024. All rights reserved.