所以,我一直在尝试使用Kafka和Avro数据Structured-Streaming Avro来尝试Angel Conde的结构化流传输>
但是似乎我的数据在其中嵌套了一些数据之后变得有点复杂。这是我的代码,
private static Injection<GenericRecord, byte[]> recordInjection; private static StructType type; private static final String SNOQTT_SCHEMA = "{" +"\"type\": \"record\"," +"\"name\": \"snoqttv2\"," +"\"fields\": [" +" { \"name\": \"src_ip\", \"type\": \"string\" }," +" { \"name\": \"classification\", \"type\": \"long\" }," +" { \"name\": \"device_id\", \"type\": \"string\" }," +" { \"name\": \"alert_msg\", \"type\": \"string\" }," +" { \"name\": \"src_mac\", \"type\": \"string\" }," +" { \"name\": \"sig_rev\", \"type\": \"long\" }," +" { \"name\": \"sig_gen\", \"type\": \"long\" }," +" { \"name\": \"dest_mac\", \"type\": \"string\" }," +" { \"name\": \"packet_info\", \"type\": {" +" \"type\": \"record\"," +" \"name\": \"packet_info\"," +" \"fields\": [" +" { \"name\": \"DF\", \"type\": \"boolean\" }," +" { \"name\": \"MF\", \"type\": \"boolean\" }," +" { \"name\": \"ttl\", \"type\": \"long\" }," +" { \"name\": \"len\", \"type\": \"long\" }," +" { \"name\": \"offset\", \"type\": \"long\" }" +" ]," +" \"connect.name\": \"packet_info\" }}," +" { \"name\": \"timestamp\", \"type\": \"string\" }," +" { \"name\": \"sig_id\", \"type\": \"long\" }," +" { \"name\": \"ip_type\", \"type\": \"string\" }," +" { \"name\": \"dest_ip\", \"type\": \"string\" }," +" { \"name\": \"priority\", \"type\": \"long\" }" +"]," +"\"connect.name\": \"snoqttv2\" }"; private static Schema.Parser parser = new Schema.Parser(); private static Schema schema = parser.parse(SNOQTT_SCHEMA); static { recordInjection = GenericAvroCodecs.toBinary(schema); type = (StructType) SchemaConverters.toSqlType(schema).dataType(); } public static void main(String[] args) throws StreamingQueryException{ // Set log4j untuk development langsung dari java LogManager.getLogger("org.apache.spark").setLevel(Level.WARN); LogManager.getLogger("akka").setLevel(Level.ERROR); // Set konfigurasi untuk streamcontext dan sparkcontext SparkConf conf = new SparkConf() .setAppName("Snoqtt-Avro-Structured") .setMaster("local[*]"); // Inisialisasi spark session SparkSession sparkSession = SparkSession .builder() .config(conf) .getOrCreate(); // Reduce task number sparkSession.sqlContext().setConf("spark.sql.shuffle.partitions", "3"); // Mulai data stream di kafka Dataset<Row> ds1 = sparkSession .readStream() .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "snoqttv2") .option("startingOffsets", "latest") .load(); // Mulai streaming query sparkSession.udf().register("deserialize", (byte[] data) -> { GenericRecord record = recordInjection.invert(data).get(); return RowFactory.create( record.get("timestamp").toString(), record.get("device_id").toString(), record.get("ip_type").toString(), record.get("src_ip").toString(), record.get("dest_ip").toString(), record.get("src_mac").toString(), record.get("dest_mac").toString(), record.get("alert_msg").toString(), record.get("sig_rev").toString(), record.get("sig_gen").toString(), record.get("sig_id").toString(), record.get("classification").toString(), record.get("priority").toString()); }, DataTypes.createStructType(type.fields())); ds1.printSchema(); Dataset<Row> ds2 = ds1 .select("value").as(Encoders.BINARY()) .selectExpr("deserialize(value) as rows") .select("rows.*"); ds2.printSchema(); StreamingQuery query1 = ds2 .groupBy("sig_id") .count() .writeStream() .queryName("Signature ID Count Query") .outputMode("complete") .format("console") .start(); query1.awaitTermination(); }
直到我收到第一批消息之前,一切都很好玩,而且很容易出错
18/01/22 14:29:00错误执行器:阶段4.0中任务0.0的异常(TID 8)org.apache.spark.SparkException:无法执行用户定义函数($ anonfun $ 27:(binary)=>struct,timestamp:string,sig_id:bigint,ip_type:string,dest_ip:string,priority:bigint>)在...
原因:com.twitter.bijection.InversionFailure:无法反转:[B @ 232f8415
at ...原因:org.apache.avro.AvroRuntimeException:格式错误的数据。长度为负数:-25
在...我做错了吗?还是我的嵌套模式表明我代码中的邪恶根源?感谢你们的任何帮助
因此,我一直在尝试使用Kafka和Avro数据来尝试Angel Conde的结构化流,并使用Avro数据进行结构化流式处理Avro然而,似乎其中的嵌套数据使我的数据有点复杂。这是我的代码,...
仅通过使用嵌套模式和新avro数据源的示例更新了存储库。