我正在使用以下代码轮询发布/订阅服务器
import requests
polling2.poll(lambda: requests.get('http://google.com').status_code == 200,
step=60,
poll_forever=True)
如果返回true,是否有一种方法可以访问响应主体以获取特定字段?在我的情况下,它正在返回json响应
您可以将poll()
结果赋给变量,并且其值将由lambda
返回
result = polling2.poll(lambda: requests.get('http://google.com').status_code == 200,
step=60,
poll_forever=True)
但是在当前代码中,lambda
返回True
,result
将为True
,并且将无法访问requests
但是您可以使用check_success=
测试状态以不同的方式编写它>
import requests import polling def test(response): return response.status_code == 200 result = polling.poll(lambda: requests.get('http://google.com'), step=60, poll_forever=True, check_success=test) print(result.text)
现在
lambda
返回response
,poll
将此response
发送到功能test
,该功能检查status_code
。如果test
返回True
,则它将response
设置为result
,您可以使用.text
,.content
或.json()
我在polling.py的源代码中找到它
EDIT:
显示如何使用collect_value=
。但是queue
获取除result
中的最后一个值以外的所有值。因此,它收集状态不同于200
的响应结果(如果错误消息出现错误,则收集错误消息)。import requests
import polling
import queue
def test(response):
return response.status_code == 200
my_queue = queue.Queue()
result = polling.poll(lambda: requests.get('http://google.com'),
step=60,
poll_forever=True,
check_success=test,
collect_value=my_queue)
if my_queue.empty():
print('empty')
else:
while not my_queue.empty():
print(my_queue.get())
#print(result.text)