有没有办法从 yaml 文件中指定的字典创建动态表?我在 yaml 文件中定义了很多 ETL 配置,所以我很好奇是否也可以向其中添加表创建方面,这样我就不必在单独的目录中修改单独的 .sql 文件。
database:
table: 'schema.fact_stuff'
create_columns: [
{}
] #not sure how this section should be
我在 stackoverflow 上找到了一个解决方案,它将一些列表压缩在一起,这与类似的东西类似,但我更愿意明确定义每一列。
{'column_name': 'id', 'column_type': Integer, 'primary_key': False, 'nullable': True}
我最终让它与这个一起工作:
from sqlalchemy.types import (Integer, NUMERIC, TEXT, BOOLEAN, TIMESTAMP, DATE)
sql_types = {'integer': Integer,
'numeric': NUMERIC,
'text': TEXT,
'date': DATE,
'timestamp': TIMESTAMP(timezone=False),
'timestamptz': TIMESTAMP(timezone=True)}
exclude_list = ['original_name']
table_dict = [{k: v for k, v in d.items() if k not in exclude_list} for d in c[variable]['load']['columns']]
for column in table_dict:
for key, val in column.copy().items():
if key == 'type_':
column[key] = sql_types[val]
elif key == 'default':
column[key] = dt.datetime.utcnow
metadata = sa.MetaData(schema=c[variable]['load']['schema'])
metadata.reflect(bind=engine, autoload=True)
fact = sa.Table(c[variable]['load']['table'], metadata, extend_existing=True,
*(sa.Column(**kwargs) for kwargs in table_dict))
fact.create_all(engine, checkfirst=True)
但后来我转而让 pandas 确定数据类型,而不是在 yaml 文件中定义它们。这将使用 jinja2 模板创建 sql,然后循环遍历所有数据源以创建 DDL。
def pandas_to_postgres(df):
dtype_dict = {
'i': 'integer',
'O': 'text',
'f': 'real',
'b': 'boolean',
'datetime64[ns]': 'timestamp',
'datetime64[ns, UTC]': 'timestampz',
}
column_list = []
column_dict = {}
for k, v in df.dtypes.items():
column_dict['name'] = k
column_dict['dtype'] = dtype_dict.get(v.kind, 'text')
column_list.append(column_dict.copy())
return column_list
def generate_create_table(df, schema, table, table_type, columns, constraint, unique_columns):
""" Returns a dictionary of coefs from training """
query = Template(
template
).render(
schema_name=schema,
table_name=table,
table_type=table_type,
columns=columns,
constraint=constraint,
constraint_columns=unique_columns
)
print(query)
今天发布的 SQLAthanor (v.0.3.0) 正是支持这一点。使用 SQLAthanor,您可以使用以下代码以编程方式生成 SQLAlchemy
Table
对象(假设 metadata
包含您的 SQLAlchemy MetaData
对象):
from sqlathanor import Table
my_table = Table.from_yaml('yaml_file.yaml',
'my_table_name',
metadata,
primary_key = 'id')
预计到达时间: 请注意,您还可以使用
Table
、Table.from_json()
和 Table.from_dict()
创建 Table.from_csv()
对象。
这里是有关其工作原理的文档(通常):https://sqlathanor.readthedocs.io/en/latest/using.html#generate-sqlalchemy-tables-from-serialized-data
这里是特定
Table.from_yaml()
方法的文档链接:https://sqlathanor.readthedocs.io/en/latest/api.html#sqlathanor.schema.Table.from_yaml
(我建议查看方法文档 - 它涉及以编程方式从序列化数据构造
Table
对象的一些“陷阱”)
预计到达时间:
基本上,编程式
Table
生成的工作方式是 SQLAthanor:
首先将序列化字符串(或文件)转换为Python
dict
。对于 YAML,默认的反序列化器是 PyYAML。对于 JSON,默认的反序列化器是 simplejson (两个默认的反序列化器都可以使用 deserialize_function
参数覆盖)。生成 Python
dict
后,SQLAthanor 会读取该 dict
中的每个键来确定列名称。它读取每个键的值,并根据值的数据类型尝试“猜测”SQLAlchemy 数据类型。根据第 2 步中的发现,它会创建一个带有
Table
对象的 Column
对象,其中每个 Column
对象对应于反序列化 dict
中的一个键。如果您需要更精确地控制每个
Column
,您可以:
type_mapping
参数覆盖其 SQLAlchemy 数据类型(type_mapping
接收 dict
,其中顶级键对应于列名称,每个值都是应用于 Column
的数据类型)Column
参数将其他关键字参数传递给 column_kwargs
构造函数(column_kwargs
接收一个 dict
,其中顶级键对应于列名称,每个值都是带有关键字参数的 dict
)提供给该列的构造函数。默认情况下,
Table.from_<format>()
不支持嵌套数据结构。默认情况下, skip_nested
设置为 True
,这意味着反序列化 dict
中包含嵌套对象(可迭代对象或 dict
)的键将被跳过(即不会接收相应的 Column
)。如果您的Table
需要存储嵌套数据,您可以将skip_nested
设置为False
并将default_to_str
激活为True
。这会将嵌套数据(可迭代或 dict
对象)转换为字符串,从而将它们保留在 Text
列中(除非被 type_mapping
覆盖)。
Table.from_dict()
示例
以下是一个示例
dict
,可以提供给Table.from_dict()
:
sample_dict = {
'id': 123,
'some_column_name': 'Some Column Value',
'created_datetime': datetime.utcnow()
}
my_table = Table.from_dict(sample_dict,
'my_table',
metadata,
primary_key = 'id')
当提供给
Table.from_dict()
时,此 dict
将生成一个 Table
对象,其数据库表名称为 my_table
,其中包含三列:
id
其类型 Integer
设置为表的主键some_column_name
其类型为 Text
created_datetime
其类型为 DateTime
Table.from_yaml()
示例
以下是相同的示例,但使用 YAML 字符串/文档来代替,可以提供给
Table.from_yaml()
:
sample_yaml = """
id: 123
some_column_name: Test Value
created_timestamp: 2018-01-01T01:23:45.67890
"""
my_table = Table.from_yaml(sample_yaml,
'my_table',
metadata,
primary_key = 'id')
当提供给
Table.from_yaml()
时,这将首先将 sample_yaml
反序列化为 dict
,就像前面的示例一样,然后生成一个包含三列的数据库表名称为 Table
的 my_table
对象:
id
其类型 Integer
设置为表的主键some_column_name
其类型为 Text
created_datetime
其类型为 DateTime
希望这有帮助!
如果您像我一样来到这里,因为您只想从 JSON 或 dict 创建多个表结构,而不一定向其附加任何数据。
我使用sqlalchemy实现了它。
from sqlalchemy import create_engine, MetaData, Table, Column, String
# Example 'diff_tables' dictionary structure
diff_tables = {
"table_name1": ["column1", "column2"],
"table_name2": ["column1", "column2", "column3"],
}
conn = create_engine("postgresql://{username}:{password}@{host}:{port}/{dbname}")
metadata = MetaData(schema='schema_name')
for table_name, columns in diff_tables.items():
table_columns = [Column(column_name, String) for column_name in columns]
table = Table(table_name, metadata, *table_columns)
table.create(conn)
print("Tables created successfully!")