将 JMS 消息桥接到 Quarkus 中的 SSE 端点

问题描述 投票:0回答:1

在 Spring 项目中,我使用

Sinks
将事件发送到 SSE 端点,效果很好,检查:

https://github.com/hantsy/spring-graphql-sample/blob/master/dgs-subscription-sse/src/main/kotlin/com/example/demo/DataFetchers.kt#L33

但是当我尝试使用 Smallrye Mutiny

MultiEmitterProcessor
来实现相同的目的时,它失败了。

示例项目是https://github.com/hantsy/quarkus-sandbox/tree/master/jms

    MultiEmitterProcessor<Message> emitterProcessor = MultiEmitterProcessor.create();

    void receive() {
        var consumer = jmsContext.createConsumer(helloQueue);
        consumer.setMessageListener(
                msg -> {
                    try {
                        var received = jsonb.fromJson(msg.getBody(String.class), Message.class);
                        LOGGER.log(Level.INFO, "consuming message: {0}", received);
                        emitterProcessor.emit(received);
                    } catch (JMSException e) {
                        throw new RuntimeException(e);
                    }
                }
        );
    }

在资源类中,

    @GET
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @RestStreamElementType(MediaType.APPLICATION_JSON)
    public Multi<Message> stream() {
        // see: https://github.com/quarkusio/quarkus/issues/35220
        return handler.emitterProcessor.toMulti().toHotStream();
    }
  1. 我不确定

    MultiEmitterProcessor
    当热点流好不好?

  2. 或者有类似 Reactor

    ConnectableFlux
    的东西可以手动连接到热流?

jms quarkus smallrye
1个回答
0
投票

最后,我从 Resource 类中移动以下代码

emitterProcessor.toMulti() 

进入处理程序类中的方法,然后它就可以工作了。

public Multi<Message> stream() {
    return processor.toMulti().broadcast().toAllSubscribers();
}

检查最终的工作示例:https://github.com/hantsy/quarkus-sandbox/tree/master/jms

© www.soinside.com 2019 - 2024. All rights reserved.