未使用 asyncio/aiohttp 完成响应负载

问题描述 投票:0回答:2

我编写了一个

Python 3.7
脚本,该脚本异步创建
(asyncio 3.4.3 and aiohttp 3.5.4)
批量 API
Salesforce
使用由单个
(v45.0)
语句查询的多个对象的作业/批处理,在完成后等待批次完成将结果下载(流式传输)到服务器,进行一些数据转换,然后最终将结果同步上传到
SOQL
。我对此进行了多次成功的试运行,并认为它工作完美,但是,我最近开始间歇性地收到以下错误,并且我对如何修复感到困惑,因为这方面的报告/解决方案非常少在网络上:

aiohttp.client_exceptions.ClientPayloadError:响应负载不是 已完成

示例代码片段:

SQL Server 2016 SP1 (13.0.4560.0)

回溯(错误行与上面的代码片段不匹配):

回溯(最近一次调用最后一次):

文件“C:\Code\salesforce.py”,第 252 行,位于 asyncio.run(asyncDownload())

文件“C:\Program Files\Python37\libsyncio unners.py”,第 43 行, 跑步 返回loop.run_until_complete(main)

文件“C:\Program Files\Python37\libsyncio ase_events.py”,行 584,在 run_until_complete 中 返回 future.result()

文件“C:\Code\salesforce.py”,第 241 行,asyncDownload 等待 asyncio.gather(*[retrieveResults(objectDictionary[sfObject]['job']['id'], objectDictionary[sfObject]['job']['batch']['id'], sfObject) for objectDictionary 中的 sfObject])

文件“C:\Code\salesforce.py”,第 183 行,位于 检索结果 块 = 等待 r.content.read(81920)

文件“C:\程序 Files\Python37\lib\site-packages iohttp\streams.py”,第 369 行,位于 读 等待 self._wait('read')

文件“C:\程序 Files\Python37\lib\site-packages iohttp\streams.py”,第 297 行,位于 _等待 等待服务员

aiohttp.client_exceptions.ClientPayloadError:响应负载不是 已完成

问题的根源似乎始于
import asyncio,aiohttp,aiofiles from simple_salesforce import Salesforce from xml.etree import ElementTree #Establish a session using the simple_salesforce module sf = Salesforce(username=username, password=password, security_token=securityToken, organizationId=organizationId) sfAPIURL = 'https://myinstance.salesforce.com/services/async/45.0/job/' sfDataPath = 'C:/Salesforce/Data/' #Dictionary to store information for the object/job/batch while the script is executing objectDictionary = {'Account': {'job': {'batch': {'id': '8596P00000ihwpJulI','results': ['8596V00000Bo9iU'],'state': 'Completed'}, 'id': '8752R00000iUjtReqS'}, 'soql': 'select Id,Name from Account'}, 'Contact': {'job': {'batch': {'id': '9874G00000iJnBbVgg','results': ['7410t00000Ao9vp'],'state': 'Completed'}, 'id': '8800o00000POIkLlLa'}, 'soql': 'select Id,Name from Contact'}} async def retrieveResults(jobId, batchId, sfObject): headers = {"X-SFDC-Session": sf.session_id, 'Content-Encoding': 'gzip'} async with aiohttp.ClientSession() as session: async with session.get(url=f'{sfAPIURL}{jobId}/batch/{batchId}/result', headers=headers) as r: data = await r.text() batchResults = ElementTree.fromstring(data) #list of batch results for resultID in batchResults: async with session.get(url=f'{sfAPIURL}{jobId}/batch/{batchId}/result/{resultID.text}', headers=headers, timeout=None) as r: async with aiofiles.open(f'{sfDataPath}{sfObject}_TEMP_JOB_{jobId}_BATCH_{batchId}_RESULT_{resultID.text}.csv', 'wb') as outfile: #save in temporary file for manipulation later while True: chunk = await r.content.read(81920) if not chunk: break await outfile.write(chunk) async def asyncDownload(): await asyncio.gather(*[retrieveResults(objectDictionary[sfObject]['job']['id'], objectDictionary[sfObject]['job']['batch']['id'], sfObject) for sfObject in objectDictionary]) if __name__ == "__main__": asyncio.run(asyncDownload())

,它应该以 81920 字节块的形式传输数据,但这就是我所能得到的。

我不认为这对我来说是一个网络问题,因为还有其他小作业连接到该服务器上的外部源,这些作业在该作业运行时顺利完成。有人知道这是怎么回事吗?

谢谢!

-编辑:

我尝试过

r.content.read(81920)

而不是

iter_any()
,但仍然遇到相同的错误...
read()

我已经尝试过
async for data in r.content.iter_any(): await outfile.write(data)

但仍然遇到相同的错误...

readline()

此后,我在代码的错误处理部分(不包含在原始问题中)中使用了一些重试功能,这最终允许作业完成。有效负载错误仍然发生,这仍然是主要问题,但重试下载是一个成功的解决方法。如果有人能够提供更多信息,问题仍然存在。

python python-3.x asynchronous salesforce aiohttp
2个回答
0
投票

async for line in r.content.readline(): await outfile.write(line)



0
投票

await asyncio.gather(*tasks) # 像 asyncio.ensure_future() 这样的任务

解决方案,修复构建环境:

... while True: chunk = await r.content.read(81920) await asyncio.sleep(0) if not chunk: break await outfile.write(chunk) ...

FROM amazonlinux:2 AS

我猜问题是 .so 文件或较低级别的文件(由库内部使用来管理协程)与 lambda 环境不兼容。因此,构建正确的 docker 基础就可以解决问题。

© www.soinside.com 2019 - 2024. All rights reserved.