在Spark sql中按二进制类型过滤

问题描述 投票:0回答:2

我有一个字段代表我的架构中的IP地址。我想使用Binary Type来存储数据。

我想象的方式是,如果我的ip是:50.100.150.200我将它保存为字节数组中的[50,100,150,200](序列肯定很重要,但我们可以将它排除在这个问题的讨论之外)。

我的问题是如何在查询时按此列过滤? (字符串不适合目的)

例如,我想运行以下查询:

SELECT * from table1 WHERE sourceip='50.100.150.200'

这是一段代码来演示这个问题:

Bean定义(用于模式创建):

    public static class MyBean1 implements Serializable {
    private static final long serialVersionUID = 1L;
    private int id;
    private String name;
    private byte[] description;

    public MyBean1(int id, String name, String description) {
        this.id = id;
        this.name = name;
        this.description = description.getBytes();
    }

    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public byte[] getDescription() {
        return description;
    }
    public void setDescription(byte[] description) {
        this.description = description;
    }
}

演示代码(我想按说明过滤):

    List<MyBean1> newDebugData = new ArrayList<MyBean1>();
    newDebugData.add(new MyBean1(1, "Arnold", "10.150.15.10"));
    newDebugData.add(new MyBean1(1, "Bob", "10.150.15.11"));
    newDebugData.add(new MyBean1(3, "Bob", "10.150.15.12"));
    newDebugData.add(new MyBean1(3, "Bob", "10.150.15.13"));
    newDebugData.add(new MyBean1(1, "Alice", "10.150.15.14"));

    Dataset<Row> df2 = sqlContext.createDataFrame(newDebugData, MyBean1.class);
    df2.createTempView("table1");
    sqlContext.sql("select * from table1 where description='10.150.15.14'").show();

我收到错误:

differing types in '(table1.`description` = CAST('10.150.15.14' AS DOUBLE))'
java apache-spark apache-spark-sql spark-dataframe
2个回答
0
投票

这不是你问题的100%答案,但我希望指针有所帮助。

以下问题不是关于过滤,而是从数组中选择数据。 selecting a range of elements in an array spark sql

它看起来像很多信息,包括UDF使用Spark SQL查询数组的一些指导。

希望这可以帮助。


0
投票

SPARK-21344在版本2.0.3,2.1.2,2.2.1中修复了以下问题:BinaryType比较执行了有符号的字节数组比较。所以二进制比较应该适用于那些版本。

JIRA具有以下scala测试代码:

case class TestRecord(col0: Array[Byte])
def convertToBytes(i: Long): Array[Byte] = {
   val bb = java.nio.ByteBuffer.allocate(8)
   bb.putLong(i)
   bb.array
}
val timestamp = 1498772083037L
val data = (timestamp to timestamp + 1000L).map(i => TestRecord(convertToBytes(i)))
val testDF = sc.parallelize(data).toDF
val filter1 = testDF.filter(col("col0") >= convertToBytes(timestamp) 
              && col("col0") < convertToBytes(timestamp + 50L))
assert(filter1.count == 50)

我不知道相同的Java代码是什么,但这应该让你开始。


我在上面的评论中提到我们使用LongType存储IPv4地址的问题。我们有一个包装脚本将点分十进制转换为长整数,而Spark UDF则转换为另一种方式:长整数到点分十进制。我假设LongType比BinaryType更快。

© www.soinside.com 2019 - 2024. All rights reserved.