如何使用 big keyed 在 Flink Apache 上工作?

问题描述 投票:0回答:0

在我的流应用程序中,它每秒将接收 70k 条数据记录。每条记录都有一个密钥(FQDN)。

示例记录即将到来的数据:

{
  "unixTime" : 1680064946,
  "FQDN" : "sub1.example1.com",
  "volume": 20

}
----
{
  "Unix time" : 1680064946,
  "FQDN" : "sub2.example2.com",
  "volume": 5
}

我的代码:

case class DnsTapStageNMessageWithClientIPAndVolume(unixTime: Long, FQDN: String, volume: Long)

object App {
  private val logger = LoggerFactory.getLogger(this.getClass)
  private val gson = new Gson()

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val builder = PulsarSourceBuilder
      .builder(new SimpleStringSchema())
      .topic("my-topic")
      .subscriptionName("my-sub")
      .subscriptionInitialPosition(SubscriptionInitialPosition.Latest)

    val clientConfig = new ClientConfigurationData()
    clientConfig.setServiceUrl("pulsar://localhost:6650")
    val src = builder.pulsarAllClientConf(clientConfig).build()

    val windowCounts = env.addSource(src).map(m => parser(m))
      .keyBy(_.FQDN)
      .window(SlidingProcessingTimeWindows.of(Time.hours(24), Time.minutes(1)))
      .sum("volume")
    windowCounts.addSink(mySink)
    env.execute("myJobName")
  }

  private final def parser(message: String): DnsTapStageNMessageWithClientIPAndVolume = {
    try {
      gson.fromJson(message, classOf[DnsTapStageNMessageWithClientIPAndVolume])
    } catch {
      case e: Exception =>
        logger.error(e.getMessage)
        null
    }
  }
}

我的虚拟机信息:

MEMORY: 5GB
vCPU: 4

我的流应用程序使用 window-size: 24hour 和 Window-slide: 1min。 每秒 70k 条记录,我的流总是 OOM。 如何解决这个问题?

谢谢, 简阮

apache-flink flink-streaming
© www.soinside.com 2019 - 2024. All rights reserved.