这是我在这个论坛上的第一个问题。我真的是 Python nad PycURL 模块的初学者。
以下代码由该小组提供。 但我想我错过了一些东西。
import pycurl
import io
import json
import time
curl_obj = pycurl.Curl()
b_obj= io.BytesIO()
sddcMurl="https://sddc-mgmt.rainpole.dev/v1/credentials/tasks/fc8dc930-f762-45b9-840e-96bf187dca40"
headers=[
'Authorization: Bearer eyJhbGciOiJIUzI1NiJ9.eyJqdGkiOiI1ZjlhOWM0YS03ZjYzLTRkNDQtODI4Mi1hYjQ3ODNmNzYyYTQiLCJpYXQiOjE2OTU4Mjg4NjYsInN1YiI6ImFkbWluaXN0cmF0b3JAdnNwaGVyZS5sb2NhbCIsImlzcyI6InZjZi1hdXRoIiwiYXVkIjoic2RkYy1zZXJ2aWNlcyIsIm5iZiI6MTY5NTgyODg2NiwiZXhwIjoxNjk1ODMyNDY2LCJ1c2VyIjoiYWRtaW5pc3RyYXRvckB2c3BoZXJlLmxvY2FsIiwibmFtZSI6ImFkbWluaXN0cmF0b3JAdnNwaGVyZS5sb2NhbCIsInNjb3BlIjpbIlJFU09VUkNFX0ZVTkNUSU9OQUxJVFlfV1JJVEUiLCJMSUNFTlNJTkdfSU5GT19SRUFEIiwiU0REQ19GRURFUkFUSU9OX1dSSVRFIiwiQVZOX1dSSVRFIiwiU0REQ19NQU5BR0VSX1JFQUQiLCJDRVJUX1dSSVRFIiwiQ09NUE9TQUJJTElUWV9XUklURSIsIkxJQ0VOU0VfS0VZX1JFQUQiLCJDT01QT1NBQklMSVRZX1JFQUQiLCJFREdFX0NMVVNURVJfV1JJVEUiLCJVU0VSX1JFQUQiLCJDUkVERU5USUFMX1dSSVRFIiwiQkFDS1VQX0NPTkZJR19SRUFEIiwiQ0xVU1RFUl9XUklURSIsIkFWTl9SRUFEIiwiVkFTQV9QUk9WSURFUl9SRUFEIiwiRE9NQUlOX1dSSVRFIiwiQ0VJUF9SRUFEIiwiU09TX1dSSVRFIiwiU0REQ19NQU5BR0VSX1dSSVRFIiwiTlRQX1dSSVRFIiwiVEFHX1dSSVRFIiwiREVQT1RfQ09ORklHX1dSSVRFIiwiU1lTVEVNX1JFQUQiLCJERVBPVF9DT05GSUdfUkVBRCIsIkhPU1RfV1JJVEUiLCJSRVNPVVJDRV9MT0NLX1dSSVRFIiwiQkFDS1VQX1JFU1RPUkVfUkVBRCIsIkNFUlRfUkVBRCIsIlVTRVJfV1JJVEUiLCJVUEdSQURFX1JFQUQiLCJPVEhFUl9SRUFEIiwiU09TX1JFQUQiLCJTRUNVUklUWV9DT05GSUdfUkVBRCIsIkNSRURFTlRJQUxfUkVBRCIsIkhPU1RfUkVBRCIsIkNFSVBfV1JJVEUiLCJSRVNPVVJDRV9MT0NLX1JFQUQiLCJPVEhFUl9XUklURSIsIkxJQ0VOU0VfS0VZX1dSSVRFIiwiUkVTT1VSQ0VfRlVOQ1RJT05BTElUWV9SRUFEIiwiQ0FfUkVBRCIsIlRBR19SRUFEIiwiTkVUV09SS19QT09MX1dSSVRFIiwiV0NQX1JFQUQiLCJMSUNFTlNJTkdfSU5GT19XUklURSIsIkJBQ0tVUF9SRVNUT1JFX1dSSVRFIiwiTlRQX1JFQUQiLCJFREdFX0NMVVNURVJfUkVBRCIsIkJBQ0tVUF9DT05GSUdfV1JJVEUiLCJXQ1BfV1JJVEUiLCJTRVJWSUNFX0FDQ09VTlRfV1JJVEUiLCJORVRXT1JLX1BPT0xfUkVBRCIsIkNBX1dSSVRFIiwiQ0xVU1RFUl9SRUFEIiwiVkFTQV9QUk9WSURFUl9XUklURSIsIkROU19XUklURSIsIlZSU0xDTV9XUklURSIsIkROU19SRUFEIiwiU0VSVklDRV9BQ0NPVU5UX1JFQUQiLCJTRERDX0ZFREVSQVRJT05fUkVBRCIsIkRPTUFJTl9SRUFEIiwiVlJTTENNX1JFQUQiLCJVUEdSQURFX1dSSVRFIl0sInJvbGUiOlsiQURNSU4iXX0.y0VpHMF4xkFhZZTsckn9nG-QF6tZtn96JAhRjacHP40'
]
curl_obj.setopt(curl_obj.URL, sddcMurl)
curl_obj.setopt(curl_obj.WRITEDATA, b_obj)
curl_obj.setopt(curl_obj.SSL_VERIFYHOST, False)
curl_obj.setopt(curl_obj.SSL_VERIFYPEER, False)
curl_obj.setopt(curl_obj.HTTPHEADER, headers)
curl_obj.perform()
response = b_obj.getvalue()
jsonobj=json.loads(response)
apicallstatus=jsonobj['status']
print(apicallstatus)
startTime = time.time()
if apicallstatus != 'FAILED':
while apicallstatus == 'IN_PROGRESS':
response = b_obj.getvalue()
jsonobj=json.loads(response)
apicallstatus=jsonobj['status']
print(apicallstatus)
print(f"print subtask {jsonobj['subTasks'][0]['status']}")
print('sleeping for 5 seconds')
time.sleep(5)
else:
print(f"Job has failed because of {jsonobj['errors'][0]['errorCode']} and {jsonobj['errors'][0]['message']}")
curl_obj.close()
b_obj.close()
代码用途:跟踪API调用的状态。 有 API 调用来更新密码。但上面只是跟踪任务的进度。 这段代码可以工作,但我想打电话给
apicallstatus
,直到它成功为止。问题是,由于它在循环内部,因此它不会出去检查状态。我尝试将整个代码放入 While 循环中,但失败了
下面是有效的代码,但我并不为此感到自豪。还有很大的进步空间。
import pycurl
import io
import json
import time
curl_obj = pycurl.Curl()
b_obj= io.BytesIO()
sddcMurl="https://sddc-mgmt.rainpole.dev/v1/credentials/tasks/04cf03e4-370f-4c55-a3ea-cfe961b52471"
headers=[
'Authorization: Bearer eyJhbGciOiJIUzI1NiJ9.eyJqdGkiOiJmNTc1NzhkNC0yMGRkLTQyZGYtYjA0ZC04OGZlNDQ3ZDA0ZDMiLCJpYXQiOjE2OTU4MzU1OTUsInN1YiI6ImFkbWluaXN0cmF0b3JAdnNwaGVyZS5sb2NhbCIsImlzcyI6InZjZi1hdXRoIiwiYXVkIjoic2RkYy1zZXJ2aWNlcyIsIm5iZiI6MTY5NTgzNTU5NSwiZXhwIjoxNjk1ODM5MTk1LCJ1c2VyIjoiYWRtaW5pc3RyYXRvckB2c3BoZXJlLmxvY2FsIiwibmFtZSI6ImFkbWluaXN0cmF0b3JAdnNwaGVyZS5sb2NhbCIsInNjb3BlIjpbIlJFU09VUkNFX0ZVTkNUSU9OQUxJVFlfV1JJVEUiLCJMSUNFTlNJTkdfSU5GT19SRUFEIiwiU0REQ19GRURFUkFUSU9OX1dSSVRFIiwiQVZOX1dSSVRFIiwiU0REQ19NQU5BR0VSX1JFQUQiLCJDRVJUX1dSSVRFIiwiQ09NUE9TQUJJTElUWV9XUklURSIsIkxJQ0VOU0VfS0VZX1JFQUQiLCJDT01QT1NBQklMSVRZX1JFQUQiLCJFREdFX0NMVVNURVJfV1JJVEUiLCJVU0VSX1JFQUQiLCJDUkVERU5USUFMX1dSSVRFIiwiQkFDS1VQX0NPTkZJR19SRUFEIiwiQ0xVU1RFUl9XUklURSIsIkFWTl9SRUFEIiwiVkFTQV9QUk9WSURFUl9SRUFEIiwiRE9NQUlOX1dSSVRFIiwiQ0VJUF9SRUFEIiwiU09TX1dSSVRFIiwiU0REQ19NQU5BR0VSX1dSSVRFIiwiTlRQX1dSSVRFIiwiVEFHX1dSSVRFIiwiREVQT1RfQ09ORklHX1dSSVRFIiwiU1lTVEVNX1JFQUQiLCJERVBPVF9DT05GSUdfUkVBRCIsIkhPU1RfV1JJVEUiLCJSRVNPVVJDRV9MT0NLX1dSSVRFIiwiQkFDS1VQX1JFU1RPUkVfUkVBRCIsIkNFUlRfUkVBRCIsIlVTRVJfV1JJVEUiLCJVUEdSQURFX1JFQUQiLCJPVEhFUl9SRUFEIiwiU09TX1JFQUQiLCJTRUNVUklUWV9DT05GSUdfUkVBRCIsIkNSRURFTlRJQUxfUkVBRCIsIkhPU1RfUkVBRCIsIkNFSVBfV1JJVEUiLCJSRVNPVVJDRV9MT0NLX1JFQUQiLCJPVEhFUl9XUklURSIsIkxJQ0VOU0VfS0VZX1dSSVRFIiwiUkVTT1VSQ0VfRlVOQ1RJT05BTElUWV9SRUFEIiwiQ0FfUkVBRCIsIlRBR19SRUFEIiwiTkVUV09SS19QT09MX1dSSVRFIiwiV0NQX1JFQUQiLCJMSUNFTlNJTkdfSU5GT19XUklURSIsIkJBQ0tVUF9SRVNUT1JFX1dSSVRFIiwiTlRQX1JFQUQiLCJFREdFX0NMVVNURVJfUkVBRCIsIkJBQ0tVUF9DT05GSUdfV1JJVEUiLCJXQ1BfV1JJVEUiLCJTRVJWSUNFX0FDQ09VTlRfV1JJVEUiLCJORVRXT1JLX1BPT0xfUkVBRCIsIkNBX1dSSVRFIiwiQ0xVU1RFUl9SRUFEIiwiVkFTQV9QUk9WSURFUl9XUklURSIsIkROU19XUklURSIsIlZSU0xDTV9XUklURSIsIkROU19SRUFEIiwiU0VSVklDRV9BQ0NPVU5UX1JFQUQiLCJTRERDX0ZFREVSQVRJT05fUkVBRCIsIkRPTUFJTl9SRUFEIiwiVlJTTENNX1JFQUQiLCJVUEdSQURFX1dSSVRFIl0sInJvbGUiOlsiQURNSU4iXX0.30E8LYFJw0zwTv6PWjjVEoLTudH06vGNCyqs4ItcoHI'
]
curl_obj.setopt(curl_obj.URL, sddcMurl)
curl_obj.setopt(curl_obj.WRITEDATA, b_obj)
curl_obj.setopt(curl_obj.SSL_VERIFYHOST, False)
curl_obj.setopt(curl_obj.SSL_VERIFYPEER, False)
curl_obj.setopt(curl_obj.HTTPHEADER, headers)
curl_obj.perform()
response = b_obj.getvalue()
jsonobj=json.loads(response)
apicallstatus=jsonobj['status']
print(apicallstatus)
startTime = time.time()
if apicallstatus != 'FAILED':
while apicallstatus == 'IN_PROGRESS':
curl_obj = pycurl.Curl()
b_obj= io.BytesIO()
curl_obj.setopt(curl_obj.URL, sddcMurl)
curl_obj.setopt(curl_obj.WRITEDATA, b_obj)
curl_obj.setopt(curl_obj.SSL_VERIFYHOST, False)
curl_obj.setopt(curl_obj.SSL_VERIFYPEER, False)
curl_obj.setopt(curl_obj.HTTPHEADER, headers)
curl_obj.perform()
response = b_obj.getvalue()
jsonobj=json.loads(response)
apicallstatus=jsonobj['status']
print(apicallstatus)
print(f"print subtask {jsonobj['subTasks'][0]['status']}")
print('sleeping for 15 seconds')
time.sleep(15)
else:
print(f"Job has failed because of {jsonobj['errors'][0]['errorCode']} and {jsonobj['errors'][0]['message']}")
curl_obj.close()
b_obj.close()