Skip to main content
The Pipeline Stage Protocol defines a generic, type-safe interface for building composable data processing pipelines. It uses Python generics to ensure type safety across pipeline stages.

Overview

from myspellchecker.data_pipeline.stage_protocol import (
    PipelineStage,
    StageOutput,
    StageMetadata,
    PipelineContext,
)

# Define a custom stage
class MyStage(PipelineStage[str, List[str]]):
    def execute(self, input_data: str) -> StageOutput[List[str]]:
        words = input_data.split()
        return StageOutput(data=words, metadata=StageMetadata(stage_name="MyStage", input_hash=""))

Core Types

PipelineStage[InputT, OutputT]

Abstract base class for pipeline stages with generic input/output types:
from abc import ABC, abstractmethod
from typing import Generic, TypeVar

InputT = TypeVar("InputT")
OutputT = TypeVar("OutputT")

class PipelineStage(ABC, Generic[InputT, OutputT]):
    """Base class for pipeline stages."""

    @abstractmethod
    def execute(
        self,
        input_data: InputT,
    ) -> StageOutput[OutputT]:
        """Execute the pipeline stage.

        Args:
            input_data: Input data of type InputT

        Returns:
            StageOutput wrapping output data of type OutputT
        """
        raise NotImplementedError

    def can_resume(self, input_data: InputT) -> Optional[StageOutput[OutputT]]:
        """Check if stage can resume from cached output.

        Returns:
            Cached StageOutput if valid, None otherwise
        """
        return None

    def validate_input(self, input_data: InputT) -> None:
        """Validate input data before execution."""
        pass

    def cleanup(self) -> None:
        """Clean up resources after stage execution."""
        pass

StageOutput[OutputT]

Generic wrapper for stage output with metadata:
from dataclasses import dataclass
from typing import Generic, TypeVar

OutputT = TypeVar("OutputT")

@dataclass
class StageOutput(Generic[OutputT]):
    """Wrapper for stage output with execution metadata."""

    data: OutputT                          # The actual output data
    metadata: StageMetadata                # Execution metadata

    def is_valid_cache(self) -> bool:
        """Check if this cached output is still valid."""
        return self.metadata.is_valid_cache()

StageMetadata

Dataclass for tracking stage execution:
from dataclasses import dataclass, field
from pathlib import Path
from typing import List

@dataclass
class StageMetadata:
    """Metadata about stage execution."""

    stage_name: str
    input_hash: str
    output_paths: List[Path] = field(default_factory=list)
    execution_time_seconds: float = 0.0
    record_count: int = 0
    file_size_bytes: int = 0
    is_cached: bool = False

    def is_valid_cache(self) -> bool:
        """Check if cached output is still valid.

        Returns:
            True if all output files exist and are readable
        """
        return all(p.exists() and p.is_file() for p in self.output_paths)

PipelineContext

Shared context for pipeline execution:
from dataclasses import dataclass, field
from pathlib import Path

@dataclass
class PipelineContext:
    """Shared context for pipeline execution."""

    work_dir: Path                          # Working directory for intermediate files
    output_dir: Path                        # Directory for final outputs
    checkpoint_dir: Path                    # Directory for resume checkpoints
    config: dict = field(default_factory=dict)   # Pipeline configuration
    enable_resume: bool = True              # Whether to attempt resume from checkpoints
    enable_cleanup: bool = False            # Whether to clean up intermediate files

Building Pipelines

Simple Pipeline

from pathlib import Path
from myspellchecker.data_pipeline.stage_protocol import (
    PipelineStage,
    StageOutput,
    StageMetadata,
    PipelineContext,
    Pipeline,
)

# Define stages
class ReadStage(PipelineStage[Path, str]):
    def execute(self, input_data: Path) -> StageOutput[str]:
        text = input_data.read_text()
        return StageOutput(
            data=text,
            metadata=StageMetadata(stage_name="ReadStage", input_hash="", record_count=1)
        )

class TokenizeStage(PipelineStage[str, List[str]]):
    def execute(self, input_data: str) -> StageOutput[List[str]]:
        tokens = input_data.split()
        return StageOutput(
            data=tokens,
            metadata=StageMetadata(stage_name="TokenizeStage", input_hash="", record_count=len(tokens))
        )

class ValidateStage(PipelineStage[List[str], List[str]]):
    def execute(self, input_data: List[str]) -> StageOutput[List[str]]:
        valid = [t for t in input_data if len(t) > 1]
        return StageOutput(
            data=valid,
            metadata=StageMetadata(stage_name="ValidateStage", input_hash="", record_count=len(valid))
        )

# Build and run pipeline
context = PipelineContext(
    work_dir=Path("work"),
    output_dir=Path("output"),
    checkpoint_dir=Path("checkpoints"),
)
pipeline = Pipeline(
    context=context,
    stages=[ReadStage(), TokenizeStage(), ValidateStage()],
)

result = pipeline.run(Path("corpus.txt"))

print(f"Valid tokens: {len(result.data)}")

Pipeline Class

The Pipeline orchestrator manages stage execution:
class Pipeline:
    """Pipeline orchestrator for composing stages."""

    def __init__(
        self,
        context: PipelineContext,
        stages: List[PipelineStage],
    ):
        """Initialize pipeline with context and stages."""
        self.context = context
        self.stages = stages

    def run(self, initial_input: InputT) -> StageOutput[OutputT]:
        """Execute all stages in sequence."""
        current_input = initial_input

        for stage in self.stages:
            # Try to resume from cache
            if self.context.enable_resume:
                cached_output = stage.can_resume(current_input)
                if cached_output:
                    current_input = cached_output.data
                    continue

            # Execute stage
            stage.validate_input(current_input)

            try:
                output = stage.execute(current_input)
                current_input = output.data
            finally:
                stage.cleanup()

        return output  # Return final stage output

Built-in Stages

CorpusIngester

Reads and ingests input corpus:
from myspellchecker.data_pipeline import CorpusIngester

# CorpusIngester takes no constructor arguments
ingester = CorpusIngester()

# Process corpus file (method is process(), not ingest())
# process() requires input_path and output_dir; returns List[Path] of shards
shard_paths = ingester.process(
    input_path="/path/to/corpus.txt",
    output_dir="/path/to/output",
)

FrequencyBuilder

Calculates word and n-gram frequencies:
from myspellchecker.data_pipeline import FrequencyBuilder

# FrequencyBuilder takes individual parameters, not a config object
builder = FrequencyBuilder(
    input_dir="/path/to/input",
    output_dir="/path/to/output",
    min_syllable_frequency=1,
    min_word_frequency=5,
    min_bigram_frequency=2,
    min_trigram_frequency=2,
)

# Load and process data (individual methods, not a single build() call)
builder.load_data()
builder.filter_by_frequency()
builder.calculate_bigram_probabilities()

DatabasePackager

Writes data to SQLite database:
from myspellchecker.data_pipeline import DatabasePackager

# DatabasePackager takes input_dir and database_path directly
packager = DatabasePackager(
    input_dir="/path/to/input",
    database_path="output.db",
)

# Individual methods for database operations
packager.connect()
packager.create_schema()
packager.load_syllables()
packager.load_words()

Error Handling

Stages signal errors by raising exceptions (e.g., ValueError, RuntimeError). The Pipeline.run() method calls each stage inside a try/finally block to ensure cleanup() is always invoked:
class MyStage(PipelineStage[str, str]):
    def execute(self, input_data: str) -> StageOutput[str]:
        result = process(input_data)
        if not result:
            raise ValueError("Processing failed: empty result")
        return StageOutput(
            data=result,
            metadata=StageMetadata(stage_name="MyStage", input_hash=""),
        )

Type Safety

The generic types ensure type safety at compile time:
# Type-safe pipeline
class StringToInt(PipelineStage[str, int]):
    def execute(self, input_data: str) -> StageOutput[int]:
        return StageOutput(data=len(input_data), metadata=StageMetadata(stage_name="StringToInt", input_hash=""))

class IntToList(PipelineStage[int, List[int]]):
    def execute(self, input_data: int) -> StageOutput[List[int]]:
        return StageOutput(data=list(range(input_data)), metadata=StageMetadata(stage_name="IntToList", input_hash=""))

# Type checker validates:
# - StringToInt accepts str, returns int
# - IntToList accepts int (matches StringToInt output)

Metadata Tracking

Each stage returns a StageOutput containing a StageMetadata object with execution metrics. The Pipeline logs these metrics automatically after each stage completes:
context = PipelineContext(
    work_dir=Path("work"),
    output_dir=Path("output"),
    checkpoint_dir=Path("checkpoints"),
)
pipeline = Pipeline(
    context=context,
    stages=[ReadStage(), TokenizeStage(), ValidateStage()],
)

result = pipeline.run(input_data)

# The final stage output contains its metadata
print(f"Records: {result.metadata.record_count}")
print(f"Time: {result.metadata.execution_time_seconds:.2f}s")
print(f"Cached: {result.metadata.is_cached}")

See Also