我的代码示例如下。
from pyspark.sql.functions import input_file_name
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc)
customSchema = StructType([ \
StructField("asset_id", StringType(), True), \
StructField("price_date", StringType(), True), \
etc.,
StructField("close_price", StringType(), True), \
StructField("filename", StringType(), True)])
fullpath = 'path_to files_and_all_credentials'
现在,要使所有内容都在数据框中,并将文件路径添加为一列,我希望代码像这样……
df = spark.load(fullPath, withColumn("filename", input_file_name(), header='false', schema = customSchema, sep='|')
或...
df = spark.read.format("csv"), \
.option("header", "false"), \
.option(schema = customSchema), \
.option(delimiter = "|"), \
.load(fullPath), \
.withColumn("filename", input_file_name()
运行此示例代码时,收到“无效语法”错误消息。我认为这应该是一件非常简单的事情。我该如何进行这项工作?谢谢!
df = spark.read.format("csv") \
.option("header", "false") \
.option("sep","|") \
.schema(customSchema) \
.load(fullPath) \
.withColumn("filename", input_file_name())
并且,要写入数据库。
import pandas as pd
url = "jdbc:sqlserver://server_name.database.windows.net:1433;databaseName=db_name"
props = {"user": "usr","password": "pwd"}
pd.set_option('display.max_columns', None)
#df.printSchema()
#df.show()
df.write.mode('append').jdbc(url,"dbo.table_name",properties=props)