在这个项目中,有一个 C# API,我需要构建一个简单的程序来读取 parquet 文件并以 json 形式返回它。通常我使用 python,在 python 中读取 parquet 文件就像 1 行一样简单——但我坚持使用 C#(初学者)。下面是整个程序的一个片段,它采用 S3 URL,将 parquet 文件下载到临时文件中,然后从那里开始执行以下代码。
代码在这一行失败 -
DataColumn column = await groupReader.ReadColumnAsync(dataFields[c]); ///ERROR
我不完全确定错误消息的含义 - 是否数据太大?是在谈论特定列,还是数据类型不匹配,甚至列名太长?我想弄清楚错误是什么,为什么会这样,以及如何处理它?在Python中读取相同的parquet文件(
pd.read_parquet(filename)
)显示所有列都是float64
类型,有90k行和30列。
System.ArgumentException
HResult=0x80070057
Message=Destination is too short. (Parameter 'destination')
Source=System.Private.CoreLib
StackTrace:
at System.ThrowHelper.ThrowArgumentException_DestinationTooShort()
at Parquet.Encodings.ParquetPlainEncoder.Decode(Span`1 source, Span`1 data)
at Parquet.Encodings.ParquetPlainEncoder.Decode(Array data, Int32 offset, Int32 count, SchemaElement tse, Span`1 source, Int32& elementsRead)
at Parquet.File.DataColumnReader.ReadColumn(Span`1 src, Encoding encoding, Int64 totalValuesInChunk, Int32 totalValuesInPage, PackedColumn pc)
at Parquet.File.DataColumnReader.<ReadDataPageV1Async>d__13.MoveNext()
at Parquet.File.DataColumnReader.<ReadAsync>d__8.MoveNext()
at ConvertController.<ConvertToJSON>d__2.MoveNext() in C:\Users\myuser\Desktop\repos\frontend\project\Controllers\WebAPI_ParquetController.cs:line 78
This exception was originally thrown at this call stack:
[External Code]
ConvertController.ConvertToJSON(string) in WebAPI_ParquetController.cs
从文件下载到临时文件的位置开始的代码 -
// Open the parquet file stream
using (Stream fileStream = System.IO.File.OpenRead(tempFilePath))
{
// Open parquet file reader
using (ParquetReader parquetReader = await ParquetReader.CreateAsync(fileStream))
{
// Get file schema
DataField[] dataFields = parquetReader.Schema.GetDataFields();
var result = new List<Dictionary<string, object>>();
// Enumerate through row groups in this file
for (int i = 0; i < parquetReader.RowGroupCount; i++)
{
// Create row group reader
using (ParquetRowGroupReader groupReader = parquetReader.OpenRowGroupReader(i))
{
var rowGroupResult = new Dictionary<string, object>();
// Read all columns inside each row group
for (int c = 0; c < dataFields.Length; c++)
{
DataColumn column = await groupReader.ReadColumnAsync(dataFields[c]); ///ERROR
// Cast column data to the appropriate type
var columnData = column.Data;
var decodedData = new object[columnData.Length];
// Decode the column data
for (int idx = 0; idx < columnData.Length; idx++)
{
decodedData[idx] = column.Data.GetValue(idx);
}
string columnName = dataFields[c].Name;
rowGroupResult[columnName] = decodedData;
}
result.Add(rowGroupResult);
}
}
// Convert the result to JSON
var jsonResult = JsonConvert.SerializeObject(result);
return Ok(jsonResult);
}
}
}
我在尝试使用 ReadColumn 和 ReadRow 等方法使用 Parquet.NET 读取 Parquet 文件时也遇到了错误,因此我切换到适合我的 ReadAsTableAsync() 方法。以下方法是将 Parquet 文件内容读取为 Parquet 表的示例:
public List<IClass> ReadStuffFromParquetFile(string dirpath)
{
List<IClass> results = new List<IClass>();
try
{
string[] parquetFiles = System.IO.Directory.GetFiles(dirpath, "*.parquet");
ParquetReader reader = ParquetReader.CreateAsync(parquetFiles[0]).Result;
Table parquetTable = reader.ReadAsTableAsync().Result;
for (int i = 0; i < parquetTable.Count; i++)
{
dynamic json = JsonConvert.DeserializeObject(parquetTable[i].ToString());
IClass obj = new IClass
{
Name = json["Name"]
};
results.Add(obj);
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
return results;
}
希望有帮助