from typing import Tuple
import numpy as np
import pandas as pd
from scipy.signal import savgol_filter, convolve
from scipy.signal.windows import boxcar
from sklearn.decomposition import PCA
from ..config.preprocessing_config import TailPreprocessingConfig
from ..preprocessing.tail_baseline import compute_baseline
from ..utils.math_utils import robust_diff
from ..utils.data_utils import create_hierarchical_df
[docs]
class TailPreprocessingResult:
"""Container for tail preprocessing results.
Parameters
----------
angle : np.ndarray
Raw tail angles
angle_baseline : np.ndarray
Computed baseline of tail angles
angle_smooth : np.ndarray
Smoothed and baseline-substracted tail angles
vigor : np.ndarray
Computed tail vigor
no_tracking : np.ndarray
Boolean mask indicating frames with no tracking
"""
[docs]
def __init__(self, angle, angle_baseline, angle_smooth, vigor, no_tracking):
self.angle = angle
self.angle_baseline = angle_baseline
self.angle_smooth = angle_smooth
self.vigor = vigor
self.no_tracking = no_tracking
self.df = self._to_dataframe()
def _to_dataframe(self):
df_info = [
("angle", "segments", self.angle),
("angle_baseline", "segments", self.angle_baseline),
("angle_smooth", "segments", self.angle_smooth),
("vigor", "None", self.vigor),
("no_tracking", "None", self.no_tracking),
]
df = create_hierarchical_df(df_info)
return df
[docs]
class TailPreprocessing:
"""Class for preprocessing tail angle data."""
[docs]
def __init__(self, config: TailPreprocessingConfig):
self.config = config
[docs]
def preprocess_tail_df(self, tail_df: pd.DataFrame) -> TailPreprocessingResult:
"""Preprocess tail angle data from a DataFrame.
Parameters
----------
tail_df : pd.DataFrame
DataFrame containing tail angle data
Returns
-------
TailPreprocessingResult
Preprocessed tail data
Examples
--------
>>> import numpy as np
>>> import pandas as pd
>>> from megabouts.config import TailPreprocessingConfig
>>> # Create sample tail angles (100 timepoints, 10 segments)
>>> tail_df = pd.DataFrame(
... np.sin(np.linspace(0, 2*np.pi, 100))[:, None] * np.ones((100, 10)),
... columns=[f'angle_{i}' for i in range(10)]
... )
>>> config = TailPreprocessingConfig(fps=100)
>>> result = TailPreprocessing(config).preprocess_tail_df(tail_df)
>>> result.angle.shape == (100, 10)
True
>>> result.angle.shape == result.angle_smooth.shape
True
>>> result.vigor.ndim == 1 # vigor is 1D time series
True
"""
# Extract Tail Angle
angle_input = tail_df[["angle_" + str(i) for i in range(10)]].values
# Smoothing
angle, angle_baseline, no_tracking = self.preprocess_tail_angle(
angle=angle_input
)
angle -= angle_baseline
vigor = TailPreprocessing.compute_tail_speed(
angle=angle,
fps=self.config.fps,
tail_speed_filter=self.config.tail_speed_filter,
tail_speed_boxcar_filter=self.config.tail_speed_boxcar_filter,
)
return TailPreprocessingResult(
angle_input, angle_baseline, angle, vigor, no_tracking
)
[docs]
def preprocess_tail_angle(self, angle: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""Preprocess raw tail angle data.
Parameters
----------
angle : np.ndarray
Raw tail angles, shape (T, n_segments)
Returns
-------
angle : np.ndarray
Preprocessed angles
angle_baseline : np.ndarray
Computed baseline
no_tracking : np.ndarray
Boolean mask for frames with no tracking
"""
angle, no_tracking = self.interp_tail_nan(angle, limit_na=self.config.limit_na)
angle = self.clean_using_pca(angle, num_pcs=self.config.num_pcs)
angle = self.smooth_tail_angle(angle, savgol_window=self.config.savgol_window)
angle_baseline = self.compute_baseline(
angle,
baseline_method=self.config.baseline_method,
baseline_params=self.config.baseline_params,
)
return angle, angle_baseline, no_tracking
[docs]
@staticmethod
def interp_tail_nan(
angle: np.ndarray, limit_na: int = 5
) -> Tuple[np.ndarray, np.ndarray]:
"""Interpolates missing values in tail angle.
Parameters
----------
angle : np.ndarray
Tail angles with potential NaN values, shape (T, n_segments)
limit_na : int, optional
Maximum number of consecutive NaN values to interpolate, by default 5
Returns
-------
angle_no_nan : np.ndarray
Interpolated angles
no_tracking : np.ndarray
Boolean mask indicating frames with no tracking
"""
# Interpolate NaN timestep:
angle_no_nan = np.zeros_like(angle)
for s in range(angle.shape[1]):
ds = pd.Series(angle[:, s])
ds.interpolate(method="nearest", limit=limit_na, inplace=True)
angle_no_nan[:, s] = ds.values
no_tracking = np.isnan(np.sum(angle_no_nan, axis=1))
angle_no_nan[np.isnan(angle_no_nan)] = 0
return angle_no_nan, no_tracking
[docs]
@staticmethod
def clean_using_pca(angle: np.ndarray, num_pcs=4) -> np.ndarray:
"""Apply PCA autoencoding to clean up a multidimensional time series.
Parameters
----------
angle : np.ndarray
Input angles, shape (T, n_segments)
num_pcs : int, optional
Number of principal components to use, by default 4
Returns
-------
np.ndarray
PCA-cleaned angles
"""
pca = PCA(n_components=num_pcs)
pca.fit(angle)
low_D = pca.transform(angle)
angle_hat = pca.inverse_transform(low_D)
return angle_hat
[docs]
@staticmethod
def smooth_tail_angle(angle: np.ndarray, savgol_window: int) -> np.ndarray:
"""Smooth the tail angle data using Savitzky-Golay filter.
Parameters
----------
angle : np.ndarray
Input angles, shape (T, n_segments)
savgol_window : int
Window length for Savitzky-Golay filter (must be odd)
Returns
-------
np.ndarray
Smoothed angles
"""
angle_smooth = np.copy(angle)
if savgol_window > 2:
for n in range(angle.shape[1]):
angle_smooth[:, n] = savgol_filter(
angle[:, n],
savgol_window,
2,
deriv=0,
delta=1.0,
axis=-1,
mode="interp",
cval=0.0,
)
return angle_smooth
[docs]
@staticmethod
def compute_baseline(
angle_smooth: np.ndarray, baseline_method: str, baseline_params: dict
) -> np.ndarray:
"""Compute the baseline for the smoothed tail angle data.
Parameters
----------
angle_smooth : np.ndarray
Smoothed angles, shape (T, n_segments)
baseline_method : str
Method for baseline computation
baseline_params : dict
Parameters for baseline computation
Returns
-------
np.ndarray
Computed baseline
"""
angle_baseline = np.zeros_like(angle_smooth)
for s in range(angle_smooth.shape[1]):
angle_baseline[:, s] = compute_baseline(
angle_smooth[:, s], baseline_method, baseline_params
)
return angle_baseline
[docs]
@staticmethod
def compute_tail_speed(
angle: np.ndarray,
fps: int,
tail_speed_filter: int,
tail_speed_boxcar_filter: int,
) -> np.ndarray:
"""Compute tail speed and vigor.
Parameters
----------
angle : np.ndarray
Input angles, shape (T, n_segments)
fps : int
Frames per second
tail_speed_filter : int
Filter length for speed computation
tail_speed_boxcar_filter : int
Filter length for boxcar smoothing
Returns
-------
np.ndarray
Computed tail vigor
"""
angle_speed = np.zeros_like(angle)
for i in range(angle.shape[1]):
angle_speed[:, i] = robust_diff(
angle[:, i], dt=1 / fps, filter_length=tail_speed_filter
)
cumul_filtered_speed = np.sum(np.abs(angle_speed), axis=1)
vigor = convolve(
cumul_filtered_speed,
boxcar(tail_speed_boxcar_filter) / tail_speed_boxcar_filter,
mode="same",
)
return vigor