SQLAlchemy - 从 yaml 或字典创建表?

问题描述 投票:0回答:2

有没有办法从 yaml 文件中指定的字典创建动态表?我在 yaml 文件中定义了很多 ETL 配置,所以我很好奇是否也可以向其中添加表创建方面,这样我就不必在单独的目录中修改单独的 .sql 文件。

database:
  table: 'schema.fact_stuff'
  create_columns: [
    {}
  ] #not sure how this section should be

我在 stackoverflow 上找到了一个解决方案,它将一些列表压缩在一起,这与类似的东西类似,但我更愿意明确定义每一列。

{'column_name': 'id', 'column_type': Integer, 'primary_key': False, 'nullable': True}

我最终让它与这个一起工作:

from sqlalchemy.types import (Integer, NUMERIC, TEXT, BOOLEAN, TIMESTAMP, DATE)

sql_types = {'integer': Integer,
        'numeric': NUMERIC,
        'text': TEXT,
        'date': DATE,
        'timestamp': TIMESTAMP(timezone=False),
        'timestamptz': TIMESTAMP(timezone=True)}

exclude_list = ['original_name']
table_dict = [{k: v for k, v in d.items() if k not in exclude_list} for d in c[variable]['load']['columns']]
for column in table_dict:
    for key, val in column.copy().items():
        if key == 'type_':
            column[key] = sql_types[val]
        elif key == 'default':
            column[key] = dt.datetime.utcnow

metadata = sa.MetaData(schema=c[variable]['load']['schema'])
metadata.reflect(bind=engine, autoload=True)
fact = sa.Table(c[variable]['load']['table'], metadata, extend_existing=True,
        *(sa.Column(**kwargs) for kwargs in table_dict))
fact.create_all(engine, checkfirst=True)

但后来我转而让 pandas 确定数据类型,而不是在 yaml 文件中定义它们。这将使用 jinja2 模板创建 sql,然后循环遍历所有数据源以创建 DDL。

def pandas_to_postgres(df):
    dtype_dict = {
      'i': 'integer',
      'O': 'text',
      'f': 'real',
      'b': 'boolean',
      'datetime64[ns]': 'timestamp',
      'datetime64[ns, UTC]': 'timestampz',
    }
    column_list = []
    column_dict = {}
    for k, v in df.dtypes.items():
        column_dict['name'] = k
        column_dict['dtype'] = dtype_dict.get(v.kind, 'text')
        column_list.append(column_dict.copy())
    return column_list


def generate_create_table(df, schema, table, table_type, columns, constraint, unique_columns):
    """ Returns a dictionary of coefs from training """
    query = Template(
        template
    ).render(
        schema_name=schema,
        table_name=table,
        table_type=table_type,
        columns=columns,
        constraint=constraint,
        constraint_columns=unique_columns
    )
    print(query)
python sqlalchemy yaml
2个回答
5
投票

今天发布的 SQLAthanor (v.0.3.0) 正是支持这一点。使用 SQLAthanor,您可以使用以下代码以编程方式生成 SQLAlchemy

Table
对象(假设
metadata
包含您的 SQLAlchemy
MetaData
对象):

from sqlathanor import Table

my_table = Table.from_yaml('yaml_file.yaml', 
                           'my_table_name', 
                           metadata, 
                           primary_key = 'id')

预计到达时间: 请注意,您还可以使用

Table
Table.from_json()
Table.from_dict()
创建
Table.from_csv()
对象。

这里是有关其工作原理的文档(通常):https://sqlathanor.readthedocs.io/en/latest/using.html#generate-sqlalchemy-tables-from-serialized-data

这里是特定

Table.from_yaml()
方法的文档链接:https://sqlathanor.readthedocs.io/en/latest/api.html#sqlathanor.schema.Table.from_yaml

(我建议查看方法文档 - 它涉及以编程方式从序列化数据构造

Table
对象的一些“陷阱”)


预计到达时间:

基本上,编程式

Table
生成的工作方式是 SQLAthanor:

  1. 首先将序列化字符串(或文件)转换为Python

    dict
    。对于 YAML,默认的反序列化器是 PyYAML。对于 JSON,默认的反序列化器是 simplejson (两个默认的反序列化器都可以使用
    deserialize_function
    参数覆盖)。

  2. 生成 Python

    dict
    后,SQLAthanor 会读取该
    dict
    中的每个键来确定列名称。它读取每个键的值,并根据值的数据类型尝试“猜测”SQLAlchemy 数据类型。

  3. 根据第 2 步中的发现,它会创建一个带有

    Table
    对象的
    Column
    对象,其中每个
    Column
    对象对应于反序列化
    dict
    中的一个键。

如果您需要更精确地控制每个

Column
,您可以:

  • 使用
    type_mapping
    参数覆盖其 SQLAlchemy 数据类型(
    type_mapping
    接收
    dict
    ,其中顶级键对应于列名称,每个值都是应用于
    Column
    的数据类型)
  • 使用
    Column
    参数将其他关键字参数传递给
    column_kwargs
    构造函数(
    column_kwargs
    接收一个
    dict
    ,其中顶级键对应于列名称,每个值都是带有关键字参数的
    dict
    )提供给该列的构造函数。

默认情况下,

Table.from_<format>()
支持嵌套数据结构。默认情况下,
skip_nested
设置为
True
,这意味着反序列化
dict
中包含嵌套对象(可迭代对象或
dict
)的键将被跳过(即不会接收相应的
Column
)。如果您的
Table
需要存储嵌套数据,您可以将
skip_nested
设置为
False
并将
default_to_str
激活为
True
。这会将嵌套数据(可迭代或
dict
对象)转换为字符串,从而将它们保留在
Text
列中(除非被
type_mapping
覆盖)。


Table.from_dict()
示例

以下是一个示例

dict
,可以提供给
Table.from_dict()

sample_dict = {
    'id': 123,
    'some_column_name': 'Some Column Value',
    'created_datetime': datetime.utcnow()
}

my_table = Table.from_dict(sample_dict, 
                           'my_table', 
                           metadata, 
                           primary_key = 'id')

当提供给

Table.from_dict()
时,此
dict
将生成一个
Table
对象,其数据库表名称为
my_table
,其中包含三列:

  • id
    其类型
    Integer
    设置为表的主键
  • some_column_name
    其类型为
    Text
  • created_datetime
    其类型为
    DateTime

Table.from_yaml()
示例

以下是相同的示例,但使用 YAML 字符串/文档来代替,可以提供给

Table.from_yaml()
:

sample_yaml = """
    id: 123
    some_column_name: Test Value
    created_timestamp: 2018-01-01T01:23:45.67890
"""

my_table = Table.from_yaml(sample_yaml, 
                           'my_table', 
                           metadata, 
                           primary_key = 'id')

当提供给

Table.from_yaml()
时,这将首先将
sample_yaml
反序列化为
dict
,就像前面的示例一样,然后生成一个包含三列的数据库表名称为
Table
my_table
对象:

  • id
    其类型
    Integer
    设置为表的主键
  • some_column_name
    其类型为
    Text
  • created_datetime
    其类型为
    DateTime

希望这有帮助!


0
投票

如果您像我一样来到这里,因为您只想从 JSON 或 dict 创建多个表结构,而不一定向其附加任何数据。

我使用sqlalchemy实现了它。

from sqlalchemy import create_engine, MetaData, Table, Column, String

# Example 'diff_tables' dictionary structure
diff_tables = {
    "table_name1": ["column1", "column2"],
    "table_name2": ["column1", "column2", "column3"],
}

conn = create_engine("postgresql://{username}:{password}@{host}:{port}/{dbname}")

metadata = MetaData(schema='schema_name')

for table_name, columns in diff_tables.items():
    table_columns = [Column(column_name, String) for column_name in columns]
    table = Table(table_name, metadata, *table_columns)
    table.create(conn)

print("Tables created successfully!")
© www.soinside.com 2019 - 2024. All rights reserved.