错误包arrowR:read_parquet/open_dataset“无法反序列化thrift:TProtocolException:超出大小限制”

问题描述 投票:0回答:1

我的机构正在慢慢从SAS过渡到R,大部分代码都是用arrow/dplyr或data.table编写,使用.parquet格式作为其主要存储格式。 在我自己的工作中,我通常要处理存储和分析 100 万到 1000 万行以及最多 150-200 列的数据。 Parquet 格式非常适合这种用法,但是最近出现了一个不寻常的错误,我在互联网上找不到任何资源:

library(arrow)
library(tidyverse)

open_dataset(data_error)
Error in `open_dataset()`:
! IOError: Error creating dataset. Could not read schema from 'path/example.parquet'. 
Is this a 'parquet' file?: 
Could not open Parquet input source 'path/example.parquet':
Couldn't deserialize thrift: TProtocolException: Exceeded size limit

函数

read_parquet
也会发生同样的情况。

什么是
data_error

data_error
只是一个典型的data.frame,通过几个
data_clean
进程从更大的数据源(我们称之为
data.table
)中提取,并由
write_parquet
未分区保存。 请注意,如果 parquet 文件已分区,则不会发生此错误

这个错误首先发生在data.table上的一个程序上,这个程序不是我写的,而且我对data.table不够熟悉,无法理解根本问题。

代表:

library(arrow)
library(data.table)
# Seed
set.seed(1L)
# Big enough data.table 
dt = data.table(x = sample(1e5L, 1e7L, TRUE), y = runif(100L)) 
# Save in parquet format
write_parquet(dt, "example_ok.parquet")
# Readable
dt_ok <- open_dataset("example_ok.parquet")
# Simple filter 
dt[x == 989L]
# Save in parquet format
write_parquet(dt, "example_error.parquet")
# Error
dt_error <- open_dataset("example_error.parquet")

谢谢大家的帮助!

r data.table apache-arrow
1个回答
0
投票

罪魁祸首是,一旦你调用

dt[x == 989L]
,就会在
data.table
中创建一个索引。

set.seed(1L)
dt = data.table(x = sample(1e5L, 1e7L, TRUE), y = runif(100L))
attr1 <- attributes(dt)
dt[x == 989L]
attr2 <- attributes(dt)
str(attr1)
# List of 4
#  $ names            : chr [1:2] "x" "y"
#  $ row.names        : int [1:10000000] 1 2 3 4 5 6 7 8 9 10 ...
#  $ class            : chr [1:2] "data.table" "data.frame"
#  $ .internal.selfref:<externalptr> 
str(attr2)
# List of 5
#  $ names            : chr [1:2] "x" "y"
#  $ row.names        : int [1:10000000] 1 2 3 4 5 6 7 8 9 10 ...
#  $ class            : chr [1:2] "data.table" "data.frame"
#  $ .internal.selfref:<externalptr> 
#  $ index            : int(0) 
#   ..- attr(*, "__x")= int [1:10000000] 17660 25871 28519 270694 275019 419020 437190 615578 628622 739696 ...

注意添加了

index
属性。

arrow
的默认动作是存储属性;这样做的一个很好的副作用是
dt_ok
实际上属于
data.table
类:

head(dt_ok) |> collect() # assuming dplyr::collect is visible?
#        x         y
#    <int>     <num>
# 1: 24388 0.4023457
# 2: 59521 0.9142361
# 3: 43307 0.2847435
# 4: 69586 0.3440578
# 5: 11571 0.1822614
# 6: 25173 0.8130521

文件大小也会受到不利影响(不确定您是否意识到这一点):

file.info(list.files(pattern = "*parquet"))
#                            size isdir mode               mtime               ctime               atime  uid  gid uname grname
# example_error.parquet 209818297 FALSE  664 2024-02-12 11:34:05 2024-02-12 11:34:05 2024-02-12 11:34:05 1000 1000    r2     r2
# example_ok.parquet     25744071 FALSE  664 2024-02-12 11:33:58 2024-02-12 11:33:58 2024-02-12 11:33:58 1000 1000    r2     r2

显然

_error
文件还有更多内容。 R 属性无法提供
parquet
文件中二进制数据存储的正常效率,因此存储效率较低的向量中的 10Mi 值会占用该空间是有道理的。

如果我们删除

index
,问题就会消失。删除索引的一种方法是手动设置顺序:

write_parquet(dt, "example_error.parquet")
setorder(dt)
names(attributes(dt))
# [1] "names"             "row.names"         "class"             ".internal.selfref"
write_parquet(dt, "example_ok2.parquet")
open_dataset("example_error.parquet")
# Error in open_dataset("example_error.parquet") : 
#   IOError: Error creating dataset. Could not read schema from '.../example_error.parquet'. Is this a 'parquet' file?: Could not open Parquet input source '.../example_error.parquet': Couldn't deserialize thrift: TProtocolException: Exceeded size limit
open_dataset("example_ok2.parquet")
# FileSystemDataset with 1 Parquet file
# x: int32
# y: double
# See $metadata for additional Schema metadata

我的第一反应是这是一个错误,可能是由于属性的大小所致。为了演示,如果我们用 100 行重复此操作,就没有问题。

set.seed(1L)
dt = data.table(x = sample(1e5L, 1e2L, TRUE), y = runif(100L))
write_parquet(dt, "small1.parquet")
open_dataset("small1.parquet")
# FileSystemDataset with 1 Parquet file
# x: int32
# y: double
# See $metadata for additional Schema metadata
dt[x == 8229L]
#        x       y
#    <int>   <num>
# 1:  8229 0.62041
str(attributes(dt))
# List of 5
#  $ names            : chr [1:2] "x" "y"
#  $ row.names        : int [1:100] 1 2 3 4 5 6 7 8 9 10 ...
#  $ class            : chr [1:2] "data.table" "data.frame"
#  $ .internal.selfref:<externalptr> 
#  $ index            : int(0) 
#   ..- attr(*, "__x")= int [1:100] 35 50 21 78 98 33 9 77 88 94 ...
write_parquet(dt, "small2.parquet")
open_dataset("small1.parquet")
# FileSystemDataset with 1 Parquet file
# x: int32
# y: double
# See $metadata for additional Schema metadata

我建议(甚至请求)您提交错误报告

© www.soinside.com 2019 - 2024. All rights reserved.