我正在尝试使用 google API 创建语音助手,但在连接到 google API 时尝试运行我的代码时,我继续收到错误请求。
我在 Mac 上的 VScode 中使用 python,过去它一直有效,直到我错误地发送了太多请求时它阻止了我的帐户。
任何人都可以帮忙吗。
from __future__ import print_function
from http import server
from lib2to3.pygram import python_grammar_no_print_statement
import pickle
import os.path
import smtplib
import webbrowser
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import os
import datetime
import time
from datetime import timedelta
import pyttsx3
import speech_recognition as sr
import pytz
import subprocess
import wikipedia
import pyjokes
import mysql.connector
import requests
import wolframalpha
mydb = mysql.connector.connect(
host = "localhost",
user = "root",
password = "6alvise2005",
database = "noradb")
SCOPES = ['https://www.googleapis.com/auth/calendar.readonly']
MONTHS = ["january", "february", "march", "april", "may", "june", "july", "august", "septmeber", "october", "november", "dicember"]
DAYS = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"]
DAY_EXTENTIONS = ["st", "nd", "rd", "th"]
def speak(text):
engine = pyttsx3.init()
engine.say(text)
engine.runAndWait()
def get_audio(): # returns the text version of what the user says into the microphone
r = sr.Recognizer() # creating a recognisable object from the speach ricognition module
with sr.Microphone() as source: # use the microphone to listen
audio = r.listen(source)
said = ""
try:
said = r.recognize_google(audio) # use the Google API to recognise what has been said
print(said)
except Exception as e: # to avoid any error if the micrphone doesn't hear anything
print("Exception: " + str(e))
return said.lower() # everything will be turned to lower case to avoid any kind of missunderstanding and eventual problems
def authenticate_google():
creds = None
if os.path.exists('token.json'): # the file token.json stores the user's access and refresh tokens,
# and is created automatically when the authorization flow completes for the first time
creds = Credentials.from_authorized_user_file('token.json', SCOPES)
if not creds or not creds.valid: # if there are no (valid) credentials available, let the user log in
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
'/Users/alvisebaracco/Desktop/computerIA/credentials.json', SCOPES) # location of the json file
creds = flow.run_local_server(port=0)
print("error1")
with open('/Users/alvisebaracco/Desktop/computerIA/token.json', 'w') as token:
token.write(creds.to_json()) # saves the credentials for the next run in a kind of local cache
service = build('calendar', 'v3', credentials=creds)
return service # this makes theis code run at the start of the program and return the service that contains the google calendar for the user
def get_events(date, service):
date = datetime.datetime.combine(date, datetime.datetime.min.time()) # retrieving the start date from the previously apploaded event
end_date = datetime.datetime.combine(date, datetime.datetime.max.time()) # retrieving the end date from the previously apploaded event
utc = pytz.UTC
date = date.astimezone(utc) # retrieving the start time from the previously apploaded event
end_date = end_date.astimezone(utc) # retrieving the end time from the previously apploaded event
events_result = service.events().list(calendarId='primary', timeMin=date.isoformat(), timeMax=end_date.isoformat(),
singleEvents=True,
orderBy='startTime').execute()
events = events_result.get('items', [])
if not events:
speak('No upcoming events found.')
else:
speak(f"You have {len(events)} events on this day.")
for event in events:
start = event['start'].get('dateTime', event['start'].get('date'))
print(start, event['summary'])
start_time = str(start.split("T")[1].split("-")[0]) # splits the different pieces of information
if int(start_time.split("-")[0]) < 12:
start_time = start_time + "am"
else:
start_time = str(int(start_time.split("-")[0]) - 12) + start_time.split(":")[1]
start_time = start_time + "pm"
speak(event["summary"] + "at" + start_time)
def get_date(text):
text = text.lower()
today = datetime.date.today()
if text.count("today") > 0 :
return today # if we see the word 'today' the code will return the current date
day = -1
day_of_week = -1
month = -1
year = today.year
for word in text.split(): # splits every word so that it is better to analyse
if word in MONTHS: # see if there is any word in the MONTHS list
month = MONTHS.index(word) + 1 # see which number each month corrisponds to
elif word in DAYS:
day_of_week = DAYS.index(word) # same as MONTHS
elif word.isdigit():
day = int(word)
else:
for ext in DAY_EXTENTIONS:
found = word.find(ext)
if found > 0: # if we has something before that we see if it a digit and then try to turn it into a number
# and if we can't we transfort it into a day
try:
day = int(word[:found])
except:
pass
if month < today.month and month != -1:
year = year + 1 # if there is a month that is less compared to the month we are currently in have add 1 to the year
if day < today.day and month == -1 and day != -1:
month = month + 1 # same as before but for the day
if month == -1 and day == -1 and day_of_week != -1:
current_day_of_week = today.weekday() # which day of the week it is from 0 - 6
dif = day_of_week - current_day_of_week
if dif < 0:
dif += 7
if text.count("next") >= 1:
dif += 7
return today + datetime.timedelta(dif)
if month == -1 or day == -1:
return None
return datetime.date(month = month, day = day, year = year) # returns at the end the month, the date and the year of the requested event
def note(text):
date = datetime.datetime.now()
file_name = str(date).replace(":", "-") + "-note.txt"
with open(file_name, "w") as f:
f.write(text)
Notes = "C:/usr/bin/open", "-W", "-n", "-a", "/Applications/Note.app" # occhio al 'C:'
WAKE = "hey nora"
SERVICE = authenticate_google()
print("Start")
def sendEmail (to, content):
server = smtplib.SMTP('smtp.gmail.com', 587)
server.ehlo()
server. starttls()
server.login('[email protected]', 'password')
server.sendemail('[email protected]', to, content)
server.close()
while True:
print("Listening")
text = get_audio()
if text.count(WAKE) < 0:
speak("I am ready")
text = get_audio()
CALENDAR_STRS = ["what do i have", "do i have plans", "am i busy"]
for phrase in CALENDAR_STRS:
if phrase in text:
date = get_date(text)
if date:
get_events(date, SERVICE)
else:
speak("I don't understand")
NOTE_STRS = ["make a note", "write this down", "remember this"]
for phrase in NOTE_STRS:
if phrase in text:
speak("What would you like me to write down?")
note_text = get_audio()
note(note_text)
speak("I've made a note of that.")
DATABASE = ["database", "table"]
for phrase in DATABASE:
if phrase in text:
speak ("Ok, let's open MySQL")
subprocess.call(["/usr/bin/open", "-W", "-n", "-a", "/Applications/MySQLWorkbench.app"])
mycursor = mydb.cursor() # to access the database
DATABASE_ADD = ["add in database", "write in database", "take note of in the database"]
for phrase in DATABASE_ADD:
if phrase in text:
subprocess.call(["/usr/bin/open", "-W", "-n", "-a", "/Applications/MySQLWorkbench.app"]) # opens MySQL
speak ("Where would you like to add your information: appointments, notes or recipes?")
add_type = get_audio() # retrieves what the clients says and searches if a precise word was said
if add_type == 'appointments':
speak ("You want to write the title or the time of the appointment?")
appointment_type_add = get_audio()
if appointment_type_add == 'the title': # inserts in the decided table
speak ("What do you want to insert?")
add_title = get_audio()
mycursor.execute("INSERT INTO appointments (name) VALUES (%s)", (add_title)) # adds to the database what has just been answered to the question
else: # inserts the wanted data in the other row, different from 'title'
speak ("What do you want to insert?")
add_time = get_audio()
mycursor.execute("INSERT INTO appointments (dateandtime) VALUES (%s)", (add_time))
elif add_type == 'notes':
speak ("What would you like me to write?")
add_note = get_audio()
mycursor.execute("INSERT INTO notes (note) VALUES (%s)", (add_note)) # adds to the database what has just been answered to the question
elif add_type == 'recipe':
speak ("What will you cook next time?")
add_recipe = get_audio()
mycursor.execute("INSERT INTO recipe (name) VALUES (%s)", (add_recipe)) # adds to the database what has just been answered to the question
mycursor.execute(sql)
mydb.commit()
cursor = mydb.cursor() # to conclude the modifications to the database
DATABASE_DELETE = ["delete something in the database", "cancel something in the database"]
for phrase in DATABASE_DELETE:
if phrase in text:
subprocess.call(["/usr/bin/open", "-W", "-n", "-a", "/Applications/MySQLWorkbench.app"])
speak ("From where would you like to delete some data? appointments, notes or recipes")
delete_type = get_audio()
if delete_type == 'appointments':
speak ("Would you like to delete at first the tilte or the time of the latter?")
appointment_type_delete = get_audio()
if appointment_type_delete == 'title': # midifies the items in this table
speak ("Which title do you want to delete?")
delete_title = get_audio()
sql = "DELETE FROM appointments WHERE name = %s" # to delete what has been said from the client in the database
value_1 = (delete_title)
cursor.execute(sql, value_1)
mydb.commit() # to conclude the modifications to the database
else:
speak ("Which time do you want to delete?")
delete_time = get_audio()
sql = "DELETE FROM appointments WHERE dateandtime = %s"
value_2 = (delete_time)
cursor.execute(sql, value_2)
mydb.commit()
elif delete_type == 'notes':
speak ("Which note do you want to delete?")
delete_note = get_audio()
sql = "DELETE FROM notes WHERE note = %s"
value_3 = (delete_note)
cursor.execute(sql, value_3)
mydb.commit()
elif delete_type == 'recipes':
speak ("Which food you've already prepared and would like to delete?")
delete_recipe = get_audio()
sql = "DELETE FROM recipes WHERE name = %s"
value_4 = (delete_recipe)
cursor.execute(sql, value_4)
mydb.commit()
if 'wikipedia' in text:
speak('Searching Wikipedia...')
query = text.replace("wikipedia", "")
results = wikipedia.summary(query, sentences = 3)
speak("According to Wikipedia")
print(results)
speak(results)
elif 'email to maria' in text:
try:
speak ("What should I send him?")
content = get_audio()
to = "[email protected]"
sendEmail (to, content)
speak ("The email has been successfully sent")
except Exception as e:
print(e)
speak ("sorry I am not able to send this email")
elif "calculate" or "what is" in text:
question = get_audio()
app_id = "Mention your API key"
client = wolframalpha.Client(app_id)
res = client.query(question)
answer = next(res.resutls).text
speak("The answer is " + answer)
elif 'open whatsapp' in text:
speak ("Ok, let's open WhatsApp")
subprocess.call(["/usr/bin/open", "-W", "-n", "-a", "/Applications/WhatsApp.app"])
elif 'open facebook' in text:
speak ("Ok, let's open FaceBook")
subprocess.call(["/usr/bin/open", "-W", "-n", "-a", "/Applications/Facebook.app"])
elif 'open gmail' in text:
speak ("Ok, let's open Gmail")
subprocess.call(["/usr/bin/open", "-W", "-n", "-a", "/Applications/Gmail.app"])
elif 'open youtube' in text:
speak("Here you go to Youtube\n")
webbrowser.open("https://www.youtube.com")
elif 'open google' in text:
speak("Here you go to Google\n")
webbrowser.open("https://www.google.com/?client=safari")
elif 'news' in text:
speak ("Here are some news of the day on the BBC")
webbrowser.open("https://www.bbc.com")
elif 'play music' in text or "play song" in text:
speak("Here you go with music")
music_dir = "C:\\Users\\Alvise\\Music"
songs = os.listdir(music_dir)
print(songs)
random = os.startfile(os.path.join(music_dir, songs[1]))
elif 'the time' in text:
strTime = datetime.datetime.now().strftime("% H:% M:% S")
speak(f"Sir, the time is {strTime}")
elif 'how are you' in text:
speak("I am fine, Thank you")
speak("How are you, Sir")
elif 'fine' in text or "good" in text:
speak("It's good to know that your fine")
elif 'exit' in text:
speak("Thanks for giving me your time")
exit()
elif "who made you" in text or "who created you" in text:
speak("I have been created by Alvise.")
elif 'joke' in text:
speak(pyjokes.get_joke())
elif 'search' in text or 'play' in text:
query = text.replace("search", "")
query = text.replace("play", "")
webbrowser.open(query)
elif "who i am" in text:
speak("If you talk then definitely your human.")
elif "why you came to world" in text:
speak("Thanks to Alvise. Further it's a secret")
elif 'is love' in text:
speak("It is 7th sense that destroy all other senses")
elif "who are you" in text:
speak("I am your virtual assistant created by Alvise")
elif 'reason for you' in text:
speak("I was created as a Minor project by Mister Alvise")
elif 'shutdown system' in text:
speak("Hold On a Sec ! Your system is on its way to shut down")
subprocess.call('shutdown / p /f')
elif "restart" in text:
subprocess.call(["shutdown", "/r"])
elif "where is" in text: # mi sa che non funziona
text = text.replace("where is", "")
location = text
speak("User asked to Locate")
speak(location)
webbrowser.open("https://www.google.nl / maps / place/" + location + "")
elif "weather" in text:
# Google Open weather website
# to get API of Open weather
api_key = "Api key"
base_url = "http://api.openweathermap.org / data / 2.5 / weather?"
speak("What's the name of the city?")
city_name = get_audio()
complete_url = base_url + "appid =" + api_key + "&q =" + city_name
response = requests.get(complete_url)
x = response.json()
if x["code"] != "404":
y = x["main"]
current_temperature = y["temp"]
current_pressure = y["pressure"]
current_humidiy = y["humidity"]
z = x["weather"]
weather_description = z[0]["description"]
print(" Temperature (in kelvin unit) = "
+ str(current_temperature)
+ "\n atmospheric pressure (in hPa unit) ="
+ str(current_pressure)
+ "\n humidity (in percentage) = "
+ str(current_humidiy) +"\n description = "
+ str(weather_description)
)
else:
speak(" City Not Found ")
我已经尝试为 google API 设置一个新帐户并获取新凭据,但它仍然给我同样的错误:
google.auth.exceptions.RefreshError: ('invalid_grant: Bad Request', {'error': 'invalid_grant', 'error_description': 'Bad Request'})