我正在编写一个Python脚本,该脚本旨在处理一些数据,创建一个表(如果不存在),并在插入刷新的数据集之前截断该表。我使用的角色具有使用、读取、写入、创建表权限以及阶段权限设置,如下所示:
grant usage, read, write on future stages in schema <schema> to role <role>
我通过雪花连接器在 python 中使用 write_pandas 函数。文档说这个函数使用 PUT 和 Copy Into 命令:
To write the data to the table, the function saves the data to Parquet files, uses the PUT command to upload these files to a temporary stage, and uses the COPY INTO <table> command to copy the data from the files to the table. You can use some of the function parameters to control how the PUT and COPY INTO <table> statements are executed.
我仍然收到错误消息,指出我无法对架构进行操作,并且我不确定还需要添加什么。有人拥有运行 write_pandas 命令所需的权限列表吗?
write_pandas()
不会自动创建表格。如果事先不存在该表,则需要自行创建该表。每次运行 write_pandas()
时,它只会将数据帧附加到您指定的表中。
另一方面,如果您使用
df.to_sql(..., method=pd_writer)
将 pandas 数据帧写入雪花,它会自动为您创建表,并且您可以在 if_exists
中使用 to_sql()
来指定不同的行为 - 追加、替换或失败- 如果表已经存在。
我有一个相当不优雅的解决方案,可以完成表创建和附加的工作,所有这些都无需离开我的 Jupyter。
我将此代码保存在我的 sql 实用程序文件中。 get_col_types 函数将创建创建表所需的列名称和数据类型的字典。
def get_col_types(df):
'''
Helper function to create/modify Snowflake tables; gets the column and dtype pair for each item in the dataframe
args:
df: dataframe to evaluate
'''
import numpy as np
# get dtypes and convert to df
ct = df.dtypes.reset_index().rename(columns={0:'col'})
ct = ct.apply(lambda x: x.astype(str).str.upper()) # case matching as snowflake needs it in uppers
# only considers objects at this point
# only considers objects and ints at this point
ct['col'] = np.where(ct['col']=='OBJECT', 'VARCHAR', ct['col'])
ct['col'] = np.where(ct['col'].str.contains('DATE'), 'DATETIME', ct['col'])
ct['col'] = np.where(ct['col'].str.contains('INT'), 'NUMERIC', ct['col'])
ct['col'] = np.where(ct['col'].str.contains('FLOAT'), 'FLOAT', ct['col'])
# get the column dtype pair
l = []
for index, row in ct.iterrows():
l.append(row['index'] + ' ' + row['col'])
string = ', '.join(l) # convert from list to a string object
string = string.strip()
return string
def create_table(table, action, col_type, df):
'''
Function to create/replace and append to tables in Snowflake
args:
table: name of the table to create/modify
action: whether do the initial create/replace or appending; key to control logic
col_type: string with column name associated dtype, each pair separated by a comma; comes from get_col_types() func
df: dataframe to load
dependencies: function get_col_types(); helper function to get the col and dtypes to create a table
'''
import pandas as pd
import snowflake.connector as snow
from snowflake.connector.pandas_tools import write_pandas
from snowflake.connector.pandas_tools import pd_writer
database=database
warehouse=warehouse
schema=schema
# set up connection
conn = snow.connect(
account = ACCOUNT,
user = USER,
password = PW,
warehouse = warehouse,
database = database,
schema = schema,
role = ROLE)
# set up cursor
cur = conn.cursor()
if action=='create_replace':
# set up execute
cur.execute(
""" CREATE OR REPLACE TABLE
""" + table +"""(""" + col_type + """)""")
#prep to ensure proper case
df.columns = [col.upper() for col in df.columns]
# write df to table
write_pandas(conn, df, table.upper())
elif action=='append':
# convert to a string list of tuples
df = str(list(df.itertuples(index=False, name=None)))
# get rid of the list elements so it is a string tuple list
df = df.replace('[','').replace(']','')
# set up execute
cur.execute(
""" INSERT INTO """ + table + """
VALUES """ + df + """
""")
工作示例:
# create df
l1 = ['cats','dogs','frogs']
l2 = [10, 20, 30]
df = pd.DataFrame(zip(l1,l2), columns=['type','age'])
col_type = get_col_types(df)
create_table('table_test', 'create_replace', col_type, df)
# now that the table is created, append to it
l1 = ['cow','cricket']
l2 = [45, 20]
df2 = pd.DataFrame(zip(l1,l2), columns=['type','age'])
append_table('table_test', 'append', None, df2)
本着 StackOverflow 的社区精神,我对 @Cory Randolph 发布的答案进行了一些更改/清理。
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
conn = snowflake.connector.connect(
user=USER,
password=PASSWORD,
account=ACCOUNT,
database=DATABASE,
role=ROLE,
warehouse=WAREHOUSE,
schema=ML)
def get_table_metadata(df):
def map_dtypes(x):
if (x == 'object') or (x=='category'):
return 'VARCHAR'
elif 'date' in x:
return 'DATETIME'
elif 'int' in x:
return 'NUMERIC'
elif 'float' in x: return 'FLOAT'
else:
print("cannot parse pandas dtype")
sf_dtypes = [map_dtypes(str(s)) for s in df.dtypes]
table_metadata = ", ". join([" ".join([y.upper(), x]) for x, y in zip(sf_dtypes, list(df.columns))])
return table_metadata
def df_to_snowflake_table(table_name, operation, df, conn=conn):
if operation=='create_replace':
df.columns = [c.upper() for c in df.columns]
table_metadata = get_table_metadata(df)
conn.cursor().execute(f"CREATE OR REPLACE TABLE {table_name} ({table_metadata})")
write_pandas(conn, df, table_name.upper())
elif operation=='insert':
table_rows = str(list(df.itertuples(index=False, name=None))).replace('[','').replace(']','')
conn.cursor().execute(f"INSERT INTO {table_name} VALUES {table_rows}")
然后
df_to_snowflake_table('table_test', 'create_replace', df)
和/或
df_to_snowflake_table('table_test', 'insert', df)
Windows 10、Python 3.9.4、Snowflake-Connector-Python 2.4.2、Pandas 1.1.5
我对 write_pandas 函数也有同样的问题。
我在 Snowflake 上拥有 accountadmin 权限。 Python 代码和错误回溯附在下面。
但是,如果我要显式编写 CSV 文件,我可以使用以下两个函数从 CSV 文件上传数据:
所以,这肯定是带有 write_pandas 功能的东西。
```import pandas as pd
```import snowflake.connector
```...
```from snowflake.connector.pandas_tools import write_pandas
```conn = snowflake.connector.connect(
``` user=strSnowflakeUserLogin,
``` password=strSnowflakeUserPassword,
``` account=strSnowflakeAccount,
``` role=strSnowflakeUserRole,
``` warehouse=strSnoflakeWarehouse,
``` database=strSnowflakeDatabase,
``` schema=strSnowflakeSchema
``` )
Traceback (most recent call last):
File "myPython.py", line xxx, in <module> myPythonModule()
write_pandas(conn, df, strSnowflakeTable)
File "C:\Users\<username>\AppData\Local\Programs\Python\Python39\lib\site-packages\snowflake\connector\pandas_tools.py", line 197, in write_pandas
copy_results = cursor.execute(copy_into_sql, _is_internal=True).fetchall()
File "C:\Users\<username>\AppData\Local\Programs\Python\Python39\lib\site-packages\snowflake\connector\cursor.py", line 692, in execute
Error.errorhandler_wrapper(
File "C:\Users\<username>\AppData\Local\Programs\Python\Python39\lib\site-packages\snowflake\connector\errors.py", line 258, in errorhandler_wrapper
cursor.errorhandler(connection, cursor, error_class, error_value)
File "C:\Users\<username>\AppData\Local\Programs\Python\Python39\lib\site-packages\snowflake\connector\errors.py", line 188, in default_errorhandler
raise error_class(
snowflake.connector.errors.ProgrammingError: 001757 (42601): SQL compilation error:
Table 'mySnowflakeTable' does not exist
```...
```write_pandas(conn, df, strSnowflakeTable)
@Christopher 解决方案对于使其成为一个可重复/动态的过程非常有帮助。
我稍微更新了 get_col_types 函数,但性能相同。
def get_col_types(df) -> str:
'''
Helper function to create/modify Snowflake tables; gets the column and dtype pair for each item in the dataframe
Args:
df: dataframe to evaluate
Returns:
String with the formated column name and the converted snowflake data type.
Example: 'COL_A FLOAT, COL_B DATETIME, COL_C FLOAT, COL_D NUMERIC, COL_E VARCHAR'
'''
import numpy as np
# Get dtypes and convert to df
df_col_types = df.dtypes.reset_index()
df_col_types = df_col_types.rename(columns={'index': 'col_name', 0:'dtype'})
df_col_types = df_col_types.apply(lambda x: x.astype(str).str.upper()) # Case matching as snowflake needs it in uppers
# Create the mapping from Dataframe types to Snowflake data types
df_col_types['dtype'] = np.where(df_col_types['dtype']=='OBJECT', 'VARCHAR', df_col_types['dtype'])
df_col_types['dtype'] = np.where(df_col_types['dtype'].str.contains('DATE'), 'DATETIME', df_col_types['dtype'])
df_col_types['dtype'] = np.where(df_col_types['dtype'].str.contains('INT'), 'NUMERIC', df_col_types['dtype'])
df_col_types['dtype'] = np.where(df_col_types['dtype'].str.contains('FLOAT'), 'FLOAT', df_col_types['dtype'])
df_col_types['dtype'] = np.where(df_col_types['dtype'].str.contains('CATEGORY'), 'VARCHAR', df_col_types['dtype'])
# Get the column dtype pairs
df_col_types['dtype_pairs'] = df_col_types.apply(lambda row: row['col_name'] + " " + row['dtype'], axis = 1)
col_type_pair_str = ' '.join(df_col_types['dtype_pairs'])
return col_type_pair_str
如果您无法在表本身中写入,我想当雪花无法读取 /warehouse/database/scheme 时就会出现此问题。因此,您可以在 try 和 catch 两个部分中执行任何命令之前使用它们。
session.use_warehouse('warehouse Name')
session.use_database('Database Name')
session.use_schema('schema Name')