在 PySpark 中读取二进制文件

问题描述 投票:0回答:0

我有一个二进制文件,我可以使用 numpy 和 pandas 读取它:

dt = numpy.dtype([('col1', numpy.int64), ('col2', numpy.float32), ('col3', numpy.int32)])

df = pandas.from_array(
    numpy.fromfile(file, dtype=dt),
    columns=data.dtype.names)
)

我想使用 PySpark 而不是先创建一个 pandas 数据框,因为文件可能比内存大。

我看到一个推荐的方法是:

df = spark.read.format("binaryFile").load(file)
df.printSchema()
df.show()

但这不允许我指定每一列的类型。 另外,即使有了那个测试文件,我也得到了一个

java.lang.OutOfMemoryError
.

所以现在我正在尝试将它加载到 RDD:

rdd = spark.sparkContext.binaryFiles(file)

然后按照建议here使用Scala应用地图:

import java.nio.ByteBuffer

val result = YourRDD.map(x=>(ByteBuffer.wrap(x.take(4)).getInt,
             ByteBuffer.wrap(x.drop(4).take(2)).getShort,
             ByteBuffer.wrap(x.drop(6)).getLong))

但我无法让它正常工作。 例如,当我尝试

rdd.first()
时,我得到了整个文件。 这是我尝试过的:

rdd = spark.sparkContext.binaryFiles(file)

def func1(x):
    
    dt = numpy.dtype([('time', numpy.int64), ('e', numpy.float32), ('id', numpy.int32)])

    df = pandas.DataFrame(
        numpy.frombuffer(x, dtype=dt),
        columns=dt.names
    )

    return (df.col1,df.col2,df.col3)

result = rdd.mapValues(lambda x: func1(x))

result.first()

但这给了我一个包含完整专栏的条目:

('file',
 (0          2317613314222
                 ...      
  4026940    7317606063913
  Name: col1, Length: 4026941, dtype: int64,
  0          1.551823
               ...   
  4026940    2.379845
  Name: col2, Length: 4026941, dtype: float32,
  0             556
              ...  
  4026940    131336
  Name: col3, Length: 4026941, dtype: int32))

如何加载这个文件?

编辑:文件的小摘录:

with open(file, mode="rb") as open_file:
    contents = open_file.readlines()
    
contents[0:5]

结果:

[b'\xae\xb0\x84\x9c\x1b\x02\x00\x00 \xa2\xc6?,\x02\x00\x00\x0cB\x95\x9c\x1b\x02\x00\x00\xe0a\x9a?\x19\x02\x02\x00\x0f\xf7\xa4\x9c\x1b\x02\x00\x00`\xe9\x82?0\x03\x02\x00\x96@\x03\x9d\x1b\x02\x00\x00\xd0H\x05@;\x02\x02\x00\xd5\n',
 b'\n',
 b'\x9d\x1b\x02\x00\x00\x00^\xa1?\x0f\x01\x00\x00nq,\x9d\x1b\x02\x00\x00\xe0\x89\xad?\xae\x03\x02\x00F\x8e\xb6\x9d\x1b\x02\x00\x00@U\xd1?<\x03\x02\x00\xc3_\xfa\x9d\x1b\x02\x00\x00@}\x87?)\x02\x02\x00\xac\x92K\x9e\x1b\x02\x00\x00P/\x1f@\n',
 b"\x02\x04\x00\x07Q\x9a\x9e\x1b\x02\x00\x00PI\x04@,\x01\x02\x00\x04-\xb2\x9e\x1b\x02\x00\x00\x80\xdc\xf0?\x1d\x00\x04\x00\x0cw\xbd\x9e\x1b\x02\x00\x00\xa0-\xef?\x0c\x02\x02\x00\xb0\x86\xcf\x9e\x1b\x02\x00\x00 \xc2\xb4?,\x02\x00\x00\x12\x03\x1e\x9f\x1b\x02\x00\x00\x80\xb6\x85?)\x02\x02\x00E\xc9w\x9f\x1b\x02\x00\x000\xf3\x03@\x13\x00\x04\x00P\x1b\x91\x9f\x1b\x02\x00\x00\x00\xea\x06@%\x00\x00\x00\x9b:\x9c\x9f\x1b\x02\x00\x00\xe0T\x0b@\x06\x03\x00\x00\x9b\x9f\xa4\x9f\x1b\x02\x00\x00\xc0/\xf4?\x06\x03\x00\x00Z\xcb\xb8\x9f\x1b\x02\x00\x00\x00A\xe1?!\x02\x02\x00\xbcJ\xbd\x9f\x1b\x02\x00\x00\xe0\xc9\xd2?!\x02\x04\x00\x06]\xd0\x9f\x1b\x02\x00\x00`D\xda?\x1d\x00\x04\x00hB\xde\x9f\x1b\x02\x00\x00\xe0\x10\xff?\x1d\x01\x02\x00\x9fi0\xa0\x1b\x02\x00\x00\xa0f\xec?\x86\x03\x00\x00\xf5Ws\xa0\x1b\x02\x00\x00 \xd5\xca?\x1d\x00\x04\x00L\xa0\x8d\xa0\x1b\x02\x00\x00\xc0#\xda?\x1d\x00\x04\x00|<,\xa1\x1b\x02\x00\x00 \xbc\xd1?\x1d\x00\x04\x00\x8b\xfb2\xa1\x1b\x02\x00\x00\xa0\xbf\xcb?\x08\x02\x02\x00d\xd2X\xa1\x1b\x02\x00\x00 \xc6\xb4?5\x00\x04\x00\xae\x1fc\xa1\x1b\x02\x00\x00@\x07\x90?1\x03\x02\x00\xf3\x80g\xa1\x1b\x02\x00\x00`\xbd\xde?4\x00\x04\x00g\x1dm\xa1\x1b\x02\x00\x00@7\x96?\x98\x03\x00\x00\xb8@|\xa1\x1b\x02\x00\x00PK\x11@\x06\x03\x00\x00\xedj\x83\xa1\x1b\x02\x00\x00\xc0\x11\xdd?,\x02\x00\x00\xb1\xbd\x8e\xa1\x1b\x02\x00\x00\xa0\xc7\xc5?\r\x02\x02\x00\xbd\x0f\xba\xa1\x1b\x02\x00\x00 \xe3\xc1?\x1f\x01\x02\x00\xf5\xa6\xf5\xa1\x1b\x02\x00\x00\x80\xdf\xcd?\x06\x01\x00\x00'\xb5 \xa2\x1b\x02\x00\x00\x00\x02\xb6?\x1d\x00\x04\x00\xfas/\xa2\x1b\x02\x00\x00\xc0\xb2\xbb?\x98\x03\x02\x00=\xaan\xa2\x1b\x02\x00\x00`\xaf\xe8?\x08\x02\x02\x00\xa2\x83\x8f\xa2\x1b\x02\x00\x00\x00\x02\xcd?\x1d\x00\x04\x00\xb2\xce\xcb\xa2\x1b\x02\x00\x00`\x9e\xc1?\x1a\x03\x00\x00\x95\x9f\xef\xa2\x1b\x02\x00\x00\xe0\xa4\x8c?)\x02\x02\x005\x86\xfa\xa2\x1b\x02\x00\x00 \x86\xc8?\x98\x03\x02\x00bH\x12\xa3\x1b\x02\x00\x00\xf0\x1b\x1e@\x8c\x02\x02\x00\xa6\xfa\x1b\xa3\x1b\x02\x00\x00\xe0\n",
 b'\xaf?\x1d\x00\x04\x00\xfb\xcd3\xa3\x1b\x02\x00\x00`\xd2\xde?\x84\x03\x00\x00\x81\xcaQ\xa3\x1b\x02\x00\x00 \xc0\xdb?8\x01\x02\x00t\x01\x9a\xa3\x1b\x02\x00\x00\x803\xf8?#\x01\x02\x00@\xdb\xa8\xa3\x1b\x02\x00\x00@k\x02@\x84\x03\x00\x00r\x8e&\xa4\x1b\x02\x00\x00\xe0\x96\xc8?\x1d\x01\x02\x00\xa3\x05?\xa4\x1b\x02\x00\x00\xa0v\xd2?\x1d\x00\x04\x00E\xc3\x8c\xa4\x1b\x02\x00\x000#\x0c@\x02\x01\x02\x00\xf3n\x9f\xa4\x1b\x02\x00\x00\xf0\x06\x13@8\x01\x02\x00\xac<\xc5\xa4\x1b\x02\x00\x00\x80A\x9f?\x8a\x03\x00\x00}\xfc\xe5\xa4\x1b\x02\x00\x00\x80\x00\xc4?\x19\x02\x02\x00\x126\xff\xa4\x1b\x02\x00\x00 \x1d\xac?\n']

对应于:

            col1      col2    col3
0  2317613314222  1.551823     556
1  2317614400012  1.206112  131609
2  2317615429391  1.022747  131888
3  2317621608598  2.082569  131643
4  2317622053589  1.260681     271
python pandas apache-spark pyspark binaryfiles
© www.soinside.com 2019 - 2024. All rights reserved.