我是Apache Flink的新手,我正在尝试了解一些有关与Kafka一起扩展Flink流媒体工作的最佳实践。我无法找到合适答案的一些问题包括:
如果这些问题看起来有点基础,请提前感谢任何支持和道歉,但我正在努力更好地处理这项技术。我已经阅读了大部分文档,但由于我在这方面缺乏经验,因此可能不会将一些概念放在一起。谢谢你的帮助!
val stream1: DataStream[UserBookingEvent] = BookingClosure.getSource(runmode).getSource(env)
.map(new ClosureMapFunction)
val stream2: DataStream[UserBookingEvent] = BookingCancel.getSource(runmode).getSource(env)
.map(new CancelMapFunction)
val unionStream: DataStream[UserBookingEvent] = stream1.union(stream2)
---
import org.apache.flink.api.common.functions.MapFunction
import org.json4s.jackson.JsonMethods.{parse => _, parseOpt => _}
import org.json4s.native.JsonMethods._
import org.slf4j.{Logger, LoggerFactory}
class CancelMapFunction extends MapFunction[String, Option[UserBookingEvent]] {
override def map(in: String): Option[UserBookingEvent] = {
val LOG: Logger = LoggerFactory.getLogger(classOf[CancelMapFunction])
try {
implicit lazy val formats = org.json4s.DefaultFormats
val json = parse(in)
..............
} catch {
case e: Exception => {
LOG.error("Could not parse Cancel Event= " + in + e.getMessage)
None
}
}
}
}