PycURL - 循环 API 调用的状态

问题描述 投票:0回答:1

这是我在这个论坛上的第一个问题。我真的是 Python nad PycURL 模块的初学者。

以下代码由该小组提供。 但我想我错过了一些东西。

import pycurl
import io
import json
import time

curl_obj = pycurl.Curl()
b_obj= io.BytesIO()
sddcMurl="https://sddc-mgmt.rainpole.dev/v1/credentials/tasks/fc8dc930-f762-45b9-840e-96bf187dca40"
headers=[
    'Authorization: Bearer eyJhbGciOiJIUzI1NiJ9.eyJqdGkiOiI1ZjlhOWM0YS03ZjYzLTRkNDQtODI4Mi1hYjQ3ODNmNzYyYTQiLCJpYXQiOjE2OTU4Mjg4NjYsInN1YiI6ImFkbWluaXN0cmF0b3JAdnNwaGVyZS5sb2NhbCIsImlzcyI6InZjZi1hdXRoIiwiYXVkIjoic2RkYy1zZXJ2aWNlcyIsIm5iZiI6MTY5NTgyODg2NiwiZXhwIjoxNjk1ODMyNDY2LCJ1c2VyIjoiYWRtaW5pc3RyYXRvckB2c3BoZXJlLmxvY2FsIiwibmFtZSI6ImFkbWluaXN0cmF0b3JAdnNwaGVyZS5sb2NhbCIsInNjb3BlIjpbIlJFU09VUkNFX0ZVTkNUSU9OQUxJVFlfV1JJVEUiLCJMSUNFTlNJTkdfSU5GT19SRUFEIiwiU0REQ19GRURFUkFUSU9OX1dSSVRFIiwiQVZOX1dSSVRFIiwiU0REQ19NQU5BR0VSX1JFQUQiLCJDRVJUX1dSSVRFIiwiQ09NUE9TQUJJTElUWV9XUklURSIsIkxJQ0VOU0VfS0VZX1JFQUQiLCJDT01QT1NBQklMSVRZX1JFQUQiLCJFREdFX0NMVVNURVJfV1JJVEUiLCJVU0VSX1JFQUQiLCJDUkVERU5USUFMX1dSSVRFIiwiQkFDS1VQX0NPTkZJR19SRUFEIiwiQ0xVU1RFUl9XUklURSIsIkFWTl9SRUFEIiwiVkFTQV9QUk9WSURFUl9SRUFEIiwiRE9NQUlOX1dSSVRFIiwiQ0VJUF9SRUFEIiwiU09TX1dSSVRFIiwiU0REQ19NQU5BR0VSX1dSSVRFIiwiTlRQX1dSSVRFIiwiVEFHX1dSSVRFIiwiREVQT1RfQ09ORklHX1dSSVRFIiwiU1lTVEVNX1JFQUQiLCJERVBPVF9DT05GSUdfUkVBRCIsIkhPU1RfV1JJVEUiLCJSRVNPVVJDRV9MT0NLX1dSSVRFIiwiQkFDS1VQX1JFU1RPUkVfUkVBRCIsIkNFUlRfUkVBRCIsIlVTRVJfV1JJVEUiLCJVUEdSQURFX1JFQUQiLCJPVEhFUl9SRUFEIiwiU09TX1JFQUQiLCJTRUNVUklUWV9DT05GSUdfUkVBRCIsIkNSRURFTlRJQUxfUkVBRCIsIkhPU1RfUkVBRCIsIkNFSVBfV1JJVEUiLCJSRVNPVVJDRV9MT0NLX1JFQUQiLCJPVEhFUl9XUklURSIsIkxJQ0VOU0VfS0VZX1dSSVRFIiwiUkVTT1VSQ0VfRlVOQ1RJT05BTElUWV9SRUFEIiwiQ0FfUkVBRCIsIlRBR19SRUFEIiwiTkVUV09SS19QT09MX1dSSVRFIiwiV0NQX1JFQUQiLCJMSUNFTlNJTkdfSU5GT19XUklURSIsIkJBQ0tVUF9SRVNUT1JFX1dSSVRFIiwiTlRQX1JFQUQiLCJFREdFX0NMVVNURVJfUkVBRCIsIkJBQ0tVUF9DT05GSUdfV1JJVEUiLCJXQ1BfV1JJVEUiLCJTRVJWSUNFX0FDQ09VTlRfV1JJVEUiLCJORVRXT1JLX1BPT0xfUkVBRCIsIkNBX1dSSVRFIiwiQ0xVU1RFUl9SRUFEIiwiVkFTQV9QUk9WSURFUl9XUklURSIsIkROU19XUklURSIsIlZSU0xDTV9XUklURSIsIkROU19SRUFEIiwiU0VSVklDRV9BQ0NPVU5UX1JFQUQiLCJTRERDX0ZFREVSQVRJT05fUkVBRCIsIkRPTUFJTl9SRUFEIiwiVlJTTENNX1JFQUQiLCJVUEdSQURFX1dSSVRFIl0sInJvbGUiOlsiQURNSU4iXX0.y0VpHMF4xkFhZZTsckn9nG-QF6tZtn96JAhRjacHP40'
]

curl_obj.setopt(curl_obj.URL, sddcMurl)
curl_obj.setopt(curl_obj.WRITEDATA, b_obj)
curl_obj.setopt(curl_obj.SSL_VERIFYHOST, False)
curl_obj.setopt(curl_obj.SSL_VERIFYPEER, False)
curl_obj.setopt(curl_obj.HTTPHEADER, headers)
curl_obj.perform()
response = b_obj.getvalue()
jsonobj=json.loads(response)
apicallstatus=jsonobj['status']
print(apicallstatus)
startTime = time.time()

if apicallstatus != 'FAILED':
    while apicallstatus == 'IN_PROGRESS':
        response = b_obj.getvalue()
        jsonobj=json.loads(response)
        apicallstatus=jsonobj['status']
        print(apicallstatus)
        print(f"print subtask {jsonobj['subTasks'][0]['status']}")
        print('sleeping for 5 seconds')
        time.sleep(5)
else:
    print(f"Job has failed because of {jsonobj['errors'][0]['errorCode']} and {jsonobj['errors'][0]['message']}")

curl_obj.close()
b_obj.close()

代码用途:跟踪API调用的状态。 有 API 调用来更新密码。但上面只是跟踪任务的进度。 这段代码可以工作,但我想打电话给

apicallstatus
,直到它成功为止。问题是,由于它在循环内部,因此它不会出去检查状态。我尝试将整个代码放入 While 循环中,但失败了

python-3.x pycurl
1个回答
0
投票

下面是有效的代码,但我并不为此感到自豪。还有很大的进步空间。

import pycurl
import io
import json
import time

curl_obj = pycurl.Curl()
b_obj= io.BytesIO()
sddcMurl="https://sddc-mgmt.rainpole.dev/v1/credentials/tasks/04cf03e4-370f-4c55-a3ea-cfe961b52471"
headers=[
    'Authorization: Bearer eyJhbGciOiJIUzI1NiJ9.eyJqdGkiOiJmNTc1NzhkNC0yMGRkLTQyZGYtYjA0ZC04OGZlNDQ3ZDA0ZDMiLCJpYXQiOjE2OTU4MzU1OTUsInN1YiI6ImFkbWluaXN0cmF0b3JAdnNwaGVyZS5sb2NhbCIsImlzcyI6InZjZi1hdXRoIiwiYXVkIjoic2RkYy1zZXJ2aWNlcyIsIm5iZiI6MTY5NTgzNTU5NSwiZXhwIjoxNjk1ODM5MTk1LCJ1c2VyIjoiYWRtaW5pc3RyYXRvckB2c3BoZXJlLmxvY2FsIiwibmFtZSI6ImFkbWluaXN0cmF0b3JAdnNwaGVyZS5sb2NhbCIsInNjb3BlIjpbIlJFU09VUkNFX0ZVTkNUSU9OQUxJVFlfV1JJVEUiLCJMSUNFTlNJTkdfSU5GT19SRUFEIiwiU0REQ19GRURFUkFUSU9OX1dSSVRFIiwiQVZOX1dSSVRFIiwiU0REQ19NQU5BR0VSX1JFQUQiLCJDRVJUX1dSSVRFIiwiQ09NUE9TQUJJTElUWV9XUklURSIsIkxJQ0VOU0VfS0VZX1JFQUQiLCJDT01QT1NBQklMSVRZX1JFQUQiLCJFREdFX0NMVVNURVJfV1JJVEUiLCJVU0VSX1JFQUQiLCJDUkVERU5USUFMX1dSSVRFIiwiQkFDS1VQX0NPTkZJR19SRUFEIiwiQ0xVU1RFUl9XUklURSIsIkFWTl9SRUFEIiwiVkFTQV9QUk9WSURFUl9SRUFEIiwiRE9NQUlOX1dSSVRFIiwiQ0VJUF9SRUFEIiwiU09TX1dSSVRFIiwiU0REQ19NQU5BR0VSX1dSSVRFIiwiTlRQX1dSSVRFIiwiVEFHX1dSSVRFIiwiREVQT1RfQ09ORklHX1dSSVRFIiwiU1lTVEVNX1JFQUQiLCJERVBPVF9DT05GSUdfUkVBRCIsIkhPU1RfV1JJVEUiLCJSRVNPVVJDRV9MT0NLX1dSSVRFIiwiQkFDS1VQX1JFU1RPUkVfUkVBRCIsIkNFUlRfUkVBRCIsIlVTRVJfV1JJVEUiLCJVUEdSQURFX1JFQUQiLCJPVEhFUl9SRUFEIiwiU09TX1JFQUQiLCJTRUNVUklUWV9DT05GSUdfUkVBRCIsIkNSRURFTlRJQUxfUkVBRCIsIkhPU1RfUkVBRCIsIkNFSVBfV1JJVEUiLCJSRVNPVVJDRV9MT0NLX1JFQUQiLCJPVEhFUl9XUklURSIsIkxJQ0VOU0VfS0VZX1dSSVRFIiwiUkVTT1VSQ0VfRlVOQ1RJT05BTElUWV9SRUFEIiwiQ0FfUkVBRCIsIlRBR19SRUFEIiwiTkVUV09SS19QT09MX1dSSVRFIiwiV0NQX1JFQUQiLCJMSUNFTlNJTkdfSU5GT19XUklURSIsIkJBQ0tVUF9SRVNUT1JFX1dSSVRFIiwiTlRQX1JFQUQiLCJFREdFX0NMVVNURVJfUkVBRCIsIkJBQ0tVUF9DT05GSUdfV1JJVEUiLCJXQ1BfV1JJVEUiLCJTRVJWSUNFX0FDQ09VTlRfV1JJVEUiLCJORVRXT1JLX1BPT0xfUkVBRCIsIkNBX1dSSVRFIiwiQ0xVU1RFUl9SRUFEIiwiVkFTQV9QUk9WSURFUl9XUklURSIsIkROU19XUklURSIsIlZSU0xDTV9XUklURSIsIkROU19SRUFEIiwiU0VSVklDRV9BQ0NPVU5UX1JFQUQiLCJTRERDX0ZFREVSQVRJT05fUkVBRCIsIkRPTUFJTl9SRUFEIiwiVlJTTENNX1JFQUQiLCJVUEdSQURFX1dSSVRFIl0sInJvbGUiOlsiQURNSU4iXX0.30E8LYFJw0zwTv6PWjjVEoLTudH06vGNCyqs4ItcoHI'
]

curl_obj.setopt(curl_obj.URL, sddcMurl)
curl_obj.setopt(curl_obj.WRITEDATA, b_obj)
curl_obj.setopt(curl_obj.SSL_VERIFYHOST, False)
curl_obj.setopt(curl_obj.SSL_VERIFYPEER, False)
curl_obj.setopt(curl_obj.HTTPHEADER, headers)
curl_obj.perform()
response = b_obj.getvalue()
jsonobj=json.loads(response)
apicallstatus=jsonobj['status']
print(apicallstatus)
startTime = time.time()

if apicallstatus != 'FAILED':
    while apicallstatus == 'IN_PROGRESS':
        curl_obj = pycurl.Curl()
        b_obj= io.BytesIO()
        curl_obj.setopt(curl_obj.URL, sddcMurl)
        curl_obj.setopt(curl_obj.WRITEDATA, b_obj)
        curl_obj.setopt(curl_obj.SSL_VERIFYHOST, False)
        curl_obj.setopt(curl_obj.SSL_VERIFYPEER, False)
        curl_obj.setopt(curl_obj.HTTPHEADER, headers)
        curl_obj.perform()
        response = b_obj.getvalue()
        jsonobj=json.loads(response)
        apicallstatus=jsonobj['status']
        print(apicallstatus)
        print(f"print subtask {jsonobj['subTasks'][0]['status']}")
        print('sleeping for 15 seconds')
        time.sleep(15)
else:
    print(f"Job has failed because of {jsonobj['errors'][0]['errorCode']} and {jsonobj['errors'][0]['message']}")

curl_obj.close()
b_obj.close()
© www.soinside.com 2019 - 2024. All rights reserved.