megabouts.preprocessing#

class megabouts.preprocessing.tail_preprocessing.TailPreprocessingResult(angle, angle_baseline, angle_smooth, vigor, no_tracking)[source]#

Bases: object

Container for tail preprocessing results.

Parameters:
  • angle (np.ndarray) – Raw tail angles

  • angle_baseline (np.ndarray) – Computed baseline of tail angles

  • angle_smooth (np.ndarray) – Smoothed and baseline-substracted tail angles

  • vigor (np.ndarray) – Computed tail vigor

  • no_tracking (np.ndarray) – Boolean mask indicating frames with no tracking

__init__(angle, angle_baseline, angle_smooth, vigor, no_tracking)[source]#
class megabouts.preprocessing.tail_preprocessing.TailPreprocessing(config: TailPreprocessingConfig)[source]#

Bases: object

Class for preprocessing tail angle data.

__init__(config: TailPreprocessingConfig)[source]#
preprocess_tail_df(tail_df: DataFrame) TailPreprocessingResult[source]#

Preprocess tail angle data from a DataFrame.

Parameters:

tail_df (pd.DataFrame) – DataFrame containing tail angle data

Returns:

Preprocessed tail data

Return type:

TailPreprocessingResult

Examples

>>> import numpy as np
>>> import pandas as pd
>>> from megabouts.config import TailPreprocessingConfig
>>> # Create sample tail angles (100 timepoints, 10 segments)
>>> tail_df = pd.DataFrame(
...     np.sin(np.linspace(0, 2*np.pi, 100))[:, None] * np.ones((100, 10)),
...     columns=[f'angle_{i}' for i in range(10)]
... )
>>> config = TailPreprocessingConfig(fps=100)
>>> result = TailPreprocessing(config).preprocess_tail_df(tail_df)
>>> result.angle.shape == (100, 10)
True
>>> result.angle.shape == result.angle_smooth.shape
True
>>> result.vigor.ndim == 1  # vigor is 1D time series
True
preprocess_tail_angle(angle: ndarray) Tuple[ndarray, ndarray][source]#

Preprocess raw tail angle data.

Parameters:

angle (np.ndarray) – Raw tail angles, shape (T, n_segments)

Returns:

  • angle (np.ndarray) – Preprocessed angles

  • angle_baseline (np.ndarray) – Computed baseline

  • no_tracking (np.ndarray) – Boolean mask for frames with no tracking

static interp_tail_nan(angle: ndarray, limit_na: int = 5) Tuple[ndarray, ndarray][source]#

Interpolates missing values in tail angle.

Parameters:
  • angle (np.ndarray) – Tail angles with potential NaN values, shape (T, n_segments)

  • limit_na (int, optional) – Maximum number of consecutive NaN values to interpolate, by default 5

Returns:

  • angle_no_nan (np.ndarray) – Interpolated angles

  • no_tracking (np.ndarray) – Boolean mask indicating frames with no tracking

static clean_using_pca(angle: ndarray, num_pcs=4) ndarray[source]#

Apply PCA autoencoding to clean up a multidimensional time series.

Parameters:
  • angle (np.ndarray) – Input angles, shape (T, n_segments)

  • num_pcs (int, optional) – Number of principal components to use, by default 4

Returns:

PCA-cleaned angles

Return type:

np.ndarray

static smooth_tail_angle(angle: ndarray, savgol_window: int) ndarray[source]#

Smooth the tail angle data using Savitzky-Golay filter.

Parameters:
  • angle (np.ndarray) – Input angles, shape (T, n_segments)

  • savgol_window (int) – Window length for Savitzky-Golay filter (must be odd)

Returns:

Smoothed angles

Return type:

np.ndarray

static compute_baseline(angle_smooth: ndarray, baseline_method: str, baseline_params: dict) ndarray[source]#

Compute the baseline for the smoothed tail angle data.

Parameters:
  • angle_smooth (np.ndarray) – Smoothed angles, shape (T, n_segments)

  • baseline_method (str) – Method for baseline computation

  • baseline_params (dict) – Parameters for baseline computation

Returns:

Computed baseline

Return type:

np.ndarray

static compute_tail_speed(angle: ndarray, fps: int, tail_speed_filter: int, tail_speed_boxcar_filter: int) ndarray[source]#

Compute tail speed and vigor.

Parameters:
  • angle (np.ndarray) – Input angles, shape (T, n_segments)

  • fps (int) – Frames per second

  • tail_speed_filter (int) – Filter length for speed computation

  • tail_speed_boxcar_filter (int) – Filter length for boxcar smoothing

Returns:

Computed tail vigor

Return type:

np.ndarray

class megabouts.preprocessing.traj_preprocessing.TrajPreprocessingResult(x, y, yaw, x_smooth, y_smooth, yaw_smooth, axial_speed, lateral_speed, yaw_speed, vigor, no_tracking)[source]#

Bases: object

Container for trajectory preprocessing results.

Parameters:
  • x (np.ndarray) – Raw position coordinates

  • y (np.ndarray) – Raw position coordinates

  • yaw (np.ndarray) – Raw orientation angles

  • x_smooth (np.ndarray) – Smoothed position coordinates

  • y_smooth (np.ndarray) – Smoothed position coordinates

  • yaw_smooth (np.ndarray) – Smoothed orientation angles

  • axial_speed (np.ndarray) – Speed along body axis

  • lateral_speed (np.ndarray) – Speed perpendicular to body axis

  • yaw_speed (np.ndarray) – Angular velocity

  • vigor (np.ndarray) – Kinematic activity measure

  • no_tracking (np.ndarray) – Boolean mask indicating frames with no tracking

__init__(x, y, yaw, x_smooth, y_smooth, yaw_smooth, axial_speed, lateral_speed, yaw_speed, vigor, no_tracking)[source]#
class megabouts.preprocessing.traj_preprocessing.TrajPreprocessing(config: TrajPreprocessingConfig)[source]#

Bases: object

Class for preprocessing trajectory data.

__init__(config: TrajPreprocessingConfig)[source]#
preprocess_traj_df(traj_df: DataFrame) DataFrame[source]#

Preprocess trajectory data from a DataFrame.

Parameters:

traj_df (pd.DataFrame) – DataFrame containing head trajectory data (x,y,yaw)

Returns:

Preprocessed trajectory data

Return type:

TrajPreprocessingResult

Examples

>>> import numpy as np
>>> import pandas as pd
>>> from megabouts.config import TrajPreprocessingConfig
>>>
>>> # Create sample data
>>> t = np.linspace(0, 1, 100)  # 1 second at 100 fps
>>> traj_df = pd.DataFrame({
...     'x': t,  # constant forward speed
...     'y': np.zeros_like(t),  # no lateral movement
...     'yaw': np.zeros_like(t)  # no rotation
... })
>>>
>>> # Preprocess
>>> config = TrajPreprocessingConfig(fps=100)
>>> preprocessor = TrajPreprocessing(config)
>>> result = preprocessor.preprocess_traj_df(traj_df)
>>>
>>> # Verify forward movement was detected
>>> np.mean(result.axial_speed[20:-20]) > 0  # positive forward speed
True
>>> np.allclose(result.lateral_speed[20:-20], 0, atol=1e-10)  # no lateral speed
True
static interp_traj_nan(x: ndarray, y: ndarray, yaw: ndarray, limit_na: int) Tuple[ndarray, ndarray, ndarray, ndarray][source]#

Interpolates missing values in trajectory data.

Parameters:
  • x (np.ndarray) – Position coordinates

  • y (np.ndarray) – Position coordinates

  • yaw (np.ndarray) – Orientation angles

  • limit_na (int) – Maximum number of consecutive NaN values to interpolate

Returns:

  • x, y (np.ndarray) – Interpolated position coordinates

  • yaw (np.ndarray) – Interpolated orientation angles

  • no_tracking (np.ndarray) – Boolean mask indicating frames with no tracking

static one_euro_filter(x: ndarray, freq_cutoff_min: float, beta: float, rate: int) ndarray[source]#

Apply 1€ filter over x.

Parameters:
  • x (np.ndarray) – Input signal

  • freq_cutoff_min (float) – Minimum cutoff frequency in Hz

  • beta (float) – Speed coefficient

  • rate (int) – Sampling rate in Hz

Returns:

Filtered signal

Return type:

np.ndarray

static compute_speed(x: ndarray, y: ndarray, yaw: ndarray, fps: int, n_diff: int) Tuple[ndarray, ndarray, ndarray][source]#

Compute the axial, lateral, and yaw speed of a body.

Parameters:
  • x (np.ndarray) – Position coordinates

  • y (np.ndarray) – Position coordinates

  • yaw (np.ndarray) – Orientation angles

  • fps (int) – Frames per second

  • n_diff (int) – Filter length for derivative computation

Returns:

  • axial_speed (np.ndarray) – Speed along body axis

  • lateral_speed (np.ndarray) – Speed perpendicular to body axis

  • yaw_speed (np.ndarray) – Angular velocity

Examples

>>> t = np.linspace(0, 1, 100)  # 1 second at 100 fps
>>> x = t  # constant forward speed
>>> y = np.zeros_like(t)  # no lateral movement
>>> yaw = np.zeros_like(t)  # no rotation
>>> ax, lat, yaw_speed = TrajPreprocessing.compute_speed(x, y, yaw, fps=100, n_diff=11)
>>> np.mean(ax[20:-20]) > 0  # positive forward speed
True
>>> np.allclose(lat[20:-20], 0, atol=1e-10)  # no lateral speed
True
static compute_kinematic_activity(axial_speed: ndarray, lateral_speed: ndarray, yaw_speed: ndarray, lag: int, fps: int) ndarray[source]#

Compute the kinematic activity of a body.

Parameters:
  • axial_speed (np.ndarray) – Speed along body axis

  • lateral_speed (np.ndarray) – Speed perpendicular to body axis

  • yaw_speed (np.ndarray) – Angular velocity

  • lag (int) – Number of frames to compute activity over

  • fps (int) – Frames per second

Returns:

Kinematic activity measure

Return type:

np.ndarray

Examples

>>> n_frames = 100
>>> speeds = np.zeros((n_frames, 3))  # no movement initially
>>> speeds[50:60, 0] = 1  # burst of forward movement
>>> activity = TrajPreprocessing.compute_kinematic_activity(
...     speeds[:, 0], speeds[:, 1], speeds[:, 2], lag=10, fps=100)
>>> np.any(activity > 0)  # detected activity during burst
True