在我的web应用程序的代码中,把调度器放在哪里?

问题描述 投票:0回答:1

我有一个web应用,它首先获得用户对API令牌的认证,然后我想使用APScheduler模块每隔一小时运行后一部分代码。 我不想从一开始就运行整个应用程序,因为第一部分需要用户交互来再次授权应用程序,这在第一次运行后是不必要的,因为我们已经有了令牌,再加上我显然不可能每小时都在那里点击授权按钮。 我把代码的 sched.start() 部分放在哪里? 我得到的错误是 RuntimeError: 工作在请求上下文之外。

import requests
import json
from flask import Flask, render_template, request, redirect, session, url_for
from flask.json import jsonify
import os
from requests_oauthlib import OAuth2Session
from apscheduler.schedulers.background import BackgroundScheduler
import atexit
from datetime import datetime

app = Flask(__name__)

client_id = "x"
client_secret = "x"
scope = 'read_station'
password = 'x'
#grant_type = 'authorization_code'
grant_type = 'password'
username='x'
authurl = 'https://api.netatmo.com/oauth2/authorize?'
token_url = 'https://api.netatmo.com/oauth2/token'
redirect_uri = 'x'
response_type = 'code'
code = None
payload= {'grant_type':grant_type,'client_id':client_id,'client_secret':client_secret,
'username':username,'password':password,'scope':scope}

rip={}
CITIES = {'bolzano' : 'lat_ne=46.30&lon_ne=11.23&lat_sw=46.28&lon_sw=11.14',
'florence' : 'lat_ne=43.51&lon_ne=11.21&lat_sw=43.44&lon_sw=11.02',
'manchester' : 'lat_ne=53.35&lon_ne=-2.0011.21&lat_sw=53.21&lon_sw=-2.36',
}
dicty = {}
def dooby(CITIES, Header):
    for city in CITIES.keys():
        i = requests.get('https://api.netatmo.com/api/getpublicdata?'+CITIES[city]+'&filter=false', headers = Header).json()
        dicty[str(city)]=i
    return dicty

@app.route('/')
def auth():
    redirect_uri = url_for('.redir', _external = True)
    oauth = OAuth2Session(client_id, redirect_uri = redirect_uri,
                          scope = scope)
    authorization_url, state = oauth.authorization_url(authurl)
    session['oauth_state'] = state
    return redirect(authorization_url)

@app.route('/redir', methods = ["GET","POST"])
def redir():
    code = request.args.get('code')
    payload['code']=code
    rip = requests.post(token_url, data=payload)
    rs = rip.content.decode()
    response = json.loads(rs)
    session['oauth_token'] = response['access_token']
    session['expiry'] = response['expires_in']
    session['refresh_token'] = response['refresh_token']

    return redirect(url_for('.profile'))

@app.route('/profile', methods = ["GET","POST"])

def profile():
    Header = {'Authorization':'Bearer '+session['oauth_token']}
    def repeat():
        return dooby(CITIES, Header)
    i = repeat()
    job = json.dumps(i)
    dt = datetime.now().strftime("%Y_%m_%d %H_%M_%S")
    f = open(r'C:\Users\freak\OneDrive\Documents\UHIpaper\{}.json'.format(dt),"w")
    f.write(job)
    f.close()
    sched = BackgroundScheduler(daemon=True)
    sched.add_job(func = profile,trigger='interval',minutes=2)
    sched.start()
    return jsonify(i)

if __name__ == "__main__":
    os.environ['DEBUG'] = "1"
    os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = "1"
    app.secret_key = os.urandom(24)
    app.run(debug=True)
flask web-applications apscheduler
1个回答
0
投票

调用 jsonify 在你 profile() func 引起了上下文外的错误,因为你在调用一个没有 Flask 应用上下文的 Flask 函数。

参考这个关于如何添加上下文的答案 或不用 jsonify 在你 profile() func,而不是标准的json lib。

© www.soinside.com 2019 - 2024. All rights reserved.