如何编写一个airflow函数来读取csv文件,将数据转换为avro格式并将数据放入临时文件中?
@task
def source_data(**kwargs) -> Dict[str, any]:
execution_date = kwargs["execution_date"]
# Construct the file path based on the execution date
file_date = execution_date.strftime('%Y-%m-%d')
file_path = os.path.join(data_dir, f"transactions_{file_date}.csv")
avro_file_path = os.path.join(data_dir, f"transactions_{file_date}.avro")
# Read CSV and convert to Avro
csv_data = []
with open(file_path, 'r') as csv_file:
for line in csv_file:
key, value = line.strip().split(',')
csv_data.append({"key": key, "value": int(value)})
schema = {
"type": "record",
"name": "Transaction",
"fields": [
{"name": "key", "type": "string"},
{"name": "value", "type": "int"}
]
}
with open(avro_file_path, 'wb') as avro_file:
writer = DataFileWriter(avro_file, DatumWriter(), schema)
for row in csv_data:
writer.append(row)
writer.close()
return {"avro_file_path": avro_file_path}
我已经尝试了很多东西,但我无法正确处理 avro 部分。在这种情况下,我有一个错误:
AttributeError: 'dict' object has no attribute 'parse'
我还能尝试什么?
schema = avro.schema.parse({...})
# or
schema = avro.schema.parse(json.dumps({...})