我目前正在根据我在阅读有关 Spring 和 SSE 的文章中找到的一些示例(以非反应性方式)实现通知系统。
我已经成功实施了一个解决方案,如果我有一个客户端使用后端发送的事件,它会很好地工作。
问题是当我打开多个浏览器并尝试向所有消费者触发事件时:只有订阅 SSE 广播端点的最后一个客户端收到通知。
如何同时向多个客户端触发事件?
如果客户端在同一个网络上,也许只有一个 SSE 连接是正常的?
这是我的控制器:
@RestController
@RequestMapping("/sse/servlet")
class EventController {
private val executorService: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor()
private val emitters: MutableMap<String, SseEmitter> = mutableMapOf()
private val objectMapper: ObjectMapper = ObjectMapper()
private val log = KotlinLogging.logger {}
@GetMapping("/notifications")
fun listenNotifications(@RequestParam eventId: String): SseEmitter? {
if (emitters[eventId] != null) {
log.info("SSE connection already exists")
return emitters[eventId]
}
val emitter = SseEmitter(TimeUnit.MINUTES.toMillis(10))
emitter.onCompletion {
log.info("SSE connection closed")
emitters.remove(eventId)
}
emitter.onTimeout {
log.info("SSE connection timed out")
emitter.complete()
}
emitter.onError { throwable: Throwable? ->
log.error("Listen SSE exception", throwable)
}
emitters[eventId] = emitter
return emitter
}
@PostMapping("/notifications")
@ResponseStatus(ACCEPTED)
fun fireNotification(
@RequestParam eventId: String,
@RequestBody notification: Notification
) {
val sseEmitter = emitters[eventId]
if (sseEmitter === null) {
log.info("SSE connection does not exist")
return
}
handleEmitter(notification, sseEmitter, eventId)
}
private fun handleEmitter(
notification: Notification,
sseEmitter: SseEmitter,
eventId: String
) = try {
val data = objectMapper.writeValueAsString(notification)
val sseEventBuilder = event().data(data)
sseEmitter.send(sseEventBuilder)
} catch (ioException: IOException) {
log.error("Send SSE exception", ioException)
emitters.remove(eventId)
}
通知模型
data class Notification(val message: String)
我非常简单application.yaml属性文件
server:
servlet:
context-path: /api
我的gradle配置
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
id("org.springframework.boot") version "3.0.4"
id("io.spring.dependency-management") version "1.1.0"
kotlin("jvm") version "1.7.22"
kotlin("plugin.spring") version "1.7.22"
}
group = "com.ggardet"
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_17
repositories {
mavenCentral()
}
dependencies {
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("io.github.microutils:kotlin-logging-jvm:2.0.11")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter-security")
}
tasks.withType<KotlinCompile> {
kotlinOptions {
freeCompilerArgs = listOf("-Xjsr305=strict")
jvmTarget = "17"
}
}
最后是用于创建/消费事件的请求(使用 httpie)
# to listen for events I open 2 or 3 terminals and launch the following request
http --stream -a user:user -v GET http://localhost:8080/api/sse/servlet/notifications\?eventId\=1
# to fire a new event I open another terminal instance and launch this single request
http -a user:user -v POST http://localhost:8080/api/sse/servlet/notifications\?eventId\=1 message=test
注意:如果我删除“发射器”映射并在类级别使用单个 SseEmitter 发送事件,我会遇到同样的问题。
所以我没有意识到客户不能共享同一个 SseEmitter。
我必须为每个订阅创建一个 SseEmitter 才能使其工作。
不确定这是最好的方法,但我在这里发布我的解决方案以防有人误解 SseEmitter 的工作原理,就像我做的一样:
@RestController
@RequestMapping("/sse/s@RestController
@RequestMapping("/sse/servlet")
class EventController {
private val executorService: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor()
private val emitters = ConcurrentHashMap<String, CopyOnWriteArrayList<SseEmitter>>()
private val objectMapper: ObjectMapper = ObjectMapper()
private val log = KotlinLogging.logger {}
@GetMapping("/notifications", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun listenNotifications(@RequestParam eventId: String): SseEmitter? {
val emitter = SseEmitter(Duration.ofMinutes(10).toMillis())
emitter.onCompletion {
log.info("SSE connection closed")
emitters.remove(eventId)
}
emitter.onTimeout {
log.info("SSE connection timed out")
emitters.remove(eventId)
emitter.complete()
}
emitter.onError { throwable: Throwable? ->
log.error("Listen SSE exception", throwable)
emitters.remove(eventId)
}
emitters.computeIfAbsent(eventId) { CopyOnWriteArrayList() }.add(emitter)
return emitter
}
@PostMapping("/notifications")
@ResponseStatus(ACCEPTED)
fun fireNotification(
@RequestParam eventId: String,
@RequestBody notification: Notification
) = emitters[eventId]?.forEach { handleEmitter(notification, it) }
private fun handleEmitter(
notification: Notification,
sseEmitter: SseEmitter,
) = try {
val data = objectMapper.writeValueAsString(notification)
val sseEventBuilder = event().data(data)
sseEmitter.send(sseEventBuilder)
} catch (ioException: IOException) {
log.error("Send SSE exception", ioException)
}
}