如何从 python pandas 数据帧在雪花中创建表(不使用 sqlalchemy)

问题描述 投票:0回答:1

有没有一种方法可以仅使用雪花连接器和 pandas 库从 python 中的 pandas 数据帧创建雪花表?这里的主要目标是仅采用 pandas 数据框并使用该架构在雪花中的特定数据仓库/数据库/架构中创建新表。我已经看到了如何使用 sqlalchemy 执行此操作的示例,我试图避免这些示例,但最坏的情况我只会使用它。

我尝试过其他方法,包括 sqlalchemy、使用 PUT 命令使用雪花上传方法,但想问是否有人有一种替代方法,只使用雪花连接器、pandas,并且需要我将数据保存到本地驱动器或使用sqlalchemy。

也感谢有关如何写出更好问题的任何意见或反馈。

*注:

write_pandas - 雪花连接器功能只能附加已经存在的表。

df.to_sql - 仅适用于 sqlalchemy 或 sqlite3 连接,因此不认为雪花 conn 可以工作但可能是错误的?

我使用了雪花连接器函数 write_pandas() 和 pd_writer() 以及 pandas 函数 to_sql()。这里的问题是,文档中的 pandas to_sql() 函数指出连接只能是“consqlalchemy.engine.(Engine or Connection) 或 sqlite3.Connection”。我更愿意继续使用 python 的雪花连接器。如果不使用 sqlalchemy,我知道我可以执行以下操作:

.ini 用于连接数据库的配置文件(名为 db.ini)

[database_name]
user = user7
pass = s$cret
acc = jhn675f
wh = 22jb7tyo5
db = dev_env546
db_schema = hubspot

用于连接到雪花数据库和执行代码的python模块

import configparser
import pandas as pd
import snowflake.connector

config = configparser.ConfigParser()
config.read('db.ini')

sn_user = config['database_name']['user']
sn_password = config['database_name']['pass']
sn_account = config['database_name']['acc']
sn_warehouse = config['database_name']['wh']
sn_database = config['database_name']['db']
sn_schema= config['database_name']['db_schema']

ctx = snowflake.connector.connect(
    user=sn_user ,
    password = sn_password 
    account=sn_account ,
    warehouse=sn_warehouse ,
    database=sn_database ,
    schema=sn_schema
    )

cs = ctx.cursor()

query_extract = '''
    select table1.field1,
    table1.field2,
    table1.field3,
    table1.field4,
    table1.field5,
    table1.field6,
    table1.field7,
    table2.field2,
    table2.field5,
    table2.field7,
    table2.field9,
    table3.field1,
    table3.field6
    from database.schema.table1
    left join database.schema.table2
    on table1.field3 = table2.field1
    left join database.schema.table3
    on table1.field5 = table3.field1
'''
try:
    cs.execute(query_extract)
    df = cs.fetch_pandas_all()
except:
    #throw exception here

# clean data in the dataframe and perform some calcs
# store results in new dataframe called df_final
# would like to just use df_final to create a table in snowflake based on df_final schema and datatypes
# right now I am not sure how to do that

当前方法或替代方案

import configparser
import pandas as pd
import snowflake.connector

config = configparser.ConfigParser()
config.read('db.ini')

sn_user = config['database_name']['user']
sn_password = config['database_name']['pass']
sn_account = config['database_name']['acc']
sn_warehouse = config['database_name']['wh']
sn_database = config['database_name']['db']
sn_schema= config['database_name']['db_schema']

ctx = snowflake.connector.connect(
    user=sn_user ,
    password = sn_password 
    account=sn_account ,
    warehouse=sn_warehouse ,
    database=sn_database ,
    schema=sn_schema
    )

cs = ctx.cursor()

query_extract = '''
    select table1.field1,
    table1.field2,
    table1.field3,
    table1.field4,
    table1.field5,
    table1.field6,
    table1.field7,
    table2.field2,
    table2.field5,
    table2.field7,
    table2.field9,
    table3.field1,
    table3.field6
    from database.schema.table1
    left join database.schema.table2
    on table1.field3 = table2.field1
    left join database.schema.table3
    on table1.field5 = table3.field1
'''
try:
    cs.execute(query_extract)
    df = cs.fetch_pandas_all()
except:
    #throw exception here

df_final.to_csv('data/processed_data') 

create_stage = '''create stage processed_data_stage
  copy_options = (on_error='skip_file');'''

create_file_format = '''create or replace file format processed_data_stage
type = 'csv' field_delimiter = ',';'''

upload_file = '''put file:/data/processed_data.csv @processed_data_stage;'''

其他选择就是采用 sqlalchemy 和 pandas_to_sql 函数

from snowflake.connector.pandas_tools import pd_writer
import pandas as pd
from sqlalchemy import create_engine

account_identifier = '<account_identifier>'
user = '<user_login_name>'
password = '<password>'
database_name = '<database_name>'
schema_name = '<schema_name>'

conn_string = f"snowflake://{user}:{password}@{account_identifier}/{database_name}/{schema_name}"
engine = create_engine(conn_string)

#Create your DataFrame

table_name = 'cities'
df = pd.DataFrame(data=[['Stephen','Oslo'],['Jane','Stockholm']],columns=['Name','City'])

#What to do if the table exists? replace, append, or fail?

if_exists = 'replace'

#Write the data to Snowflake, using pd_writer to speed up loading

with engine.connect() as con:
        df.to_sql(name=table_name.lower(), con=con, if_exists=if_exists, method=pd_writer)
python pandas dataframe snowflake-cloud-data-platform
1个回答
2
投票

因此,对于任何正在做类似事情的人来说,我最终找到了一个没有记录但可以从数据帧中提取模式的 pandas 函数。

pandas 库有一个名为 pd.io.sql.get_schema() 的函数,它可以根据数据帧和表名称返回格式化为 sql 查询的字符串。所以你可以这样做:

import configparser
import pandas as pd
import snowflake.connector

config = configparser.ConfigParser()
config.read('db.ini')

sn_user = config['database_name']['user']
sn_password = config['database_name']['pass']
sn_account = config['database_name']['acc']
sn_warehouse = config['database_name']['wh']
sn_database = config['database_name']['db']
sn_schema= config['database_name']['db_schema']

ctx = snowflake.connector.connect(
    user=sn_user ,
    password = sn_password 
    account=sn_account ,
    warehouse=sn_warehouse ,
    database=sn_database ,
    schema=sn_schema
    )

cs = ctx.cursor()

query_extract = '''
    select table1.field1,
    table1.field2,
    table1.field3,
    table1.field4,
    table1.field5,
    table1.field6,
    table1.field7,
    table2.field2,
    table2.field5,
    table2.field7,
    table2.field9,
    table3.field1,
    table3.field6
    from database.schema.table1
    left join database.schema.table2
    on table1.field3 = table2.field1
    left join database.schema.table3
    on table1.field5 = table3.field1
'''
try:
    cs.execute(query_extract)
    df = cs.fetch_pandas_all()
except:
    #throw exception here

# clean data in the dataframe and perform some calcs
# store results in new dataframe called df_final

df_final.columns = map(lambda x: str(x).upper(), df_final.columns)

tb_name = 'NEW_TABLE'
df_schema = pd.io.sql.get_schema(df_final, tb_name)
df_schema = str(df_schema).replace('TEXT', 'VARCAHR(256)')

#use replace to remove characters and quotes that might give you syntax errors

cs.execute(df_schema)

success, nchunks, nrows, _ = write_pandas(ctx, df, tb_name)

cs.close()
ctx.close()

我跳过了部分内容,但基本上你可以这样做:

  1. 建立与雪花的连接
  2. 将雪花中的表提取到 pandas 数据框中
  3. 清理数据框并执行转换并保存到新数据框
  4. 使用 pd.io.sql.get_schema 根据要加载到雪花中的 pandas 数据帧获取字符串 sql 查询
  5. 使用 sql 查询的字符串与您的连接和数据库游标来执行基于 df 模式的创建表命令
  6. 使用snowflake write_pandas命令将你的df写入新创建的snowflake表
© www.soinside.com 2019 - 2024. All rights reserved.