我正在开发一个 FastApi,它应该根据 JSON 格式的请求进行一些计算,然后发送响应并将其存储在多个
Databrick
目录表中。
因此,在 API 中,我转换响应并创建表格
我正在努力解决的是我应该连接到的正确的
databrick
s API 端点是什么?
从下面的代码中可以看到,我定义了:
url = f"{self.databricks_host}/api/2.0/sql/createTable"
但它不起作用。
def send_to_dtb_catalog(self, df, table_name):
# doing some stuff here ....
# Prepare data payload for Databricks API
data = {
"tableName": f"my_database.my_schema.{table_name}",
"data": df_json
}
# Make HTTP request to Databricks REST API
# suppose databricks_host and databricks_token are pre-defined
url = f"{self.databricks_host}/api/2.0/sql/createTable"
headers = {
"Authorization": f"Bearer {self.databricks_token}",
"Content-Type": "application/json"
}
response = requests.post(url, headers=headers, json=data)
然后我将使用
send_to_dtb_catalog
将创建的表发送到 Databricks 目录表,类似这样
self.send_to_dtb_catalog(table1_df, "table1_databricks")
self.send_to_dtb_catalog(table2_df, "table2_databricks")
感谢任何帮助,因为我对 Databricks 和 API 开发都是新手。
您可以使用以下API来执行SQL语句。
像下面这样改变你的功能。
代码:
import requests
def send_to_dtb_catalog(df, table_name):
url = f"{databricks_host}/api/2.0/sql/statements/"
headers = {
"Authorization": f"Bearer {databricks_token}",
"Content-Type": "application/json"
}
sql_q = f'''
CREATE TABLE IF NOT EXISTS {table_name} (
id INT,
name STRING
)
'''
body = {
"warehouse_id": "a415c87c62c279a5",
"statement": sql_q,
"wait_timeout": "30s",
"on_wait_timeout": "CANCEL"
}
response = requests.post(url, headers=headers, json=body)
if response.json()['status']['state'] == 'SUCCEEDED':
print("Inserting values....")
t = df.rdd.map(lambda row: tuple(row)).collect()
insert_query = f'''
INSERT INTO {table_name}
VALUES
{','.join(map(str, t))}
'''
body['statement'] = insert_query
res2 = requests.post(url, headers=headers, json=body)
return res2
接下来,调用您的函数。
输出:
API请求的输出:
另一种方法是使用驱动程序连接到 Databricks。
请参阅 this 了解如何连接到服务器并执行查询。