Overview
Copy
Ask AI
from myspellchecker.data_pipeline.stage_protocol import (
PipelineStage,
StageOutput,
StageMetadata,
PipelineContext,
)
# Define a custom stage
class MyStage(PipelineStage[str, List[str]]):
def execute(self, input_data: str) -> StageOutput[List[str]]:
words = input_data.split()
return StageOutput(data=words, metadata=StageMetadata(stage_name="MyStage", input_hash=""))
Core Types
PipelineStage[InputT, OutputT]
Abstract base class for pipeline stages with generic input/output types:Copy
Ask AI
from abc import ABC, abstractmethod
from typing import Generic, TypeVar
InputT = TypeVar("InputT")
OutputT = TypeVar("OutputT")
class PipelineStage(ABC, Generic[InputT, OutputT]):
"""Base class for pipeline stages."""
@abstractmethod
def execute(
self,
input_data: InputT,
) -> StageOutput[OutputT]:
"""Execute the pipeline stage.
Args:
input_data: Input data of type InputT
Returns:
StageOutput wrapping output data of type OutputT
"""
raise NotImplementedError
def can_resume(self, input_data: InputT) -> Optional[StageOutput[OutputT]]:
"""Check if stage can resume from cached output.
Returns:
Cached StageOutput if valid, None otherwise
"""
return None
def validate_input(self, input_data: InputT) -> None:
"""Validate input data before execution."""
pass
def cleanup(self) -> None:
"""Clean up resources after stage execution."""
pass
StageOutput[OutputT]
Generic wrapper for stage output with metadata:Copy
Ask AI
from dataclasses import dataclass
from typing import Generic, TypeVar
OutputT = TypeVar("OutputT")
@dataclass
class StageOutput(Generic[OutputT]):
"""Wrapper for stage output with execution metadata."""
data: OutputT # The actual output data
metadata: StageMetadata # Execution metadata
def is_valid_cache(self) -> bool:
"""Check if this cached output is still valid."""
return self.metadata.is_valid_cache()
StageMetadata
Dataclass for tracking stage execution:Copy
Ask AI
from dataclasses import dataclass, field
from pathlib import Path
from typing import List
@dataclass
class StageMetadata:
"""Metadata about stage execution."""
stage_name: str
input_hash: str
output_paths: List[Path] = field(default_factory=list)
execution_time_seconds: float = 0.0
record_count: int = 0
file_size_bytes: int = 0
is_cached: bool = False
def is_valid_cache(self) -> bool:
"""Check if cached output is still valid.
Returns:
True if all output files exist and are readable
"""
return all(p.exists() and p.is_file() for p in self.output_paths)
PipelineContext
Shared context for pipeline execution:Copy
Ask AI
from dataclasses import dataclass, field
from pathlib import Path
@dataclass
class PipelineContext:
"""Shared context for pipeline execution."""
work_dir: Path # Working directory for intermediate files
output_dir: Path # Directory for final outputs
checkpoint_dir: Path # Directory for resume checkpoints
config: dict = field(default_factory=dict) # Pipeline configuration
enable_resume: bool = True # Whether to attempt resume from checkpoints
enable_cleanup: bool = False # Whether to clean up intermediate files
Building Pipelines
Simple Pipeline
Copy
Ask AI
from pathlib import Path
from myspellchecker.data_pipeline.stage_protocol import (
PipelineStage,
StageOutput,
StageMetadata,
PipelineContext,
Pipeline,
)
# Define stages
class ReadStage(PipelineStage[Path, str]):
def execute(self, input_data: Path) -> StageOutput[str]:
text = input_data.read_text()
return StageOutput(
data=text,
metadata=StageMetadata(stage_name="ReadStage", input_hash="", record_count=1)
)
class TokenizeStage(PipelineStage[str, List[str]]):
def execute(self, input_data: str) -> StageOutput[List[str]]:
tokens = input_data.split()
return StageOutput(
data=tokens,
metadata=StageMetadata(stage_name="TokenizeStage", input_hash="", record_count=len(tokens))
)
class ValidateStage(PipelineStage[List[str], List[str]]):
def execute(self, input_data: List[str]) -> StageOutput[List[str]]:
valid = [t for t in input_data if len(t) > 1]
return StageOutput(
data=valid,
metadata=StageMetadata(stage_name="ValidateStage", input_hash="", record_count=len(valid))
)
# Build and run pipeline
context = PipelineContext(
work_dir=Path("work"),
output_dir=Path("output"),
checkpoint_dir=Path("checkpoints"),
)
pipeline = Pipeline(
context=context,
stages=[ReadStage(), TokenizeStage(), ValidateStage()],
)
result = pipeline.run(Path("corpus.txt"))
print(f"Valid tokens: {len(result.data)}")
Pipeline Class
The Pipeline orchestrator manages stage execution:Copy
Ask AI
class Pipeline:
"""Pipeline orchestrator for composing stages."""
def __init__(
self,
context: PipelineContext,
stages: List[PipelineStage],
):
"""Initialize pipeline with context and stages."""
self.context = context
self.stages = stages
def run(self, initial_input: InputT) -> StageOutput[OutputT]:
"""Execute all stages in sequence."""
current_input = initial_input
for stage in self.stages:
# Try to resume from cache
if self.context.enable_resume:
cached_output = stage.can_resume(current_input)
if cached_output:
current_input = cached_output.data
continue
# Execute stage
stage.validate_input(current_input)
try:
output = stage.execute(current_input)
current_input = output.data
finally:
stage.cleanup()
return output # Return final stage output
Built-in Stages
CorpusIngester
Reads and ingests input corpus:Copy
Ask AI
from myspellchecker.data_pipeline import CorpusIngester
# CorpusIngester takes no constructor arguments
ingester = CorpusIngester()
# Process corpus file (method is process(), not ingest())
# process() requires input_path and output_dir; returns List[Path] of shards
shard_paths = ingester.process(
input_path="/path/to/corpus.txt",
output_dir="/path/to/output",
)
FrequencyBuilder
Calculates word and n-gram frequencies:Copy
Ask AI
from myspellchecker.data_pipeline import FrequencyBuilder
# FrequencyBuilder takes individual parameters, not a config object
builder = FrequencyBuilder(
input_dir="/path/to/input",
output_dir="/path/to/output",
min_syllable_frequency=1,
min_word_frequency=5,
min_bigram_frequency=2,
min_trigram_frequency=2,
)
# Load and process data (individual methods, not a single build() call)
builder.load_data()
builder.filter_by_frequency()
builder.calculate_bigram_probabilities()
DatabasePackager
Writes data to SQLite database:Copy
Ask AI
from myspellchecker.data_pipeline import DatabasePackager
# DatabasePackager takes input_dir and database_path directly
packager = DatabasePackager(
input_dir="/path/to/input",
database_path="output.db",
)
# Individual methods for database operations
packager.connect()
packager.create_schema()
packager.load_syllables()
packager.load_words()
Error Handling
Stages signal errors by raising exceptions (e.g.,ValueError, RuntimeError). The Pipeline.run() method calls each stage inside a try/finally block to ensure cleanup() is always invoked:
Copy
Ask AI
class MyStage(PipelineStage[str, str]):
def execute(self, input_data: str) -> StageOutput[str]:
result = process(input_data)
if not result:
raise ValueError("Processing failed: empty result")
return StageOutput(
data=result,
metadata=StageMetadata(stage_name="MyStage", input_hash=""),
)
Type Safety
The generic types ensure type safety at compile time:Copy
Ask AI
# Type-safe pipeline
class StringToInt(PipelineStage[str, int]):
def execute(self, input_data: str) -> StageOutput[int]:
return StageOutput(data=len(input_data), metadata=StageMetadata(stage_name="StringToInt", input_hash=""))
class IntToList(PipelineStage[int, List[int]]):
def execute(self, input_data: int) -> StageOutput[List[int]]:
return StageOutput(data=list(range(input_data)), metadata=StageMetadata(stage_name="IntToList", input_hash=""))
# Type checker validates:
# - StringToInt accepts str, returns int
# - IntToList accepts int (matches StringToInt output)
Metadata Tracking
Each stage returns aStageOutput containing a StageMetadata object with execution metrics. The Pipeline logs these metrics automatically after each stage completes:
Copy
Ask AI
context = PipelineContext(
work_dir=Path("work"),
output_dir=Path("output"),
checkpoint_dir=Path("checkpoints"),
)
pipeline = Pipeline(
context=context,
stages=[ReadStage(), TokenizeStage(), ValidateStage()],
)
result = pipeline.run(input_data)
# The final stage output contains its metadata
print(f"Records: {result.metadata.record_count}")
print(f"Time: {result.metadata.execution_time_seconds:.2f}s")
print(f"Cached: {result.metadata.is_cached}")
See Also
- Worker Context - Multiprocessing context
- Schema Management - Database schemas
- Data Pipeline - Full pipeline guide