import argparse import json import math import os import time import urllib.request from pathlib import Path from typing import Dict, List, Optional, Tuple import cv2 import numpy as np import torch import torchvision from scipy.signal import savgol_filter from ultralytics import YOLO # Define COCO keypoint names KEYPOINT_NAMES = [ "nose", "left_eye", "right_eye", "left_ear", "right_ear", "left_shoulder", "right_shoulder", "left_elbow", "right_elbow", "left_wrist", "right_wrist", "left_hip", "right_hip", "left_knee", "right_knee", "left_ankle", "right_ankle" ] # Define skeleton connections POSE_CONNECTIONS = [ (0, 1), (0, 2), # nose to eyes (1, 3), (2, 4), # eyes to ears (5, 6), # shoulders (5, 7), (7, 9), # left arm (6, 8), (8, 10), # right arm (5, 11), (6, 12), # shoulders to hips (11, 12), # hips (11, 13), (13, 15), # left leg (12, 14), (14, 16) # right leg ] # Monkey patch torchvision NMS to handle CUDA compatibility issues original_nms = torchvision.ops.nms def patched_nms(boxes, scores, iou_threshold): """ Custom NMS implementation that handles the CUDA compatibility issue by temporarily moving tensors to CPU, running NMS, and moving back to original device """ device = boxes.device if device.type == 'cuda': try: # Try to run NMS on CUDA directly return original_nms(boxes, scores, iou_threshold) except RuntimeError as e: if "Could not run 'torchvision::nms'" in str(e): # If CUDA NMS fails, temporarily move to CPU, run NMS, then back to GPU cpu_boxes = boxes.cpu() cpu_scores = scores.cpu() keep = original_nms(cpu_boxes, cpu_scores, iou_threshold) # Move result back to original device return keep.to(device) else: raise else: # For non-CUDA devices, just run the original NMS return original_nms(boxes, scores, iou_threshold) # Apply the monkey patch torchvision.ops.nms = patched_nms def download_video(url: str, output_dir: str = "downloaded_videos") -> str: """Download a video from a URL and return the local file path""" os.makedirs(output_dir, exist_ok=True) video_name = os.path.basename(url).split("?")[0] if not video_name or "." not in video_name: video_name = f"video_{int(time.time())}.mp4" output_path = os.path.join(output_dir, video_name) print(f"⬇️ Downloading video from {url} to {output_path}...") urllib.request.urlretrieve(url, output_path) print(f"✅ Video downloaded successfully to {output_path}") return output_path def normalize_landmarks_per_person(people_landmarks: List[Dict], window_size: int = 5, poly_order: int = 4) -> List[Dict]: """Normalize landmarks over time for each person using Savitzky-Golay filter""" if not people_landmarks: return people_landmarks # Reorganize by person ID person_data = {} for frame_data in people_landmarks: frame_num = frame_data['frame'] timestamp = frame_data['timestamp'] for person in frame_data['people']: person_id = person['person_id'] if person_id not in person_data: person_data[person_id] = { 'frames': [], 'timestamps': [], 'landmarks': [] } person_data[person_id]['frames'].append(frame_num) person_data[person_id]['timestamps'].append(timestamp) person_data[person_id]['landmarks'].append(person['landmarks']) # Normalize each person's landmarks for person_id, data in person_data.items(): if len(data['landmarks']) >= window_size: data['landmarks'] = normalize_landmarks( data['landmarks'], window_size=window_size, poly_order=poly_order ) # Reconstruct the frame data structure normalized_data = [] for frame_data in people_landmarks: frame_num = frame_data['frame'] timestamp = frame_data['timestamp'] new_people = [] for person in frame_data['people']: person_id = person['person_id'] idx = person_data[person_id]['frames'].index(frame_num) new_people.append({ 'person_id': person_id, 'bbox': person['bbox'], 'landmarks': person_data[person_id]['landmarks'][idx] }) normalized_data.append({ 'frame': frame_num, 'timestamp': timestamp, 'people': new_people }) return normalized_data def normalize_landmarks(landmarks: List[List[Dict]], window_size: int = 5, poly_order: int = 4) -> List[List[Dict]]: """Normalize landmarks over time using Savitzky-Golay filter to smooth motion""" if not landmarks or len(landmarks) < window_size: return landmarks # Ensure window_size is odd if window_size % 2 == 0: window_size += 1 # Check if all frames have the same number of landmarks if not all(len(frame) == len(landmarks[0]) for frame in landmarks): # If inconsistent landmark counts, use a simpler approach (frame by frame smoothing) print("⚠️ Warning: Inconsistent landmark counts across frames. Using simplified smoothing.") return landmarks # Extract x, y values for each landmark landmark_count = len(landmarks[0]) x_values = np.zeros((len(landmarks), landmark_count)) y_values = np.zeros((len(landmarks), landmark_count)) conf_values = np.zeros((len(landmarks), landmark_count)) for i, frame_landmarks in enumerate(landmarks): for j, landmark in enumerate(frame_landmarks): x_values[i, j] = landmark['x'] y_values[i, j] = landmark['y'] conf_values[i, j] = landmark['confidence'] # Apply Savitzky-Golay filter to smooth x, y trajectories x_smooth = savgol_filter(x_values, window_size, poly_order, axis=0) y_smooth = savgol_filter(y_values, window_size, poly_order, axis=0) # Reconstruct normalized landmarks normalized_landmarks = [] for i in range(len(landmarks)): frame_landmarks = [] for j in range(landmark_count): frame_landmarks.append({ 'idx': j, 'x': float(x_smooth[i, j]), 'y': float(y_smooth[i, j]), 'confidence': float(conf_values[i, j]) }) normalized_landmarks.append(frame_landmarks) return normalized_landmarks def calculate_iou(box1, box2): """Calculate IoU (Intersection over Union) between two bounding boxes""" # Extract coordinates x1_1, y1_1, x2_1, y2_1 = box1 x1_2, y1_2, x2_2, y2_2 = box2 # Calculate intersection area x_left = max(x1_1, x1_2) y_top = max(y1_1, y1_2) x_right = min(x2_1, x2_2) y_bottom = min(y2_1, y2_2) if x_right < x_left or y_bottom < y_top: return 0.0 intersection_area = (x_right - x_left) * (y_bottom - y_top) # Calculate union area box1_area = (x2_1 - x1_1) * (y2_1 - y1_1) box2_area = (x2_2 - x1_2) * (y2_2 - y1_2) union_area = box1_area + box2_area - intersection_area return intersection_area / union_area if union_area > 0 else 0 def calculate_keypoint_distance(landmarks1, landmarks2): """Calculate average distance between corresponding keypoints""" if not landmarks1 or not landmarks2: return float('inf') # Create dictionary for fast lookup kps1 = {lm['idx']: (lm['x'], lm['y']) for lm in landmarks1} kps2 = {lm['idx']: (lm['x'], lm['y']) for lm in landmarks2} # Find common keypoints common_idx = set(kps1.keys()) & set(kps2.keys()) if not common_idx: return float('inf') # Calculate distance between corresponding keypoints total_dist = 0 for idx in common_idx: x1, y1 = kps1[idx] x2, y2 = kps2[idx] dist = math.sqrt((x1 - x2)**2 + (y1 - y2)**2) total_dist += dist return total_dist / len(common_idx) def assign_person_ids(current_people, previous_people, iou_threshold=0.3, distance_threshold=0.2): """Assign stable IDs to people across frames based on IOU and keypoint distance""" if not previous_people: # First frame, assign new IDs to everyone next_id = 0 for person in current_people: person['person_id'] = next_id next_id += 1 return current_people # Create copy of current people to modify assigned_people = [] unassigned_current = current_people.copy() # Try to match current detections with previous ones matched_prev_ids = set() # Sort previous people by ID to maintain consistency in matching sorted_prev = sorted(previous_people, key=lambda x: x['person_id']) for prev_person in sorted_prev: prev_id = prev_person['person_id'] prev_box = prev_person['bbox'] prev_landmarks = prev_person['landmarks'] best_match = None best_score = float('inf') # Lower is better for distance for curr_person in unassigned_current: curr_box = curr_person['bbox'] curr_landmarks = curr_person['landmarks'] # Calculate IoU between bounding boxes iou = calculate_iou(prev_box, curr_box) # Calculate keypoint distance kp_dist = calculate_keypoint_distance(prev_landmarks, curr_landmarks) # Combined score (lower is better) score = kp_dist * (1.5 - iou) # Favor high IoU and low distance if (iou >= iou_threshold or kp_dist <= distance_threshold) and score < best_score: best_match = curr_person best_score = score if best_match: # Assign the previous ID to this person best_match['person_id'] = prev_id matched_prev_ids.add(prev_id) assigned_people.append(best_match) unassigned_current.remove(best_match) # Find the next available ID next_id = 0 existing_ids = {p['person_id'] for p in previous_people} while next_id in existing_ids: next_id += 1 # Assign new IDs to unmatched current detections for person in unassigned_current: person['person_id'] = next_id assigned_people.append(person) next_id += 1 return assigned_people def compress_pose_data(all_frame_data, frame_sampling=1, precision=3): """Compress pose data to reduce JSON file size by reducing precision and sampling frames""" compressed_data = [] # Process only every nth frame based on sampling rate for i, frame_data in enumerate(all_frame_data): if i % frame_sampling != 0: continue # Compress frame data compressed_frame = { 'f': frame_data['frame'], # Short key name 't': round(frame_data['timestamp'], 2), # Reduce timestamp precision 'p': [] # Short key for people } # Process each person for person in frame_data['people']: # Only keep essential bbox info (we only need width/height for visualization) x1, y1, x2, y2 = person['bbox'] width = x2 - x1 height = y2 - y1 compressed_person = { 'id': person['person_id'], # Keep ID as is 'b': [round(x1, 1), round(y1, 1), round(width, 1), round(height, 1)], # Simplified bbox with less precision 'k': [] # Short key for keypoints/landmarks } # Process each landmark with reduced precision for lm in person['landmarks']: compressed_person['k'].append([ lm['idx'], # Keep index as is (small integer) round(lm['x'], precision), # Reduce coordinate precision round(lm['y'], precision), # Reduce coordinate precision round(lm['confidence'], 2) # Reduce confidence precision ]) compressed_frame['p'].append(compressed_person) compressed_data.append(compressed_frame) return compressed_data def process_frame(frame: np.ndarray, model, detection_threshold: float = 0.5, show_preview: bool = False): """Process a single frame with YOLOv11-pose, handling multiple people""" # Process with YOLO try: results = model.predict(frame, verbose=False, conf=detection_threshold) # Extract keypoints if available processed_frame = None people_data = [] # Get frame dimensions h, w = frame.shape[:2] if results and len(results[0].keypoints.data) > 0: # Get all keypoints and bounding boxes keypoints = results[0].keypoints.data # [num_people, 17, 3] - (x, y, confidence) boxes = results[0].boxes.xyxy.cpu() # [num_people, 4] - (x1, y1, x2, y2) for i, (kps, box) in enumerate(zip(keypoints, boxes)): # Extract keypoints to landmarks_data landmarks_data = [] for idx, kp in enumerate(kps): x, y, conf = kp.tolist() if conf >= detection_threshold: landmarks_data.append({ 'idx': idx, 'x': round(x / w, 4), # Normalize to 0-1 range with 4 decimal precision 'y': round(y / h, 4), # Normalize to 0-1 range with 4 decimal precision 'confidence': round(conf, 2) # Reduce confidence to 2 decimal places }) if landmarks_data: # Only add if we have valid landmarks # Add bounding box and landmarks for this person people_data.append({ 'bbox': box.tolist(), # Store unnormalized for IoU calculation 'landmarks': landmarks_data # Store normalized for consistency }) # Create visualization if preview is enabled if show_preview: processed_frame = results[0].plot() # Add person IDs to the visualization if they're already assigned for person in people_data: if 'person_id' in person: # Get center of bounding box x1, y1, x2, y2 = person['bbox'] center_x = int((x1 + x2) / 2) center_y = int(y1) # Top of the bbox # Draw ID text cv2.putText( processed_frame, f"ID: {person['person_id']}", (center_x, center_y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2 ) return processed_frame, people_data except RuntimeError as e: # Check if this is an NMS backend error if "Could not run 'torchvision::nms'" in str(e): raise RuntimeError("CUDA NMS Error") else: # Re-raise if it's a different error raise def run_pose_detection( input_source, output_file=None, normalize=True, detection_threshold=0.5, filter_window_size=7, filter_poly_order=4, model_size='n', device='auto', show_preview=True, batch_size=1, frame_sampling=1, # New parameter to control frame sampling rate precision=3 # New parameter to control coordinate precision ): """YOLOv11 pose detection with CUDA acceleration, properly handling NMS issues""" start_time = time.time() # Handle URL input if input_source and isinstance(input_source, str) and ( input_source.startswith('http://') or input_source.startswith('https://') or input_source.startswith('rtsp://') ): input_source = download_video(input_source) # Check if CUDA is available when requested if 'cuda' in device and not torch.cuda.is_available(): print(f"⚠️ CUDA requested but not available. Falling back to CPU.") device = 'cpu' # Check if MPS is available when requested if device == 'mps' and not (hasattr(torch, 'mps') and torch.backends.mps.is_available()): print(f"⚠️ MPS (Apple Silicon) requested but not available. Falling back to CPU.") device = 'cpu' # Load YOLOv11-pose model with specified device model_name = f"yolo11{model_size.lower()}-pose.pt" print(f"🔍 Loading {model_name} on {device}...") # Apply NMS patch for CUDA device if 'cuda' in device: print("💪 Applying CUDA-compatible NMS patch (keeping all processing on GPU)") try: # Load model with specified device model = YOLO(model_name) if device != 'auto': model.to(device) print(f"✅ Model loaded on {model.device}") except Exception as e: print(f"❌ Error loading model: {str(e)}") return # Initialize video capture if isinstance(input_source, int) or (isinstance(input_source, str) and input_source.isdigit()): cap = cv2.VideoCapture(int(input_source)) source_name = f"Webcam {input_source}" else: if not os.path.isfile(input_source): print(f"❌ Error: Video file '{input_source}' not found") return cap = cv2.VideoCapture(input_source) source_name = f"Video: {os.path.basename(input_source)}" if not cap.isOpened(): print(f"❌ Error: Could not open {source_name}") return # Get video properties fps = cap.get(cv2.CAP_PROP_FPS) frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if fps <= 0: fps = 30 print(f"▶️ Processing {source_name}: {frame_width}x{frame_height}@{fps:.2f}fps") # Create window if preview is enabled if show_preview: window_name = "YOLOv11 Pose" cv2.namedWindow(window_name, cv2.WINDOW_NORMAL) # Initialize variables for tracking all_frame_data = [] processed_frames = 0 last_people_data = [] last_fps_update = time.time() current_fps = 0 total_people_detected = 0 # Main processing loop print("⏳ Processing frames...") while cap.isOpened(): success, frame = cap.read() if not success: break try: # Process the frame processed_frame, people_data = process_frame( frame, model, detection_threshold, show_preview ) # Assign stable person IDs if people_data: people_data = assign_person_ids(people_data, last_people_data) last_people_data = people_data.copy() # Store frame data with people frame_data = { 'frame': processed_frames, 'timestamp': processed_frames / fps if fps > 0 else time.time() - start_time, 'people': people_data } all_frame_data.append(frame_data) total_people_detected += len(people_data) except RuntimeError as e: if str(e) == "CUDA NMS Error": print("⚠️ CUDA NMS error detected. Switching to CPU for processing.") # Skip this frame and try again with CPU model continue else: # Re-raise if it's a different error raise # Show preview if enabled if show_preview and processed_frame is not None: # Calculate FPS if time.time() - last_fps_update > 1.0: # Update FPS every second current_fps = int(1.0 / ((time.time() - last_fps_update) / max(1, processed_frames % 30))) last_fps_update = time.time() # Add FPS and progress info cv2.putText( processed_frame, f"FPS: {current_fps} | Frame: {processed_frames}/{total_frames}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2 ) # Show CUDA status cv2.putText( processed_frame, f"Device: {model.device} | People: {len(people_data) if people_data else 0}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2 ) # Show frame cv2.imshow(window_name, processed_frame) # Exit on 'q' or ESC key = cv2.waitKey(1) & 0xFF if key == ord('q') or key == 27: break processed_frames += 1 # Print progress if processed_frames % 100 == 0: percent_done = (processed_frames / total_frames * 100) if total_frames > 0 else 0 print(f"Progress: {processed_frames} frames ({percent_done:.1f}%)") # Calculate performance metrics elapsed_time = time.time() - start_time effective_fps = processed_frames / elapsed_time if elapsed_time > 0 else 0 print(f"⏱️ Processed {processed_frames} frames in {elapsed_time:.2f}s ({effective_fps:.2f} fps)") if all_frame_data: unique_people = set() for frame in all_frame_data: for person in frame['people']: unique_people.add(person['person_id']) print(f"🧮 Detected {len(all_frame_data)} frames with poses ({len(all_frame_data)/max(1, processed_frames)*100:.1f}%)") print(f"👥 Detected {len(unique_people)} unique people with {total_people_detected} total detections") else: print(f"⚠️ No poses detected. Try adjusting detection threshold or check the video content.") # Save results if output file is specified if output_file and all_frame_data: output_dir = os.path.dirname(output_file) if output_dir: os.makedirs(output_dir, exist_ok=True) # Apply normalization if requested if normalize and len(all_frame_data) > filter_window_size: print(f"🔄 Normalizing data for each person...") all_frame_data = normalize_landmarks_per_person( all_frame_data, window_size=filter_window_size, poly_order=filter_poly_order ) # Compress data to reduce file size print(f"🗜️ Compressing data (frame sampling: {frame_sampling}, precision: {precision})...") compressed_frames = compress_pose_data(all_frame_data, frame_sampling, precision) actual_frames_saved = len(compressed_frames) # Calculate compression ratio original_frame_count = len(all_frame_data) compression_ratio = (original_frame_count - actual_frames_saved) / original_frame_count * 100 print(f"📊 Compression: {original_frame_count} frames reduced to {actual_frames_saved} ({compression_ratio:.1f}% reduction)") # Create output in compatible format with compressed frames json_data = { 'src': source_name, # Shortened key 'w': frame_width, # Shortened key 'h': frame_height, # Shortened key 'fps': fps, 'frames': processed_frames, 'keypoints': KEYPOINT_NAMES, # More descriptive key 'connections': [{'s': c[0], 'e': c[1]} for c in POSE_CONNECTIONS], # Shortened keys 'data': compressed_frames, # Use compressed data 'meta': { # Shortened key 'model': f"YOLOv11-{model_size}-pose", 'device': str(model.device), 'normalized': normalize, 'threshold': detection_threshold, 'filter_size': filter_window_size if normalize else None, 'filter_order': filter_poly_order if normalize else None, 'frame_sampling': frame_sampling, 'precision': precision, 'created': time.strftime('%Y-%m-%d %H:%M:%S') } } # Save to file with open(output_file, 'w') as f: json.dump(json_data, f) file_size_mb = os.path.getsize(output_file) / (1024 * 1024) print(f"💾 Saved tracking data to {output_file} ({file_size_mb:.2f} MB)") elif output_file: print(f"⚠️ No pose data to save. Output file was not created.") # Release resources cap.release() if show_preview: cv2.destroyAllWindows() # Restore original NMS function torchvision.ops.nms = original_nms return all_frame_data def main(): # Set up simple argument parser parser = argparse.ArgumentParser( description='YOLOv11 Pose Detection for JD-Clone with CUDA acceleration', formatter_class=argparse.ArgumentDefaultsHelpFormatter ) # Essential arguments parser.add_argument('--input', '-i', required=False, help='Input source (path to video file, URL, or camera index like "0" for webcam)') parser.add_argument('--camera', '-c', action='store_true', help='Use default webcam (camera 0) as input source') parser.add_argument('--output', '-o', required=False, help='Output JSON file to save pose data (optional for camera mode)') parser.add_argument('--model', type=str, default='n', choices=['n', 's', 'm', 'l', 'x'], help='YOLOv11 model size (n=nano, s=small, m=medium, l=large, x=xlarge)') parser.add_argument('--device', type=str, default='auto', help='Computation device (cpu, cuda:0, auto, mps)') # Additional options parser.add_argument('--no-preview', action='store_true', help='Disable video preview') parser.add_argument('--no-normalize', action='store_true', help='Disable pose normalization') parser.add_argument('--detection-threshold', type=float, default=0.5, help='Threshold for pose detection confidence (0.0-1.0)') parser.add_argument('--filter-window', type=int, default=7, help='Window size for smoothing filter (must be odd, larger = smoother)') parser.add_argument('--filter-order', type=int, default=4, help='Polynomial order for smoothing filter (1-4)') parser.add_argument('--batch-size', type=int, default=4, help='Batch size for processing (higher uses more VRAM but can be faster)') parser.add_argument('--frame-sampling', type=int, default=2, help='Save only every Nth frame (1=all frames, 2=half, 4=quarter, etc.)') parser.add_argument('--precision', type=int, default=3, choices=[2, 3, 4], help='Decimal precision for coordinates (2-4, lower=smaller file)') args = parser.parse_args() # Handle camera/input source logic if args.camera: input_source = 0 # Default webcam print("📷 Using default webcam (camera 0)") elif args.input: input_source = args.input else: parser.error("Either --input/-i or --camera/-c must be specified") # Output is optional for camera mode if not args.output and not args.camera: parser.error("--output/-o is required when not using camera mode") # Validate filter window size if args.filter_window % 2 == 0: args.filter_window += 1 # Print configuration print("\n" + "="*50) print("📹 JD-Clone YOLOv11 Pose Detector") print("="*50) print(f"• Input: {input_source if not args.camera else 'Webcam (camera 0)'}") print(f"• Output: {args.output if args.output else 'None (preview only)'}") print(f"• Model: YOLOv11-{args.model}") print(f"• Device: {args.device}") print(f"• Preview: {'Disabled' if args.no_preview else 'Enabled'}") print(f"• Normalization: {'Disabled' if args.no_normalize else 'Enabled'}") print(f"• Frame sampling: Every {args.frame_sampling} frame(s)") print(f"• Coordinate precision: {args.precision} decimal places") print("="*50 + "\n") # Run pose detection try: run_pose_detection( input_source=input_source, output_file=args.output, normalize=not args.no_normalize, detection_threshold=args.detection_threshold, filter_window_size=args.filter_window, filter_poly_order=args.filter_order, model_size=args.model, device=args.device, show_preview=not args.no_preview, batch_size=args.batch_size, frame_sampling=args.frame_sampling, precision=args.precision ) except KeyboardInterrupt: print("\n⏹️ Process interrupted by user") except Exception as e: print(f"\n❌ Error: {str(e)}") import traceback traceback.print_exc() finally: print("👋 Done!") cv2.destroyAllWindows() if __name__ == "__main__": main()