假设我有两个kafka主题,request_topic
用于我的Post请求,response_topic
用于我的回复。
这是型号:
case class Request(requestId: String, body: String)
case class Response(responseId: String, body: String, requestId: String)
这是我的套接字处理程序
def socket = WebSocket.accept[String, String] { req =>
val requestId = ??? // Generate a unique requestId
val in: Sink[String, Future[Done]] = Sink.foreach[String]{ msg =>
val record = new ProducerRecord[String, Request]("request_topic", "key", Request(requestId, msg))
val producer: KafkaProducer[String, Request] = ???
Future { producer.send(record).get }
}
// Once produced, some stream processing apps will manage to process request and publish the reponse to response_topic
// The Request and Response object are linked by the requestId field.
val consumerSettings = ???
val out: Source[ConsumerRecord[String, Response], _] = Consumer
.plainSource(consumerSettings, Subscriptions.topics("response_topic"))
.filter(cr => cr.value.requestId == requestId)
.map(cr => someResponseString(cr.value))
Flow.formSinkAndSource(in, out)
}
def someResponseString(res: Response): String = ???
[基本上,对于每个传入消息,我将请求对象发布到Kafka,然后由某个流处理应用程序(此处未显示)处理该请求,并希望将响应发布回Kafka。
我在这里有一些担忧:
1-Alpakka Kafka连接器将为每个传入消息创建连接器的新实例,还是只要Play正在运行,它将使用相同的实例吗?
2-是基于单个requestId过滤响应的好主意,还是应该将整个流发送回每个Client,并让他们根据他们感兴趣的requestId过滤响应。
3-我在所有事情上都错了吗? (我是Websocket中真正的新手)
提前感谢。
1)取决于您的配置方式。例如,在in: Sink
正文中,您为每个消息创建一个新的KafkaProducer
。相反,您应该为整个应用程序指定一个生产者。
我不确定Akka / Play的线程模型如何工作,但是大多数Web服务器都会为每个传入的连接启动一个新线程,直到线程池中的线程数量达到固定数量为止。
2)我想尽快过滤是可取的,并且尽可能在服务器端进行过滤。这样可以节省返回客户端的带宽。
此外,如果您只想将Web服务器上的Kafka数据沿一个方向推送到客户端,则可能需要SSE, not Websocket