# Generated from: pose_detector.ipynb
# Converted at: 2026-06-18T16:52:52.930Z
# Next step (optional): refactor into modules & generate tests with RunCell
# Quick start: pip install runcell

!pip3 install mediapipe

import os, math, warnings
from collections import Counter

import numpy  as np
import pandas as pd
import cv2
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches

import mediapipe as mp
from mediapipe.tasks                         import python as mp_tasks
from mediapipe.tasks.python                  import vision as mp_vision
from mediapipe.tasks.python.core             import base_options as bo
from mediapipe.tasks.python.vision           import PoseLandmarker, PoseLandmarkerOptions, RunningMode
from mediapipe import Image, ImageFormat

warnings.filterwarnings("ignore")

print(f"MediaPipe  : {mp.__version__}")
print(f"OpenCV     : {cv2.__version__}")
print(f"NumPy      : {np.__version__}")
print(f"Pandas     : {pd.__version__}")


MODEL_PATH = "pose_landmarker_lite.task"

if os.path.exists(MODEL_PATH):
    print("path exists")
else:
    print("Model either in wrong location, or model not yet downloaded")

# ── Configuration ─────────────────────────────────────────────────────────────
TRAIN_DIR  = "datasets"        # root folder: one sub-folder per class
IMG_EXTS   = {".jpg", ".jpeg", ".png", ".bmp", ".webp"} # set containing the allowed image file extensions
OUTPUT_CSV = "posture_detection.csv" # filename where the extracted landmarks will be saved as CSV
# ──────────────────────────────────────────────────────────────────────────────

def collect_images(root: str) -> list:
    """
    Walk `root` and return [{"path": ..., "label": ...}, ...].
    The sub-folder name is used as the class label.
    """
    records = []
    for label in sorted(os.listdir(root)):
        class_dir = os.path.join(root, label) # Build full path to the class folder (e.g., TRAIN/Tree)
        if not os.path.isdir(class_dir): # Skip anything that is not a directory (e.g., files)
            continue # Continue to the next item
        for fname in sorted(os.listdir(class_dir)): # Loop through files inside the class directory
            if os.path.splitext(fname)[1].lower() in IMG_EXTS: # Get file extension and check if it's an allowed image type
                # Add a dictionary to the list:
                # "path"  -> full image path
                # "label" -> class name (folder name)
                records.append({"path": os.path.join(class_dir, fname), "label": label})
    return records


image_records = collect_images(TRAIN_DIR) # Call the function to collect all dataset images
counts = Counter(r["label"] for r in image_records) # Count how many images belong to each label

print(f"Total images found: {len(image_records)}") # Print total number of images collected
for label, n in counts.items(): # Loop through each label and its count 
    print(f"  {label:15s}: {n} images") # Print label name (left aligned in 15 chars) and number of images

def load_rgb(path: str):
    """
    Read an image with OpenCV and convert BGR → RGB.
    OpenCV loads in BGR order; MediaPipe and matplotlib both expect RGB.
    Returns None if the file cannot be read.
    """
    # Load the image from disk using OpenCV
    # Result is a NumPy array in BGR color order
    # Returns None if file is missing or unreadable
    img_bgr = cv2.imread(path) 
    if img_bgr is None: # Check if image loading failed
        return None
    else:
        return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) # Convert image from BGR to RGB color format


classes = sorted(counts.keys()) # Get all class names from the counts dictionary

# Create a matplotlib figure with 1 row and one column per class
# figsize dynamically scales width depending on number of classes
fig, axes = plt.subplots(1, len(classes), figsize=(5 * len(classes), 5))

# Loop through each subplot axis and each class name together
for ax, cls in zip(axes, classes):
    # Find the FIRST image belonging to this class
    # next(...) stops at the first match (used as sample image)
    record = next(r for r in image_records if r["label"] == cls) #!
    # Load the image using the helper function above
    img    = load_rgb(record["path"])
    if img is None:
        # Show warning in subplot title
        ax.set_title(f"{cls}\n⚠ LOAD FAILED")
    else:
        ax.imshow(img)
        # Show class name + image resolution
        ax.set_title(f"{cls}\n{img.shape[1]}×{img.shape[0]} px")
    # Hide x/y axis ticks for cleaner display
    ax.axis("off")

# Add a main title above all subplots
plt.suptitle("Sample image per class — RGB load check", fontsize=13)
# Automatically adjust spacing to prevent overlap
plt.tight_layout()
# Render the figure to the screen
plt.show()
print("✅ If images appear above, loading is working correctly.")

options = PoseLandmarkerOptions(
    base_options = bo.BaseOptions(model_asset_path=MODEL_PATH), # Tell MediaPipe which model file to load
    running_mode = RunningMode.IMAGE, # Set processing mode to single images Other options: VIDEO, LIVE_STREAM
    num_poses = 1, # Detect at most ONE person per image (he will pick the person closest to the camera)
    min_pose_detection_confidence = 0.5, # Minimum confidence required to detect a pose at all
    min_pose_presence_confidence = 0.5, # Confidence threshold that a pose is actually present
    min_tracking_confidence = 0.5, # Used mainly for video tracking, but still required
    output_segmentation_masks = False, # Disable body segmentation masks. Saves memory and speeds up processing since you only need landmarks
)

# The landmarker is used as a context manager so it is properly closed on exit.
# We will open it once for the whole batch in Step 6.
print("✅ PoseLandmarkerOptions configured.")

import cv2
import numpy as np

# Landmarks used for training
SELECTED_LANDMARKS = [
    0,      # Nose
    7, 8,   # Ears
    11, 12, # Shoulders
    23, 24
]

# Connections used for visualization
SELECTED_CONNECTIONS = [
    (11,12),    # shoulders

    (11,23),    # left upper arm
    (12,14),    # right upper arm

    (0,11),     # nose -> left shoulder
    (0,12),     # nose -> right shoulder

    (7,0),      # left ear -> nose
    (8,0),      # right ear -> nose
]

def draw_selected_landmarks(rgb_image, detection_result):
    annotated_image = np.copy(rgb_image)

    h, w, _ = annotated_image.shape

    for pose_landmarks in detection_result.pose_landmarks:

        # Draw connections
        for start_idx, end_idx in SELECTED_CONNECTIONS:

            start = pose_landmarks[start_idx]
            end = pose_landmarks[end_idx]

            x1, y1 = int(start.x * w), int(start.y * h)
            x2, y2 = int(end.x * w), int(end.y * h)

            cv2.line(
                annotated_image,
                (x1, y1),
                (x2, y2),
                (0, 255, 0),
                2
            )

        # Draw landmarks
        for idx in SELECTED_LANDMARKS:

            lm = pose_landmarks[idx]

            x = int(lm.x * w)
            y = int(lm.y * h)

            cv2.circle(
                annotated_image,
                (x, y),
                5,
                (255, 0, 0),
                -1
            )

            cv2.putText(
                annotated_image,
                str(idx),
                (x + 5, y - 5),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.4,
                (0, 255, 255),
                1
            )

    return annotated_image

# Run on the first image as a visual sanity check
sample = image_records[0]

# Load image in RGB format using helper function
img    = load_rgb(sample["path"])

# Wrap the numpy array into MediaPipe Image object — required by the Tasks API
mp_img  = Image(image_format=ImageFormat.SRGB, data=img)

# Create pose detector using earlier configuration
with PoseLandmarker.create_from_options(options) as landmarker:
    result = landmarker.detect(mp_img) # Run pose detection on the image

if result.pose_landmarks:               # result.pose_landmarks is a list-of-lists (one entry per detected person)
    annotated =draw_selected_landmarks(img, result)  # [0] = first person
    status = f"✅ Showing {len(SELECTED_LANDMARKS)} selected landmarks"
else:
    annotated = img.copy()
    status = "⚠ No pose detected — try a different image"

# Create figure with 2 side-by-side plots
fig, axes = plt.subplots(1, 2, figsize=(12, 6))
axes[0].imshow(img); axes[0].set_title("Original"); axes[0].axis("off")
axes[1].imshow(annotated); axes[1].set_title(f"Landmarks\n{status}"); axes[1].axis("off")
plt.suptitle(f"{sample['label']} — {os.path.basename(sample['path'])}", fontsize=13)
plt.tight_layout()
plt.show()

import math

SELECTED_LANDMARKS = [0, 7, 8, 11, 12, 23, 24]  # no elbows

def calculate_angle(p1, p2):
    dx = p2[0] - p1[0]
    dy = p2[1] - p1[1]
    return math.degrees(math.atan2(dy, dx))

def extract_features(result, label):
    if not result.pose_landmarks:
        return None

    lms = result.pose_landmarks[0]
    row = {}

    # Raw midpoints
    ear_x      = (lms[7].x  + lms[8].x)  / 2
    ear_y      = (lms[7].y  + lms[8].y)  / 2
    shoulder_x = (lms[11].x + lms[12].x) / 2
    shoulder_y = (lms[11].y + lms[12].y) / 2
    hip_x      = (lms[23].x + lms[24].x) / 2
    hip_y      = (lms[23].y + lms[24].y) / 2

    # Normalization: nose=top(0), hip=bottom(1)
    y_top       = lms[0].y
    y_bottom    = hip_y
    x_ref       = hip_x
    body_height = (y_bottom - y_top) + 1e-6

    def norm_y(y): return (y - y_top)  / body_height
    def norm_x(x): return (x - x_ref) / body_height

    # Normalized landmark coordinates
    for idx in SELECTED_LANDMARKS:
        lm = lms[idx]
        row[f"lm{idx}_x"] = norm_x(lm.x)
        row[f"lm{idx}_y"] = norm_y(lm.y)
        row[f"lm{idx}_z"] = lm.z           # z unchanged

    # Normalized midpoints
    nose_xn     = norm_x(lms[0].x)
    nose_yn     = norm_y(lms[0].y)    # = 0.0
    ear_xn      = norm_x(ear_x)
    ear_yn      = norm_y(ear_y)
    shoulder_xn = norm_x(shoulder_x)
    shoulder_yn = norm_y(shoulder_y)
    hip_xn      = norm_x(hip_x)      # = 0.0
    hip_yn      = norm_y(hip_y)      # = 1.0

    # 4 angles
    row["head_angle"]     = calculate_angle((ear_xn,      ear_yn),
                                            (nose_xn,     nose_yn))
    row["neck_angle"]     = calculate_angle((shoulder_xn, shoulder_yn),
                                            (ear_xn,      ear_yn))
    row["torso_angle"]    = calculate_angle((hip_xn,      hip_yn),
                                            (shoulder_xn, shoulder_yn))
    row["shoulder_angle"] = calculate_angle(
        (norm_x(lms[11].x), norm_y(lms[11].y)),
        (norm_x(lms[12].x), norm_y(lms[12].y))
    )

    # 4 ratio features
    row["forward_lean"]         = nose_xn - shoulder_xn
    row["spine_curve"]          = (nose_xn - shoulder_xn) - (shoulder_xn - hip_xn)
    row["lateral_shift_norm"]   = shoulder_xn - hip_xn
    row["shoulder_height_diff"] = norm_y(lms[11].y) - norm_y(lms[12].y)

    row["label"] = label
    return row

print("Feature extraction ready")
print(f"7 landmarks × 3 = {7*3} coordinate features")
print("8 engineered features")
print("1 label")
print(f"Total columns = {7*3 + 8 + 1}")  # = 30

rows   = []       # List that will store one feature dictionary per successful image
failed = []       # List that will store paths of images that failed (no pose or load error)

# Open the landmarker ONCE outside the loop.
# Creating it inside the loop would reload the model weights on every image
# which is very slow. The context manager ensures the model is freed afterwards.
with PoseLandmarker.create_from_options(options) as landmarker: # "with" automatically cleans up resources when finished

    for i, record in enumerate(image_records, start=1): 
        path, label = record["path"], record["label"] # Extract image path and class label from record

        # 1. Load image
        img = load_rgb(path)
        if img is None:
            print(f"  [{i:4d}] ⚠ Could not load: {path}")
            failed.append(path)
            continue

        # 2. Convert numpy array to MediaPipe Image object
        mp_img = Image(image_format=ImageFormat.SRGB, data=img)

        # 3. Run pose detection
        result = landmarker.detect(mp_img)

        # 4. Extract features
        row = extract_features(result, label)
        if row is None:
            print(f"  [{i:4d}] ⚠ No pose detected: {path}")
            failed.append(path)
            continue

        rows.append(row)

        # Every 20 images (or last image), print progress
        if i % 20 == 0 or i == len(image_records): 
            print(f"  [{i:4d}/{len(image_records)}] ✓  rows so far: {len(rows)}")

print(f"\n✅ Done.  Successful: {len(rows)}  |  Failed: {len(failed)}")

# Convert list of dictionaries (rows) into a pandas DataFrame
df = pd.DataFrame(rows)

# shuffle the dataset so that not every class is grouped together:
df = df.sample(frac=1, random_state=42).reset_index(drop=True)

print("Shape:", df.shape) # Print dataset dimensions: (number_of_rows, number_of_columns)
print("\nSamples per class:")
print(df["label"].value_counts().to_string())
df.head()

df.to_csv(OUTPUT_CSV, index=False)   # index=False keeps row numbers out of the file

print(f"✅ Saved '{OUTPUT_CSV}'")
print(f"   Rows    : {len(df)}")
print(f"   Columns : {len(df.columns)}")


# Round-trip check: reload and verify shape matches
df_check = pd.read_csv(OUTPUT_CSV)
assert df_check.shape == df.shape, "Shape mismatch after reload!"
print("✅ CSV round-trip check passed.")