无法分析数据

问题描述 投票:0回答:1

val模式= ctx.getBroadcastState(patternStateDescriptor)

我进口的商品

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{MapStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

这里是代码

    val env = StreamExecutionEnvironment.getExecutionEnvironment

  val properties = new Properties()
  properties.setProperty("bootstrap.servers","localhost:9092")

  val patternStream = new FlinkKafkaConsumer010("patterns", new SimpleStringSchema, properties)

  val patterns = env.addSource(patternStream)

  var patternData = patterns.map {
    str =>
      val splitted_str = str.split(",")
      PatternStream(splitted_str(0).trim, splitted_str(1).trim, splitted_str(2).trim)
  }

  val logsStream = new FlinkKafkaConsumer010("logs", new SimpleStringSchema, properties)

//  logsStream.setStartFromEarliest()

  val logs = env.addSource(logsStream)

  var data = logs.map {
    str =>
      val splitted_str = str.split(",")
      LogsTest(splitted_str.head.trim, splitted_str(1).trim, splitted_str(2).trim)
  }

  val keyedData: KeyedStream[LogsTest, String] = data.keyBy(_.metric)

  val bcStateDescriptor = new MapStateDescriptor[Unit, PatternStream]("patterns", Types.UNIT, Types.of[PatternStream]) // first type defined is for the key and second data type defined is for the value

  val broadcastPatterns: BroadcastStream[PatternStream]  = patternData.broadcast(bcStateDescriptor)

  val alerts = keyedData
      .connect(broadcastPatterns)
      .process(new PatternEvaluator())

  alerts.print()

//   println(alerts.getClass)
//  val sinkProducer = new FlinkKafkaProducer010("output",  new SimpleStringSchema(), properties)



  env.execute("Flink Broadcast State Job")
}

class PatternEvaluator()
  extends KeyedBroadcastProcessFunction[String, LogsTest, PatternStream, (String, String, String)] {

  private lazy val patternStateDescriptor = new MapStateDescriptor("patterns", classOf[String], classOf[String])

  private var lastMetricState: ValueState[String] = _

  override def open(parameters: Configuration): Unit = {
    val lastMetricDescriptor = new ValueStateDescriptor("last-metric", classOf[String])

    lastMetricState = getRuntimeContext.getState(lastMetricDescriptor)
  }

  override def processElement(reading: LogsTest,
                              readOnlyCtx: KeyedBroadcastProcessFunction[String, LogsTest, PatternStream, (String, String, String)]#ReadOnlyContext,
                              out: Collector[(String, String, String)]): Unit = {

    val metrics = readOnlyCtx.getBroadcastState(patternStateDescriptor)
    if (metrics.contains(reading.metric)) {
      val metricPattern: String = metrics.get(reading.metric)
      val metricPatternValue: String = metrics.get(reading.value)
      val lastMetric = lastMetricState.value()

      val logsMetric = (reading.metric)
      val logsValue = (reading.value)


      if (logsMetric == metricPattern) {
        if (metricPatternValue == logsValue) {
          out.collect((reading.timestamp, reading.value, reading.metric))
        }
      }
    }
  }


  override def processBroadcastElement(
                                        update: PatternStream,
                                        ctx: KeyedBroadcastProcessFunction[String, LogsTest, PatternStream, (String, String, String)]#Context,
                                        out: Collector[(String, String, String)]
                                      ): Unit = {
    val patterns = ctx.getBroadcastState(patternStateDescriptor)

    if (update.metric == "IP") {
      patterns.put(update.metric /*,update.operator*/ , update.value)
    }
    //    else if (update.metric == "username"){
    //      patterns.put(update.metric, update.value)
    //    }
    //    else {
    //      println("No required data found")
    //    }
    //  }

  }
}

样本数据:-日志流

"21/09/98","IP", "5.5.5.5"

图案流

"IP","==","5.5.5.5"

我无法通过获得期望的结果来分析数据,即= 21/09/98,IP,5.5.5.5

到目前为止没有错误,只是没有分析数据

代码正在读取流(已检查)

scala sbt apache-flink
1个回答
0
投票

问题是,在processElement方法中,readOnlyCtx是错误的类型。应该是

readOnlyCtx: KeyedBroadcastProcessFunction[String, 
  LogsTest, PatternStream, (String, String, String)]#ReadOnlyContext
© www.soinside.com 2019 - 2024. All rights reserved.