import time
import threading
import qi
import balldetection
import common
import cv2
import vision_definitions
import math
lock = threading.Lock()
ball_detected = False
ip = "10.42.0.98"
port = 9559
session = qi.Session()
session.connect("tcp://" + ip + ":" + str(port))
motion_service = session.service("ALMotion")
posture_service = session.service("ALRobotPosture")
video_service = session.service("ALVideoDevice")
speak_service = session.service("ALTextToSpeech")
def detect_ball_and_act():
global ball_detected
print("In detect_ball_and_act")
videoClient = video_service.subscribeCamera("nao_opencv", 0, vision_definitions.kQVGA, vision_definitions.kBGRColorSpace, 5)
while not ball_detected:
print("Not ball detected")
try:
print("In try")
frame = common.readImage(video_service, videoClient)
if frame is not None:
print("in none")
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
img, (ret, x, y, radius, distance) = balldetection.detect_ball(hsv)
if ret:
print("Ball in sight at ({}, {}) with radius {} and distance {}".format(x, y, radius, distance))
print("Performing action...")
speak_service.say("Ball is in sight!")
with lock:
ball_detected = True
else:
time.sleep(1.0)
except Exception as e:
print("An error occurred:", e)
def keep_walking_and_looking():
while not ball_detected:
print("Walking and looking around...")
walk_and_look()
def walk_and_look():
print("In walk_and_look")
motion_service.moveToward(0.5, 0, 0)
time.sleep(3.0)
motion_service.moveToward(0, 0, 0)
time.sleep(1.0)
motion_service.angleInterpolationWithSpeed(["HeadYaw", "HeadPitch"], [0.5, 1.0], 0.2)
time.sleep(1.0)
motion_service.angleInterpolationWithSpeed(["HeadYaw", "HeadPitch"], [-0.5, 1.0], 0.2)
time.sleep(1.0)
motion_service.angleInterpolationWithSpeed(["HeadYaw", "HeadPitch"], [0.0, 0.0], 0.2)
walking_thread = threading.Thread(target=keep_walking_and_looking)
ball_detection_thread = threading.Thread(target=detect_ball_and_act)
motion_service.setStiffnesses("Body", 1.0)
posture_service.goToPosture("StandInit", 1.0)
walking_thread.start()
ball_detection_thread.start()
ball_detection_thread.join()
walking_thread.join()
代码做了我想要的,但我想知道是否有更好的方法来处理这个问题。这是 Nao 机器人的任务,他现在的任务是在房间里走动并寻找球。截至目前,他确实在寻找球并在房间里走来走去,但在他看到球后,他继续走动,这是我想阻止的事情。我知道这是因为线程仍在运行并在他完成步行后停止,但我希望他在看到球后完全停止,因为下一步是他走向球。它应该像一个状态机。
naoqi API 的目的是简化多线程。 因此,要移动和查看,您可以例如执行 Motion_service.post.angleInterpolationWithSpeed 头部会移动,但你的线程不会被阻塞,所以你可以同时继续做某事。
据说,当机器人看到球时,您可以使用运动命令来停止您的移动,例如:motion_service.stopWalk 或motion_service.stopMove。 另请查看 post 方法返回的 id。
idToKillTaskLater = motion_service.post.angleInterpolationWithSpeed
不要犹豫,查看 NAOqi 的文档以及您可以在其中找到的一些示例...