如何在Quarkus中通过REST将传入的数据转发到SSE流。

问题描述 投票:0回答:2

在我的设置中,我想通过SSE通道(服务器发送事件)转发某些状态变化。状态变化是通过调用一个REST端点发起的。所以,我需要将传入的状态变化转发到SSE流中。

在Quarkus中,有什么最简单的方法来完成这个任务。

我能想到的一个解决方案是使用一个EventBus (https:/quarkus.ioguidesreactive-messaging。). SSE端点将订阅状态变化并通过SSE通道推送。状态变化端点会发布相应的事件。

这是一个可行的解决方案吗?还有其他(更简单)的解决方案吗?在任何情况下,我都需要使用反应式的东西来完成这个任务吗?

任何帮助都是非常感激的

server-sent-events quarkus
2个回答
1
投票

最简单的方法是使用rxjava作为流提供者。首先,你需要添加rxjava依赖。它可以从夸克斯中的反应式依赖,如kafka,或者直接使用它(如果你不需要任何流媒体库)。

        <dependency>
            <groupId>io.reactivex.rxjava2</groupId>
            <artifactId>rxjava</artifactId>
            <version>2.2.19</version>
        </dependency>

下面是一个如何每秒发送随机双值的例子。

    @GET
    @Path("/stream")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @SseElementType("text/plain")
    public Publisher<Double> stream() {
        return Flowable.interval(1, TimeUnit.SECONDS).map(tick -> new Random().nextDouble());
    }

我们创建一个新的Flowable,它将每秒钟启动一次,在每一个tick上我们生成下一个随机的双倍值。研究如何创建Flowable的任何其他选项,如 Flowable.fromFuture() 以适应你特定的代码逻辑。

P.S上面的代码每次查询这个端点的时候都会生成新的Flowable,我这样做是为了节省空间,在你的情况下,我假设你会有一个单一的事件源,你可以建立一次,每次查询端点的时候都使用同一个实例。


1
投票

Dmytro,谢谢你给我指出了正确的方向。我选择了Mutiny与Kotlin连接。我的代码现在看起来是这样的。

data class DeviceStatus(var status: Status = Status.OFFLINE) {
    enum class Status {OFFLINE, CONNECTED, ANALYZING, MAINTENANCE}
}

@ApplicationScoped
class DeviceStatusService {
    var deviceStatusProcessor: PublishProcessor<DeviceStatus> = PublishProcessor.create()
    var deviceStatusQueue: Flowable<DeviceStatus> = Flowable.fromPublisher(deviceStatusProcessor)

    fun pushDeviceStatus(deviceStatus: DeviceStatus) {
        deviceStatusProcessor.onNext(deviceStatus)
    }

    fun getStream(): Multi<DeviceStatus> {
        return Multi.createFrom().publisher(deviceStatusQueue)
    }
}

@Path("/deviceStatus")
class DeviceStatusResource {
    private val LOGGER: Logger = Logger.getLogger("DeviceStatusResource")

    @Inject
    @field: Default
    lateinit var deviceStatusService: DeviceStatusService

    @POST
    @Consumes(MediaType.APPLICATION_JSON)
    fun status(status: DeviceStatus): Response {
        LOGGER.info("POST /deviceStatus " + status.status);
        deviceStatusService.pushDeviceStatus(status)
        return Response.ok().build();
    }

    @GET
    @Path("/eventStream")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @SseElementType(MediaType.APPLICATION_JSON)
    fun stream(): Multi<DeviceStatus>? {
        return deviceStatusService.getStream()
    }
}

作为最小的设置,服务可以直接使用设备状态处理器作为发布者。但是,Flowable增加了缓冲功能.欢迎大家对实现方式提出意见。

© www.soinside.com 2019 - 2024. All rights reserved.