有没有一种方法可以仅使用雪花连接器和 pandas 库从 python 中的 pandas 数据帧创建雪花表?这里的主要目标是仅采用 pandas 数据框并使用该架构在雪花中的特定数据仓库/数据库/架构中创建新表。我已经看到了如何使用 sqlalchemy 执行此操作的示例,我试图避免这些示例,但最坏的情况我只会使用它。
我尝试过其他方法,包括 sqlalchemy、使用 PUT 命令使用雪花上传方法,但想问是否有人有一种替代方法,只使用雪花连接器、pandas,并且需要我将数据保存到本地驱动器或使用sqlalchemy。
也感谢有关如何写出更好问题的任何意见或反馈。
*注:
write_pandas - 雪花连接器功能只能附加已经存在的表。
df.to_sql - 仅适用于 sqlalchemy 或 sqlite3 连接,因此不认为雪花 conn 可以工作但可能是错误的?
我使用了雪花连接器函数 write_pandas() 和 pd_writer() 以及 pandas 函数 to_sql()。这里的问题是,文档中的 pandas to_sql() 函数指出连接只能是“consqlalchemy.engine.(Engine or Connection) 或 sqlite3.Connection”。我更愿意继续使用 python 的雪花连接器。如果不使用 sqlalchemy,我知道我可以执行以下操作:
.ini 用于连接数据库的配置文件(名为 db.ini)
[database_name]
user = user7
pass = s$cret
acc = jhn675f
wh = 22jb7tyo5
db = dev_env546
db_schema = hubspot
用于连接到雪花数据库和执行代码的python模块
import configparser
import pandas as pd
import snowflake.connector
config = configparser.ConfigParser()
config.read('db.ini')
sn_user = config['database_name']['user']
sn_password = config['database_name']['pass']
sn_account = config['database_name']['acc']
sn_warehouse = config['database_name']['wh']
sn_database = config['database_name']['db']
sn_schema= config['database_name']['db_schema']
ctx = snowflake.connector.connect(
user=sn_user ,
password = sn_password
account=sn_account ,
warehouse=sn_warehouse ,
database=sn_database ,
schema=sn_schema
)
cs = ctx.cursor()
query_extract = '''
select table1.field1,
table1.field2,
table1.field3,
table1.field4,
table1.field5,
table1.field6,
table1.field7,
table2.field2,
table2.field5,
table2.field7,
table2.field9,
table3.field1,
table3.field6
from database.schema.table1
left join database.schema.table2
on table1.field3 = table2.field1
left join database.schema.table3
on table1.field5 = table3.field1
'''
try:
cs.execute(query_extract)
df = cs.fetch_pandas_all()
except:
#throw exception here
# clean data in the dataframe and perform some calcs
# store results in new dataframe called df_final
# would like to just use df_final to create a table in snowflake based on df_final schema and datatypes
# right now I am not sure how to do that
当前方法或替代方案
import configparser
import pandas as pd
import snowflake.connector
config = configparser.ConfigParser()
config.read('db.ini')
sn_user = config['database_name']['user']
sn_password = config['database_name']['pass']
sn_account = config['database_name']['acc']
sn_warehouse = config['database_name']['wh']
sn_database = config['database_name']['db']
sn_schema= config['database_name']['db_schema']
ctx = snowflake.connector.connect(
user=sn_user ,
password = sn_password
account=sn_account ,
warehouse=sn_warehouse ,
database=sn_database ,
schema=sn_schema
)
cs = ctx.cursor()
query_extract = '''
select table1.field1,
table1.field2,
table1.field3,
table1.field4,
table1.field5,
table1.field6,
table1.field7,
table2.field2,
table2.field5,
table2.field7,
table2.field9,
table3.field1,
table3.field6
from database.schema.table1
left join database.schema.table2
on table1.field3 = table2.field1
left join database.schema.table3
on table1.field5 = table3.field1
'''
try:
cs.execute(query_extract)
df = cs.fetch_pandas_all()
except:
#throw exception here
df_final.to_csv('data/processed_data')
create_stage = '''create stage processed_data_stage
copy_options = (on_error='skip_file');'''
create_file_format = '''create or replace file format processed_data_stage
type = 'csv' field_delimiter = ',';'''
upload_file = '''put file:/data/processed_data.csv @processed_data_stage;'''
其他选择就是采用 sqlalchemy 和 pandas_to_sql 函数
from snowflake.connector.pandas_tools import pd_writer
import pandas as pd
from sqlalchemy import create_engine
account_identifier = '<account_identifier>'
user = '<user_login_name>'
password = '<password>'
database_name = '<database_name>'
schema_name = '<schema_name>'
conn_string = f"snowflake://{user}:{password}@{account_identifier}/{database_name}/{schema_name}"
engine = create_engine(conn_string)
#Create your DataFrame
table_name = 'cities'
df = pd.DataFrame(data=[['Stephen','Oslo'],['Jane','Stockholm']],columns=['Name','City'])
#What to do if the table exists? replace, append, or fail?
if_exists = 'replace'
#Write the data to Snowflake, using pd_writer to speed up loading
with engine.connect() as con:
df.to_sql(name=table_name.lower(), con=con, if_exists=if_exists, method=pd_writer)
因此,对于任何正在做类似事情的人来说,我最终找到了一个没有记录但可以从数据帧中提取模式的 pandas 函数。
pandas 库有一个名为 pd.io.sql.get_schema() 的函数,它可以根据数据帧和表名称返回格式化为 sql 查询的字符串。所以你可以这样做:
import configparser
import pandas as pd
import snowflake.connector
config = configparser.ConfigParser()
config.read('db.ini')
sn_user = config['database_name']['user']
sn_password = config['database_name']['pass']
sn_account = config['database_name']['acc']
sn_warehouse = config['database_name']['wh']
sn_database = config['database_name']['db']
sn_schema= config['database_name']['db_schema']
ctx = snowflake.connector.connect(
user=sn_user ,
password = sn_password
account=sn_account ,
warehouse=sn_warehouse ,
database=sn_database ,
schema=sn_schema
)
cs = ctx.cursor()
query_extract = '''
select table1.field1,
table1.field2,
table1.field3,
table1.field4,
table1.field5,
table1.field6,
table1.field7,
table2.field2,
table2.field5,
table2.field7,
table2.field9,
table3.field1,
table3.field6
from database.schema.table1
left join database.schema.table2
on table1.field3 = table2.field1
left join database.schema.table3
on table1.field5 = table3.field1
'''
try:
cs.execute(query_extract)
df = cs.fetch_pandas_all()
except:
#throw exception here
# clean data in the dataframe and perform some calcs
# store results in new dataframe called df_final
df_final.columns = map(lambda x: str(x).upper(), df_final.columns)
tb_name = 'NEW_TABLE'
df_schema = pd.io.sql.get_schema(df_final, tb_name)
df_schema = str(df_schema).replace('TEXT', 'VARCAHR(256)')
#use replace to remove characters and quotes that might give you syntax errors
cs.execute(df_schema)
success, nchunks, nrows, _ = write_pandas(ctx, df, tb_name)
cs.close()
ctx.close()
我跳过了部分内容,但基本上你可以这样做: