无法访问getBroadcastState

问题描述 投票:0回答:1

我正试图使广播状态只是为了开始使用flink

错误:(78,24)值getBroadcastState不是org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction [String,realtimealert.LogsTest,realtimealert.PatternStream,(String,String,String)的成员]val模式= ctx.getBroadcastState(patternStateDescriptor)

我进口的商品

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{MapStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

这里是代码

    val env = StreamExecutionEnvironment.getExecutionEnvironment

  val properties = new Properties()
  properties.setProperty("bootstrap.servers","localhost:9092")

  val patternStream = new FlinkKafkaConsumer010("output", new SimpleStringSchema, properties)

  val patterns = env.addSource(patternStream)

  var patternData = patterns.map {
    str =>
      val splitted_str = str.split(",")
      PatternStream(splitted_str(0).trim, splitted_str(1).trim, splitted_str(2).trim)
  }

  val logsStream = new FlinkKafkaConsumer010("logs-data", new SimpleStringSchema, properties)

//  logsStream.setStartFromEarliest()

  val logs = env.addSource(logsStream)

  var data = logs.map {
    str =>
      val splitted_str = str.split(",")
      LogsTest(splitted_str.head.trim, splitted_str(1).trim, splitted_str(2).trim)
  }

  val keyedData: KeyedStream[LogsTest, String] = data.keyBy(_.metric)

  val bcStateDescriptor = new MapStateDescriptor("patterns", Types.STRING, Types.STRING ) // first type defined is for the key and second data type defined is for the value

  val broadcastPatterns: BroadcastStream[String]  = patterns.broadcast(bcStateDescriptor)

  val alerts = keyedData
      .connect(broadcastPatterns)
      .process(new PatternEvaluator())

  alerts.print()

  env.execute("Flink Broadcast State Job")
}

class PatternEvaluator()
  extends KeyedBroadcastProcessFunction[String, LogsTest, PatternStream, (String, String, String)] {

  private lazy val patternStateDescriptor = new MapStateDescriptor("patterns", classOf[String], classOf[String])

  private var lastMetricState: ValueState[String] = _

  override def open(parameters: Configuration): Unit = {
    val lastMetricDescriptor = new ValueStateDescriptor[String]("last-metric", classOf[String])

    lastMetricState = getRuntimeContext.getState[String](lastMetricDescriptor)
  }

  override def processElement(reading: LogsTest,
                              readOnlyCtx: KeyedBroadcastProcessFunction[String, LogsTest, PatternStream, (String, String, String)]#ReadOnlyContext,
                              out: Collector[(String, String, String)]): Unit = {

    val metrics = readOnlyCtx.getBroadcastState(patternStateDescriptor)
    if (metrics.contains(reading.metric)) {
      val metricPattern: String = metrics.get(reading.metric)
      val metricPatternValue: String = metrics.get(reading.value)
      val lastMetric = lastMetricState.value()

      val logsMetric = (reading.metric)
      val logsValue = (reading.value)



      if (logsMetric == metricPattern) {
        if(metricPatternValue == logsValue) {
          out.collect((reading.timestamp,reading.value,reading.metric))
        }
      }

    }

  }
  override def processBroadcastElement(
                                      update: PatternStream,
                                      ctx: KeyedBroadcastProcessFunction[String, LogsTest, PatternStream, (String, String, String)]#Context,
                                      out: Collector[(String, String, String)]
                                      ): Unit =  {
    val patterns = ctx.getBroadcastState(patternStateDescriptor)

    if (update.metric == "IP") {
      patterns.put(/*update.metric, */update.operator, update.value)
    }
  }
}
scala sbt apache-flink
1个回答
0
投票

问题是,在processElement方法中,readOnlyCtx是错误的类型。应该是

© www.soinside.com 2019 - 2024. All rights reserved.