Pipeline Migration¶
Replaces the original feature execution infrastructure with a composable pipeline. The Feature protocol is reduced from 12+ methods to 4, global features are decomposed into independently cacheable stages, and XGBoost behavior classification is integrated as a pipeline feature.
Motivation¶
The original architecture served the project well during early development but accumulated structural constraints as the feature library grew:
-
Output handling. GlobalTSNE, WardAssign, and GlobalKMeans managed their own output writing during
fit()/save_model(), separate from the pipeline's standard output path. This made it harder to reason about where outputs lived and to add cross-cutting concerns (overlap trimming, index updates). -
Two data-loading paths.
iteration.py:yield_input_data()andStreamingFeatureHelper.load_key_data()evolved independently to serve different feature types. Having two paths for the same job meant changes had to be synchronized across both. -
Large Feature protocol surface. The protocol accumulated 12+ methods (
fit,transform,needs_fit,partial_fit,finalize_fit,set_run_root,bind_dataset,loads_own_data, ...) as features with different lifecycle needs were added. This made implementing new features harder than necessary. -
Module placement. Protocol types (
Feature,Params,Inputs,Result,LoadSpec) lived inbehavior/feature_library/but were used by the pipeline incore/. Moving them tocore/pipeline/establishes a clear dependency direction.
What changed¶
Feature protocol: 4 methods + 4 attributes¶
load_state(run_root, artifact_paths, dependency_lookups) -> bool
fit(inputs: InputStream) -> None
apply(df: DataFrame) -> DataFrame
save_state(run_root) -> None
Class attributes: name, version, parallelizable, scope_dependent.
The pipeline owns all data loading, output writing, and index management. Features own computation only.
Pipeline package: core/pipeline/¶
Extracted from dataset.py and behavior/feature_library/:
| Module | Responsibility |
|---|---|
types/ |
Feature protocol, Params, Inputs, Result, ArtifactSpec, InputStream, DependencyLookup |
run.py |
run_feature() orchestration, dependency resolution, parallel apply |
manifest.py |
Unified manifest builder + per-sequence iterator |
loading.py |
Sequence identity resolution, NN lookup construction |
index_csv.py |
Generic typed IndexCSV with dataclass rows |
writers.py |
Parquet output writing with overlap trimming |
_loaders.py |
LoadSpec dispatcher (NPZ, Parquet, Joblib) |
_utils.py |
Scope dataclass, param hashing |
Unified manifest and iterator¶
Single code path replaces both yield_input_data() and StreamingFeatureHelper. Handles tracks, single-feature, and multi-feature inputs through one manifest builder with inner-join semantics across all inputs.
Type system¶
- Typed artifacts.
ParquetArtifact,JoblibArtifact,NpzArtifactwithfrom_path()methods. Defined at module level per the Feature Artifact Pattern. - Typed index rows.
FeatureIndexRow,FramesIndexRowreplace raw dicts. GlobalModelParams[M]. Generic base for features that fit on templates or load a pre-fitted model. Mutual exclusivity validation (templates XOR model).- Upstream dependency resolution into
dict[tuple[str, str], Path](DependencyLookuptype alias) for bothResultandLabelsSourcefields. Features receive ready-to-use(group, sequence) -> Pathlookups. InputStream. Wraps the input factory and exposesn_entriesso features can make exact allocation decisions (e.g. train/test split counts) without extra data passes.
Per-sequence output standardization¶
GlobalTSNE, GlobalKMeans, and GlobalWard migrated from NPZ to standard per-frame Parquet output. apply() returns a DataFrame the pipeline writes.
Composable global feature pipeline¶
GlobalTSNE decomposed into independent pipeline stages:
per-sequence features (wavelets, egocentric, etc.)
-> ExtractTemplates (subsampling, scope-dependent)
-> GlobalScaler (fit StandardScaler on templates, scale per-sequence)
-> [ExtractTemplates] (optional second extraction on scaled data)
-> GlobalTSNE (fit embedding, map per-sequence)
-> GlobalKMeans / GlobalWard (fit clustering, assign per-sequence)
Each stage is a separate feature with its own run_id, cache, and artifact. scope_dependent features -- ExtractTemplates, ExtractLabeledTemplates, PairPoseDistancePCA, and KpmsFeature -- include the manifest entries in their run_id hash so that different scopes produce separate runs. All other features are scope-independent.
XGBoost as a feature¶
BehaviorXGBoostModel (model_library) migrated to two feature classes:
scaled features
-> ExtractLabeledTemplates (label alignment, train/test split, subsampling)
-> XgboostFeature (train classifier, per-sequence inference)
ExtractLabeledTemplates:
- Streams upstream features, aligns ground truth labels from NPZ files via dependency_lookups["labels"]
- Exact train/test split assignment using InputStream.n_entries (guarantees at least 1 test sequence when test_fraction > 0)
- Per-(split, class) reservoir sampling with lazy class discovery
- apply() adds label and split columns to per-sequence outputs, enabling downstream filtering for held-out evaluation
- Farthest-first selection option for diverse template coverage
XgboostFeature:
- Multiclass (single multi:softprob model) or one-vs-rest (N binary models)
- Balanced class weights via sample_weight / scale_pos_weight
- Optional multiclass-aware undersampling + SMOTE
- Decision thresholds (global, per-class, or None) with default_class fallback
- Test split evaluation with reports.json + summary.csv
Scope reduction vs BehaviorXGBoostModel: The new features do not handle data loading, label loading, train/test splitting, or scaling internally -- those are handled by upstream pipeline stages. External memory mode and parameter presets are also dropped (bounded templates and direct hyperparameter passing replace them).
Visualization features replaced by load_values()¶
VizTimeline and VizGlobalColored removed. Their role is now served by load_values() -- a free function that loads and aligns columns from tracks, features, and labels into a single DataFrame for notebook-level analysis.
KPMS unification¶
KpmsFit + KpmsApply merged into single KpmsFeature with persistent subprocess server for the external keypoint-MoSeq process.
Other changes¶
StreamingFeatureHelperremoved (replaced by unified manifest)loads_own_data()removed from protocol (replaced byInputs._require)group_safe/sequence_safecolumns removed from indexesload_key_data()replaced by manifest-based data loading
What this enables¶
- Composable pipelines. Chain features with typed artifact references. Each stage cached independently.
load_values()for ad-hoc analysis. Load any combination of feature columns, track columns, and ground truth labels into one DataFrame.- Train/test split as per-frame metadata.
ExtractLabeledTemplates.apply()writessplitandlabelcolumns. Filter to test-only data for held-out evaluation in notebooks. - Exact split allocation.
InputStream.n_entriesenables features to compute deterministic split assignments without extra data passes. - Pre-resolved dependencies.
dependency_lookupsgives features ready-to-use(group, sequence) -> Pathlookups. - Parallel apply.
parallelizable=Truefeatures run per-sequence apply in thread or process pools (existing capability, simplified in the rewrite).
Open issues¶
Feature output column declaration¶
feature_columns() uses a hardcoded _EXTRA_META set to distinguish metadata from feature output. Adding label and split required updating this set manually. A more robust solution would be explicit output_columns declarations per feature, validated at registration time.
model_library/ not yet removed¶
BehaviorXGBoostModel still exists in model_library/. To be removed after XgboostFeature is validated on production datasets.
Bout-aware sampling¶
ExtractLabeledTemplates uses proportional random sampling, which over-represents long stationary periods relative to short behavioral events. A bout-aware strategy would distribute sampling across behavioral episodes. Planned future enhancement (documented in code comment).
User-facing API changes¶
See notebooks/migration-guide.md for the full migration guide. Key changes:
Inputs((result,))replacesinput_kind/input_feature/input_run_idkwargsrun_feature()returnsResultdataclass, not rawrun_idstring- Typed artifact references:
Feature.Artifact().from_result(result) - Frame/time filtering moved to
run_feature()parameters - Pair filtering moved to feature
Params