megabouts.preprocessing#
- class megabouts.preprocessing.tail_preprocessing.TailPreprocessingResult(angle, angle_baseline, angle_smooth, vigor, no_tracking)[source]#
Bases:
object
Container for tail preprocessing results.
- Parameters:
angle (np.ndarray) – Raw tail angles
angle_baseline (np.ndarray) – Computed baseline of tail angles
angle_smooth (np.ndarray) – Smoothed and baseline-substracted tail angles
vigor (np.ndarray) – Computed tail vigor
no_tracking (np.ndarray) – Boolean mask indicating frames with no tracking
- class megabouts.preprocessing.tail_preprocessing.TailPreprocessing(config: TailPreprocessingConfig)[source]#
Bases:
object
Class for preprocessing tail angle data.
- preprocess_tail_df(tail_df: DataFrame) TailPreprocessingResult [source]#
Preprocess tail angle data from a DataFrame.
- Parameters:
tail_df (pd.DataFrame) – DataFrame containing tail angle data
- Returns:
Preprocessed tail data
- Return type:
Examples
>>> import numpy as np >>> import pandas as pd >>> from megabouts.config import TailPreprocessingConfig >>> # Create sample tail angles (100 timepoints, 10 segments) >>> tail_df = pd.DataFrame( ... np.sin(np.linspace(0, 2*np.pi, 100))[:, None] * np.ones((100, 10)), ... columns=[f'angle_{i}' for i in range(10)] ... ) >>> config = TailPreprocessingConfig(fps=100) >>> result = TailPreprocessing(config).preprocess_tail_df(tail_df) >>> result.angle.shape == (100, 10) True >>> result.angle.shape == result.angle_smooth.shape True >>> result.vigor.ndim == 1 # vigor is 1D time series True
- preprocess_tail_angle(angle: ndarray) Tuple[ndarray, ndarray] [source]#
Preprocess raw tail angle data.
- Parameters:
angle (np.ndarray) – Raw tail angles, shape (T, n_segments)
- Returns:
angle (np.ndarray) – Preprocessed angles
angle_baseline (np.ndarray) – Computed baseline
no_tracking (np.ndarray) – Boolean mask for frames with no tracking
- static interp_tail_nan(angle: ndarray, limit_na: int = 5) Tuple[ndarray, ndarray] [source]#
Interpolates missing values in tail angle.
- Parameters:
angle (np.ndarray) – Tail angles with potential NaN values, shape (T, n_segments)
limit_na (int, optional) – Maximum number of consecutive NaN values to interpolate, by default 5
- Returns:
angle_no_nan (np.ndarray) – Interpolated angles
no_tracking (np.ndarray) – Boolean mask indicating frames with no tracking
- static clean_using_pca(angle: ndarray, num_pcs=4) ndarray [source]#
Apply PCA autoencoding to clean up a multidimensional time series.
- Parameters:
angle (np.ndarray) – Input angles, shape (T, n_segments)
num_pcs (int, optional) – Number of principal components to use, by default 4
- Returns:
PCA-cleaned angles
- Return type:
np.ndarray
- static smooth_tail_angle(angle: ndarray, savgol_window: int) ndarray [source]#
Smooth the tail angle data using Savitzky-Golay filter.
- Parameters:
angle (np.ndarray) – Input angles, shape (T, n_segments)
savgol_window (int) – Window length for Savitzky-Golay filter (must be odd)
- Returns:
Smoothed angles
- Return type:
np.ndarray
- static compute_baseline(angle_smooth: ndarray, baseline_method: str, baseline_params: dict) ndarray [source]#
Compute the baseline for the smoothed tail angle data.
- Parameters:
angle_smooth (np.ndarray) – Smoothed angles, shape (T, n_segments)
baseline_method (str) – Method for baseline computation
baseline_params (dict) – Parameters for baseline computation
- Returns:
Computed baseline
- Return type:
np.ndarray
- static compute_tail_speed(angle: ndarray, fps: int, tail_speed_filter: int, tail_speed_boxcar_filter: int) ndarray [source]#
Compute tail speed and vigor.
- Parameters:
angle (np.ndarray) – Input angles, shape (T, n_segments)
fps (int) – Frames per second
tail_speed_filter (int) – Filter length for speed computation
tail_speed_boxcar_filter (int) – Filter length for boxcar smoothing
- Returns:
Computed tail vigor
- Return type:
np.ndarray
- class megabouts.preprocessing.traj_preprocessing.TrajPreprocessingResult(x, y, yaw, x_smooth, y_smooth, yaw_smooth, axial_speed, lateral_speed, yaw_speed, vigor, no_tracking)[source]#
Bases:
object
Container for trajectory preprocessing results.
- Parameters:
x (np.ndarray) – Raw position coordinates
y (np.ndarray) – Raw position coordinates
yaw (np.ndarray) – Raw orientation angles
x_smooth (np.ndarray) – Smoothed position coordinates
y_smooth (np.ndarray) – Smoothed position coordinates
yaw_smooth (np.ndarray) – Smoothed orientation angles
axial_speed (np.ndarray) – Speed along body axis
lateral_speed (np.ndarray) – Speed perpendicular to body axis
yaw_speed (np.ndarray) – Angular velocity
vigor (np.ndarray) – Kinematic activity measure
no_tracking (np.ndarray) – Boolean mask indicating frames with no tracking
- class megabouts.preprocessing.traj_preprocessing.TrajPreprocessing(config: TrajPreprocessingConfig)[source]#
Bases:
object
Class for preprocessing trajectory data.
- preprocess_traj_df(traj_df: DataFrame) DataFrame [source]#
Preprocess trajectory data from a DataFrame.
- Parameters:
traj_df (pd.DataFrame) – DataFrame containing head trajectory data (x,y,yaw)
- Returns:
Preprocessed trajectory data
- Return type:
Examples
>>> import numpy as np >>> import pandas as pd >>> from megabouts.config import TrajPreprocessingConfig >>> >>> # Create sample data >>> t = np.linspace(0, 1, 100) # 1 second at 100 fps >>> traj_df = pd.DataFrame({ ... 'x': t, # constant forward speed ... 'y': np.zeros_like(t), # no lateral movement ... 'yaw': np.zeros_like(t) # no rotation ... }) >>> >>> # Preprocess >>> config = TrajPreprocessingConfig(fps=100) >>> preprocessor = TrajPreprocessing(config) >>> result = preprocessor.preprocess_traj_df(traj_df) >>> >>> # Verify forward movement was detected >>> np.mean(result.axial_speed[20:-20]) > 0 # positive forward speed True >>> np.allclose(result.lateral_speed[20:-20], 0, atol=1e-10) # no lateral speed True
- static interp_traj_nan(x: ndarray, y: ndarray, yaw: ndarray, limit_na: int) Tuple[ndarray, ndarray, ndarray, ndarray] [source]#
Interpolates missing values in trajectory data.
- Parameters:
x (np.ndarray) – Position coordinates
y (np.ndarray) – Position coordinates
yaw (np.ndarray) – Orientation angles
limit_na (int) – Maximum number of consecutive NaN values to interpolate
- Returns:
x, y (np.ndarray) – Interpolated position coordinates
yaw (np.ndarray) – Interpolated orientation angles
no_tracking (np.ndarray) – Boolean mask indicating frames with no tracking
- static one_euro_filter(x: ndarray, freq_cutoff_min: float, beta: float, rate: int) ndarray [source]#
Apply 1€ filter over x.
- Parameters:
x (np.ndarray) – Input signal
freq_cutoff_min (float) – Minimum cutoff frequency in Hz
beta (float) – Speed coefficient
rate (int) – Sampling rate in Hz
- Returns:
Filtered signal
- Return type:
np.ndarray
- static compute_speed(x: ndarray, y: ndarray, yaw: ndarray, fps: int, n_diff: int) Tuple[ndarray, ndarray, ndarray] [source]#
Compute the axial, lateral, and yaw speed of a body.
- Parameters:
x (np.ndarray) – Position coordinates
y (np.ndarray) – Position coordinates
yaw (np.ndarray) – Orientation angles
fps (int) – Frames per second
n_diff (int) – Filter length for derivative computation
- Returns:
axial_speed (np.ndarray) – Speed along body axis
lateral_speed (np.ndarray) – Speed perpendicular to body axis
yaw_speed (np.ndarray) – Angular velocity
Examples
>>> t = np.linspace(0, 1, 100) # 1 second at 100 fps >>> x = t # constant forward speed >>> y = np.zeros_like(t) # no lateral movement >>> yaw = np.zeros_like(t) # no rotation >>> ax, lat, yaw_speed = TrajPreprocessing.compute_speed(x, y, yaw, fps=100, n_diff=11) >>> np.mean(ax[20:-20]) > 0 # positive forward speed True >>> np.allclose(lat[20:-20], 0, atol=1e-10) # no lateral speed True
- static compute_kinematic_activity(axial_speed: ndarray, lateral_speed: ndarray, yaw_speed: ndarray, lag: int, fps: int) ndarray [source]#
Compute the kinematic activity of a body.
- Parameters:
axial_speed (np.ndarray) – Speed along body axis
lateral_speed (np.ndarray) – Speed perpendicular to body axis
yaw_speed (np.ndarray) – Angular velocity
lag (int) – Number of frames to compute activity over
fps (int) – Frames per second
- Returns:
Kinematic activity measure
- Return type:
np.ndarray
Examples
>>> n_frames = 100 >>> speeds = np.zeros((n_frames, 3)) # no movement initially >>> speeds[50:60, 0] = 1 # burst of forward movement >>> activity = TrajPreprocessing.compute_kinematic_activity( ... speeds[:, 0], speeds[:, 1], speeds[:, 2], lag=10, fps=100) >>> np.any(activity > 0) # detected activity during burst True