我有以下设置
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter
@RestController
@RequestMapping("/api/{orderToken}")
class TestingController {
companion object {
private val store = HashMap<String, SseEmitter>()
}
@GetMapping("/add", produces = ["text/event-stream"])
fun getOrderStatus(
@PathVariable orderToken: String,
): SseEmitter {
val sseEmitter = SseEmitter()
store[orderToken] = sseEmitter
return sseEmitter
}
@GetMapping("/notification", produces = ["text/event-stream"])
fun simulateKafkaEvent(
@PathVariable orderToken: String,
) {
val sseEmitter = store[orderToken]
sseEmitter!!.send("order token: $orderToken")
}
}
这里的思路如下
/api/{orderToken}/add
端点来设置 SSE 连接。/api/{orderToken}/notification
端点使用 SseEmitter 向客户端发送消息。这有效!运行应用程序并使用两个不同的订单令牌从两个不同的终端调用
/add
端点,然后从第三个终端调用 /notification
端点将预期的消息发送到预期的终端。
但是我想对其进行自动化测试。最终,我需要一种机制来订阅/收听返回的 SseEmitter,但我还没有找到一种方法。
这就是测试的样子。它通过了……但是出于错误的原因。它正在做的是消耗第一个 SseEmitter 而不是后续消息。
@BeforeEach
fun setup() {
webTestClient = WebTestClient.bindToController(TestingController()).build()
}
@Test
fun `test 2`() {
val addEndpoint = webTestClient
.get()
.uri("/api/1234/add")
.accept(MediaType.TEXT_EVENT_STREAM)
.exchange()
.expectStatus().isOk
.expectHeader().contentTypeCompatibleWith(MediaType.TEXT_EVENT_STREAM)
.returnResult(SseEmitter::class.java)
.responseBody //It results FluxExchangeResult<ServerSentEvent<*>>
val notification = webTestClient.get()
.uri("/api/1234/kafka")
.accept()
.accept(MediaType.TEXT_EVENT_STREAM)
.exchange()
.returnResult(String::class.java)
.responseBody
StepVerifier.create(addEndpoint).consumeNextWith { println("notification: " + s)}.thenCancel().verify()
}