如何使用Kafka,Alpakka Kafka,Play框架和Websocket处理POST请求?

问题描述 投票:0回答:1

假设我有两个kafka主题,request_topic用于我的Post请求,response_topic用于我的回复。

这是型号:

case class Request(requestId: String, body: String)
case class Response(responseId: String, body: String, requestId: String)

这是我的套接字处理程序

def socket = WebSocket.accept[String, String] { req =>
  val requestId = ??? // Generate a unique requestId

  val in: Sink[String, Future[Done]] = Sink.foreach[String]{ msg =>
    val record = new ProducerRecord[String, Request]("request_topic", "key", Request(requestId, msg))
    val producer: KafkaProducer[String, Request] = ???
    Future { producer.send(record).get }
  }

  // Once produced, some stream processing apps will manage to process request and publish the reponse to response_topic
  // The Request and Response object are linked by the requestId field.

  val consumerSettings = ???
  val out: Source[ConsumerRecord[String, Response], _] = Consumer
    .plainSource(consumerSettings, Subscriptions.topics("response_topic"))
    .filter(cr => cr.value.requestId == requestId)
    .map(cr => someResponseString(cr.value))

  Flow.formSinkAndSource(in, out)
}

def someResponseString(res: Response): String = ???

[基本上,对于每个传入消息,我将请求对象发布到Kafka,然后由某个流处理应用程序(此处未显示)处理该请求,并希望将响应发布回Kafka。

我在这里有一些担忧:

1-Alpakka Kafka连接器将为每个传入消息创建连接器的新实例,还是只要Play正在运行,它将使用相同的实例吗?

2-是基于单个requestId过滤响应的好主意,还是应该将整个流发送回每个Client,并让他们根据他们感兴趣的requestId过滤响应。

3-我在所有事情上都错了吗? (我是Websocket中真正的新手)

提前感谢。

scala websocket apache-kafka playframework alpakka
1个回答
1
投票

1)取决于您的配置方式。例如,在in: Sink正文中,您为每个消息创建一个新的KafkaProducer。相反,您应该为整个应用程序指定一个生产者。

我不确定Akka / Play的线程模型如何工作,但是大多数Web服务器都会为每个传入的连接启动一个新线程,直到线程池中的线程数量达到固定数量为止。

2)我想尽快过滤是可取的,并且尽可能在服务器端进行过滤。这样可以节省返回客户端的带宽。

此外,如果您只想将Web服务器上的Kafka数据沿一个方向推送到客户端,则可能需要SSE, not Websocket

© www.soinside.com 2019 - 2024. All rights reserved.