我试图让
gpiozero
与我的 RPi 4 中的 asyncio
很好地配合,但遇到了一个我无法解决的问题。本质上,我试图获取旋转编码器的转向信号(顺时针、逆时针),并检测 RE 按钮是否已被按下。
我正在使用 asyncio 来收集协程并同时运行它们(到目前为止其他协程按预期工作)。但是当我尝试运行按钮检测和旋转检测时,只有列表中的第一个启动。
这是对我不起作用的代码摘录:
import asyncio
from gpiozero import RotaryEncoder, Button
button = Button(7)
encoder = RotaryEncoder(a=10, b=9, wrap=False, max_steps=0)
async def rotary_detection():
print("rotary_detection started")
last_rotary_value = 0
try:
while True:
current_rotary_value = encoder.steps
if last_rotary_value != current_rotary_value:
rotDiff = last_rotary_value - current_rotary_value
if rotDiff > 0:
print("turned CW")
else:
print("turned CCW")
last_rotary_value = current_rotary_value
asyncio.sleep(0.01)
except KeyboardInterrupt:
print("Program terminated")
except Exception as e:
print(e)
async def button_detection():
print("button_detection started")
try:
while True:
if button.is_pressed:
print("Button pressed")
except KeyboardInterrupt:
print("Program terminated")
except Exception as e:
print(e)
async def main():
await asyncio.gather(
rotary_detection(),
button_detection()
)
if __name__ == "__main__":
asyncio.run(main())
无论
asyncio.gather()
列表中的顺序如何,只有第一个开始。
如何使用 gpiozero 和 asyncio 实现我的用例,使它们能够很好地配合?
您的异步方法都不会调用
await
,这意味着它们永远不会将控制权交还给调度程序,以便另一个异步任务可以运行。您实际上有两个同步(即非异步)方法。
如果您记得在
rotary_detection
上await
,那么您的asyncio.sleep
方法将是正确的:
async def rotary_detection():
print("rotary_detection started")
last_rotary_value = 0
try:
while True:
...
# Don't forget to await on async tasks
await asyncio.sleep(0.01)
except KeyboardInterrupt:
print("Program terminated")
except Exception as e:
print(e)
但是,您需要将
await
引入到您的 button_pressed
方法中。可能只需添加另一个对 asyncio.sleep
的调用就能使事情正常进行,但您可能需要考虑其他选项:
gpiozero
when_held
和 when_pressed
方法来实现您的逻辑。