我正在使用 plpython3u 扩展开发一个 PostgreSQL 过程,它允许您在 PostgreSQL 中使用 python 过程语言。
通过使用 plpy 的以下代码,我可以检索数据表单并将其放入 pandas 数据框中。
CREATE OR REPLACE PROCEDURE public.plpy_proc_clas_full(
)
LANGUAGE 'plpython3u'
AS $BODY$
import pandas as pd
data_lt = plpy.execute('SELECT "key", "value" FROM public."<your-table>" ORDER BY "key"'); #PLyResult --> List or Dictionary
data_df_x = pd.DataFrame.from_records(data_lt)['key'];
data_df_y = pd.DataFrame.from_records(data_lt)['value'];
df = pd.concat([data_df_x, data_df_y], axis=1).values
return df;
$BODY$;
但是如何将 pandas 数据帧写回到表中(例如在 python 中进行一些数据操作之后)?
文档在一个相当意想不到的部分显示了一个关于捕获错误的示例:
CREATE FUNCTION insert_fraction(numerator int, denominator int) RETURNS text AS $$
from plpy import spiexceptions
try:
plan = plpy.prepare("INSERT INTO fractions (frac) VALUES ($1 / $2)", ["int", "int"])
plpy.execute(plan, [numerator, denominator])
except spiexceptions.DivisionByZero:
return "denominator cannot equal zero"
except spiexceptions.UniqueViolation:
return "already have that fraction"
except plpy.SPIError as e:
return "other error, SQLSTATE %s" % e.sqlstate
else:
return "fraction inserted"
$$ LANGUAGE plpython3u;
这表明您需要将数据帧映射回记录,并将其绑定到
$1, $2
d plpy.execute
语句中的占位符 (INSERT...VALUES...
)。
与所有基本的数据库交互一样,您还可以基于该数据帧动态构造带有内联文字值的文字静态插入语句的全文,然后
plpy.execute
。虽然简单直观,但它会更慢(重写、重新格式化可以直接传递并自动处理的数据)并且不太健壮(例如,没有自动引用或卫生)。
一段时间后,我阅读了一些文章,其中使用 psycopg2 或 plpy.connect 等库来打开与目标数据库的连接。
就我个人而言,我没有找到任何理由在同一运行时/数据库中操作数据(输入和输出)时重新打开连接。
我想到了一种解决方案,可以完成这项工作,尽管不确定处理大型表时的性能,因为使用了数据帧的迭代,这种情况下有多个插入语句,更具体的是每行插入一个数据帧,这不是批量的。
CREATE OR REPLACE PROCEDURE public.plpy_proc_test(
)
LANGUAGE 'plpython3u'
AS $BODY$
import pandas as pd
data_lt = plpy.execute('SELECT "key", "value" FROM public."ml_train" ORDER BY "key"'); #PLyResult --> List or Dictionary
data_df_x = pd.DataFrame.from_records(data_lt)['key'];
data_df_y = pd.DataFrame.from_records(data_lt)['value'];
df = pd.concat([data_df_x, data_df_y], axis=1)
for index, row in df.iterrows():
plan = plpy.prepare('INSERT INTO test (y_hat, y_act) VALUES ($1, $2)', ['numeric', 'numeric'])
plpy.execute(plan, [row[0], row[1]])
$BODY$;