我正在尝试将数据加载器添加到名为 detector_livestream() 的 YOLO 代码中,它在运行推理之前使用数据加载器。我尝试了多种方法将其实现到我的代码中,但我的代码要么无法打开我的相机,要么最终会出现随机错误。我的代码哪里出了问题?
def draw_border_boxes_Live(classifiedItems, image):
for classifiedItem in classifiedItems:
xyxy, label, color, conf = classifiedItem
# Convert the bounding box format from xyxy to xywh
x1, y1, x2, y2 = map(int, xyxy)
cv2.rectangle(image, (x1, y1), (x2, y2), color, 2)
cv2.putText(image, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
# Transformation function
class CustomImageDataset(Dataset):
def __init__(self, transform=None, device=None):
self.transform = transform
self.device = device
self.frame_generator = self.run_baslercamera()
def __getitem__(self, idx):
try:
image = next(self.generator)
except StopIteration:
self.generator = self.run_baslercamera()
image = next(self.generator)
if self.transform is not None:
image = self.transform(image)
return image.to(self.device)
def __len__(self):
return len(self.image_list)
def run_baslercamera(self):
logger = logging.getLogger()
tl_factory = pylon.TlFactory.GetInstance()
devices = tl_factory.EnumerateDevices()
if len(devices) == 0:
logger.info("No cameras found!")
exit()
top_camera = pylon.InstantCamera(tl_factory.CreateDevice(devices[0]))
top_camera.Open()
converter = pylon.ImageFormatConverter()
converter.OutputPixelFormat = pylon.PixelType_BGR8packed
converter.OutputBitAlignment = pylon.OutputBitAlignment_MsbAligned
top_camera.OutputQueueSize = 2
top_camera.StartGrabbing(pylon.GrabStrategy_LatestImages)
try:
while (True):
top_grabResult = top_camera.RetrieveResult(5000, pylon.TimeoutHandling_Return)
while not top_grabResult.GrabSucceeded():
logger.info("top grab failed, trying again")
top_grabResult = top_camera.RetrieveResult(5000, pylon.TimeoutHandling_Return)
if top_grabResult.GrabSucceeded():
image = converter.Convert(top_grabResult)
img = image.Array
img = cv2.rotate(img, cv2.ROTATE_90_COUNTERCLOCKWISE)
original_h = img.shape[0]
original_w = img.shape[1]
new_h = (1088 / 720) * (original_w)
offset = int(original_h - new_h)
img = img[offset:original_h, 0:original_w]
img = cv2.resize(img, (720, 1088))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = img / 255.0
yield img
else:
logger.info(f"Top camera failed to grab an image")
yield np.zeros((1088, 720, 3))
finally:
top_camera.Close()
# Define transformation
transform = transforms.Compose([
transforms.Resize((640, 640)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
# Initialize dataset
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
dataset = CustomImageDataset(transform=transform, device=device)
# Initialize dataloader
dataloader = DataLoader(dataset, batch_size=1, shuffle=False)
#camera code to open
def main(run=None, detection_args=None):
if detection_args.source == "1":
run_baslercamera(detection_args)
else:
run(**vars(detection_args))
def run_baslercamera(detection_args, device): # added device parameter here
logger = logging.getLogger()
tl_factory = pylon.TlFactory.GetInstance()
devices = tl_factory.EnumerateDevices()
if len(devices) == 0:
logger.info("No cameras found!")
exit()
top_camera = pylon.InstantCamera(tl_factory.CreateDevice(devices[0]))
top_camera.Open()
converter = pylon.ImageFormatConverter()
converter.OutputPixelFormat = pylon.PixelType_BGR8packed
converter.OutputBitAlignment = pylon.OutputBitAlignment_MsbAligned
top_camera.OutputQueueSize = 2
top_camera.StartGrabbing(pylon.GrabStrategy_LatestImages)
try:
counter = 0
while (True):
top_grabResult = top_camera.RetrieveResult(5000, pylon.TimeoutHandling_Return)
while not top_grabResult.GrabSucceeded():
logger.info("top grab failed, trying again")
top_grabResult = top_camera.RetrieveResult(5000, pylon.TimeoutHandling_Return)
if top_grabResult.GrabSucceeded():
image = converter.Convert(top_grabResult)
img = image.Array
img = cv2.rotate(img, cv2.ROTATE_90_COUNTERCLOCKWISE)
original_h = img.shape[0]
original_w = img.shape[1]
new_h = (1088 / 720) * (original_w) # x/2160 = 1088/720 solve for x
offset = int(original_h - new_h)
img = img[offset:original_h, 0:original_w]
top_img = cv2.resize(img, (720, 1088))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = img / 255.0
img = torch.from_numpy(img).permute(2, 0, 1).float().unsqueeze(0).to(device) # use device here
yield top_img # yield the image tensor
counter += 1
else:
logger.info(f"Top camera failed to grab an image")
top_img = np.zeros((1088, 720, 3))
top_img = cv2.resize(top_img, (0, 0), fx=0.5, fy=0.5)
yield top_img
finally:
top_camera.Close()
def detect_livestream(weights, source='1', img_size=640, conf_thres=0.5, iou_thres=0.5, device='', augment=False,
only_right_face=None):
set_logging()
device = select_device(device)
half = device.type != 'cpu'
# Make sure img_size is an even number
img_size = img_size if img_size % 2 == 0 else img_size + 1
# Load model
model = attempt_load(weights, map_location=device)
imgsz = check_img_size(img_size, s=model.stride.max())
if half:
model.half()
# Get colors
names = model.module.names if hasattr(model, 'module') else model.names
colors = [[random.randint(0, 255) for _ in range(3)] for _ in range(len(names))]
# Dataloader
dataloader = DataLoader(dataset, batch_size=1, shuffle=False)
if source == '1':
for frame in run_baslercamera(detection_args, device):
im0s = frame.copy()
# Resize, convert to RGB, normalize, and convert to tensor for inference
img = cv2.resize(frame, (img_size, img_size))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = img / 255.0
img = np.ascontiguousarray(img.transpose((2, 0, 1))) # HWC to CHW
img = torch.from_numpy(img).float().unsqueeze(0).to(device)
print(f"Input shape: {img.shape}")
# Inference (call model by passing image object, not path folder)
t1 = time_synchronized()
pred = model(img, augment=augment)[0] # Pass the image tensor directly to the model
# NMS
pred = non_max_suppression(pred, conf_thres, iou_thres)
t2 = time_synchronized()
# Process detections
for i, det in enumerate(pred): # detections per image
if det is not None and len(det):
# Rescale boxes from img_size to im0 size
det[:, :4] = scale_coords(img.shape[2:], det[:, :4], im0s.shape).round()
# Write results
classifiedItems = [] # Create an empty list to store items with bounding boxes
for *xyxy, conf, cls in reversed(det):
label = f'{names[int(cls)]} {conf:.2f}'
# Update label to include package type
label = f'{names[int(cls)]} {conf:.2f}'
classifiedItems.append((xyxy, label, colors[int(cls)], conf)) # Add item details to the list
draw_border_boxes_Live(classifiedItems, im0s)
# Stream results
cv2.imshow('Live Stream', im0s) # display the window with the name 'Live Stream'
if cv2.waitKey(1) == ord('q'): # q to quit
break
else:
print('Invalid source. Only source "1" is supported.')
if __name__ == '__main__':
weights = str(Path('../weights/best.pt'))
source = '1'
img_size = 640 # inference size (pixels)
conf_thres = 0.5 # object confidence threshold
iou_thres = 0.5
device = ''
augment = False
# Creating the detection_args namespace
detection_args = argparse.Namespace(weights=weights, source=source, img_size=img_size,
conf_thres=conf_thres, iou_thres=iou_thres,
device=device, augment=augment)
detect_livestream(weights, source, img_size, conf_thres, iou_thres, device, augment)