如何在 kotlin 中测试 SseEmitter 发送事件

问题描述 投票:0回答:0

我有以下设置

import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter

@RestController
@RequestMapping("/api/{orderToken}")
class TestingController {

    companion object {
        private val store = HashMap<String, SseEmitter>()
    }

    @GetMapping("/add", produces = ["text/event-stream"])
    fun getOrderStatus(
        @PathVariable orderToken: String,
    ): SseEmitter {
        val sseEmitter = SseEmitter()
        store[orderToken] = sseEmitter
        return sseEmitter
    }

    @GetMapping("/notification", produces = ["text/event-stream"])
    fun simulateKafkaEvent(
        @PathVariable orderToken: String,
    ) {
        val sseEmitter = store[orderToken]
        sseEmitter!!.send("order token: $orderToken")
    }
}

这里的思路如下

  • 客户端调用
    /api/{orderToken}/add
    端点来设置 SSE 连接。
  • 此连接存储在地图中。
  • 其他东西调用
    /api/{orderToken}/notification
    端点使用 SseEmitter 向客户端发送消息。

这有效!运行应用程序并使用两个不同的订单令牌从两个不同的终端调用

/add
端点,然后从第三个终端调用
/notification
端点将预期的消息发送到预期的终端。

但是我想对其进行自动化测试。最终,我需要一种机制来订阅/收听返回的 SseEmitter,但我还没有找到一种方法。

这就是测试的样子。它通过了……但是出于错误的原因。它正在做的是消耗第一个 SseEmitter 而不是后续消息。

    @BeforeEach
    fun setup() {
        webTestClient = WebTestClient.bindToController(TestingController()).build()
    }

    @Test
    fun `test 2`() {
        val addEndpoint = webTestClient
            .get()
            .uri("/api/1234/add")
            .accept(MediaType.TEXT_EVENT_STREAM)
            .exchange()
            .expectStatus().isOk
            .expectHeader().contentTypeCompatibleWith(MediaType.TEXT_EVENT_STREAM)
            .returnResult(SseEmitter::class.java)
            .responseBody  //It results FluxExchangeResult<ServerSentEvent<*>>

        val notification = webTestClient.get()
            .uri("/api/1234/kafka")
            .accept()
            .accept(MediaType.TEXT_EVENT_STREAM)
            .exchange()
            .returnResult(String::class.java)
            .responseBody

        StepVerifier.create(addEndpoint).consumeNextWith { println("notification: " + s)}.thenCancel().verify()
    }
spring kotlin automated-tests spring-webflux server-sent-events
© www.soinside.com 2019 - 2024. All rights reserved.