社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  机器学习算法

利用深度学习进行实时人员跟踪和识别

小白玩转Python • 7 月前 • 238 次点击  

点击下方卡片,关注“小白玩转Python”公众号


在技术快速发展的时代,利用深度学习进行人员识别和追踪彻底改变了监控和安全系统。


深度学习基础知识


要理解如何使用深度学习进行人物识别和跟踪,掌握深度学习的基础知识至关重要。深度学习是机器学习的一个子集,它采用人工神经网络,可以自动从大量数据中学习模式和表示。卷积神经网络 (CNN) 和循环神经网络 (RNN) 是许多用于此目的的深度学习模型的支柱


物体检测

物体检测是识别图像或视频流中人物的关键步骤。深度学习模型(例如单次多框检测器 (SSD) 和仅看一次 (YOLO))处于实时物体检测的前沿。这些模型可以以惊人的准确度定位和分类图像或视频帧中的个人。


人脸识别

人脸识别是人脸识别的一个子领域,由于深度学习,人脸识别取得了重大进展。深度神经网络(包括 FaceNet 和 VGGFace)使得根据面部特征准确识别个人成为可能。这些系统广泛用于访问控制、执法和身份验证目的。
class FaceNet:    def __init__(        self, 


    
        detector: object,        onnx_model_path: str = "assets/models/facenet512_weights.onnx",         anchors: typing.Union[dict] = data,        force_cpu: bool = False,        threshold: float = 0.5,        color: tuple = (255, 255, 255),        thickness: int = 2,        ) -> None:        if not stow.exists(onnx_model_path):            raise Exception(f"Model doesn't exists in {onnx_model_path}")
self.detector = detector self.threshold = threshold self.color = color self.thickness = thickness
providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
providers = providers if ort.get_device() == "GPU" and not force_cpu else providers[::-1]
self.ort_sess = ort.InferenceSession(onnx_model_path, providers=providers)
self.input_shape = self.ort_sess._inputs_meta[0].shape[1:3] self.anchors = self.load_anchors(anchors) if isinstance(anchors, str) else anchors
def normalize(self, img: np.ndarray) -> np.ndarray: mean, std = img.mean(), img.std() return (img - mean) / std
def l2_normalize(self, x: np.ndarray, axis: int = -1, epsilon: float = 1e-10) -> np.ndarray: output = x / np.sqrt(np.maximum(np.sum(np.square(x), axis=axis, keepdims=True), epsilon)) return output
def detect_save_faces(self, image: np.ndarray, output_dir: str = "faces"): face_crops = [image[t:b, l:r] for t, l, b, r in self.detector(image, return_tlbr=True)] # face_crops = [face for f in self.detector(image,return_tlbr=True)] if face_crops == []: return False
stow.mkdir(output_dir)
for index, crop in enumerate(face_crops): output_path = stow.join(output_dir, f"face_{str(index)}.png") cv2.imwrite(output_path, crop) print("Crop saved to:", output_path)
self.anchors = self.load_anchors(output_dir) return True
def load_anchors(self, faces_path: str): anchors = {} if not stow.exists(faces_path): return {}
for face_path in stow.ls(faces_path): anchors[stow.basename(face_path)] = self.encode(cv2.imread(face_path.path))
return anchors
def encode(self, face_image: np.ndarray) -> np.ndarray: face = self.normalize(face_image) face = cv2.resize(face, self.input_shape).astype(np.float32)
encode = self.ort_sess.run(None, {self.ort_sess._inputs_meta[0].name: np.expand_dims(face, axis=0)})[0][0] normalized_encode = self.l2_normalize(encode)
return normalized_encode def l1_distance(self, a: np.ndarray, b: typing.Union[np.ndarray, list]) -> np.ndarray: if isinstance(a, list): a = np.array(a)
if isinstance(b, list): b = np.array(b)
return np.sum(np.abs(a - b)) def cosine_distance(self, a: np.ndarray, b: typing.Union[np.ndarray, list]) -> np.ndarray: if isinstance(a, list): a = np.array(a)
if isinstance(b, list): b = np.array(b)
return np.dot(a, b.T) / (np.linalg.norm(a) * np.linalg.norm(b))
def draw(self, image: np.ndarray, face_crops: dict): for value in face_crops.values(): t, l, b, r = value["tlbr"] cv2.rectangle(image, (l, t), (r, b), self.color, self.thickness) name = stow.name(value['name']) name = name.rsplit('_')[0] cv2.putText(image, name, (l, t - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, self.color, self.thickness)
return image
def __call__(self, frame: np.ndarray,face_frame=False) -> np.ndarray: names = None if not face_frame: face_crops = {index: {"name": "", "tlbr": tlbr} for index, tlbr in enumerate(self.detector(frame, return_tlbr=True))} for key, value in face_crops.items(): t, l, b, r = value["tlbr"] face_encoding = self.encode(frame[t:b, l:r]) distances = self.cosine_distance(face_encoding, list(self.anchors.values())) if np.max(distances) > self.threshold: face_crops[key]["name"] = list(self.anchors.keys())[np.argmax(distances)] names = face_crops[key]["name"] names = names.rsplit('_')[0] print(names,np.max(distances)) else: face_encoding = self.encode(frame) distances = self.cosine_distance(face_encoding, list(self.anchors.values())) if np.max(distances) > self.threshold: names = list(self.anchors.keys())[np.argmax(distances)].rsplit('_')[0] print(names,np.max(distances))
return names
跟踪算法

人员跟踪涉及监控和跟踪个人在场景中或视频帧中移动的情况。DeepSORT(单对象跟踪深度学习)和 SORT(简单在线实时跟踪)是利用深度学习来提高准确性和稳健性的跟踪算法的示例。


方法


该系统的工作原理如下:

  • 该应用程序从相机中抓取新的帧。
  • 物体检测系统处理帧并从场景中提取人物。对于每个人,都会裁剪帧的一个子区域以进行详细处理。
  • 每个人的区域都经过面部检测算法的处理,从身体中提取出一个人的脸部。

  • 每张脸都经过人脸识别系统扫描,系统会将当前人脸与数据库中存储的人脸进行比较。如果识别出人脸,则返回姓名,否则返回“未定义”。

将识别与唯一的跟踪 ID 关联起来是人物识别和跟踪系统中一种常见且有效的方法,尤其是在个人可能进出视野或暂时遮挡脸部的情况下。此方法可确保即使在给定帧中某人的脸部暂时被遮挡或不再可见,系统仍可以根据其分配的跟踪 ID 识别他们。其工作原理如下:

  • 分配跟踪 ID:系统为检测到的人分配唯一的跟踪 ID。此 ID 与他们的面部特征和其他相关信息相关联。

  • 持续跟踪:随着视频流或帧的进展,跟踪算法会持续监控个人的动作和外观。即使一个人的脸暂时被遮挡或不再可见,系统仍会根据其唯一的跟踪 ID 跟踪他们的动作。

  • 重新识别:当人的脸部变得不可见时,系统可以通过匹配其当前跟踪 ID 来重新识别他们。即使在具有挑战性的场景中,也可以跨不同帧无缝跟踪个人。
#**name is the output of face recognition calling**if bool(name):  to_remove = []  for key, value in id_face_dictionary.items():      if value == name:          if id != key:                  to_remove.append(key)          loggers["recognition"].info(f"{name} already in dict. ID: {id}")  for k in to_remove:      id_face_dictionary.pop(k)
#once deleted, we add new key id_face_dictionary[id] = name loggers["recognition"].info(f"Added {name} to key {id}")
通过使用跟踪 ID,系统可以在整个视频或帧序列中为每个人保持一致的身份,确保即使脸部始终不可见也能保持识别。这种方法在包括视频监控在内的各种应用中都很有价值,在这些应用中,持续跟踪和识别对于安全和分析目的至关重要。

优化实时物体检测和追踪

实时物体检测和跟踪是各种应用(例如监控、自动驾驶和交互系统)中的关键组件。然而,在每帧 30 毫秒 (ms) 的严格时间限制内执行检测和跟踪(相当于每秒 30 帧 (fps) 的帧速率)会带来巨大的计算挑战。为了克服这个问题,我们提出了一种多线程架构,将处理分为三个独立线程:核心、检测器和识别器。每个线程都设计为同时运行,从而减少处理延迟和资源争用。


核心线程:应用程序管理器
核心线程充当中央协调器。其主要功能是:

  • 直接从摄像机输入获取视频帧。

  • 立即将这些帧发送到检测器线程。

  • 从检测器和识别器线程收集处理后的数据。

  • 显示检测到的物体和识别出的实体的结果帧。

此线程可确保始终处理最新的帧。如果检测器线程处理一帧的时间超过30 毫秒,则核心线程会跳过该线程,从而避免积压并确保实时性能,而无需对帧进行排队。
while vid.isOpened():  ret, frame = vid.read()  # out = None  if ret:      if queuepulls == 1:          timer2 = time.time()      # Capture frame-by-frame      # if the input queue *is* empty, give the current frame to


    
      # classify      if inputQueue.empty():          inputQueue.put(frame)      else:          loggers["general"].debug("Skipping frame from face detection")                   # if the output queue *is not* empty, grab the detections      if not outputQueue.empty():          out = outputQueue.get()      if out is not None:          queuepulls += 1          for output in out:              bbox_left = int(output[0])              bbox_top = int(output[1])              bbox_w = int(output[2])               bbox_h = int(output[3])              if output.shape[0] == 7:                  id = int(output[4])                  prev_id = id              else:                  id =prev_id              if id in id_face_dictionary:                  name = id_face_dictionary[id]              else:                  name = "undefined"              color = (255,0,0) # Use your custom color              drawPerson(frame,bbox_left,bbox_top,bbox_w,bbox_h,name,color)        cv2.imshow('frame', frame)      if cv2.waitKey(1) & 0xFF == ord('q'):          vid.release()          cv2.destroyAllWindows()          p.kill()          pRec.kill()          break
检测器线程:对象检测引擎

检测器线程在无限循环中运行,其任务是:

  • 在当前帧上执行对象检测算法。

  • 将检测结果发送回核心线程。

  • 将有关面部检测的信息转发到识别器线程。

该探测器注重速度和准确性,采用优化的算法,能够在 30 毫秒的时间内识别各种物体。
def object_detection_(model_path,confidence,inputQueue,outputQueue,recognitionQueue):    global id_face_dictionary    yolov8_detector = YOLO(model_path)    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')    yolov8_detector.to(device)    loggers['tracking'].info("Detection initialized")    while True:        if not inputQueue.empty():            frame = inputQueue.get()            result = yolov8_detector.track(frame,verbose=


    
False,conf=confidence,persist=True)[0] #Verbose False to avoid yolov8 messages            data = result.cpu().numpy().boxes.data            outputQueue.put(data)            if recognitionQueue.empty():                recognitionQueue.put((data,frame))
识别线程:识别专家

与检测器并行,识别器线程负责:

  • 对检测到的面部数据执行面部识别任务。

  • 将识别结果传回核心线程。

它还以无限循环的方式运行,检查来自探测器的新数据并立即处理以识别视频帧中的个人或特征。
def recognize_algorithm(model_path,recognitionQueue,id_face_dictionary,confidence):    detector = face_detector.FaceDetection()    recog = face_recognition.FaceNet(        detector=detector,        threshold=confidence,        onnx_model_path = model_path)    loggers['recognition'].info("Recognition initialized")    while True:        if not recognitionQueue.empty():            out = recognitionQueue.get()            frame = out[1]            boxes = out[0]            for output in boxes:                 bbox_left = int(output[0])                bbox_top = int(output[1])                bbox_w = int(output[2])                 bbox_h = int(output[3])                id = int(output[4])                if bbox_w > 0 and bbox_h > 0:                    person_frame = frame[bbox_top:bbox_h,bbox_left:bbox_w,:]                    start_time = time.time()
name = recog(frame=person_frame,face_frame=True) loggers['recognition'].debug(f"RECOGNITION - Inference time: {round(time.time()-start_time,2)}")
if bool(name): to_remove = [] for key, value in id_face_dictionary.items(): if value == name: if id != key: to_remove.append(key) loggers["recognition"].info(f"{name} already in dict. ID: {id}") for k in to_remove: id_face_dictionary.pop(k)
#once deleted, we add new key id_face_dictionary[id] = name loggers["recognition"].info(f"Added {name} to key {id}")
线程间通信

线程间通信是此架构的基石。它允许异步处理帧,其中每个线程独立检查新帧并处理它们。这种设计确保系统始终在最新的可用帧上工作,从而保持实时性能而不会出现延迟。每个线程通过Python 队列进行通信,并采用同步机制来防止竞争条件和数据损坏。

 

 inputQueue = Queue(maxsize=1)  outputQueue = Queue(maxsize=1)  recognitionQueue = Queue()  p = Process(target=object_detection_, args=(model_path,detection_confidence,inputQueue, outputQueue,recognitionQueue,))  p.daemon = True  p.start()
pRec = Process(target=recognize_algorithm, args=(recognition_model_path,recognitionQueue,id_face_dictionary,recognition_confidence,)) pRec.daemon = True pRec.start()

系统规格

该应用程序旨在无缝运行在 Python 上,让不同操作系统的广大用户都能使用它。它的跨平台兼容性确保它可以在 Windows、macOS 和各种 Linux 发行版等流行操作系统上使用。虽然该应用程序在操作系统支持方面用途广泛,但需要注意的是,为了获得最佳实时性能,强烈建议使用 GPU(图形处理单元),尤其是在使用资源密集型深度学习模型时。GPU 可以显著加速这些模型的执行,实现更快的处理速度并增强应用程序高效执行实时任务的能力。
·  END  ·


HAPPY LIFE

本文仅供学习交流使用,如有侵权请联系作者删除

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/171342
 
238 次点击