我有一个二进制文件,我可以使用 numpy 和 pandas 读取它:
dt = numpy.dtype([('col1', numpy.int64), ('col2', numpy.float32), ('col3', numpy.int32)])
df = pandas.from_array(
numpy.fromfile(file, dtype=dt),
columns=data.dtype.names)
)
我想使用 PySpark 而不是先创建一个 pandas 数据框,因为文件可能比内存大。
我看到一个推荐的方法是:
df = spark.read.format("binaryFile").load(file)
df.printSchema()
df.show()
但这不允许我指定每一列的类型。 另外,即使有了那个测试文件,我也得到了一个
java.lang.OutOfMemoryError
.
所以现在我正在尝试将它加载到 RDD:
rdd = spark.sparkContext.binaryFiles(file)
然后按照建议here使用Scala应用地图:
import java.nio.ByteBuffer
val result = YourRDD.map(x=>(ByteBuffer.wrap(x.take(4)).getInt,
ByteBuffer.wrap(x.drop(4).take(2)).getShort,
ByteBuffer.wrap(x.drop(6)).getLong))
但我无法让它正常工作。 例如,当我尝试
rdd.first()
时,我得到了整个文件。
这是我尝试过的:
rdd = spark.sparkContext.binaryFiles(file)
def func1(x):
dt = numpy.dtype([('time', numpy.int64), ('e', numpy.float32), ('id', numpy.int32)])
df = pandas.DataFrame(
numpy.frombuffer(x, dtype=dt),
columns=dt.names
)
return (df.col1,df.col2,df.col3)
result = rdd.mapValues(lambda x: func1(x))
result.first()
但这给了我一个包含完整专栏的条目:
('file',
(0 2317613314222
...
4026940 7317606063913
Name: col1, Length: 4026941, dtype: int64,
0 1.551823
...
4026940 2.379845
Name: col2, Length: 4026941, dtype: float32,
0 556
...
4026940 131336
Name: col3, Length: 4026941, dtype: int32))
如何加载这个文件?
编辑:文件的小摘录:
with open(file, mode="rb") as open_file:
contents = open_file.readlines()
contents[0:5]
结果:
[b'\xae\xb0\x84\x9c\x1b\x02\x00\x00 \xa2\xc6?,\x02\x00\x00\x0cB\x95\x9c\x1b\x02\x00\x00\xe0a\x9a?\x19\x02\x02\x00\x0f\xf7\xa4\x9c\x1b\x02\x00\x00`\xe9\x82?0\x03\x02\x00\x96@\x03\x9d\x1b\x02\x00\x00\xd0H\x05@;\x02\x02\x00\xd5\n',
b'\n',
b'\x9d\x1b\x02\x00\x00\x00^\xa1?\x0f\x01\x00\x00nq,\x9d\x1b\x02\x00\x00\xe0\x89\xad?\xae\x03\x02\x00F\x8e\xb6\x9d\x1b\x02\x00\x00@U\xd1?<\x03\x02\x00\xc3_\xfa\x9d\x1b\x02\x00\x00@}\x87?)\x02\x02\x00\xac\x92K\x9e\x1b\x02\x00\x00P/\x1f@\n',
b"\x02\x04\x00\x07Q\x9a\x9e\x1b\x02\x00\x00PI\x04@,\x01\x02\x00\x04-\xb2\x9e\x1b\x02\x00\x00\x80\xdc\xf0?\x1d\x00\x04\x00\x0cw\xbd\x9e\x1b\x02\x00\x00\xa0-\xef?\x0c\x02\x02\x00\xb0\x86\xcf\x9e\x1b\x02\x00\x00 \xc2\xb4?,\x02\x00\x00\x12\x03\x1e\x9f\x1b\x02\x00\x00\x80\xb6\x85?)\x02\x02\x00E\xc9w\x9f\x1b\x02\x00\x000\xf3\x03@\x13\x00\x04\x00P\x1b\x91\x9f\x1b\x02\x00\x00\x00\xea\x06@%\x00\x00\x00\x9b:\x9c\x9f\x1b\x02\x00\x00\xe0T\x0b@\x06\x03\x00\x00\x9b\x9f\xa4\x9f\x1b\x02\x00\x00\xc0/\xf4?\x06\x03\x00\x00Z\xcb\xb8\x9f\x1b\x02\x00\x00\x00A\xe1?!\x02\x02\x00\xbcJ\xbd\x9f\x1b\x02\x00\x00\xe0\xc9\xd2?!\x02\x04\x00\x06]\xd0\x9f\x1b\x02\x00\x00`D\xda?\x1d\x00\x04\x00hB\xde\x9f\x1b\x02\x00\x00\xe0\x10\xff?\x1d\x01\x02\x00\x9fi0\xa0\x1b\x02\x00\x00\xa0f\xec?\x86\x03\x00\x00\xf5Ws\xa0\x1b\x02\x00\x00 \xd5\xca?\x1d\x00\x04\x00L\xa0\x8d\xa0\x1b\x02\x00\x00\xc0#\xda?\x1d\x00\x04\x00|<,\xa1\x1b\x02\x00\x00 \xbc\xd1?\x1d\x00\x04\x00\x8b\xfb2\xa1\x1b\x02\x00\x00\xa0\xbf\xcb?\x08\x02\x02\x00d\xd2X\xa1\x1b\x02\x00\x00 \xc6\xb4?5\x00\x04\x00\xae\x1fc\xa1\x1b\x02\x00\x00@\x07\x90?1\x03\x02\x00\xf3\x80g\xa1\x1b\x02\x00\x00`\xbd\xde?4\x00\x04\x00g\x1dm\xa1\x1b\x02\x00\x00@7\x96?\x98\x03\x00\x00\xb8@|\xa1\x1b\x02\x00\x00PK\x11@\x06\x03\x00\x00\xedj\x83\xa1\x1b\x02\x00\x00\xc0\x11\xdd?,\x02\x00\x00\xb1\xbd\x8e\xa1\x1b\x02\x00\x00\xa0\xc7\xc5?\r\x02\x02\x00\xbd\x0f\xba\xa1\x1b\x02\x00\x00 \xe3\xc1?\x1f\x01\x02\x00\xf5\xa6\xf5\xa1\x1b\x02\x00\x00\x80\xdf\xcd?\x06\x01\x00\x00'\xb5 \xa2\x1b\x02\x00\x00\x00\x02\xb6?\x1d\x00\x04\x00\xfas/\xa2\x1b\x02\x00\x00\xc0\xb2\xbb?\x98\x03\x02\x00=\xaan\xa2\x1b\x02\x00\x00`\xaf\xe8?\x08\x02\x02\x00\xa2\x83\x8f\xa2\x1b\x02\x00\x00\x00\x02\xcd?\x1d\x00\x04\x00\xb2\xce\xcb\xa2\x1b\x02\x00\x00`\x9e\xc1?\x1a\x03\x00\x00\x95\x9f\xef\xa2\x1b\x02\x00\x00\xe0\xa4\x8c?)\x02\x02\x005\x86\xfa\xa2\x1b\x02\x00\x00 \x86\xc8?\x98\x03\x02\x00bH\x12\xa3\x1b\x02\x00\x00\xf0\x1b\x1e@\x8c\x02\x02\x00\xa6\xfa\x1b\xa3\x1b\x02\x00\x00\xe0\n",
b'\xaf?\x1d\x00\x04\x00\xfb\xcd3\xa3\x1b\x02\x00\x00`\xd2\xde?\x84\x03\x00\x00\x81\xcaQ\xa3\x1b\x02\x00\x00 \xc0\xdb?8\x01\x02\x00t\x01\x9a\xa3\x1b\x02\x00\x00\x803\xf8?#\x01\x02\x00@\xdb\xa8\xa3\x1b\x02\x00\x00@k\x02@\x84\x03\x00\x00r\x8e&\xa4\x1b\x02\x00\x00\xe0\x96\xc8?\x1d\x01\x02\x00\xa3\x05?\xa4\x1b\x02\x00\x00\xa0v\xd2?\x1d\x00\x04\x00E\xc3\x8c\xa4\x1b\x02\x00\x000#\x0c@\x02\x01\x02\x00\xf3n\x9f\xa4\x1b\x02\x00\x00\xf0\x06\x13@8\x01\x02\x00\xac<\xc5\xa4\x1b\x02\x00\x00\x80A\x9f?\x8a\x03\x00\x00}\xfc\xe5\xa4\x1b\x02\x00\x00\x80\x00\xc4?\x19\x02\x02\x00\x126\xff\xa4\x1b\x02\x00\x00 \x1d\xac?\n']
对应于:
col1 col2 col3
0 2317613314222 1.551823 556
1 2317614400012 1.206112 131609
2 2317615429391 1.022747 131888
3 2317621608598 2.082569 131643
4 2317622053589 1.260681 271