OSError:无法反序列化thrift:没有更多数据可供读取。反序列化页眉失败

问题描述 投票:0回答:1

我正在从事件中心获取数据并使用 blob_type 将其上传到 blob

AppendBlob
,它正确附加,但是当我下载并尝试读取该 parquet 文件时,它会显示此错误
OSError: Couldn't deserialize thrift: No more data to read. Deserializing page header failed.
,有时此错误也
Unexpected end of stream: Page was smaller (4) than expected (13)
任何人都可以帮助我理解这两个错误并帮助我解决以前的错误。

import asyncio
from datetime import datetime
import time
from datetime import datetime
import pandas as pd
from io import BytesIO
from azure.storage.blob import BlobServiceClient
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import (BlobCheckpointStore)

EVENT_HUB_CONNECTION_STR = ""
EVENT_HUB_NAME = ""
BLOB_STORAGE_CONNECTION_STRING = ""
BLOB_CONTAINER_NAME = ""

async def on_event(partition_context, event):
    global finalDF
    try:
       
        data = event.body_as_json(encoding='UTF-8')
        df=pd.DataFrame(data,index=[0])
        finalDF=pd.concat([finalDF,df])
       
        if finalDF.shape[0]>100:
            uniqueBPIds=(finalDF['batteryserialnumber'].unique()).tolist()
            parquet = BytesIO()
            for i in uniqueBPIds:
                tempdf=finalDF[finalDF['batteryserialnumber']==i]
                tempdf.to_parquet(parquet)
                parquet.seek(0)
                blob_service_client = BlobServiceClient.from_connection_string(BLOB_STORAGE_CONNECTION_STRING)
                blob_path = f'new8_{year}/{month}/{i}/{i}_{year}_{month}_{day}.parquet'
                blob_client = blob_service_client.get_blob_client(container=BLOB_CONTAINER_NAME, blob=blob_path)
                blob_client.upload_blob(data = parquet,overwrite=False,blob_type='AppendBlob')
            finalDF=pd.DataFrame()
            print('done')
    except Exception as e:
        print('ERROR',e)

    await partition_context.update_checkpoint(event)
    
async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(
        BLOB_STORAGE_CONNECTION_STRING, BLOB_CONTAINER_NAME
    )

    client = EventHubConsumerClient.from_connection_string(
        EVENT_HUB_CONNECTION_STR,
        consumer_group="$Default",
        checkpoint_store=checkpoint_store,
        eventhub_name=EVENT_HUB_NAME,
    )
    async with client:
        await client.receive(on_event=on_event, starting_position="-1")

if __name__ == "__main__":
    k=0
    finalDF = pd.DataFrame()
    current_datetime = datetime.now()
    year, month, day = current_datetime.year, current_datetime.month, current_datetime.day

    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
python azure azure-blob-storage parquet
1个回答
0
投票

我尝试使用下面的代码将事件中心命名空间中的每个事件存储到单个 parquet 文件中的 Azure blob 存储中。

代码:

import datetime
import json
from azure.eventhub import EventHubConsumerClient
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
import uuid
import io


def on_event(partition_context, event):
    event_data = json.loads(event.body_as_str())
    print(f"Event Received from Partition {partition_context.partition_id}: {event_data}")
    save_event_to_blob(event_data, partition_context.partition_id)
    partition_context.update_checkpoint()

def on_error(partition_context, error):
    print(f"Error Received: {error}")

def save_event_to_blob(event_data, partition_id):
    blob_service_client = BlobServiceClient.from_connection_string(
        "<storage_conne_string>"
    )
    container_name = "<container_name>"
    blob_name = f"{datetime.datetime.utcnow().strftime('%Y-%m-%d-%H-%M-%S')}-{uuid.uuid4()}.parquet"

    blob_container_client = blob_service_client.get_container_client(container_name)
    if not blob_container_client.exists():
        blob_container_client.create_container()

    blob_client = blob_container_client.get_blob_client(blob_name)

    with io.BytesIO(json.dumps(event_data).encode('utf-8')) as stream:
        blob_client.upload_blob(stream, overwrite=True)

    print(f"Event stored in blob: {blob_name}")

if __name__ == '__main__':
    eventhub_namespace = "<eventhub_namespace>.servicebus.windows.net"
    eventhub_name = "<eventhub_name>"
    connection_str = f"Endpoint=sb://{eventhub_namespace}/;SharedAccessKeyName=kamc;SharedAccessKey=<event_key>;EntityPath={eventhub_name}"

    consumer_client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group="$Default",
    )

    consumer_client.receive(
        on_event=on_event,
        on_error=on_error,
        starting_position="-1",  
    )

输出:

代码运行成功,如下图:

C:\Users\xxxxx\Documents\xxxxx>python -u "c:\Users\xxxxx\Documents\xxxxx\sample.py"

Event Received from Partition 0: {'EventId': 0, 'Message': 'This is event 0'}
Event stored in blob: 2024-01-20-xxxxxxxx.parquet
Event Received from Partition 0: {'EventId': 1, 'Message': 'This is event 1'}
Event stored in blob: 2024-01-20-xxxxxxxxxx.parquet
Event Received from Partition 0: {'EventId': 2, 'Message': 'This is event 2'}
Event stored in blob: 2024-01-20-xxxxxxxxxx.parquet
Event Received from Partition 0: {'EventId': 3, 'Message': 'This is event 3'}
Event stored in blob: 2024-01-20-xxxxxxxx.parquet
Event Received from Partition 0: {'EventId': 4, 'Message': 'This is event 4'}
Event stored in blob: 2024-01-20-xxxxxxxxxx.parquet
Event Received from Partition 0: {'EventId': 5, 'Message': 'This is event 5'}
Event stored in blob: 2024-01-20-xxxxxxxxxx.parquet
Event Received from Partition 0: {'EventId': 6, 'Message': 'This is event 6'}
Event stored in blob: 2024-01-20-xxxxxxx.parquet
Event Received from Partition 0: {'EventId': 7, 'Message': 'This is event 7'}
Event stored in blob: 2024-01-20-xxxxxxx.parquet
Event Received from Partition 0: {'EventId': 8, 'Message': 'This is event 8'}
Event stored in blob: 2024-01-20-xxxxxxx.parquet
Event Received from Partition 0: {'EventId': 9, 'Message': 'This is event 9'}
Event stored in blob: 2024-01-20-xxxxxxx.parquet

enter image description here

Azure 门户:

所有事件都存储在各自的镶木地板文件中,如下所示:

enter image description here

每个事件数据都保存到相应的 parquet 文件中,如下图所示。

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.