如何查询并高效存储5000万条REST API记录来构建数据湖

问题描述 投票:0回答:1

问题用例很简单。

我想通过 REST API 从 Salesforce 查询超过 50M 条记录。从多个来源收集数据并将其融合在一起是工作的一部分,这些数据可以输入到机器学习模型中。 我想将这些数据存储到数据库或镶木地板文件中,以便我可以执行后续步骤。

现在,Salesforce 对一个对象的如此大的数据大小进行 REST API 查询,将返回类似的内容

 {"totalsize":50000000, "done":false, 
"nextrecordUrl":"/services/data/v49.0/a1000xxxxxx-2000",
 "records":[here the data of query remains of the 2000 batch that was returned]}

nextRecordUrl 简单给出了查询下一批 od 2000 的 url(salesforce 可以返回的最大批量大小,但是对此数字没有保证)。 所以我简单地运行一个循环,直到不再有 nextRecordUrl 来获取整组数据。

当前的方法如下所示:

    read the data = query(api)
    while "done" is not True:
          read the "records" part of the json 
          load it into Database ( using psycopg2 )
          again read data = query( nextRecordUrl )
          read value of "done"

我的问题是,如何加快此操作? 考虑到即使每个循环需要 2 秒才能完成,这最多仍需要 16-17 小时,并且我们可能需要每两周运行一次,以保持应用程序 ( salesforce ) 中反映的最新数据,更不用说数据库了超时错误或我必须处理的内存错误。

有什么建议吗?

编辑: 目前的方法是这样的:

sf = salesforce () # simple_salesforce library connection
df = pandas dataframe
sqlalc_engine = sqlalchemy engine
# query the first batch here then this loop start
while True:
            if num_batches == 1:
                current_df = df
            else:
                current_df = pd.DataFrame(lstRecords)
                #print(current_df.columns)
                current_df = current_df.drop(['attributes'], axis=1)

            print(f"next record URL is :{nextRecordsUrl}, || batch no is : {num_batches}")
            print(f"Loading batch : {num_batches} | records count : {current_df.shape[0]}")

            #this line i am using to insert records into DB
            current_df.to_sql(table_name, con=sqlalc_engine, schema='', if_exists='append', index=False)

            print(f"{num_batches} is loaded to DB")
            num_batches+=1

            if completed:
                #no more records
                break

            # get next batch of records
            records = sf.query_more(nextRecordsUrl, identifier_is_url=True)
            lstRecords = records.get('records')
            nextRecordsUrl = records.get('nextRecordsUrl')

            # set running check for next loop
            completed = records.get('done')
python database rest salesforce psycopg2
1个回答
0
投票

在查询中添加 WHERE 子句,以免每次都获取所有内容。

SELECT Id FROM Account WHERE LastModifiedDate >= 2024-02-25T01:23:45T
(使用上次成功作业的开始时间戳。开始,而不是结束,以确保没有间隙!)。或者有特殊的 not-really-constants 让你写
WHERE LastModifiedDate >= LAST_WEEK

这应该会大大减少你的时间。数据多久更改一次?说5%?是否有任何其他过滤器可以应用,或者您确实需要一切。

另一件事是您正在使用同步 API(发送请求、等待、处理结果、发送下一个请求...)。有一个异步版本可能更适合:Bulk API 2.0。您发出请求并定期询问“完成了吗”。 SF 开始生成每个最多 10K 行的结果文件,您甚至可以在多个线程中提取它们(假设应用程序的其余部分可以很好地处理多线程,可能下一个瓶颈将是本地数据库中的锁定?)。

如果你真的想手工制作请求,有一些资源:https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/asynch_api_code_curl_walkthrough.htm但我会想要使用

simple salesforce
来完成繁重的工作吗?

另一个好处是批量 API 将允许您使用 PK 分块 进一步有效地将结果切割成文件。这意味着,如果给定的 100-250K 记录“块”中只有 5 个满足您的查询条件,则该块的结果文件将只有 5 个,而不是您指定为批量大小的 10K。但是嘿,生成速度会更快。

© www.soinside.com 2019 - 2024. All rights reserved.