在我的流应用程序中,它每秒将接收 70k 条数据记录。每条记录都有一个密钥(FQDN)。
示例记录即将到来的数据:
{
"unixTime" : 1680064946,
"FQDN" : "sub1.example1.com",
"volume": 20
}
----
{
"Unix time" : 1680064946,
"FQDN" : "sub2.example2.com",
"volume": 5
}
我的代码:
case class DnsTapStageNMessageWithClientIPAndVolume(unixTime: Long, FQDN: String, volume: Long)
object App {
private val logger = LoggerFactory.getLogger(this.getClass)
private val gson = new Gson()
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val builder = PulsarSourceBuilder
.builder(new SimpleStringSchema())
.topic("my-topic")
.subscriptionName("my-sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
val clientConfig = new ClientConfigurationData()
clientConfig.setServiceUrl("pulsar://localhost:6650")
val src = builder.pulsarAllClientConf(clientConfig).build()
val windowCounts = env.addSource(src).map(m => parser(m))
.keyBy(_.FQDN)
.window(SlidingProcessingTimeWindows.of(Time.hours(24), Time.minutes(1)))
.sum("volume")
windowCounts.addSink(mySink)
env.execute("myJobName")
}
private final def parser(message: String): DnsTapStageNMessageWithClientIPAndVolume = {
try {
gson.fromJson(message, classOf[DnsTapStageNMessageWithClientIPAndVolume])
} catch {
case e: Exception =>
logger.error(e.getMessage)
null
}
}
}
我的虚拟机信息:
MEMORY: 5GB
vCPU: 4
我的流应用程序使用 window-size: 24hour 和 Window-slide: 1min。 每秒 70k 条记录,我的流总是 OOM。 如何解决这个问题?
谢谢, 简阮