Skip to content

Pipeline

The composable pipeline infrastructure that orchestrates feature extraction, frame sampling, and model training. See the Pipeline Architecture guide for design details and the Pipeline Guide for practical usage.

Pipeline Orchestrator

Declarative multi-step feature pipeline with automatic caching, staleness detection, and dependency chaining.

Pipeline

Pipeline(default_run_kwargs: dict | None = None)

Lightweight feature pipeline orchestrator.

Wraps dataset.run_feature() with a declarative graph of named steps. Caching is automatic (same params → same run_id → skip).

Source code in src/mosaic/core/pipeline/pipeline.py
def __init__(self, default_run_kwargs: dict | None = None) -> None:
    self.steps: list[FeatureStep | CallbackStep] = []
    self.results: dict[str, Result] = {}
    self.default_run_kwargs: dict = default_run_kwargs or {}

add

add(step: FeatureStep | CallbackStep) -> Pipeline

Register a step and return self for chaining.

Source code in src/mosaic/core/pipeline/pipeline.py
def add(self, step: FeatureStep | CallbackStep) -> Pipeline:
    """Register a step and return self for chaining."""
    existing = {s.name for s in self.steps}
    if step.name in existing:
        msg = f"Duplicate step name: {step.name!r}"
        raise ValueError(msg)

    # Validate that referenced inputs exist
    refs = (
        step.input_names
        if isinstance(step, FeatureStep)
        else step.depends_on
    )
    for ref in refs:
        if ref not in existing:
            msg = (
                f"Step {step.name!r} references unknown upstream "
                f"{ref!r}. Available: {sorted(existing)}"
            )
            raise ValueError(msg)

    self.steps.append(step)
    return self

status

status(dataset: Dataset) -> pd.DataFrame

Show pipeline status: which steps are cached, their run_ids.

Source code in src/mosaic/core/pipeline/pipeline.py
def status(self, dataset: Dataset) -> pd.DataFrame:
    """Show pipeline status: which steps are cached, their run_ids."""
    resolved = self._resolve_step_cache(dataset)
    rows = []

    for info in resolved:
        step = info["step"]

        if isinstance(step, CallbackStep):
            rows.append({
                "step": step.name,
                "feature": "(callback)",
                "run_id": "-",
                "n_seq": "-",
                "runs": "-",
                "cached": "-" if not info["stale"] else "stale",
                "modified": "-",
            })
            continue

        try:
            runs_df = list_feature_runs(dataset, info["storage_name"])
            n_runs = len(runs_df["run_id"].unique())
            if info["cached"]:
                matched = runs_df[
                    runs_df["run_id"] == info["expected_run_id"]
                ]
                n_seq = (
                    int(matched.iloc[0].get("n_entries", 0))
                    if not matched.empty
                    else 0
                )
            else:
                n_seq = 0
        except (FileNotFoundError, ValueError):
            n_runs = 0
            n_seq = 0

        cached_display = info["cached"]
        if info["stale"] and not info["cached"]:
            run_root = feature_run_root(
                dataset,
                info["storage_name"],
                info["expected_run_id"] or "",
            )
            if info["expected_run_id"] and run_root.exists():
                cached_display = "stale"

        # Modification timestamp (show even for stale runs if files exist)
        modified = None
        if info["expected_run_id"]:
            rr = feature_run_root(
                dataset, info["storage_name"], info["expected_run_id"]
            )
            if rr.exists():
                modified = _newest_mtime(rr)

        row: dict[str, object] = {
            "step": step.name,
            "feature": info.get("feature_short", info["storage_name"]),
            "run_id": (
                info["expected_run_id"] if info["cached"] else "\u2014"
            ),
            "n_seq": n_seq,
            "runs": n_runs,
            "cached": cached_display,
            "modified": modified,
        }
        if "error" in info:
            row["error"] = info["error"]
        rows.append(row)

    return pd.DataFrame(rows)

load

load(dataset: Dataset) -> dict[str, Result]

Populate self.results from cached runs on disk (no execution).

Source code in src/mosaic/core/pipeline/pipeline.py
def load(self, dataset: Dataset) -> dict[str, Result]:
    """Populate ``self.results`` from cached runs on disk (no execution)."""
    resolved = self._resolve_step_cache(dataset)
    self.results = {}
    loaded: list[str] = []
    missing: list[tuple[str, str]] = []

    for info in resolved:
        step = info["step"]
        if isinstance(step, CallbackStep):
            continue

        if info["cached"]:
            self.results[step.name] = Result(
                feature=info["storage_name"],
                run_id=info["expected_run_id"],
            )
            loaded.append(step.name)
        else:
            reason = (
                "stale (upstream changed)"
                if info["stale"]
                else "not cached"
            )
            missing.append((step.name, reason))

    total = len(loaded) + len(missing)
    print(f"Pipeline.load: {len(loaded)}/{total} steps loaded")
    if loaded:
        print(f"  Loaded: {', '.join(loaded)}")
    if missing:
        print(
            f"  Missing: {', '.join(f'{n} ({r})' for n, r in missing)}"
        )
    return self.results

run

run(dataset: Dataset, force_from: str | None = None) -> dict[str, Result]

Execute the pipeline, skipping steps with cached results.

Parameters

force_from : str, optional Step name from which to force overwrite (and all downstream).

Source code in src/mosaic/core/pipeline/pipeline.py
def run(
    self,
    dataset: Dataset,
    force_from: str | None = None,
) -> dict[str, Result]:
    """Execute the pipeline, skipping steps with cached results.

    Parameters
    ----------
    force_from : str, optional
        Step name from which to force overwrite (and all downstream).
    """
    resolved = self._resolve_step_cache(dataset)
    cached_map: dict[str, dict] = {
        info["step"].name: info
        for info in resolved
        if not isinstance(info["step"], CallbackStep)
    }

    # Open registry for this dataset so run_feature() can write to it
    try:
        features_root = dataset.get_root("features")
        registry = open_registry(features_root, migrate_csv=True)
    except Exception:
        registry = None

    if force_from:
        known = {s.name for s in self.steps}
        if force_from not in known:
            msg = f"force_from={force_from!r} is not a known step. Available: {sorted(known)}"
            raise ValueError(msg)

    self.results = {}
    force_set: set[str] = self._downstream_of(force_from) if force_from else set()

    try:
        for step in self.steps:
            if isinstance(step, CallbackStep):
                print(f"  [{step.name}] running callback...")
                step.fn(dataset, self.results)
                continue

            # Check if cached
            info = cached_map.get(step.name, {})
            if step.name not in force_set and info.get("cached"):
                run_id = info["expected_run_id"]
                self.results[step.name] = Result(
                    feature=info["storage_name"],
                    run_id=run_id,
                )
                print(
                    f"  [{step.name}] {step.feature_cls.__name__}"
                    f" -> {run_id} (cached)"
                )
                continue

            # Not cached — execute
            if step.input_names:
                input_items = tuple(
                    self.results[n] for n in step.input_names
                )
            else:
                input_items = ("tracks",)

            feature = step.feature_cls(
                inputs=Inputs(input_items), params=step.params
            )
            kwargs = {**self.default_run_kwargs, **step.run_kwargs}
            if step.name in force_set:
                kwargs["overwrite"] = True
            if registry is not None:
                kwargs["registry"] = registry

            print(
                f"  [{step.name}] {step.feature_cls.__name__} ...",
                end=" ",
            )
            result = dataset.run_feature(feature, **kwargs)
            self.results[step.name] = result
            print(f"-> {result.run_id}")

    finally:
        if registry is not None:
            registry.close()

    return self.results

clean

clean(dataset: Dataset, *, dry_run: bool = True) -> pd.DataFrame

Remove orphaned run directories that no longer match the pipeline.

For each FeatureStep, compares the expected run_id (from current params/inputs) against all run_ids found on disk. Runs that don't match are deleted.

Parameters

dry_run : bool If True (default), only report what would be deleted. If False, actually delete directories and clean index files.

Returns

pd.DataFrame Summary with columns [step, feature, run_id, status, n_files, size_mb, modified].

Source code in src/mosaic/core/pipeline/pipeline.py
def clean(
    self,
    dataset: Dataset,
    *,
    dry_run: bool = True,
) -> pd.DataFrame:
    """Remove orphaned run directories that no longer match the pipeline.

    For each FeatureStep, compares the expected run_id (from current
    params/inputs) against all run_ids found on disk.  Runs that don't
    match are deleted.

    Parameters
    ----------
    dry_run : bool
        If True (default), only report what would be deleted.
        If False, actually delete directories and clean index files.

    Returns
    -------
    pd.DataFrame
        Summary with columns ``[step, feature, run_id, status, n_files, size_mb, modified]``.
    """
    import shutil
    from datetime import datetime, timezone

    resolved = self._resolve_step_cache(dataset)
    rows: list[dict] = []

    for info in resolved:
        step = info["step"]
        if isinstance(step, CallbackStep):
            continue

        storage = info.get("storage_name")
        expected_rid = info.get("expected_run_id")
        if not storage:
            continue

        # List all run_ids on disk for this feature
        try:
            runs_df = list_feature_runs(dataset, storage)
        except (FileNotFoundError, ValueError):
            continue

        all_rids = runs_df["run_id"].unique().tolist() if not runs_df.empty else []

        for rid in all_rids:
            run_dir = feature_run_root(dataset, storage, rid)
            if run_dir.exists():
                files = list(run_dir.glob("*"))
                n_files = len(files)
                stats = [f.stat() for f in files if f.is_file()]
                size_bytes = sum(s.st_size for s in stats)
                newest_mtime = max((s.st_mtime for s in stats), default=0)
            else:
                n_files = 0
                size_bytes = 0
                newest_mtime = 0

            is_current = rid == expected_rid
            if is_current:
                status = "current"
            elif dry_run:
                status = "would remove"
            else:
                # Delete the directory
                if run_dir.exists():
                    shutil.rmtree(run_dir)
                status = "removed"

            rows.append({
                "step": step.name,
                "feature": storage,
                "run_id": rid,
                "status": status,
                "n_files": n_files,
                "size_mb": round(size_bytes / 1_048_576, 1),
                "modified": (
                    datetime.fromtimestamp(newest_mtime, tz=timezone.utc).strftime("%Y-%m-%d %H:%M")
                    if newest_mtime > 0
                    else None
                ),
            })

        # Clean index.csv if not dry_run
        if not dry_run:
            idx_path = feature_index_path(dataset, storage)
            if idx_path.exists():
                idx_df = pd.read_csv(idx_path, keep_default_na=False)
                before = len(idx_df)
                idx_df = idx_df[
                    (idx_df["run_id"] == expected_rid)
                    | ~idx_df["run_id"].isin(all_rids)
                ]
                if len(idx_df) < before:
                    idx_df.to_csv(idx_path, index=False)

    summary = pd.DataFrame(rows)
    if summary.empty:
        print("Pipeline.clean: no feature runs found on disk.")
        return summary

    orphaned = summary[summary["status"] != "current"]
    if orphaned.empty:
        print("Pipeline.clean: no orphaned runs found — everything is current.")
    else:
        action = "Would remove" if dry_run else "Removed"
        total_mb = orphaned["size_mb"].sum()
        n_runs = len(orphaned)
        print(
            f"Pipeline.clean: {action} {n_runs} orphaned run(s) "
            f"({total_mb:.1f} MB)."
        )
        if dry_run:
            print("  Pass dry_run=False to delete.")

    return summary

dag

dag() -> dict[str, list[str]]

Return adjacency dict: {step_name: [upstream_names]}.

Source code in src/mosaic/core/pipeline/pipeline.py
def dag(self) -> dict[str, list[str]]:
    """Return adjacency dict: ``{step_name: [upstream_names]}``."""
    adj: dict[str, list[str]] = {}
    for step in self.steps:
        if isinstance(step, FeatureStep):
            adj[step.name] = list(step.input_names)
        else:
            adj[step.name] = list(step.depends_on)
    return adj

FeatureStep dataclass

FeatureStep(name: str, feature_cls: type, params: dict | None = None, input_names: list[str] = list(), run_kwargs: dict = dict())

One node in the pipeline graph.

Parameters

name : str Unique identifier for this step (used in input_names of downstream steps). feature_cls : type Feature class to instantiate. Must follow the Feature protocol. params : dict | None Parameter overrides passed to the feature constructor. input_names : list[str] Names of upstream FeatureSteps whose Result objects are wired into this feature's Inputs. Empty means the feature reads directly from tracks. run_kwargs : dict Extra keyword arguments forwarded to dataset.run_feature() (e.g. parallel_workers, filter_start_time).

CallbackStep dataclass

CallbackStep(name: str, fn: Callable[['Dataset', dict[str, Result]], None], depends_on: list[str] = list())

A custom function that runs between feature layers.

Parameters

name : str Unique identifier for this step. fn : Callable Called as fn(dataset, results_so_far). depends_on : list[str] Upstream step names (used for staleness tracking).

Feature Registry

SQLite-backed registry replacing per-feature CSV indices. Enables cross-feature queries, pending-work detection, dependency tracking, and API-friendly reads.

FeatureRegistry

FeatureRegistry(db_path: Path)

Queryable SQLite registry for feature runs and entries.

Parameters

db_path : Path Location of the .mosaic.db file (created if absent).

Source code in src/mosaic/core/pipeline/registry.py
def __init__(self, db_path: Path) -> None:
    self.db_path = db_path
    db_path.parent.mkdir(parents=True, exist_ok=True)
    self._conn = sqlite3.connect(str(db_path), timeout=10)
    self._conn.execute("PRAGMA journal_mode=WAL")
    self._conn.execute("PRAGMA foreign_keys=ON")
    self._conn.executescript(_SCHEMA)
    self._conn.commit()

record_run_start

record_run_start(feature: str, run_id: str, version: str, params_hash: str, params_json: str | None = None, inputs_json: str | None = None) -> None

Record that a feature run has started.

Source code in src/mosaic/core/pipeline/registry.py
def record_run_start(
    self,
    feature: str,
    run_id: str,
    version: str,
    params_hash: str,
    params_json: str | None = None,
    inputs_json: str | None = None,
) -> None:
    """Record that a feature run has started."""
    self._conn.execute(
        """\
        INSERT OR IGNORE INTO feature_runs
            (feature, run_id, version, params_hash, params_json,
             inputs_json, started_at)
        VALUES (?, ?, ?, ?, ?, ?, ?)
        """,
        (feature, run_id, version, params_hash, params_json, inputs_json, now_iso()),
    )
    self._conn.commit()

record_entry

record_entry(feature: str, run_id: str, group: str, sequence: str, abs_path: str | Path, n_rows: int = 0) -> None

Record a computed entry (one parquet file).

Source code in src/mosaic/core/pipeline/registry.py
def record_entry(
    self,
    feature: str,
    run_id: str,
    group: str,
    sequence: str,
    abs_path: str | Path,
    n_rows: int = 0,
) -> None:
    """Record a computed entry (one parquet file)."""
    self._conn.execute(
        """\
        INSERT OR REPLACE INTO feature_entries
            (feature, run_id, group_, sequence, abs_path, n_rows)
        VALUES (?, ?, ?, ?, ?, ?)
        """,
        (feature, run_id, group, sequence, str(abs_path), n_rows),
    )
    self._conn.commit()

record_entries

record_entries(rows: list[tuple[str, str, str, str, str | Path, int]]) -> None

Batch-record multiple entries in one transaction.

Source code in src/mosaic/core/pipeline/registry.py
def record_entries(
    self,
    rows: list[tuple[str, str, str, str, str | Path, int]],
) -> None:
    """Batch-record multiple entries in one transaction."""
    self._conn.executemany(
        """\
        INSERT OR REPLACE INTO feature_entries
            (feature, run_id, group_, sequence, abs_path, n_rows)
        VALUES (?, ?, ?, ?, ?, ?)
        """,
        [(f, r, g, s, str(p), n) for f, r, g, s, p, n in rows],
    )
    self._conn.commit()

mark_finished

mark_finished(feature: str, run_id: str) -> None

Set finished_at on a run.

Source code in src/mosaic/core/pipeline/registry.py
def mark_finished(self, feature: str, run_id: str) -> None:
    """Set ``finished_at`` on a run."""
    self._conn.execute(
        """\
        UPDATE feature_runs SET finished_at = ?
        WHERE feature = ? AND run_id = ? AND finished_at = ''
        """,
        (now_iso(), feature, run_id),
    )
    self._conn.commit()

record_dependency

record_dependency(feature: str, run_id: str, upstream_feature: str, upstream_run_id: str) -> None

Record that (feature, run_id) consumed (upstream_feature, upstream_run_id).

Source code in src/mosaic/core/pipeline/registry.py
def record_dependency(
    self,
    feature: str,
    run_id: str,
    upstream_feature: str,
    upstream_run_id: str,
) -> None:
    """Record that *(feature, run_id)* consumed *(upstream_feature, upstream_run_id)*."""
    self._conn.execute(
        """\
        INSERT OR IGNORE INTO dependencies
            (feature, run_id, upstream_feature, upstream_run_id)
        VALUES (?, ?, ?, ?)
        """,
        (feature, run_id, upstream_feature, upstream_run_id),
    )
    self._conn.commit()

latest_run_id

latest_run_id(feature: str) -> str

Return most recent run_id for feature (prefers finished).

Source code in src/mosaic/core/pipeline/registry.py
def latest_run_id(self, feature: str) -> str:
    """Return most recent run_id for *feature* (prefers finished)."""
    row = self._conn.execute(
        """\
        SELECT run_id FROM feature_runs
        WHERE feature = ?
        ORDER BY
            CASE WHEN finished_at != '' THEN 0 ELSE 1 END,
            started_at DESC
        LIMIT 1
        """,
        (feature,),
    ).fetchone()
    if row is None:
        msg = f"No runs found for feature '{feature}'"
        raise ValueError(msg)
    return str(row[0])

list_runs

list_runs(feature: str) -> pd.DataFrame

Return all runs for feature as a DataFrame.

Source code in src/mosaic/core/pipeline/registry.py
def list_runs(self, feature: str) -> pd.DataFrame:
    """Return all runs for *feature* as a DataFrame."""
    return pd.read_sql_query(
        """\
        SELECT * FROM feature_runs
        WHERE feature = ?
        ORDER BY
            CASE WHEN finished_at != '' THEN 0 ELSE 1 END,
            started_at DESC
        """,
        self._conn,
        params=(feature,),
    )

list_features

list_features() -> list[str]

Return the names of all features that have at least one run.

Source code in src/mosaic/core/pipeline/registry.py
def list_features(self) -> list[str]:
    """Return the names of all features that have at least one run."""
    rows = self._conn.execute(
        "SELECT DISTINCT feature FROM feature_runs ORDER BY feature"
    ).fetchall()
    return [r[0] for r in rows]

list_entries

list_entries(feature: str, run_id: str | None = None) -> pd.DataFrame

Return entries for feature (optionally filtered by run_id).

Source code in src/mosaic/core/pipeline/registry.py
def list_entries(
    self,
    feature: str,
    run_id: str | None = None,
) -> pd.DataFrame:
    """Return entries for *feature* (optionally filtered by run_id)."""
    if run_id is None:
        run_id = self.latest_run_id(feature)
    return pd.read_sql_query(
        """\
        SELECT * FROM feature_entries
        WHERE feature = ? AND run_id = ?
        ORDER BY group_, sequence
        """,
        self._conn,
        params=(feature, run_id),
    )

entry_count

entry_count(feature: str, run_id: str) -> int

Return the number of entries for a specific run.

Source code in src/mosaic/core/pipeline/registry.py
def entry_count(self, feature: str, run_id: str) -> int:
    """Return the number of entries for a specific run."""
    row = self._conn.execute(
        """\
        SELECT COUNT(*) FROM feature_entries
        WHERE feature = ? AND run_id = ?
        """,
        (feature, run_id),
    ).fetchone()
    return int(row[0]) if row else 0

has_run

has_run(feature: str, run_id: str) -> bool

Check whether a run exists in the registry.

Source code in src/mosaic/core/pipeline/registry.py
def has_run(self, feature: str, run_id: str) -> bool:
    """Check whether a run exists in the registry."""
    row = self._conn.execute(
        """\
        SELECT 1 FROM feature_runs
        WHERE feature = ? AND run_id = ?
        LIMIT 1
        """,
        (feature, run_id),
    ).fetchone()
    return row is not None

run_is_finished

run_is_finished(feature: str, run_id: str) -> bool

Check whether a run has been marked finished.

Source code in src/mosaic/core/pipeline/registry.py
def run_is_finished(self, feature: str, run_id: str) -> bool:
    """Check whether a run has been marked finished."""
    row = self._conn.execute(
        """\
        SELECT finished_at FROM feature_runs
        WHERE feature = ? AND run_id = ?
        """,
        (feature, run_id),
    ).fetchone()
    return row is not None and row[0] != ""

pending_entries

pending_entries(feature: str, run_id: str, all_entries: set[tuple[str, str]]) -> list[tuple[str, str]]

Return (group, sequence) pairs missing from a run.

all_entries is the full set that should exist (e.g. from tracks).

Source code in src/mosaic/core/pipeline/registry.py
def pending_entries(
    self,
    feature: str,
    run_id: str,
    all_entries: set[tuple[str, str]],
) -> list[tuple[str, str]]:
    """Return ``(group, sequence)`` pairs missing from a run.

    *all_entries* is the full set that should exist (e.g. from tracks).
    """
    existing = self._conn.execute(
        """\
        SELECT group_, sequence FROM feature_entries
        WHERE feature = ? AND run_id = ?
        """,
        (feature, run_id),
    ).fetchall()
    existing_set = {(r[0], r[1]) for r in existing}
    return sorted(all_entries - existing_set)

get_dependencies

get_dependencies(feature: str, run_id: str) -> list[tuple[str, str]]

Return upstream (feature, run_id) pairs for a run.

Source code in src/mosaic/core/pipeline/registry.py
def get_dependencies(
    self, feature: str, run_id: str
) -> list[tuple[str, str]]:
    """Return upstream ``(feature, run_id)`` pairs for a run."""
    rows = self._conn.execute(
        """\
        SELECT upstream_feature, upstream_run_id FROM dependencies
        WHERE feature = ? AND run_id = ?
        """,
        (feature, run_id),
    ).fetchall()
    return [(r[0], r[1]) for r in rows]

lineage

lineage(feature: str, run_id: str) -> dict[str, object]

Return full dependency tree rooted at (feature, run_id).

Returns a nested dict: {"feature": ..., "run_id": ..., "upstream": [...]}.

Source code in src/mosaic/core/pipeline/registry.py
def lineage(self, feature: str, run_id: str) -> dict[str, object]:
    """Return full dependency tree rooted at *(feature, run_id)*.

    Returns a nested dict: ``{"feature": ..., "run_id": ..., "upstream": [...]}``.
    """
    upstream = self.get_dependencies(feature, run_id)
    return {
        "feature": feature,
        "run_id": run_id,
        "upstream": [
            self.lineage(uf, ur) for uf, ur in upstream
        ],
    }

migrate_from_csv

migrate_from_csv(features_root: Path) -> int

Import existing index.csv files into the registry.

Returns the number of entries imported. Safe to call multiple times -- uses INSERT OR IGNORE so duplicates are skipped.

Source code in src/mosaic/core/pipeline/registry.py
def migrate_from_csv(self, features_root: Path) -> int:
    """Import existing ``index.csv`` files into the registry.

    Returns the number of entries imported. Safe to call multiple times --
    uses INSERT OR IGNORE so duplicates are skipped.
    """
    count = 0
    for csv_path in sorted(features_root.glob("*/index.csv")):
        feature_name = csv_path.parent.name
        try:
            df = pd.read_csv(csv_path, keep_default_na=False)
        except Exception:
            continue
        if df.empty:
            continue

        # Group by run_id to create feature_runs entries
        if "run_id" not in df.columns:
            continue

        for run_id, grp in df.groupby("run_id"):
            run_id_str = str(run_id)
            first = grp.iloc[0]
            version = str(first.get("version", ""))
            params_hash = str(first.get("params_hash", ""))
            started_at = str(first.get("started_at", now_iso()))
            finished_at = str(first.get("finished_at", ""))

            self._conn.execute(
                """\
                INSERT OR IGNORE INTO feature_runs
                    (feature, run_id, version, params_hash,
                     started_at, finished_at)
                VALUES (?, ?, ?, ?, ?, ?)
                """,
                (feature_name, run_id_str, version, params_hash,
                 started_at, finished_at),
            )

            for _, row in grp.iterrows():
                group = str(row.get("group", ""))
                sequence = str(row.get("sequence", ""))
                abs_path = str(row.get("abs_path", ""))
                n_rows = int(row.get("n_rows", 0))
                self._conn.execute(
                    """\
                    INSERT OR IGNORE INTO feature_entries
                        (feature, run_id, group_, sequence, abs_path, n_rows)
                    VALUES (?, ?, ?, ?, ?, ?)
                    """,
                    (feature_name, run_id_str, group, sequence, abs_path, n_rows),
                )
                count += 1

    self._conn.commit()
    return count

open_registry

open_registry(features_root: Path, *, migrate_csv: bool = True) -> FeatureRegistry

Open (or create) the feature registry for a dataset.

Parameters

features_root : Path The features/ directory of the dataset. migrate_csv : bool If True and the DB is newly created, import existing index.csv files.

Source code in src/mosaic/core/pipeline/registry.py
def open_registry(features_root: Path, *, migrate_csv: bool = True) -> FeatureRegistry:
    """Open (or create) the feature registry for a dataset.

    Parameters
    ----------
    features_root : Path
        The ``features/`` directory of the dataset.
    migrate_csv : bool
        If True and the DB is newly created, import existing ``index.csv`` files.
    """
    db_path = features_root / ".mosaic.db"
    is_new = not db_path.exists()
    reg = FeatureRegistry(db_path)
    if is_new and migrate_csv:
        n = reg.migrate_from_csv(features_root)
        if n > 0:
            print(f"[registry] migrated {n} entries from CSV indices into {db_path}")
    return reg

Feature Protocol

Features implement four methods plus four class attributes:

class MyFeature:
    name: str                  # Unique feature name
    version: str               # Semantic version string
    parallelizable: bool       # Whether apply() can run in parallel
    scope_dependent: bool      # Whether run_id includes manifest scope

    def load_state(self, run_root, artifact_paths, dependency_lookups) -> bool: ...
    def fit(self, inputs: InputStream) -> None: ...
    def apply(self, df: pd.DataFrame) -> pd.DataFrame: ...
    def save_state(self, run_root) -> None: ...

Pipeline Types

types

JoblibLoadSpec

Bases: StrictModel

Load spec for joblib-serialized objects.

Attributes:

Name Type Description
kind Literal['joblib']

Discriminator literal "joblib".

key str | None

Dict key to extract from loaded object. None loads raw.

NpzLoadSpec

Bases: StrictModel

Load spec for numpy .npz archives.

Attributes:

Name Type Description
kind Literal['npz']

Discriminator literal "npz".

key str

Array key to extract from the .npz file. Required.

transpose bool

Transpose the loaded array. Default False.

ParquetLoadSpec

Bases: StrictModel

Load spec for parquet files.

Attributes:

Name Type Description
kind Literal['parquet']

Discriminator literal "parquet".

transpose bool

Transpose the loaded array. Default False.

columns list[str] | None

Explicit column list. None uses numeric_only filter.

drop_columns list[str] | None

Columns to drop before loading.

numeric_only bool

Keep only numeric columns. Default True.

frame_column str | None

Column to extract as frame indices.

ArtifactSpec

Bases: Result[str], Generic[L, R]

Reference to a feature artifact with load specification.

Class Type Parameters:

Name Bound or Constraints Description Default
L

Load spec type (NpzLoadSpec, ParquetLoadSpec, JoblibLoadSpec).

required
R

Return type of from_path(). Defaults to object.

required

Attributes:

Name Type Description
load L

How to load the matched files.

pattern str

Glob pattern. Auto-derived from load.kind when empty.

from_result classmethod

from_result(result: Result[str]) -> Self

Create from a Result, validating feature match.

Typed artifact subclasses (with a default feature) validate that result.feature matches. Base ArtifactSpec passes through.

Source code in src/mosaic/core/pipeline/types/artifacts.py
@classmethod
def from_result(cls, result: Result[str]) -> Self:
    """Create from a Result, validating feature match.

    Typed artifact subclasses (with a default feature) validate
    that result.feature matches. Base ArtifactSpec passes through.
    """
    from pydantic_core import PydanticUndefined

    expected = cls.model_fields["feature"].default
    if expected is not PydanticUndefined and isinstance(expected, str):
        if not (
            result.feature == expected
            or result.feature.startswith(f"{expected}__from__")
        ):
            raise ValueError(
                f"{cls.__name__} expects feature={expected!r} (or {expected}__from__...), got {result.feature!r}"
            )
    return cls.model_validate({"feature": result.feature, "run_id": result.run_id})

from_path

from_path(path: Path) -> R

Load artifact from a resolved file path.

Dispatches on load-spec type via load_from_spec(). Return type is determined by the R type parameter.

Source code in src/mosaic/core/pipeline/types/artifacts.py
def from_path(self, path: Path) -> R:
    """Load artifact from a resolved file path.

    Dispatches on load-spec type via load_from_spec().
    Return type is determined by the R type parameter.
    """
    return load_from_spec(path, self.load)  # pyright: ignore[reportReturnType]

FeatureLabelsSource

Bases: ArtifactSpec[NpzLoadSpec, ndarray]

Labels loaded from a feature's output files.

Columns

Bases: StrictModel

Dataset column name conventions.

Single source of truth for all standard column names. Override by monkey-patching COLUMNS before importing features: from mosaic.behavior.feature_library import params params.COLUMNS = params.Columns(id_col="animal")

Attributes:

Name Type Description
id_col str

Animal/subject identifier column. Default "id".

seq_col str

Sequence identifier column. Default "sequence".

group_col str

Group/session identifier column. Default "group".

frame_col str

Frame number column name. Default "frame".

time_col str

Timestamp column name. Default "time".

order_by Literal['frames', 'time']

Preferred temporal ordering column. Default "frames".

x_col str

X-coordinate column name. Default "X".

y_col str

Y-coordinate column name. Default "Y".

orientation_col str

Body-orientation angle column name. Default "ANGLE".

meta_set

meta_set() -> set[str]

The five metadata column names as a set.

Useful for set intersection (passthrough) or set difference (exclusion) against df.columns. Spatial columns (x, y, orientation) are intentionally excluded -- they are data, not metadata.

Source code in src/mosaic/core/pipeline/types/data_config.py
def meta_set(self) -> set[str]:
    """The five metadata column names as a set.

    Useful for set intersection (passthrough) or set difference (exclusion)
    against ``df.columns``.  Spatial columns (x, y, orientation) are
    intentionally excluded -- they are data, not metadata.
    """
    return {
        self.id_col,
        self.seq_col,
        self.group_col,
        self.frame_col,
        self.time_col,
    }

PoseConfig

Bases: StrictModel

Pose keypoint column naming and selection.

Attributes:

Name Type Description
pose_n int

Total number of pose keypoints in the data. Default 7.

pose_indices list[int] | None

Subset of keypoint indices to use. None uses all.

x_prefix str

Column name prefix for X coordinates. Default "poseX".

y_prefix str

Column name prefix for Y coordinates. Default "poseY".

confidence_prefix str

Column prefix for confidence scores. Default "poseP".

keypoint_names list[str] | None

Human-readable names for each keypoint. Default None (auto-generated as ["kp0", "kp1", ...] by features that need names).

Feature

Bases: Protocol

Feature protocol -- 4 attributes, 4 methods.

InputStream

InputStream(factory: Callable[[], Iterator[tuple[str, DataFrame]]], n_entries: int)

Factory for fit() input iterators, with entry count metadata.

Wraps a callable that produces (entry_key, DataFrame) iterators. Each call creates a fresh iterator over the manifest entries. n_entries exposes the total number of entries so features can make exact allocation decisions (e.g. train/test split counts) without an extra data pass.

Source code in src/mosaic/core/pipeline/types/feature.py
def __init__(
    self,
    factory: Callable[[], Iterator[tuple[str, pd.DataFrame]]],
    n_entries: int,
) -> None:
    self._factory = factory
    self._n_entries = n_entries

Inputs

Bases: RootModel[tuple[InputItem, ...]], Generic[InputItem]

Base class for feature input collections. Mirrors Params.

Each Feature subclasses to narrow allowed input types, paralleling class Params(Params):.

Examples:

Inputs(("tracks",)) Inputs((Result(feature="speed-angvel"),)) Inputs(("tracks", Result(feature="nn", run_id="0.1-abc")))

Per-feature narrowing

class Inputs(Inputs[TrackInput]): pass

Features that take no pipeline inputs

class Inputs(Inputs[Result]): _require: ClassVar[InputRequire] = "empty"

Self-loading features that optionally accept inputs (e.g. fit + assign): class Inputs(Inputs[Result]): _require: ClassVar[InputRequire] = "any"

InputsLike

Bases: Protocol

Read-only interface satisfied by any Inputs[InputItem].

GroundTruthLabelsSource

Bases: LabelsSource[Literal['behavior']]

Labels loaded from labels//index.csv.

LabelsSource

Bases: StrictModel, Generic[K]

Base class for dataset label dependencies.

Resolved by _resolve_dependency_paths (Task 9) to /labels//.

GlobalModelParams

Bases: Params, Generic[M]

Base params for global features that fit on a templates artifact or load a pre-fitted model.

Type parameter M is the model artifact type (must extend JoblibArtifact). Exactly one of templates or model must be provided.

Both fields use default_factory so that from_overrides() merges partial dicts correctly. The _exclusive_source validator checks model_fields_set and nulls out the field that was not provided.

Attributes:

Name Type Description
templates ParquetArtifact | None

Templates artifact to fit from. Mutually exclusive with model.

model M | None

Pre-fitted model artifact. Mutually exclusive with templates.

Params

Bases: StrictModel

Base for all feature parameter models.

Provides from_overrides() constructor for user-config dicts. Subclasses declare feature-specific fields.

from_overrides classmethod

from_overrides(overrides: dict[str, object] | None = None) -> Self

Construct from user dict; missing keys get field defaults.

For BaseModel-typed fields with default_factory, partial dict overrides are merged on top of the default before validation (1-level deep merge). This replaces the per-feature deep-merge hacks in global_ward.

Source code in src/mosaic/core/pipeline/types/params.py
@classmethod
def from_overrides(cls, overrides: dict[str, object] | None = None) -> Self:
    """Construct from user dict; missing keys get field defaults.

    For BaseModel-typed fields with default_factory, partial dict overrides
    are merged on top of the default before validation (1-level deep merge).
    This replaces the per-feature deep-merge hacks in global_ward.
    """
    if not overrides:
        return cls()
    merged = dict(overrides)
    for key, value in list(merged.items()):
        if not isinstance(value, dict):
            continue
        field_info = cls.model_fields.get(key)
        if field_info is None or field_info.default_factory is None:
            continue
        default_obj: object = field_info.get_default(call_default_factory=True)  # pyright: ignore[reportAny]
        if isinstance(default_obj, BaseModel):
            merged[key] = {**default_obj.model_dump(), **value}
    return cls.model_validate(merged)

BodyScaleResult

Bases: Result[str]

Result for a body-scale-family feature.

Accepts any feature name (default "body-scale") so that auto-derived names or upstream variants can be referenced. Use from_result() to copy feature+run_id from an existing run.

from_result

from_result(result: Result[str]) -> Self

Return a copy with feature and run_id set from another Result.

Source code in src/mosaic/core/pipeline/types/results.py
def from_result(self, result: Result[str]) -> Self:
    """Return a copy with feature and run_id set from another Result."""
    return self.model_copy(
        update={"feature": result.feature, "run_id": result.run_id}
    )

NNResult

Bases: Result[str]

Result for a nearest-neighbor-family feature.

Accepts any feature name (default "nearest-neighbor") so that auto-derived names like nearest-neighbor__from__tracks or variants computed from different upstream data (e.g. smoothed tracks) can be referenced. Use from_result() to copy feature+run_id from an existing run.

from_result

from_result(result: Result[str]) -> Self

Return a copy with feature and run_id set from another Result.

Source code in src/mosaic/core/pipeline/types/results.py
def from_result(self, result: Result[str]) -> Self:
    """Return a copy with feature and run_id set from another Result."""
    return self.model_copy(
        update={"feature": result.feature, "run_id": result.run_id}
    )

Result

Bases: StrictModel, Generic[F]

Reference to a prior feature's output as pipeline input.

Attributes:

Name Type Description
feature F

Feature name whose output to consume.

run_id str | None

Specific run ID, or None for latest finished run.

use_latest

use_latest() -> Self

Return a copy with run_id=None (resolves to latest run).

Source code in src/mosaic/core/pipeline/types/results.py
def use_latest(self) -> Self:
    """Return a copy with run_id=None (resolves to latest run)."""
    return self.model_copy(update={"run_id": None})

ResultColumn

Bases: Result[str]

Reference to a column in a feature's standard parquet output.

Attributes:

Name Type Description
feature str

Source feature name.

column str

Column name to extract from the parquet output.

run_id str | None

Specific run ID, or None for latest.

from_result

from_result(result: Result[str]) -> Self

Return a copy with feature and run_id set from another Result.

Source code in src/mosaic/core/pipeline/types/results.py
def from_result(self, result: Result[str]) -> Self:
    """Return a copy with feature and run_id set from another Result."""
    return self.model_copy(
        update={"feature": result.feature, "run_id": result.run_id}
    )

TracksColumn

Bases: StrictModel

Reference to a column in the tracks data.

Attributes:

Name Type Description
column str

Column name to extract from tracks.

load_from_spec

load_from_spec(path: Path, spec: NpzLoadSpec | ParquetLoadSpec | JoblibLoadSpec) -> object

Load artifact from a file path using a typed load specification.

Parameters

path Resolved path to the artifact file. spec Typed load specification (NpzLoadSpec, ParquetLoadSpec, or JoblibLoadSpec).

Returns

object Loaded data: np.ndarray (npz), pd.DataFrame (parquet), or arbitrary object (joblib).

Source code in src/mosaic/core/pipeline/_loaders.py
def load_from_spec(
    path: Path, spec: NpzLoadSpec | ParquetLoadSpec | JoblibLoadSpec
) -> object:
    """Load artifact from a file path using a typed load specification.

    Parameters
    ----------
    path
        Resolved path to the artifact file.
    spec
        Typed load specification (NpzLoadSpec, ParquetLoadSpec, or JoblibLoadSpec).

    Returns
    -------
    object
        Loaded data: np.ndarray (npz), pd.DataFrame (parquet), or arbitrary object (joblib).
    """
    match spec:
        case NpzLoadSpec(key=key, transpose=transpose):
            data = np.load(path, allow_pickle=True)
            if key not in data.files:
                msg = f"Key {key!r} not found in {path}"
                raise FileNotFoundError(msg)
            arr = np.asarray(data[key])
            if arr.ndim == 1:
                arr = arr[None, :]
            if transpose:
                arr = arr.T
            return arr.astype(np.float32, copy=False)

        case JoblibLoadSpec(key=key):
            obj: object = joblib.load(path)
            if key is not None:
                return obj[key]  # pyright: ignore[reportIndexIssue,reportUnknownVariableType]
            return obj

        case ParquetLoadSpec(
            columns=columns,
            drop_columns=drop_columns,
            numeric_only=numeric_only,
            transpose=transpose,
        ):
            df = pd.read_parquet(path, columns=columns)
            if drop_columns:
                df = df.drop(columns=set(drop_columns) & set(df.columns))
            if columns is None and numeric_only:
                df = df.select_dtypes(include=[np.number])
            if transpose:
                df = df.T
            return df

resolve_order_col

resolve_order_col(df: DataFrame) -> str

Pick the best ordering column present in df.

Uses COLUMNS.order_by preference, then falls back to the other option. Raises ValueError when neither column exists.

Source code in src/mosaic/core/pipeline/types/data_config.py
def resolve_order_col(df: pd.DataFrame) -> str:
    """Pick the best ordering column present in *df*.

    Uses COLUMNS.order_by preference, then falls back to the other option.
    Raises ValueError when neither column exists.
    """
    if COLUMNS.order_by == "frames":
        first, second = COLUMNS.frame_col, COLUMNS.time_col
    else:
        first, second = COLUMNS.time_col, COLUMNS.frame_col
    if first in df.columns:
        return first
    if second in df.columns:
        return second
    raise ValueError(
        f"Need '{COLUMNS.frame_col}' or '{COLUMNS.time_col}' column to order rows."
    )

Pipeline Runner

run

run_feature

run_feature(ds: Dataset, feature: Feature, groups: Iterable[str] | None = None, sequences: Iterable[str] | None = None, overwrite: bool = False, parallel_workers: int | None = None, parallel_mode: str | None = 'thread', overlap_frames: int = 0, filter_start_frame: int | None = None, filter_end_frame: int | None = None, filter_start_time: float | None = None, filter_end_time: float | None = None, registry: FeatureRegistry | None = None) -> Result

Apply a Feature over a chosen scope (default: whole dataset).

Input routing is determined by feature.inputs: tracks (default), a single upstream feature, or a multi-input set.

Parameters

feature : Feature The feature object implementing the Feature protocol. Its inputs attribute controls where data is read from. groups, sequences : optional iterables Scope filter (applies to whichever input source is used). overwrite : bool Overwrite existing outputs for this run_id. parallel_workers : int | None When >1 and the feature declares itself parallelizable, run the apply phase in parallel. Defaults to sequential execution. parallel_mode : {'thread','process'} Execution backend when parallel_workers > 1. 'thread' (default) uses ThreadPoolExecutor; 'process' uses ProcessPoolExecutor. overlap_frames : int, default 0 Load this many frames from adjacent segments to handle edge effects. Mutually exclusive with frame/time filters. filter_start_frame : int | None If set, only include frames >= this value. filter_end_frame : int | None If set, only include frames < this value. filter_start_time : float | None If set, converted to start frame via fps_default from dataset metadata. filter_end_time : float | None If set, converted to end frame via fps_default from dataset metadata.

Source code in src/mosaic/core/pipeline/run.py
def run_feature(
    ds: Dataset,
    feature: Feature,
    groups: Iterable[str] | None = None,
    sequences: Iterable[str] | None = None,
    overwrite: bool = False,
    parallel_workers: int | None = None,
    parallel_mode: str | None = "thread",
    overlap_frames: int = 0,
    filter_start_frame: int | None = None,
    filter_end_frame: int | None = None,
    filter_start_time: float | None = None,
    filter_end_time: float | None = None,
    registry: FeatureRegistry | None = None,
) -> Result:
    """Apply a Feature over a chosen scope (default: whole dataset).

    Input routing is determined by ``feature.inputs``: tracks (default),
    a single upstream feature, or a multi-input set.

    Parameters
    ----------
    feature : Feature
        The feature object implementing the Feature protocol.  Its ``inputs``
        attribute controls where data is read from.
    groups, sequences : optional iterables
        Scope filter (applies to whichever input source is used).
    overwrite : bool
        Overwrite existing outputs for this run_id.
    parallel_workers : int | None
        When >1 and the feature declares itself parallelizable, run the apply
        phase in parallel. Defaults to sequential execution.
    parallel_mode : {'thread','process'}
        Execution backend when parallel_workers > 1. 'thread' (default) uses
        ThreadPoolExecutor; 'process' uses ProcessPoolExecutor.
    overlap_frames : int, default 0
        Load this many frames from adjacent segments to handle edge effects.
        Mutually exclusive with frame/time filters.
    filter_start_frame : int | None
        If set, only include frames >= this value.
    filter_end_frame : int | None
        If set, only include frames < this value.
    filter_start_time : float | None
        If set, converted to start frame via fps_default from dataset metadata.
    filter_end_time : float | None
        If set, converted to end frame via fps_default from dataset metadata.
    """
    # Storage name derivation
    storage_feature_name = derive_storage_name(
        feature.name, feature.inputs.storage_suffix()
    )

    # Frame range + mutual exclusivity with overlap
    frame_start, frame_end = resolve_frame_range(
        ds.meta.get("fps_default"),
        filter_start_frame,
        filter_end_frame,
        filter_start_time,
        filter_end_time,
    )
    has_frame_filter = frame_start is not None or frame_end is not None
    if has_frame_filter and overlap_frames > 0:
        raise ValueError("Frame/time filters and overlap_frames are mutually exclusive")

    # Scope sets
    groups_set = {str(g) for g in groups} if groups is not None else None
    sequences_set = {str(s) for s in sequences} if sequences is not None else None

    # Build manifest
    if feature.inputs.is_empty:
        manifest: Manifest = {}
        scope = Scope()
    else:
        manifest, scope = build_manifest(ds, feature.inputs, groups_set, sequences_set)

    # Run ID hash
    hashable: dict[str, object] = {
        "_params": feature.params.model_dump(),
        "_inputs": feature.inputs.model_dump(),
        "_frame_range": [frame_start, frame_end],
    }
    if feature.scope_dependent:
        hashable["_scope_entries"] = sorted(scope.entries)
    params_hash = hash_params(hashable)
    run_id = f"{feature.version}-{params_hash}"

    # Run root + params.json
    run_root = feature_run_root(ds, storage_feature_name, run_id)
    run_root.mkdir(parents=True, exist_ok=True)

    params_path = run_root / "params.json"
    try:
        save_payload: dict[str, object] = {
            "_params": json_ready(feature.params),
            "_inputs": feature.inputs.model_dump(),
            "_frame_range": [frame_start, frame_end],
        }
        params_path.write_text(json.dumps(save_payload, indent=2))
    except Exception as exc:
        print(
            f"[feature:{feature.name}] failed to save params.json: {exc}",
            file=sys.stderr,
        )

    # Index CSV setup
    idx = feature_index(feature_index_path(ds, storage_feature_name))
    idx.ensure()

    # Registry: record run start
    if registry is not None:
        registry.record_run_start(
            feature=storage_feature_name,
            run_id=run_id,
            version=feature.version,
            params_hash=params_hash,
            params_json=params_path.read_text() if params_path.exists() else None,
            inputs_json=json.dumps(json_ready(feature.inputs.model_dump())),
        )

    # Bind dataset (for features that need media paths, etc.)
    if hasattr(feature, "bind_dataset"):
        feature.bind_dataset(ds)

    # Resolve dependencies
    artifact_paths, dependency_lookups = _resolve_dependencies(ds, feature.params)

    # Resolve pair filter
    pair_filter_spec = _resolve_pair_filter(feature.params)

    # Load state
    state_ready = feature.load_state(run_root, artifact_paths, dependency_lookups)

    # Build filter factory (shared by fit and apply phases)
    filter_factory = _make_filter_factory(
        ds, scope, pair_filter_spec, frame_start, frame_end
    )

    # Fit phase (if not state_ready)
    if not state_ready:

        def input_factory() -> Iterator[tuple[str, pd.DataFrame]]:
            return iter_manifest(manifest, filter_factory=filter_factory)

        feature.fit(InputStream(input_factory, n_entries=len(manifest)))
        feature.save_state(run_root)

    # Apply phase — index rows are flushed periodically for interrupt recovery
    _IDX_FLUSH_EVERY = 10
    _pending_idx_rows: list[FeatureIndexRow] = []
    _total_written = 0

    def _flush_idx() -> None:
        nonlocal _total_written
        if _pending_idx_rows:
            idx.append(list(_pending_idx_rows))
            _total_written += len(_pending_idx_rows)
            _pending_idx_rows.clear()

    def _record_row(row: FeatureIndexRow) -> None:
        _pending_idx_rows.append(row)
        if registry is not None:
            registry.record_entry(
                feature=row.feature,
                run_id=row.run_id,
                group=row.group,
                sequence=row.sequence,
                abs_path=row.abs_path,
                n_rows=row.n_rows,
            )
        if len(_pending_idx_rows) >= _IDX_FLUSH_EVERY:
            _flush_idx()

    max_workers = (
        parallel_workers if parallel_workers is not None and parallel_workers > 1 else 1
    )
    parallel_mode_str = (parallel_mode or "thread").lower()
    if parallel_mode_str not in {"thread", "process"}:
        parallel_mode_str = "thread"
    if max_workers > 1 and not feature.parallelizable:
        print(
            f"[feature:{feature.name}] parallel_workers requested but feature is not parallelizable; running sequentially.",
            file=sys.stderr,
        )
        max_workers = 1
    apply_overlap: int | None = overlap_frames if overlap_frames > 0 else None

    executor: ProcessPoolExecutor | ThreadPoolExecutor | None = None
    if max_workers > 1:
        if parallel_mode_str == "process":
            executor = ProcessPoolExecutor(
                max_workers=max_workers, mp_context=mp.get_context("spawn")
            )
        else:
            executor = ThreadPoolExecutor(max_workers=max_workers)

    pending: dict[Future[pd.DataFrame], tuple[FeatureMeta, int, int]] = {}

    def _drain_completed() -> None:
        done, _ = wait(pending, return_when=FIRST_COMPLETED)
        for future in done:
            meta, core_start, core_end = pending.pop(future)
            try:
                result_df: FeatureOutput = future.result()
            except Exception as exc:
                print(
                    f"[feature:{feature.name}] apply failed for ({meta.group},{meta.sequence}): {exc}",
                    file=sys.stderr,
                )
                continue
            if apply_overlap is not None and apply_overlap > 0:
                result_df = trim_feature_output(result_df, core_start, core_end)
            n_rows = write_output(meta, result_df)
            _record_row(
                FeatureIndexRow(
                    run_id=run_id,
                    feature=storage_feature_name,
                    version=feature.version,
                    group=meta.group,
                    sequence=meta.sequence,
                    abs_path=meta.out_path,
                    n_rows=n_rows,
                    params_hash=params_hash,
                )
            )
            del result_df
            gc.collect()

    def _process_entry(
        entry_key: str,
        df: pd.DataFrame,
        core_start: int,
        core_end: int,
    ) -> None:
        group, sequence = resolve_sequence_identity(entry_key, scope.entry_map)
        meta = build_feature_meta(group, sequence, run_root)

        # Skip existing outputs when state was loaded from cache
        if state_ready and not overwrite and meta.out_path.exists():
            n_rows = int(pq.read_metadata(meta.out_path).num_rows)  # pyright: ignore[reportUnknownMemberType,reportUnknownArgumentType]
            _record_row(
                FeatureIndexRow(
                    run_id=run_id,
                    feature=storage_feature_name,
                    version=feature.version,
                    group=meta.group,
                    sequence=meta.sequence,
                    abs_path=meta.out_path,
                    n_rows=n_rows,
                    params_hash=params_hash,
                )
            )
            return

        if executor is not None:
            while len(pending) >= max_workers:
                _drain_completed()
            if parallel_mode_str == "process":
                artifact_paths_str = {k: str(v) for k, v in artifact_paths.items()}
                pending[
                    executor.submit(
                        _process_apply_worker,
                        feature.__module__,
                        type(feature).__name__,
                        feature.inputs.model_dump(),
                        feature.params.model_dump(),
                        str(run_root),
                        artifact_paths_str,
                        dependency_lookups,
                        df,
                    )
                ] = (meta, core_start, core_end)
            else:
                pending[executor.submit(feature.apply, df)] = (
                    meta,
                    core_start,
                    core_end,
                )
        else:
            try:
                result_df: FeatureOutput = feature.apply(df)
            except Exception as exc:
                print(
                    f"[feature:{feature.name}] apply failed for ({group},{sequence}): {exc}",
                    file=sys.stderr,
                )
                return
            if apply_overlap is not None and apply_overlap > 0:
                result_df = trim_feature_output(result_df, core_start, core_end)
            n_rows = write_output(meta, result_df)
            _record_row(
                FeatureIndexRow(
                    run_id=run_id,
                    feature=storage_feature_name,
                    version=feature.version,
                    group=group,
                    sequence=sequence,
                    abs_path=meta.out_path,
                    n_rows=n_rows,
                    params_hash=params_hash,
                )
            )
            del result_df
            gc.collect()

    # Iterate manifest entries
    if apply_overlap is not None:
        for entry_key, df, core_start, core_end in iter_manifest(
            manifest,
            filter_factory=filter_factory,
            overlap_frames=apply_overlap,
            progress_label=storage_feature_name,
        ):
            _process_entry(entry_key, df, core_start, core_end)
    else:
        for entry_key, df in iter_manifest(
            manifest,
            filter_factory=filter_factory,
            progress_label=storage_feature_name,
        ):
            _process_entry(entry_key, df, 0, len(df))

    # Drain remaining futures
    if executor is not None:
        while pending:
            _drain_completed()
        executor.shutdown(wait=True)

    # Global marker (for empty-input features)
    if _total_written == 0 and not _pending_idx_rows and not manifest:
        _pending_idx_rows.append(
            FeatureIndexRow(
                run_id=run_id,
                feature=storage_feature_name,
                version=feature.version,
                group="",
                sequence="__global__",
                abs_path=run_root,
                n_rows=0,
                params_hash=params_hash,
            )
        )

    # Finalize — flush any remaining rows
    _flush_idx()
    idx.mark_finished(run_id)
    if registry is not None:
        registry.mark_finished(storage_feature_name, run_id)
    print(f"[feature:{storage_feature_name}] completed run_id={run_id} -> {run_root}")
    return Result(feature=storage_feature_name, run_id=run_id)

Model Training

train_model

train_model(ds, model, config: str | Path | dict[str, object] | None = None, overwrite: bool = False, *, progress_callback: TrainingProgressCallback | None = None) -> str

Train a registered model using a JSON (or dict) configuration.

Parameters:

Name Type Description Default
ds

Dataset instance providing model storage roots.

required
model

Model/trainer instance implementing name, version, and a train() method. Optionally implements bind_dataset(ds) and configure(config, run_root).

required
config str | Path | dict[str, object] | None

Path to a JSON config file or an in-memory dict of hyperparameters.

None
overwrite bool

Reserved for future use (run_ids are hash-based).

False
progress_callback TrainingProgressCallback | None

Callback for reporting training progress (epoch, phase, etc.). If the model's train() method accepts a callback or progress_callback keyword argument, it is passed through automatically.

None

Returns:

Type Description
str

The run_id for this training run.

Source code in src/mosaic/core/pipeline/models.py
def train_model(
    ds,
    model,
    config: str | Path | dict[str, object] | None = None,
    overwrite: bool = False,
    *,
    progress_callback: TrainingProgressCallback | None = None,
) -> str:
    """Train a registered model using a JSON (or dict) configuration.

    Args:
        ds: Dataset instance providing model storage roots.
        model: Model/trainer instance implementing ``name``, ``version``,
            and a ``train()`` method.  Optionally implements
            ``bind_dataset(ds)`` and ``configure(config, run_root)``.
        config: Path to a JSON config file or an in-memory dict of
            hyperparameters.
        overwrite: Reserved for future use (run_ids are hash-based).
        progress_callback: Callback for reporting training progress
            (epoch, phase, etc.).  If the model's ``train()`` method
            accepts a ``callback`` or ``progress_callback`` keyword
            argument, it is passed through automatically.

    Returns:
        The ``run_id`` for this training run.
    """
    if progress_callback is None:
        progress_callback = NullProgressCallback()
    storage_model_name = getattr(
        model, "storage_model_name", getattr(model, "name", None)
    )
    if not storage_model_name:
        raise ValueError("Model must define 'name' or 'storage_model_name'.")
    config_dict = load_model_config(config)
    config_hash = hash_params(config_dict or {})
    run_id = f"{model.version}-{config_hash}"
    run_root = model_run_root(ds, storage_model_name, run_id)
    run_root.mkdir(parents=True, exist_ok=True)

    idx_path = model_index_path(ds, storage_model_name)
    idx = model_index(idx_path)
    idx.ensure()

    config_path = run_root / "config.json"
    write_model_config(config_path, config_dict)

    if hasattr(model, "bind_dataset"):
        try:
            model.bind_dataset(ds)
        except Exception as exc:
            print(
                f"[model:{storage_model_name}] bind_dataset failed: {exc}",
                file=sys.stderr,
            )

    if hasattr(model, "configure"):
        model.configure(config_dict, run_root)
    else:
        setattr(model, "config", config_dict)
        setattr(model, "run_root", run_root)

    metrics = None
    metrics_path = run_root / "metrics.json"
    status = "success"
    notes = ""
    try:
        import inspect

        train_sig = inspect.signature(model.train)
        if "callback" in train_sig.parameters or "progress_callback" in train_sig.parameters:
            cb_name = "callback" if "callback" in train_sig.parameters else "progress_callback"
            metrics = model.train(**{cb_name: progress_callback})
        else:
            metrics = model.train()
    except Exception as exc:
        status = "failed"
        notes = str(exc)
        rows = [
            ModelIndexRow(
                run_id=run_id,
                abs_path=run_root,
                model=storage_model_name,
                version=model.version,
                config_path=str(config_path),
                config_hash=config_hash,
                metrics_path="",
                status=status,
                notes=notes[:500],
            )
        ]
        idx.append(rows)
        idx.mark_finished(run_id)
        raise

    if metrics:
        metrics_path.write_text(json.dumps(json_ready(metrics), indent=2))
    else:
        if metrics_path.exists():
            metrics_path.unlink()

    rows = [
        ModelIndexRow(
            run_id=run_id,
            abs_path=run_root,
            model=storage_model_name,
            version=model.version,
            config_path=str(config_path),
            config_hash=config_hash,
            metrics_path=str(metrics_path) if metrics and metrics_path.exists() else "",
            status=status,
            notes=notes[:500],
        )
    ]
    idx.append(rows)
    idx.mark_finished(run_id)
    print(f"[model:{storage_model_name}] completed run_id={run_id} -> {run_root}")
    return run_id

Training Progress

Callback protocol and SQLite implementation for monitoring training progress in real time.

TrainingProgressCallback

Bases: Protocol

Minimal callback contract for training progress reporting.

Implement this protocol to create custom progress backends (e.g. MLflow, W&B). The three methods correspond to different granularities of training progress.

on_epoch_end

on_epoch_end(epoch: int, total_epochs: int, metrics: dict[str, float]) -> None

Called after each training epoch.

Parameters:

Name Type Description Default
epoch int

Zero-based epoch index.

required
total_epochs int

Total number of epochs planned.

required
metrics dict[str, float]

Metric name-value pairs (e.g. {"train_loss": 0.45}).

required
Source code in src/mosaic/core/pipeline/progress.py
def on_epoch_end(
    self,
    epoch: int,
    total_epochs: int,
    metrics: dict[str, float],
) -> None:
    """Called after each training epoch.

    Args:
        epoch: Zero-based epoch index.
        total_epochs: Total number of epochs planned.
        metrics: Metric name-value pairs (e.g. ``{"train_loss": 0.45}``).
    """
    ...

on_class_start

on_class_start(class_idx: int, total_classes: int, class_name: str) -> None

Called when a one-vs-rest class training begins.

Parameters:

Name Type Description Default
class_idx int

Zero-based class index.

required
total_classes int

Total number of classes.

required
class_name str

Human-readable class identifier.

required
Source code in src/mosaic/core/pipeline/progress.py
def on_class_start(
    self,
    class_idx: int,
    total_classes: int,
    class_name: str,
) -> None:
    """Called when a one-vs-rest class training begins.

    Args:
        class_idx: Zero-based class index.
        total_classes: Total number of classes.
        class_name: Human-readable class identifier.
    """
    ...

on_phase

on_phase(phase: str, message: str) -> None

Called for coarse-grained phase transitions.

Parameters:

Name Type Description Default
phase str

Phase identifier (e.g. "data_prep", "training").

required
message str

Free-text description.

required
Source code in src/mosaic/core/pipeline/progress.py
def on_phase(self, phase: str, message: str) -> None:
    """Called for coarse-grained phase transitions.

    Args:
        phase: Phase identifier (e.g. ``"data_prep"``, ``"training"``).
        message: Free-text description.
    """
    ...

SQLiteProgressCallback

SQLiteProgressCallback(db_path: Path, job_id: str)

Writes progress rows into the training_progress table.

Opens its own connection (WAL mode) so that the training thread and any reader (API, notebook) do not block each other.

Parameters

db_path : Path Path to the .mosaic.db file. job_id : str The training job this progress belongs to.

Source code in src/mosaic/core/pipeline/progress.py
def __init__(self, db_path: Path, job_id: str) -> None:
    self.db_path = db_path
    self.job_id = job_id
    self._conn = sqlite3.connect(str(db_path), timeout=10)
    self._conn.execute("PRAGMA journal_mode=WAL")

on_epoch_end

on_epoch_end(epoch: int, total_epochs: int, metrics: dict[str, float]) -> None

Record an epoch completion with associated metrics.

Parameters:

Name Type Description Default
epoch int

Zero-based epoch index.

required
total_epochs int

Total epochs planned.

required
metrics dict[str, float]

Metric name-value pairs.

required
Source code in src/mosaic/core/pipeline/progress.py
def on_epoch_end(
    self,
    epoch: int,
    total_epochs: int,
    metrics: dict[str, float],
) -> None:
    """Record an epoch completion with associated metrics.

    Args:
        epoch: Zero-based epoch index.
        total_epochs: Total epochs planned.
        metrics: Metric name-value pairs.
    """
    self._write("epoch", epoch, total_epochs, metrics)

on_class_start

on_class_start(class_idx: int, total_classes: int, class_name: str) -> None

Record the start of a one-vs-rest class training iteration.

Parameters:

Name Type Description Default
class_idx int

Zero-based class index.

required
total_classes int

Total number of classes.

required
class_name str

Human-readable class identifier.

required
Source code in src/mosaic/core/pipeline/progress.py
def on_class_start(
    self,
    class_idx: int,
    total_classes: int,
    class_name: str,
) -> None:
    """Record the start of a one-vs-rest class training iteration.

    Args:
        class_idx: Zero-based class index.
        total_classes: Total number of classes.
        class_name: Human-readable class identifier.
    """
    self._write("class", class_idx, total_classes, message=class_name)

on_phase

on_phase(phase: str, message: str) -> None

Record a coarse-grained phase transition.

Parameters:

Name Type Description Default
phase str

Phase identifier.

required
message str

Free-text description.

required
Source code in src/mosaic/core/pipeline/progress.py
def on_phase(self, phase: str, message: str) -> None:
    """Record a coarse-grained phase transition.

    Args:
        phase: Phase identifier.
        message: Free-text description.
    """
    self._write("phase", 0, 0, message=f"{phase}: {message}")

get_progress

get_progress() -> list[dict[str, Any]]

Return all progress rows for this job, ordered chronologically.

Source code in src/mosaic/core/pipeline/progress.py
def get_progress(self) -> list[dict[str, Any]]:
    """Return all progress rows for this job, ordered chronologically."""
    cur = self._conn.execute(
        """\
        SELECT step_type, step_index, step_total, metric_json, message, timestamp
        FROM training_progress
        WHERE job_id = ?
        ORDER BY timestamp, step_index
        """,
        (self.job_id,),
    )
    cols = [d[0] for d in cur.description]
    rows = cur.fetchall()
    out = []
    for row in rows:
        d = dict(zip(cols, row))
        d["metrics"] = json.loads(d.pop("metric_json", "{}"))
        out.append(d)
    return out

CompositeProgressCallback

CompositeProgressCallback(*backends: TrainingProgressCallback)

Fans out calls to multiple callback backends.

Example::

cb = CompositeProgressCallback(
    SQLiteProgressCallback(db, job_id),
    some_mlflow_callback,
)
Source code in src/mosaic/core/pipeline/progress.py
def __init__(self, *backends: TrainingProgressCallback) -> None:
    self._backends = backends

read_progress

read_progress(db_path: Path, job_id: str) -> list[dict[str, Any]]

Read progress for a job without creating a full callback instance.

Opens a read-only connection, queries, and closes immediately. Suitable for one-shot reads from an API endpoint or notebook.

Parameters:

Name Type Description Default
db_path Path

Path to the .mosaic.db file.

required
job_id str

The training job to query.

required

Returns:

Type Description
list[dict[str, Any]]

List of progress dicts, each with keys step_type,

list[dict[str, Any]]

step_index, step_total, metrics, message,

list[dict[str, Any]]

and timestamp.

Source code in src/mosaic/core/pipeline/progress.py
def read_progress(db_path: Path, job_id: str) -> list[dict[str, Any]]:
    """Read progress for a job without creating a full callback instance.

    Opens a read-only connection, queries, and closes immediately.
    Suitable for one-shot reads from an API endpoint or notebook.

    Args:
        db_path: Path to the ``.mosaic.db`` file.
        job_id: The training job to query.

    Returns:
        List of progress dicts, each with keys ``step_type``,
        ``step_index``, ``step_total``, ``metrics``, ``message``,
        and ``timestamp``.
    """
    conn = sqlite3.connect(str(db_path), timeout=10)
    conn.execute("PRAGMA journal_mode=WAL")
    try:
        cur = conn.execute(
            """\
            SELECT step_type, step_index, step_total, metric_json, message, timestamp
            FROM training_progress
            WHERE job_id = ?
            ORDER BY timestamp, step_index
            """,
            (job_id,),
        )
        cols = [d[0] for d in cur.description]
        rows = cur.fetchall()
        out = []
        for row in rows:
            d = dict(zip(cols, row))
            d["metrics"] = json.loads(d.pop("metric_json", "{}"))
            out.append(d)
        return out
    finally:
        conn.close()