Trajectory Preprocessing#
The following notebook illustrate the TrajPreprocessing
class how to run the preprocessing steps.
Several preprocessing steps are available for the tail angle:
Interpolating missing values
Apply 1€ filter
The kinematic vigor is also computed from the speed and will also be useful for segmentation into bouts:
Loading dependencies
import numpy as np
import matplotlib.pyplot as plt
from megabouts.tracking_data import TrackingConfig, FullTrackingData, load_example_data
from megabouts.config import TrajPreprocessingConfig
from megabouts.preprocessing import TrajPreprocessing
Loading Data#
TrackingConfig and TrackingData similar to tutorial_Loading_Data
# Load data and set tracking configuration
df_recording, fps, mm_per_unit = load_example_data("SLEAP_fulltracking")
tracking_cfg = TrackingConfig(fps=fps, tracking="full_tracking")
# List of keypoints
keypoints = ["left_eye", "right_eye", "tail0", "tail1", "tail2", "tail3", "tail4"]
# Place NaN where the score is below a threshold
thresh_score = 0.0
for kps in keypoints:
score_below_thresh = df_recording["instance.score"] < thresh_score
score_below_thresh | (df_recording[f"{kps}.score"] < thresh_score),
[f"{kps}.x", f"{kps}.y"],
] = np.nan
# Compute head and tail coordinates and convert to mm
head_x = ((df_recording["left_eye.x"] + df_recording["right_eye.x"]) / 2) * mm_per_unit
head_y = ((df_recording["left_eye.y"] + df_recording["right_eye.y"]) / 2) * mm_per_unit
tail_x = df_recording[[f"tail{i}.x" for i in range(5)]].values * mm_per_unit
tail_y = df_recording[[f"tail{i}.y" for i in range(5)]].values * mm_per_unit
# Create FullTrackingData object
tracking_data = FullTrackingData.from_keypoints(
head_x=head_x.values, head_y=head_y.values, tail_x=tail_x, tail_y=tail_y
Run Preprocessing#
Define preprocessing config
traj_preprocessing_cfg = TrajPreprocessingConfig(fps=tracking_cfg.fps)
Apply the trajectory preprocessing
traj_df_input = tracking_data.traj_df
traj = TrajPreprocessing(traj_preprocessing_cfg).preprocess_traj_df(traj_df_input)
traj.df contains information about the trajectory, the smooth values as well as the kinematic vigor:
x | y | yaw | x_smooth | y_smooth | yaw_smooth | axial_speed | lateral_speed | yaw_speed | vigor | no_tracking | |
0 | 20.381254 | 20.996582 | 1.491111 | 20.381254 | 20.996582 | 1.491111 | NaN | NaN | NaN | 0.0 | 0.0 |
1 | 20.381340 | 20.996807 | 1.491013 | 20.381277 | 20.996642 | 1.491085 | NaN | NaN | NaN | 0.0 | 0.0 |
2 | 20.381337 | 20.997020 | 1.491086 | 20.381293 | 20.996742 | 1.491085 | NaN | NaN | NaN | 0.0 | 0.0 |
3 | 20.418551 | 20.997656 | 1.430916 | 20.391138 | 20.996983 | 1.475189 | NaN | NaN | NaN | 0.0 | 0.0 |
4 | 20.418671 | 20.997872 | 1.430749 | 20.399294 | 20.997219 | 1.461210 | 0.293531 | -1.616157 | -2.659344 | 0.0 | 0.0 |
We can visualize the result of preprocessing:
Show code cell source
t = np.arange(tracking_data.T) / tracking_cfg.fps
IdSt = np.random.randint(tracking_data.T)
Duration = 10 * tracking_cfg.fps
t_win = t[IdSt : IdSt + Duration] - t[IdSt]
fig, ax = plt.subplots(2, 1, figsize=(10, 7), sharex=True)
ax[0].plot(t_win, traj.x[IdSt : IdSt + Duration], label="raw position")
traj.x_smooth[IdSt : IdSt + Duration],
label="smooth position",
ax[1].plot(t_win, traj.vigor[IdSt : IdSt + Duration], label="trajectory vigor")
ax[0].set(ylabel="x position (mm)")
ax[1].set(ylabel="vigor (A.U.)")
ax[1].set(xlabel="time (s)")
for i in range(2):
Tuning 1€ filter#
📝 There are two configurable parameters in the filter, the minimum cutoff frequency
and the speed coefficientbeta
Decreasing the minimum cutoff frequency decreases slow speed jitter. Increasing the speed coefficient decreases speed lag.
The parameters are set using two-step procedure:
Step 1#
is set to 0 and freq_cutoff_min
to 10Hz. Focus on a part of the recording where the fish is not swimming or swimming at low speed.
is adjusted to remove jitter and preserve an acceptable lag during these slow movements.
traj_preprocessing_cfg = TrajPreprocessingConfig(
fps=tracking_cfg.fps, freq_cutoff_min=10, beta=0
Show code cell source
IdSt = 26800
Duration = 2 * tracking_cfg.fps
t_win = t[IdSt : IdSt + Duration] - t[IdSt]
x = traj_df_input.x
traj = TrajPreprocessing(traj_preprocessing_cfg).preprocess_traj_df(traj_df_input)
x_smooth = traj.df.x_smooth
fig, ax = plt.subplots(1, 1, figsize=(15, 4), sharex=True)
ax.set_title("Tuning " + r"$\text{freq_cutoff_min}$" + " during slow movement")
ax.plot(t_win, x[IdSt : IdSt + Duration], label="raw position")
t_win, x_smooth[IdSt : IdSt + Duration], label="smoothed position", color="tab:red"
ax.set(xlabel="time (s)", ylabel="x position (mm)")
✅ Here a value close to 10Hz for freq_cutoff_min
allows to smooth while keeping an acceptable lag
Step 2#
Focus on a part of the recording where the fish is moving fast. Adjust beta
with a focus on minimizing lag.
traj_preprocessing_cfg = TrajPreprocessingConfig(
fps=tracking_cfg.fps, freq_cutoff_min=10, beta=1.4
Show code cell source
IdSt = 81600 # np.random.randint(x.shape[0])
Duration = 2 * tracking_cfg.fps
t_win = t[IdSt : IdSt + Duration] - t[IdSt]
x = traj_df_input.x
traj = TrajPreprocessing(traj_preprocessing_cfg).preprocess_traj_df(traj_df_input)
x_smooth = traj.df.x_smooth
fig, ax = plt.subplots(1, 1, figsize=(15, 4), sharex=True)
ax.set_title("Tuning " + r"$\text{beta}$" + " during fast movement")
ax.plot(t_win, x[IdSt : IdSt + Duration], label="raw position")
t_win, x_smooth[IdSt : IdSt + Duration], label="smoothed position", color="tab:red"
ax.set(xlabel="time (s)", ylabel="x position (mm)")
✅ Here we selected beta=1.4
📝 Note
Smoothing the trajectory data is optional for classifying tail bouts. The transformer model was trained on raw tracking data, so it can handle unsmoothed input just as well.