使用Spark,将所有内容加载到数据框中时如何提取文件名?

问题描述 投票:0回答:1

我的代码示例如下。

from  pyspark.sql.functions import input_file_name
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc)

customSchema = StructType([ \
StructField("asset_id", StringType(), True), \
StructField("price_date", StringType(), True), \
etc., 
StructField("close_price", StringType(), True), \
StructField("filename", StringType(), True)])

fullpath = 'path_to files_and_all_credentials'

现在,要使所有内容都在数据框中,并将文件路径添加为一列,我希望代码像这样……

df = spark.load(fullPath, withColumn("filename", input_file_name(), header='false', schema = customSchema, sep='|')

或...

df = spark.read.format("csv"), \
   .option("header", "false"), \
   .option(schema = customSchema), \
   .option(delimiter = "|"), \
   .load(fullPath), \
   .withColumn("filename", input_file_name()

运行此示例代码时,收到“无效语法”错误消息。我认为这应该是一件非常简单的事情。我该如何进行这项工作?谢谢!

python dataframe apache-spark pyspark pyspark-dataframes
1个回答
0
投票
df = spark.read.format("csv") \ .option("header", "false") \ .option("sep","|") \ .schema(customSchema) \ .load(fullPath) \ .withColumn("filename", input_file_name())

并且,要写入数据库。

import pandas as pd
url = "jdbc:sqlserver://server_name.database.windows.net:1433;databaseName=db_name"
props = {"user": "usr","password": "pwd"}

pd.set_option('display.max_columns', None)
#df.printSchema()
#df.show()

df.write.mode('append').jdbc(url,"dbo.table_name",properties=props)
© www.soinside.com 2019 - 2024. All rights reserved.