我正在使用Kafka和Spark Structured Streaming。我收到以下格式的kafka消息。
{"deviceId":"001","sNo":1,"data":"aaaaa"}
{"deviceId":"002","sNo":1,"data":"bbbbb"}
{"deviceId":"001","sNo":2,"data":"ccccc"}
{"deviceId":"002","sNo":2,"data":"ddddd"}
我正在读下面的内容。
Dataset<String> data = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option(subscribeType, topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING());
Dataset<DeviceData> ds = data.as(ExpressionEncoder.javaBean(DeviceData.class)).orderBy("deviceId","sNo");
ds.foreach(event ->
processData(event.getDeviceId(),event.getSNo(),event.getData().getBytes())
);}
private void processData(String deviceId,int SNo, byte[] data)
{
//How to check previous processed Dataset???
}
在我的json消息中,“data”是byte []的String形式。我有一个要求,我需要按照“sNo”的顺序处理给定“deviceId”的二进制“数据”。因此,对于“deviceId”=“001”,我必须处理“sNo”= 1的二进制数据,然后“sNo”= 2,依此类推。如何检查结构化流中先前处理的数据集的状态?
如果您正在寻找像DStream.mapWithState这样的状态管理,那么结构流中不支持它。工作正在进行中。请检查https://issues.apache.org/jira/browse/SPARK-19067。