将 csv 转换为加载数据到 avro tmp 文件中

问题描述 投票:0回答:1

如何编写一个airflow函数来读取csv文件,将数据转换为avro格式并将数据放入临时文件中?

@task
def source_data(**kwargs) -> Dict[str, any]:

    execution_date = kwargs["execution_date"]
    # Construct the file path based on the execution date
    file_date = execution_date.strftime('%Y-%m-%d')
    file_path = os.path.join(data_dir, f"transactions_{file_date}.csv")
    avro_file_path = os.path.join(data_dir, f"transactions_{file_date}.avro")
   

    # Read CSV and convert to Avro
    csv_data = []
    with open(file_path, 'r') as csv_file:
        for line in csv_file:
            key, value = line.strip().split(',')
            csv_data.append({"key": key, "value": int(value)})

    schema = {
    "type": "record",
    "name": "Transaction",
    "fields": [
        {"name": "key", "type": "string"},
        {"name": "value", "type": "int"}
    ]
    }

    with open(avro_file_path, 'wb') as avro_file:
        writer = DataFileWriter(avro_file, DatumWriter(), schema)
        for row in csv_data:
            writer.append(row)
        writer.close()

    return {"avro_file_path": avro_file_path}

我已经尝试了很多东西,但我无法正确处理 avro 部分。在这种情况下,我有一个错误:

AttributeError: 'dict' object has no attribute 'parse'

我还能尝试什么?

python python-3.x csv airflow avro
1个回答
0
投票
schema = avro.schema.parse({...})

# or

schema = avro.schema.parse(json.dumps({...})
最新问题
© www.soinside.com 2019 - 2024. All rights reserved.