我有一个字段代表我的架构中的IP地址。我想使用Binary Type来存储数据。
我想象的方式是,如果我的ip是:50.100.150.200
我将它保存为字节数组中的[50,100,150,200]
(序列肯定很重要,但我们可以将它排除在这个问题的讨论之外)。
我的问题是如何在查询时按此列过滤? (字符串不适合目的)
例如,我想运行以下查询:
SELECT * from table1 WHERE sourceip='50.100.150.200'
这是一段代码来演示这个问题:
Bean定义(用于模式创建):
public static class MyBean1 implements Serializable {
private static final long serialVersionUID = 1L;
private int id;
private String name;
private byte[] description;
public MyBean1(int id, String name, String description) {
this.id = id;
this.name = name;
this.description = description.getBytes();
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public byte[] getDescription() {
return description;
}
public void setDescription(byte[] description) {
this.description = description;
}
}
演示代码(我想按说明过滤):
List<MyBean1> newDebugData = new ArrayList<MyBean1>();
newDebugData.add(new MyBean1(1, "Arnold", "10.150.15.10"));
newDebugData.add(new MyBean1(1, "Bob", "10.150.15.11"));
newDebugData.add(new MyBean1(3, "Bob", "10.150.15.12"));
newDebugData.add(new MyBean1(3, "Bob", "10.150.15.13"));
newDebugData.add(new MyBean1(1, "Alice", "10.150.15.14"));
Dataset<Row> df2 = sqlContext.createDataFrame(newDebugData, MyBean1.class);
df2.createTempView("table1");
sqlContext.sql("select * from table1 where description='10.150.15.14'").show();
我收到错误:
differing types in '(table1.`description` = CAST('10.150.15.14' AS DOUBLE))'
这不是你问题的100%答案,但我希望指针有所帮助。
以下问题不是关于过滤,而是从数组中选择数据。 selecting a range of elements in an array spark sql
它看起来像很多信息,包括UDF使用Spark SQL查询数组的一些指导。
希望这可以帮助。
SPARK-21344在版本2.0.3,2.1.2,2.2.1中修复了以下问题:BinaryType比较执行了有符号的字节数组比较。所以二进制比较应该适用于那些版本。
JIRA具有以下scala测试代码:
case class TestRecord(col0: Array[Byte])
def convertToBytes(i: Long): Array[Byte] = {
val bb = java.nio.ByteBuffer.allocate(8)
bb.putLong(i)
bb.array
}
val timestamp = 1498772083037L
val data = (timestamp to timestamp + 1000L).map(i => TestRecord(convertToBytes(i)))
val testDF = sc.parallelize(data).toDF
val filter1 = testDF.filter(col("col0") >= convertToBytes(timestamp)
&& col("col0") < convertToBytes(timestamp + 50L))
assert(filter1.count == 50)
我不知道相同的Java代码是什么,但这应该让你开始。
我在上面的评论中提到我们使用LongType存储IPv4地址的问题。我们有一个包装脚本将点分十进制转换为长整数,而Spark UDF则转换为另一种方式:长整数到点分十进制。我假设LongType比BinaryType更快。