以下代码取自文档:
package spendreport;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
public class FraudDetectionJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions");
DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetector())
.name("fraud-detector");
alerts
.addSink(new AlertSink())
.name("send-alerts");
env.execute("Fraud Detection");
}
}
.keyBy(Transaction::getAccountId)
函数应用于算子(alerts
),是否在flink算子(alerts
)处对传入数据进行分区(基于账户ID),如下图所示?
2) 如果是,如何编写映射器来处理从每个分区流式传输的数据?使用
.keyBy(Transaction::getAccountId)
?
keyBy
应用于数据流 transactions
。
应用
keyBy
后,transactions
中具有相同account ID
的记录将位于同一分区中,并且您可以应用KeyedStream
中的功能,例如process
(不推荐,因为它被标记为已弃用), window
、reduce
、min/max/sum
等
有很多使用 keyBy 的例子,例如这个链接