使用WindowStream.apply()函数无法应用WindowFunction

问题描述 投票:2回答:1

我使用Apache Flink和Scala相对较新,我只是掌握了一些基本功能。我试图实现自定义WindowFunction

问题是,当我尝试实现自定义WindowFunction时,IDE会在“.apply()”函数上出错

Cannot resolve symbol apply

Unspecified value parameters: foldFunction: (NotInferedR, Data.Fingerprint) => NotInferedR, windowFunction: (Tuple, TimeWindow, Iterable[NotInferedR], Collector[NotInferedR]) => Unit 
Unspecified value parameters: foldFunction: FoldFunction[Data.Fingerprint, NotInferedR], function: WindowFunction[NotInferedR, NotInferedR, Tuple, TimeWindow] 
Unspecified value parameters: function: WindowFunction[Data.Fingerprint, NotInferedR, Tuple, TimeWindow] 
Unspecified value parameters: windowFunction: (Tuple, TimeWindow, Iterable[Data.Fingerprint], Collector[NotInferedR]) => Unit 

Type mismatch, expected: (Tuple, TimeWindow, Iterable[Data.Fingerprint], Collector[NotInferedR]) => Unit, actual: DataTimeWindow.DataWindow 
Type mismatch, expected: WindowFunction[Data.Fingerprint, NotInferedR, Tuple, TimeWindow], actual: DataTimeWindow.DataWindow

这是我的代码:

val test = hashMap
      .keyBy("hash")
      .timeWindow(Time.minutes(1))
      .apply(new DataWindow())

这是WindowFunction

class DataWindow extends WindowFunction[Data.Fingerprint, String, String, TimeWindow]  {

    override def apply(key: String,
                       window: TimeWindow,
                       input: Iterable[Fingerprint],
                       out: Collector[String]) {

      out.collect("helo")
    }
  }
apache-flink flink-streaming
1个回答
0
投票

我认为问题在于WindowFunction的第三个类型参数,即密钥的类型。因为您将keyBy方法中的键声明为String(keyBy("hash")),所以无法在编译时确定键的类型。有两种方法可以解决这个问题:

  1. KeySelector中使用keyBy函数来提取密钥(类似于keyBy(x: FingerPrint => x.hash))。 KeySelector函数的返回类型在编译时是已知的,因此您可以使用类型化的WindowFunction
  2. WindowFunction的第三个类型参数的类型更改为TupleTuplekeyBy提取的密钥的通用持有者。在你的情况下,它将是一个Tuple1,并且Tuple1.f0可以访问哈希字符串。
© www.soinside.com 2019 - 2024. All rights reserved.