Source code for megabouts.preprocessing.traj_preprocessing

import numpy as np
import pandas as pd
from scipy.ndimage import shift
from ..utils.math_utils import robust_diff
from typing import Tuple
from ..config.preprocessing_config import TrajPreprocessingConfig


[docs] class TrajPreprocessingResult: """Container for trajectory preprocessing results. Parameters ---------- x, y : np.ndarray Raw position coordinates yaw : np.ndarray Raw orientation angles x_smooth, y_smooth : np.ndarray Smoothed position coordinates yaw_smooth : np.ndarray Smoothed orientation angles axial_speed : np.ndarray Speed along body axis lateral_speed : np.ndarray Speed perpendicular to body axis yaw_speed : np.ndarray Angular velocity vigor : np.ndarray Kinematic activity measure no_tracking : np.ndarray Boolean mask indicating frames with no tracking """
[docs] def __init__( self, x, y, yaw, x_smooth, y_smooth, yaw_smooth, axial_speed, lateral_speed, yaw_speed, vigor, no_tracking, ): self.x = x self.y = y self.yaw = yaw self.x_smooth = x_smooth self.y_smooth = y_smooth self.yaw_smooth = yaw_smooth self.axial_speed = axial_speed self.lateral_speed = lateral_speed self.yaw_speed = yaw_speed self.vigor = vigor self.no_tracking = no_tracking self.df = self._to_dataframe()
def _to_dataframe(self): columns = [ "x", "y", "yaw", "x_smooth", "y_smooth", "yaw_smooth", "axial_speed", "lateral_speed", "yaw_speed", "vigor", "no_tracking", ] data = np.vstack( ( self.x, self.y, self.yaw, self.x_smooth, self.y_smooth, self.yaw_smooth, self.axial_speed, self.lateral_speed, self.yaw_speed, self.vigor, self.no_tracking, ) ).T return pd.DataFrame(data, index=range(data.shape[0]), columns=columns)
[docs] class TrajPreprocessing: """Class for preprocessing trajectory data."""
[docs] def __init__(self, config: TrajPreprocessingConfig): self.config = config
[docs] def preprocess_traj_df(self, traj_df: pd.DataFrame) -> pd.DataFrame: """Preprocess trajectory data from a DataFrame. Parameters ---------- traj_df : pd.DataFrame DataFrame containing head trajectory data (x,y,yaw) Returns ------- TrajPreprocessingResult Preprocessed trajectory data Examples -------- >>> import numpy as np >>> import pandas as pd >>> from megabouts.config import TrajPreprocessingConfig >>> >>> # Create sample data >>> t = np.linspace(0, 1, 100) # 1 second at 100 fps >>> traj_df = pd.DataFrame({ ... 'x': t, # constant forward speed ... 'y': np.zeros_like(t), # no lateral movement ... 'yaw': np.zeros_like(t) # no rotation ... }) >>> >>> # Preprocess >>> config = TrajPreprocessingConfig(fps=100) >>> preprocessor = TrajPreprocessing(config) >>> result = preprocessor.preprocess_traj_df(traj_df) >>> >>> # Verify forward movement was detected >>> np.mean(result.axial_speed[20:-20]) > 0 # positive forward speed True >>> np.allclose(result.lateral_speed[20:-20], 0, atol=1e-10) # no lateral speed True """ x_input, y_input, yaw_input = ( traj_df["x"].values, traj_df["y"].values, traj_df["yaw"].values, ) x, y, yaw, no_tracking = TrajPreprocessing.interp_traj_nan( x_input, y_input, yaw_input, limit_na=self.config.limit_na ) def smooth_func(x): return TrajPreprocessing.one_euro_filter( x, self.config.freq_cutoff_min, self.config.beta, self.config.fps ) x, y, yaw = map(smooth_func, [x, y, yaw]) axial_speed, lateral_speed, yaw_speed = TrajPreprocessing.compute_speed( x, y, yaw, self.config.fps, n_diff=self.config.robust_diff ) vigor = TrajPreprocessing.compute_kinematic_activity( axial_speed, lateral_speed, yaw_speed, lag=self.config.lag_kinematic_activity, fps=self.config.fps, ) return TrajPreprocessingResult( x_input, y_input, yaw_input, x, y, yaw, axial_speed, lateral_speed, yaw_speed, vigor, no_tracking, )
[docs] @staticmethod def interp_traj_nan( x: np.ndarray, y: np.ndarray, yaw: np.ndarray, limit_na: int ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """Interpolates missing values in trajectory data. Parameters ---------- x, y : np.ndarray Position coordinates yaw : np.ndarray Orientation angles limit_na : int Maximum number of consecutive NaN values to interpolate Returns ------- x, y : np.ndarray Interpolated position coordinates yaw : np.ndarray Interpolated orientation angles no_tracking : np.ndarray Boolean mask indicating frames with no tracking """ # Interpolate within limit na: yaw = np.arctan2(np.sin(yaw), np.cos(yaw)) yaw[~np.isnan(yaw)] = np.unwrap(yaw[~np.isnan(yaw)]) def linear_fill_na(x): return pd.Series(x).interpolate(method="linear", limit=limit_na).values x, y, yaw = map(linear_fill_na, [x, y, yaw]) no_tracking = np.logical_or.reduce((np.isnan(x), np.isnan(y), np.isnan(yaw))) # Interpolate remaining values: def linear_fill_na_no_limit(x): return ( pd.Series(x).interpolate(method="linear", axis=0).ffill().bfill().values ) x, y, yaw = map(linear_fill_na_no_limit, [x, y, yaw]) return x, y, yaw, no_tracking
[docs] @staticmethod def one_euro_filter( x: np.ndarray, freq_cutoff_min: float, beta: float, rate: int ) -> np.ndarray: """Apply 1€ filter over x. Parameters ---------- x : np.ndarray Input signal freq_cutoff_min : float Minimum cutoff frequency in Hz beta : float Speed coefficient rate : int Sampling rate in Hz Returns ------- np.ndarray Filtered signal """ n_frames = len(x) x_smooth = np.zeros_like(x) x_smooth[0] = x[0] fc = freq_cutoff_min tau = 1 / (2 * np.pi * fc) te = 1 / rate alpha = 1 / (1 + tau / te) for i in range(1, n_frames): x_smooth[i] = alpha * x[i] + (1 - alpha) * x_smooth[i - 1] x_dot = (x_smooth[i] - x_smooth[i - 1]) * rate fc = freq_cutoff_min + beta * np.abs(x_dot) tau = 1 / (2 * np.pi * fc) te = 1 / rate alpha = 1 / (1 + tau / te) return x_smooth
[docs] @staticmethod def compute_speed( x: np.ndarray, y: np.ndarray, yaw: np.ndarray, fps: int, n_diff: int ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """Compute the axial, lateral, and yaw speed of a body. Parameters ---------- x, y : np.ndarray Position coordinates yaw : np.ndarray Orientation angles fps : int Frames per second n_diff : int Filter length for derivative computation Returns ------- axial_speed : np.ndarray Speed along body axis lateral_speed : np.ndarray Speed perpendicular to body axis yaw_speed : np.ndarray Angular velocity Examples -------- >>> t = np.linspace(0, 1, 100) # 1 second at 100 fps >>> x = t # constant forward speed >>> y = np.zeros_like(t) # no lateral movement >>> yaw = np.zeros_like(t) # no rotation >>> ax, lat, yaw_speed = TrajPreprocessing.compute_speed(x, y, yaw, fps=100, n_diff=11) >>> np.mean(ax[20:-20]) > 0 # positive forward speed True >>> np.allclose(lat[20:-20], 0, atol=1e-10) # no lateral speed True """ body_vector = np.array([np.cos(yaw), np.sin(yaw)])[:, :-1] position_change = np.zeros_like(body_vector) position_change[0, :] = robust_diff(x, dt=1 / fps, filter_length=n_diff)[:-1] position_change[1, :] = robust_diff(y, dt=1 / fps, filter_length=n_diff)[:-1] angle = np.pi / 2 rotMat = np.array( [[np.cos(angle), -np.sin(angle)], [np.sin(angle), np.cos(angle)]] ) body_vector_orth = np.dot(rotMat, body_vector) axial_speed = np.einsum("ij,ij->j", body_vector, position_change) lateral_speed = np.einsum("ij,ij->j", body_vector_orth, position_change) yaw_speed = robust_diff(np.unwrap(yaw), dt=1 / fps, filter_length=n_diff)[:-1] axial_speed = np.concatenate((np.array([np.nan]), axial_speed)) lateral_speed = np.concatenate((np.array([np.nan]), lateral_speed)) yaw_speed = np.concatenate((np.array([np.nan]), yaw_speed)) return axial_speed, lateral_speed, yaw_speed
[docs] @staticmethod def compute_kinematic_activity( axial_speed: np.ndarray, lateral_speed: np.ndarray, yaw_speed: np.ndarray, lag: int, fps: int, ) -> np.ndarray: """Compute the kinematic activity of a body. Parameters ---------- axial_speed : np.ndarray Speed along body axis lateral_speed : np.ndarray Speed perpendicular to body axis yaw_speed : np.ndarray Angular velocity lag : int Number of frames to compute activity over fps : int Frames per second Returns ------- np.ndarray Kinematic activity measure Examples -------- >>> n_frames = 100 >>> speeds = np.zeros((n_frames, 3)) # no movement initially >>> speeds[50:60, 0] = 1 # burst of forward movement >>> activity = TrajPreprocessing.compute_kinematic_activity( ... speeds[:, 0], speeds[:, 1], speeds[:, 2], lag=10, fps=100) >>> np.any(activity > 0) # detected activity during burst True """ traj_speed = np.vstack((axial_speed, lateral_speed, yaw_speed)).T traj_speed[np.isnan(traj_speed)] = 0 traj_cumul = np.cumsum(np.abs(traj_speed), axis=0) dt = 1000 / fps * lag displacement = np.zeros_like(traj_cumul) for i in range(3): tmp = np.copy(traj_cumul[:, i]) tmp_future_lag = shift(tmp, -lag, cval=0) displacement[:, i] = (tmp_future_lag - tmp) / dt displacement[:lag, :] = 0 displacement[-lag:, :] = 0 kinematic_activity = np.linalg.norm(displacement, axis=1) return kinematic_activity