Pipeline¶
The composable pipeline infrastructure that orchestrates feature extraction, frame sampling, and model training. See the Pipeline Architecture guide for design details and the Pipeline Guide for practical usage.
Pipeline Orchestrator¶
Declarative multi-step feature pipeline with automatic caching, staleness detection, and dependency chaining.
Pipeline ¶
Lightweight feature pipeline orchestrator.
Wraps dataset.run_feature() with a declarative graph of named
steps. Caching is automatic (same params → same run_id → skip).
Source code in src/mosaic/core/pipeline/pipeline.py
add ¶
Register a step and return self for chaining.
Source code in src/mosaic/core/pipeline/pipeline.py
status ¶
Show pipeline status: which steps are cached, their run_ids.
Source code in src/mosaic/core/pipeline/pipeline.py
load ¶
Populate self.results from cached runs on disk (no execution).
Source code in src/mosaic/core/pipeline/pipeline.py
run ¶
Execute the pipeline, skipping steps with cached results.
Parameters¶
force_from : str, optional Step name from which to force overwrite (and all downstream).
Source code in src/mosaic/core/pipeline/pipeline.py
396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 | |
clean ¶
Remove orphaned run directories that no longer match the pipeline.
For each FeatureStep, compares the expected run_id (from current params/inputs) against all run_ids found on disk. Runs that don't match are deleted.
Parameters¶
dry_run : bool If True (default), only report what would be deleted. If False, actually delete directories and clean index files.
Returns¶
pd.DataFrame
Summary with columns [step, feature, run_id, status, n_files, size_mb, modified].
Source code in src/mosaic/core/pipeline/pipeline.py
483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 | |
dag ¶
Return adjacency dict: {step_name: [upstream_names]}.
Source code in src/mosaic/core/pipeline/pipeline.py
FeatureStep
dataclass
¶
FeatureStep(name: str, feature_cls: type, params: dict | None = None, input_names: list[str] = list(), run_kwargs: dict = dict())
One node in the pipeline graph.
Parameters¶
name : str
Unique identifier for this step (used in input_names of
downstream steps).
feature_cls : type
Feature class to instantiate. Must follow the Feature protocol.
params : dict | None
Parameter overrides passed to the feature constructor.
input_names : list[str]
Names of upstream FeatureSteps whose Result objects are wired
into this feature's Inputs. Empty means the feature reads
directly from tracks.
run_kwargs : dict
Extra keyword arguments forwarded to dataset.run_feature()
(e.g. parallel_workers, filter_start_time).
CallbackStep
dataclass
¶
CallbackStep(name: str, fn: Callable[['Dataset', dict[str, Result]], None], depends_on: list[str] = list())
A custom function that runs between feature layers.
Parameters¶
name : str
Unique identifier for this step.
fn : Callable
Called as fn(dataset, results_so_far).
depends_on : list[str]
Upstream step names (used for staleness tracking).
Feature Registry¶
SQLite-backed registry replacing per-feature CSV indices. Enables cross-feature queries, pending-work detection, dependency tracking, and API-friendly reads.
FeatureRegistry ¶
Queryable SQLite registry for feature runs and entries.
Parameters¶
db_path : Path
Location of the .mosaic.db file (created if absent).
Source code in src/mosaic/core/pipeline/registry.py
record_run_start ¶
record_run_start(feature: str, run_id: str, version: str, params_hash: str, params_json: str | None = None, inputs_json: str | None = None) -> None
Record that a feature run has started.
Source code in src/mosaic/core/pipeline/registry.py
record_entry ¶
record_entry(feature: str, run_id: str, group: str, sequence: str, abs_path: str | Path, n_rows: int = 0) -> None
Record a computed entry (one parquet file).
Source code in src/mosaic/core/pipeline/registry.py
record_entries ¶
Batch-record multiple entries in one transaction.
Source code in src/mosaic/core/pipeline/registry.py
mark_finished ¶
Set finished_at on a run.
Source code in src/mosaic/core/pipeline/registry.py
record_dependency ¶
Record that (feature, run_id) consumed (upstream_feature, upstream_run_id).
Source code in src/mosaic/core/pipeline/registry.py
latest_run_id ¶
Return most recent run_id for feature (prefers finished).
Source code in src/mosaic/core/pipeline/registry.py
list_runs ¶
Return all runs for feature as a DataFrame.
Source code in src/mosaic/core/pipeline/registry.py
list_features ¶
Return the names of all features that have at least one run.
Source code in src/mosaic/core/pipeline/registry.py
list_entries ¶
Return entries for feature (optionally filtered by run_id).
Source code in src/mosaic/core/pipeline/registry.py
entry_count ¶
Return the number of entries for a specific run.
Source code in src/mosaic/core/pipeline/registry.py
has_run ¶
Check whether a run exists in the registry.
Source code in src/mosaic/core/pipeline/registry.py
run_is_finished ¶
Check whether a run has been marked finished.
Source code in src/mosaic/core/pipeline/registry.py
pending_entries ¶
pending_entries(feature: str, run_id: str, all_entries: set[tuple[str, str]]) -> list[tuple[str, str]]
Return (group, sequence) pairs missing from a run.
all_entries is the full set that should exist (e.g. from tracks).
Source code in src/mosaic/core/pipeline/registry.py
get_dependencies ¶
Return upstream (feature, run_id) pairs for a run.
Source code in src/mosaic/core/pipeline/registry.py
lineage ¶
Return full dependency tree rooted at (feature, run_id).
Returns a nested dict: {"feature": ..., "run_id": ..., "upstream": [...]}.
Source code in src/mosaic/core/pipeline/registry.py
migrate_from_csv ¶
Import existing index.csv files into the registry.
Returns the number of entries imported. Safe to call multiple times -- uses INSERT OR IGNORE so duplicates are skipped.
Source code in src/mosaic/core/pipeline/registry.py
open_registry ¶
Open (or create) the feature registry for a dataset.
Parameters¶
features_root : Path
The features/ directory of the dataset.
migrate_csv : bool
If True and the DB is newly created, import existing index.csv files.
Source code in src/mosaic/core/pipeline/registry.py
Feature Protocol¶
Features implement four methods plus four class attributes:
class MyFeature:
name: str # Unique feature name
version: str # Semantic version string
parallelizable: bool # Whether apply() can run in parallel
scope_dependent: bool # Whether run_id includes manifest scope
def load_state(self, run_root, artifact_paths, dependency_lookups) -> bool: ...
def fit(self, inputs: InputStream) -> None: ...
def apply(self, df: pd.DataFrame) -> pd.DataFrame: ...
def save_state(self, run_root) -> None: ...
Pipeline Types¶
types ¶
JoblibLoadSpec ¶
Bases: StrictModel
Load spec for joblib-serialized objects.
Attributes:
| Name | Type | Description |
|---|---|---|
kind |
Literal['joblib']
|
Discriminator literal "joblib". |
key |
str | None
|
Dict key to extract from loaded object. None loads raw. |
NpzLoadSpec ¶
Bases: StrictModel
Load spec for numpy .npz archives.
Attributes:
| Name | Type | Description |
|---|---|---|
kind |
Literal['npz']
|
Discriminator literal "npz". |
key |
str
|
Array key to extract from the .npz file. Required. |
transpose |
bool
|
Transpose the loaded array. Default False. |
ParquetLoadSpec ¶
Bases: StrictModel
Load spec for parquet files.
Attributes:
| Name | Type | Description |
|---|---|---|
kind |
Literal['parquet']
|
Discriminator literal "parquet". |
transpose |
bool
|
Transpose the loaded array. Default False. |
columns |
list[str] | None
|
Explicit column list. None uses numeric_only filter. |
drop_columns |
list[str] | None
|
Columns to drop before loading. |
numeric_only |
bool
|
Keep only numeric columns. Default True. |
frame_column |
str | None
|
Column to extract as frame indices. |
ArtifactSpec ¶
Bases: Result[str], Generic[L, R]
Reference to a feature artifact with load specification.
Class Type Parameters:
| Name | Bound or Constraints | Description | Default |
|---|---|---|---|
L
|
Load spec type (NpzLoadSpec, ParquetLoadSpec, JoblibLoadSpec). |
required | |
R
|
Return type of from_path(). Defaults to object. |
required |
Attributes:
| Name | Type | Description |
|---|---|---|
load |
L
|
How to load the matched files. |
pattern |
str
|
Glob pattern. Auto-derived from load.kind when empty. |
from_result
classmethod
¶
Create from a Result, validating feature match.
Typed artifact subclasses (with a default feature) validate that result.feature matches. Base ArtifactSpec passes through.
Source code in src/mosaic/core/pipeline/types/artifacts.py
from_path ¶
Load artifact from a resolved file path.
Dispatches on load-spec type via load_from_spec(). Return type is determined by the R type parameter.
Source code in src/mosaic/core/pipeline/types/artifacts.py
FeatureLabelsSource ¶
Columns ¶
Bases: StrictModel
Dataset column name conventions.
Single source of truth for all standard column names. Override by monkey-patching COLUMNS before importing features: from mosaic.behavior.feature_library import params params.COLUMNS = params.Columns(id_col="animal")
Attributes:
| Name | Type | Description |
|---|---|---|
id_col |
str
|
Animal/subject identifier column. Default "id". |
seq_col |
str
|
Sequence identifier column. Default "sequence". |
group_col |
str
|
Group/session identifier column. Default "group". |
frame_col |
str
|
Frame number column name. Default "frame". |
time_col |
str
|
Timestamp column name. Default "time". |
order_by |
Literal['frames', 'time']
|
Preferred temporal ordering column. Default "frames". |
x_col |
str
|
X-coordinate column name. Default "X". |
y_col |
str
|
Y-coordinate column name. Default "Y". |
orientation_col |
str
|
Body-orientation angle column name. Default "ANGLE". |
meta_set ¶
The five metadata column names as a set.
Useful for set intersection (passthrough) or set difference (exclusion)
against df.columns. Spatial columns (x, y, orientation) are
intentionally excluded -- they are data, not metadata.
Source code in src/mosaic/core/pipeline/types/data_config.py
PoseConfig ¶
Bases: StrictModel
Pose keypoint column naming and selection.
Attributes:
| Name | Type | Description |
|---|---|---|
pose_n |
int
|
Total number of pose keypoints in the data. Default 7. |
pose_indices |
list[int] | None
|
Subset of keypoint indices to use. None uses all. |
x_prefix |
str
|
Column name prefix for X coordinates. Default "poseX". |
y_prefix |
str
|
Column name prefix for Y coordinates. Default "poseY". |
confidence_prefix |
str
|
Column prefix for confidence scores. Default "poseP". |
keypoint_names |
list[str] | None
|
Human-readable names for each keypoint. Default None (auto-generated as ["kp0", "kp1", ...] by features that need names). |
Feature ¶
Bases: Protocol
Feature protocol -- 4 attributes, 4 methods.
InputStream ¶
Factory for fit() input iterators, with entry count metadata.
Wraps a callable that produces (entry_key, DataFrame) iterators.
Each call creates a fresh iterator over the manifest entries.
n_entries exposes the total number of entries so features can
make exact allocation decisions (e.g. train/test split counts)
without an extra data pass.
Source code in src/mosaic/core/pipeline/types/feature.py
Inputs ¶
Bases: RootModel[tuple[InputItem, ...]], Generic[InputItem]
Base class for feature input collections. Mirrors Params.
Each Feature subclasses to narrow allowed input types, paralleling class Params(Params):.
Examples:
Inputs(("tracks",)) Inputs((Result(feature="speed-angvel"),)) Inputs(("tracks", Result(feature="nn", run_id="0.1-abc")))
Per-feature narrowing
class Inputs(Inputs[TrackInput]): pass
Features that take no pipeline inputs
class Inputs(Inputs[Result]): _require: ClassVar[InputRequire] = "empty"
Self-loading features that optionally accept inputs (e.g. fit + assign): class Inputs(Inputs[Result]): _require: ClassVar[InputRequire] = "any"
InputsLike ¶
Bases: Protocol
Read-only interface satisfied by any Inputs[InputItem].
GroundTruthLabelsSource ¶
LabelsSource ¶
Bases: StrictModel, Generic[K]
Base class for dataset label dependencies.
Resolved by _resolve_dependency_paths (Task 9) to
GlobalModelParams ¶
Bases: Params, Generic[M]
Base params for global features that fit on a templates artifact or load a pre-fitted model.
Type parameter M is the model artifact type (must extend JoblibArtifact).
Exactly one of templates or model must be provided.
Both fields use default_factory so that from_overrides() merges partial dicts correctly. The _exclusive_source validator checks model_fields_set and nulls out the field that was not provided.
Attributes:
| Name | Type | Description |
|---|---|---|
templates |
ParquetArtifact | None
|
Templates artifact to fit from. Mutually exclusive with model. |
model |
M | None
|
Pre-fitted model artifact. Mutually exclusive with templates. |
Params ¶
Bases: StrictModel
Base for all feature parameter models.
Provides from_overrides() constructor for user-config dicts. Subclasses declare feature-specific fields.
from_overrides
classmethod
¶
Construct from user dict; missing keys get field defaults.
For BaseModel-typed fields with default_factory, partial dict overrides are merged on top of the default before validation (1-level deep merge). This replaces the per-feature deep-merge hacks in global_ward.
Source code in src/mosaic/core/pipeline/types/params.py
BodyScaleResult ¶
Bases: Result[str]
Result for a body-scale-family feature.
Accepts any feature name (default "body-scale") so that auto-derived
names or upstream variants can be referenced. Use from_result() to
copy feature+run_id from an existing run.
from_result ¶
Return a copy with feature and run_id set from another Result.
NNResult ¶
Bases: Result[str]
Result for a nearest-neighbor-family feature.
Accepts any feature name (default "nearest-neighbor") so that
auto-derived names like nearest-neighbor__from__tracks or variants
computed from different upstream data (e.g. smoothed tracks) can be
referenced. Use from_result() to copy feature+run_id from an
existing run.
from_result ¶
Return a copy with feature and run_id set from another Result.
Result ¶
Bases: StrictModel, Generic[F]
Reference to a prior feature's output as pipeline input.
Attributes:
| Name | Type | Description |
|---|---|---|
feature |
F
|
Feature name whose output to consume. |
run_id |
str | None
|
Specific run ID, or None for latest finished run. |
ResultColumn ¶
Bases: Result[str]
Reference to a column in a feature's standard parquet output.
Attributes:
| Name | Type | Description |
|---|---|---|
feature |
str
|
Source feature name. |
column |
str
|
Column name to extract from the parquet output. |
run_id |
str | None
|
Specific run ID, or None for latest. |
from_result ¶
Return a copy with feature and run_id set from another Result.
TracksColumn ¶
Bases: StrictModel
Reference to a column in the tracks data.
Attributes:
| Name | Type | Description |
|---|---|---|
column |
str
|
Column name to extract from tracks. |
load_from_spec ¶
Load artifact from a file path using a typed load specification.
Parameters¶
path Resolved path to the artifact file. spec Typed load specification (NpzLoadSpec, ParquetLoadSpec, or JoblibLoadSpec).
Returns¶
object Loaded data: np.ndarray (npz), pd.DataFrame (parquet), or arbitrary object (joblib).
Source code in src/mosaic/core/pipeline/_loaders.py
resolve_order_col ¶
Pick the best ordering column present in df.
Uses COLUMNS.order_by preference, then falls back to the other option. Raises ValueError when neither column exists.
Source code in src/mosaic/core/pipeline/types/data_config.py
Pipeline Runner¶
run ¶
run_feature ¶
run_feature(ds: Dataset, feature: Feature, groups: Iterable[str] | None = None, sequences: Iterable[str] | None = None, overwrite: bool = False, parallel_workers: int | None = None, parallel_mode: str | None = 'thread', overlap_frames: int = 0, filter_start_frame: int | None = None, filter_end_frame: int | None = None, filter_start_time: float | None = None, filter_end_time: float | None = None, registry: FeatureRegistry | None = None) -> Result
Apply a Feature over a chosen scope (default: whole dataset).
Input routing is determined by feature.inputs: tracks (default),
a single upstream feature, or a multi-input set.
Parameters¶
feature : Feature
The feature object implementing the Feature protocol. Its inputs
attribute controls where data is read from.
groups, sequences : optional iterables
Scope filter (applies to whichever input source is used).
overwrite : bool
Overwrite existing outputs for this run_id.
parallel_workers : int | None
When >1 and the feature declares itself parallelizable, run the apply
phase in parallel. Defaults to sequential execution.
parallel_mode : {'thread','process'}
Execution backend when parallel_workers > 1. 'thread' (default) uses
ThreadPoolExecutor; 'process' uses ProcessPoolExecutor.
overlap_frames : int, default 0
Load this many frames from adjacent segments to handle edge effects.
Mutually exclusive with frame/time filters.
filter_start_frame : int | None
If set, only include frames >= this value.
filter_end_frame : int | None
If set, only include frames < this value.
filter_start_time : float | None
If set, converted to start frame via fps_default from dataset metadata.
filter_end_time : float | None
If set, converted to end frame via fps_default from dataset metadata.
Source code in src/mosaic/core/pipeline/run.py
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 | |
Model Training¶
train_model ¶
train_model(ds, model, config: str | Path | dict[str, object] | None = None, overwrite: bool = False, *, progress_callback: TrainingProgressCallback | None = None) -> str
Train a registered model using a JSON (or dict) configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ds
|
Dataset instance providing model storage roots. |
required | |
model
|
Model/trainer instance implementing |
required | |
config
|
str | Path | dict[str, object] | None
|
Path to a JSON config file or an in-memory dict of hyperparameters. |
None
|
overwrite
|
bool
|
Reserved for future use (run_ids are hash-based). |
False
|
progress_callback
|
TrainingProgressCallback | None
|
Callback for reporting training progress
(epoch, phase, etc.). If the model's |
None
|
Returns:
| Type | Description |
|---|---|
str
|
The |
Source code in src/mosaic/core/pipeline/models.py
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 | |
Training Progress¶
Callback protocol and SQLite implementation for monitoring training progress in real time.
TrainingProgressCallback ¶
Bases: Protocol
Minimal callback contract for training progress reporting.
Implement this protocol to create custom progress backends (e.g. MLflow, W&B). The three methods correspond to different granularities of training progress.
on_epoch_end ¶
Called after each training epoch.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
epoch
|
int
|
Zero-based epoch index. |
required |
total_epochs
|
int
|
Total number of epochs planned. |
required |
metrics
|
dict[str, float]
|
Metric name-value pairs (e.g. |
required |
Source code in src/mosaic/core/pipeline/progress.py
on_class_start ¶
Called when a one-vs-rest class training begins.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
class_idx
|
int
|
Zero-based class index. |
required |
total_classes
|
int
|
Total number of classes. |
required |
class_name
|
str
|
Human-readable class identifier. |
required |
Source code in src/mosaic/core/pipeline/progress.py
on_phase ¶
Called for coarse-grained phase transitions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
phase
|
str
|
Phase identifier (e.g. |
required |
message
|
str
|
Free-text description. |
required |
SQLiteProgressCallback ¶
Writes progress rows into the training_progress table.
Opens its own connection (WAL mode) so that the training thread and any reader (API, notebook) do not block each other.
Parameters¶
db_path : Path
Path to the .mosaic.db file.
job_id : str
The training job this progress belongs to.
Source code in src/mosaic/core/pipeline/progress.py
on_epoch_end ¶
Record an epoch completion with associated metrics.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
epoch
|
int
|
Zero-based epoch index. |
required |
total_epochs
|
int
|
Total epochs planned. |
required |
metrics
|
dict[str, float]
|
Metric name-value pairs. |
required |
Source code in src/mosaic/core/pipeline/progress.py
on_class_start ¶
Record the start of a one-vs-rest class training iteration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
class_idx
|
int
|
Zero-based class index. |
required |
total_classes
|
int
|
Total number of classes. |
required |
class_name
|
str
|
Human-readable class identifier. |
required |
Source code in src/mosaic/core/pipeline/progress.py
on_phase ¶
Record a coarse-grained phase transition.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
phase
|
str
|
Phase identifier. |
required |
message
|
str
|
Free-text description. |
required |
Source code in src/mosaic/core/pipeline/progress.py
get_progress ¶
Return all progress rows for this job, ordered chronologically.
Source code in src/mosaic/core/pipeline/progress.py
CompositeProgressCallback ¶
read_progress ¶
Read progress for a job without creating a full callback instance.
Opens a read-only connection, queries, and closes immediately. Suitable for one-shot reads from an API endpoint or notebook.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
db_path
|
Path
|
Path to the |
required |
job_id
|
str
|
The training job to query. |
required |
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of progress dicts, each with keys |
list[dict[str, Any]]
|
|
list[dict[str, Any]]
|
and |