从REST API检索到Databrick数据存储的数据

问题描述 投票:0回答:1

新手火花问题。我正在尝试从REST API读取数据,该API通过分页返回数据。要检索数据,我会拨打相同的API说5次。我想在databrick表中保存该数据。 API的结果是在json中。想法是每天进行API调用并保存按日期分区的数据。所有示例都会导致单个API调用。

有什么指针吗?谢谢

pyspark databricks
1个回答
0
投票

我正在做与Google Api类似的事情。最好的Apis将通过返回分页令牌来处理分页 - 例如你要求获得第一个X记录并且它给出一个令牌以显示它有更多的记录,所以你继续循环直到没有更多的记录。

在Spark方面,您可以使用Row继续将返回数据添加到List中,然后最终将其转换为Dataframe(之后您可以在数据库中实现所有常见的优点:(使用Google Api Python Sdk的Python代码,缩写为大小)但你shuold得到的想法:)

from pyspark.sql import *

..
credentials = service_account.Credentials.from_service_account_info(SERVICE_ACCOUNT_FILE, scopes=SCOPES, subject=IMPERSONATED_USER)
service = build('admin', 'reports_v1', credentials=credentials)   #https://developers.google.com/api-client-library/python/start/get_started#build-the-service-object

# https://stackoverflow.com/questions/29903125/google-reporting-api-customer-usage-report 
# https://developers.google.com/resources/api-libraries/documentation/admin/reports_v1/python/latest/admin_reports_v1.userUsageReport.html
# https://developers.google.com/api-client-library/python/start/get_started#build-the-service-object
print('Hitting Google Admin Reports - User Usage Api')
request = service.userUsageReport().get(userKey=keyUserFilter, date=keyDateFilter, filters=fieldFilter, maxResults=pageSize, parameters=fieldSelect)

rows = []
# get all pages until there are none left
while request != None:
  response = request.execute()
  rows.append(Row(id=ur.get("id"), item=response.get("item")))  
  request = service.userUsageReport().get_next(request, response)

print("end of loop")
df1 = spark.createDataFrame(rows)

display(df1)
© www.soinside.com 2019 - 2024. All rights reserved.