使用 FastAPI 在 Databricks 中创建表 - Python 代码

问题描述 投票:0回答:1

我正在开发一个 FastApi,它应该根据 JSON 格式的请求进行一些计算,然后发送响应并将其存储在多个

Databrick
目录表中。

因此,在 API 中,我转换响应并创建表格

我正在努力解决的是我应该连接到的正确的

databrick
s API 端点是什么?

从下面的代码中可以看到,我定义了:

url = f"{self.databricks_host}/api/2.0/sql/createTable"

但它不起作用。

def send_to_dtb_catalog(self, df, table_name):
        # doing some stuff here ....

        # Prepare data payload for Databricks API
        data = {
            "tableName": f"my_database.my_schema.{table_name}",
            "data": df_json
        }   

        # Make HTTP request to Databricks REST API
        # suppose databricks_host and databricks_token are pre-defined 
        url = f"{self.databricks_host}/api/2.0/sql/createTable"

        headers = {
            "Authorization": f"Bearer {self.databricks_token}",
            "Content-Type": "application/json"
        }

        response = requests.post(url, headers=headers, json=data)

然后我将使用

send_to_dtb_catalog
将创建的表发送到 Databricks 目录表,类似这样

self.send_to_dtb_catalog(table1_df, "table1_databricks")
self.send_to_dtb_catalog(table2_df, "table2_databricks")

感谢任何帮助,因为我对 Databricks 和 API 开发都是新手。

python databricks fastapi azure-databricks databricks-unity-catalog
1个回答
0
投票

您可以使用以下API来执行SQL语句。

执行SQL语句

像下面这样改变你的功能。

代码:

import requests

def send_to_dtb_catalog(df, table_name):
    url = f"{databricks_host}/api/2.0/sql/statements/"

    headers = {
        "Authorization": f"Bearer {databricks_token}",
        "Content-Type": "application/json"
    }
    sql_q = f'''
        CREATE TABLE IF NOT EXISTS {table_name} (
        id INT,
        name STRING
        )
    '''

    body = {
        "warehouse_id": "a415c87c62c279a5",
        "statement": sql_q,
        "wait_timeout": "30s",
        "on_wait_timeout": "CANCEL"
    }
    response = requests.post(url, headers=headers, json=body)
    if response.json()['status']['state'] == 'SUCCEEDED':
        print("Inserting values....")
    
        t = df.rdd.map(lambda row: tuple(row)).collect()
        insert_query = f'''
        INSERT INTO {table_name}
        VALUES
        {','.join(map(str, t))}
        '''
    
        body['statement'] = insert_query

        res2 = requests.post(url, headers=headers, json=body)
    return res2

接下来,调用您的函数。

输出:

enter image description here

API请求的输出:

enter image description here

另一种方法是使用驱动程序连接到 Databricks。

请参阅 this 了解如何连接到服务器并执行查询。

© www.soinside.com 2019 - 2024. All rights reserved.