“格式错误的数据长度为负数,当尝试使用带有Avro数据源的kafka的Spark结构化流式传输时

问题描述 投票:0回答:1

所以,我一直在尝试使用Kafka和Avro数据Structured-Streaming Avro来尝试Angel Conde的结构化流传输>

但是似乎我的数据在其中嵌套了一些数据之后变得有点复杂。这是我的代码,

private static Injection<GenericRecord, byte[]> recordInjection;
private static StructType type;
private static final String SNOQTT_SCHEMA = "{"
        +"\"type\": \"record\","
        +"\"name\": \"snoqttv2\","
        +"\"fields\": ["
        +"    { \"name\": \"src_ip\", \"type\": \"string\" },"
        +"    { \"name\": \"classification\", \"type\": \"long\" },"
        +"    { \"name\": \"device_id\", \"type\": \"string\" },"
        +"    { \"name\": \"alert_msg\", \"type\": \"string\" },"
        +"    { \"name\": \"src_mac\", \"type\": \"string\" },"
        +"    { \"name\": \"sig_rev\", \"type\": \"long\" },"
        +"    { \"name\": \"sig_gen\", \"type\": \"long\" },"
        +"    { \"name\": \"dest_mac\", \"type\": \"string\" },"
        +"    { \"name\": \"packet_info\", \"type\": {"
        +"        \"type\": \"record\","
        +"        \"name\": \"packet_info\","
        +"        \"fields\": ["
        +"              { \"name\": \"DF\", \"type\": \"boolean\" },"
        +"              { \"name\": \"MF\", \"type\": \"boolean\" },"
        +"              { \"name\": \"ttl\", \"type\": \"long\" },"
        +"              { \"name\": \"len\", \"type\": \"long\" },"
        +"              { \"name\": \"offset\", \"type\": \"long\" }"
        +"          ],"
        +"        \"connect.name\": \"packet_info\" }},"
        +"    { \"name\": \"timestamp\", \"type\": \"string\" },"
        +"    { \"name\": \"sig_id\", \"type\": \"long\" },"
        +"    { \"name\": \"ip_type\", \"type\": \"string\" },"
        +"    { \"name\": \"dest_ip\", \"type\": \"string\" },"
        +"    { \"name\": \"priority\", \"type\": \"long\" }"
        +"],"
        +"\"connect.name\": \"snoqttv2\" }";

private static Schema.Parser parser = new Schema.Parser();
private static Schema schema = parser.parse(SNOQTT_SCHEMA);

static {
    recordInjection = GenericAvroCodecs.toBinary(schema);
    type = (StructType) SchemaConverters.toSqlType(schema).dataType();
}

public static void main(String[] args) throws StreamingQueryException{
    // Set log4j untuk development langsung dari java
    LogManager.getLogger("org.apache.spark").setLevel(Level.WARN);
    LogManager.getLogger("akka").setLevel(Level.ERROR);

    // Set konfigurasi untuk streamcontext dan sparkcontext
    SparkConf conf = new SparkConf()
            .setAppName("Snoqtt-Avro-Structured")
            .setMaster("local[*]");

    // Inisialisasi spark session
    SparkSession sparkSession = SparkSession
            .builder()
            .config(conf)
            .getOrCreate();

    // Reduce task number
    sparkSession.sqlContext().setConf("spark.sql.shuffle.partitions", "3");

    // Mulai data stream di kafka
    Dataset<Row> ds1 = sparkSession
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option("subscribe", "snoqttv2")
            .option("startingOffsets", "latest")
            .load();

    // Mulai streaming query

    sparkSession.udf().register("deserialize", (byte[] data) -> {
        GenericRecord record = recordInjection.invert(data).get();
        return RowFactory.create(
                record.get("timestamp").toString(),
                record.get("device_id").toString(),
                record.get("ip_type").toString(),
                record.get("src_ip").toString(),
                record.get("dest_ip").toString(),
                record.get("src_mac").toString(),
                record.get("dest_mac").toString(),
                record.get("alert_msg").toString(),
                record.get("sig_rev").toString(),
                record.get("sig_gen").toString(),
                record.get("sig_id").toString(),
                record.get("classification").toString(),
                record.get("priority").toString());
    }, DataTypes.createStructType(type.fields()));

    ds1.printSchema();
    Dataset<Row> ds2 = ds1
            .select("value").as(Encoders.BINARY())
            .selectExpr("deserialize(value) as rows")
            .select("rows.*");

    ds2.printSchema();

    StreamingQuery query1 = ds2
            .groupBy("sig_id")
            .count()
            .writeStream()
            .queryName("Signature ID Count Query")
            .outputMode("complete")
            .format("console")
            .start();

    query1.awaitTermination();
}

直到我收到第一批消息之前,一切都很好玩,而且很容易出错

18/01/22 14:29:00错误执行器:阶段4.0中任务0.0的异常(TID 8)org.apache.spark.SparkException:无法执行用户定义函数($ anonfun $ 27:(binary)=>struct,timestamp:string,sig_id:bigint,ip_type:string,dest_ip:string,priority:bigint>)在...

原因:com.twitter.bijection.InversionFailure:无法反转:[B @ 232f8415

at ...

原因:org.apache.avro.AvroRuntimeException:格式错误的数据。长度为负数:-25

在...

我做错了吗?还是我的嵌套模式表明我代码中的邪恶根源?感谢你们的任何帮助

因此,我一直在尝试使用Kafka和Avro数据来尝试Angel Conde的结构化流,并使用Avro数据进行结构化流式处理Avro然而,似乎其中的嵌套数据使我的数据有点复杂。这是我的代码,...

java apache-spark apache-kafka avro spark-structured-streaming
1个回答
0
投票

仅通过使用嵌套模式和新avro数据源的示例更新了存储库。

© www.soinside.com 2019 - 2024. All rights reserved.