具有非阻塞嵌入式资源的 REST API

问题描述 投票:0回答:1

我正在研究 REST API,对于收集端点,希望

_embedded
数组异步填充。不幸的是,当also具有
_links
元素时,我似乎无法弄清楚如何做到这一点。在响应的顶层放置一个
Publisher
可以正确地异步返回值;在嵌入对象中放置
Publisher
会导致序列化时出错。

这是当前(不工作)的代码。我正在使用 Project Reactor 和 Micronaut。

    @Get
    HttpResponse<Publisher<Response>> get() {
        Response response = new Response();
        response.embedded("item", new FluxEmbed(Flux.just(1, 2, 3, 4, 5)));
        response.link("index", "example.org");

        return HttpResponse.ok(Mono.just(response));
    }

    @Serdeable.Serializable
    class Response extends AbstractResource<Response> {
    }

    @Serdeable.Serializable
    class FluxEmbed extends AbstractResource<FluxEmbed> {
        private final Publisher<Integer> publisher;

        public FluxEmbed(Publisher<Integer> publisher) {
            this.publisher = publisher;
        }

        public Publisher<Integer> getPublisher() {
            return publisher;
        }
    }

预期的行为是让

_embedded
数组的输出逐渐填充。现在,我可以通过执行阻塞操作来实现这一点,但我更愿意留在异步模型中。

JSON输出;

_embedded
数组应随时间填充。如果我返回一个 Flux 作为
HttpResponse
的主体就可以做到这一点,但这不符合 HAL 标准。

{
  "_links": {
    "index": [
      {
        "href": "example.org",
        "templated": false
      }
    ]
  },
  "_embedded": {
    "item": [
      {
        "value": 1
      }
    ]
  }
}
rest project-reactor micronaut hateoas
1个回答
0
投票

你在这里可以做的,就是按照

这里
的描述使用Flux.create

和一座桥

在您的情况下,它将涉及控制器、Flux 桥和生成数据的侦听器。

下面的示例将每 5 秒输出数字 1..10。很抱歉在此示例中使用了 Kotlin,但它应该说明如何执行此操作。如有必要,我可以重写为 Java。

import io.micronaut.context.annotation.Prototype
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.scheduling.annotation.Scheduled
import jakarta.annotation.PostConstruct
import jakarta.inject.Singleton
import reactor.core.publisher.Flux
import reactor.core.publisher.FluxSink
import java.util.concurrent.ConcurrentHashMap


@Controller("/flux-test")
class TestController(private val fluxBridge: FluxBridge) {

    @Get
    fun get(): Flux<Int> = fluxBridge.getFlux()
}


/** anonymous implementation inside FluxBridge */
interface MyEventListener<T> {
    fun onDataChunk(chunk: List<T>)
    fun processComplete()
}

/** class that produces 1..10 every 5 seconds */
@Singleton
class MyEventProcessor {

    /** map threadId -> MyEventListener */
    /** NOTE: Not for production use */
    private val listeners = ConcurrentHashMap<Long, MyEventListener<Int>>()

    fun register(
        threadId: Long,
        incomingListener: MyEventListener<Int>
    ) {
        listeners[threadId] = incomingListener
    }

    @Scheduled(cron = "*/5 * * * * *")
    fun scheduled() {
        listeners.values.forEach {
            it.onDataChunk((1..10).toList())
        }
    }
}

@Prototype /** class is not thread-safe, hence create a new instance for every access */
class FluxBridge(private val myEventProcessor: MyEventProcessor) {

    private lateinit var bridge: Flux<Int>

    @PostConstruct
    fun init() {
        /** initialize the bridge that is used by method produceData */
        bridge = Flux.create { sink: FluxSink<Int> ->
            myEventProcessor.register(
                Thread.currentThread().id,
                /** anonymous implementation of MyEventListener */
                object : MyEventListener<Int> {
                    override fun onDataChunk(chunk: List<Int>) {
                        for (s in chunk) {
                            sink.next(s)
                        }
                    }

                    /** not used in this example */
                    override fun processComplete() {
                        sink.complete()
                    }
                })
        }
    }

    /** just return the bridge which is a Flux<Int> */
    fun getFlux(): Flux<Int> = bridge
}
© www.soinside.com 2019 - 2024. All rights reserved.