Documentation Index
Fetch the complete documentation index at: https://docs.myspellchecker.com/llms.txt
Use this file to discover all available pages before exploring further.
Long-running pipeline stages need to communicate progress without coupling to a specific output target. The PipelineReporter protocol lets you swap between rich console output, structured logging, or silent test mocks.
Overview
from myspellchecker.utils.console import PipelineConsole
from myspellchecker.data_pipeline.reporter import PipelineReporter
console = PipelineConsole()
reporter = PipelineReporter(console)
# Report progress
reporter.report_step_start(1, 4, "Processing data")
reporter.report_info("Found 1000 records")
reporter.report_step_complete(1, 4, "Processing data", "2.5s")
ReporterInterface
Abstract base class defining the reporting contract:
from abc import ABC, abstractmethod
class ReporterInterface(ABC):
"""Abstract interface for pipeline reporting."""
@abstractmethod
def report_step_start(self, step: int, total: int, title: str) -> None:
"""Report that a pipeline step is starting."""
raise NotImplementedError
@abstractmethod
def report_step_complete(self, step: int, total: int, title: str, duration: str = "") -> None:
"""Report that a pipeline step completed successfully."""
raise NotImplementedError
@abstractmethod
def report_step_skipped(self, step: int, total: int, title: str, reason: str = "") -> None:
"""Report that a pipeline step was skipped."""
raise NotImplementedError
@abstractmethod
def report_progress(self, message: str) -> None:
"""Report a progress message within a step."""
raise NotImplementedError
@abstractmethod
def report_info(self, message: str) -> None:
"""Report an informational message."""
raise NotImplementedError
@abstractmethod
def report_warning(self, message: str) -> None:
"""Report a warning message."""
raise NotImplementedError
@abstractmethod
def report_error(self, message: str) -> None:
"""Report an error message."""
raise NotImplementedError
@abstractmethod
def report_success(self, message: str) -> None:
"""Report a success message."""
raise NotImplementedError
PipelineReporter
Console-based implementation for production use:
class PipelineReporter(ReporterInterface):
"""Console-based pipeline reporter.
Delegates to PipelineConsole for output formatting
while also logging messages for debugging.
"""
def __init__(self, console: PipelineConsole):
self.console = console
self.logger = get_logger(__name__)
Usage
from myspellchecker.utils.console import PipelineConsole
from myspellchecker.data_pipeline.reporter import PipelineReporter
# Create reporter
console = PipelineConsole()
reporter = PipelineReporter(console)
# Report pipeline progress
def run_pipeline(reporter):
total_steps = 4
# Step 1
reporter.report_step_start(1, total_steps, "Ingesting corpus")
reporter.report_progress("Reading file...")
reporter.report_info("Found 10,000 lines")
reporter.report_step_complete(1, total_steps, "Ingesting corpus", "1.2s")
# Step 2
reporter.report_step_start(2, total_steps, "Segmenting text")
reporter.report_progress("Processing...")
reporter.report_step_complete(2, total_steps, "Segmenting text", "5.3s")
# Step 3 (skipped)
reporter.report_step_skipped(3, total_steps, "POS Tagging", "No POS seed provided")
# Step 4
reporter.report_step_start(4, total_steps, "Building database")
reporter.report_progress("Creating tables...")
reporter.report_success("Database created successfully")
reporter.report_step_complete(4, total_steps, "Building database", "2.1s")
Methods
| Method | Description |
|---|
report_step_start(step, total, title) | Announce step starting |
report_step_complete(step, total, title, duration) | Announce step completed |
report_step_skipped(step, total, title, reason) | Announce step skipped |
report_progress(message) | Progress within a step |
report_info(message) | Informational message |
report_warning(message) | Warning message |
report_error(message) | Error message |
report_success(message) | Success message |
print_raw(*args, **kwargs) | Print raw content |
print_newline() | Print blank line |
MockReporter
Testing implementation that records all messages:
from myspellchecker.data_pipeline.reporter import MockReporter
class MockReporter(ReporterInterface):
"""Mock reporter for testing pipeline logic."""
def __init__(self):
self.step_starts: List[tuple] = []
self.step_completes: List[tuple] = []
self.step_skips: List[tuple] = []
self.progress_messages: List[str] = []
self.infos: List[str] = []
self.warnings: List[str] = []
self.errors: List[str] = []
self.successes: List[str] = []
def reset(self) -> None:
"""Clear all recorded messages."""
...
Testing Usage
from myspellchecker.data_pipeline.reporter import MockReporter
def test_pipeline_reports_progress():
# Arrange
reporter = MockReporter()
# Act
run_pipeline(reporter)
# Assert
assert len(reporter.step_starts) == 4
assert len(reporter.step_completes) == 3
assert len(reporter.step_skips) == 1
assert "Found 10,000 lines" in reporter.infos
assert reporter.step_skips[0] == (3, 4, "POS Tagging", "No POS seed provided")
def test_pipeline_handles_errors():
reporter = MockReporter()
try:
run_failing_pipeline(reporter)
except Exception:
pass
assert len(reporter.errors) > 0
assert "Failed" in reporter.errors[0]
Reset Between Tests
def test_multiple_scenarios():
reporter = MockReporter()
# Scenario 1
run_pipeline_scenario_1(reporter)
assert len(reporter.infos) == 5
reporter.reset()
# Scenario 2 (fresh start)
run_pipeline_scenario_2(reporter)
assert len(reporter.infos) == 3
Integration with Pipeline
The Pipeline receives its reporter through PipelineConfig, not as a direct constructor parameter:
from myspellchecker.data_pipeline import Pipeline
from myspellchecker.data_pipeline.config import PipelineConfig
from myspellchecker.data_pipeline.reporter import PipelineReporter
from myspellchecker.utils.console import PipelineConsole
# Configure reporter via PipelineConfig
console = PipelineConsole()
reporter = PipelineReporter(console)
config = PipelineConfig(reporter=reporter)
pipeline = Pipeline(config=config)
pipeline.build_database(["corpus.txt"], "output.db")
Custom Reporters
File-Based Reporter
class FileReporter(ReporterInterface):
"""Write progress to a log file."""
def __init__(self, log_path: Path):
self.log_path = log_path
def report_step_start(self, step: int, total: int, title: str) -> None:
with open(self.log_path, "a") as f:
f.write(f"[START] Step {step}/{total}: {title}\n")
def report_step_complete(self, step: int, total: int, title: str, duration: str = "") -> None:
with open(self.log_path, "a") as f:
f.write(f"[DONE] Step {step}/{total}: {title} ({duration})\n")
# ... implement other methods ...
JSON Reporter
import json
class JSONReporter(ReporterInterface):
"""Collect progress as JSON for API consumption."""
def __init__(self):
self.events = []
def report_step_start(self, step: int, total: int, title: str) -> None:
self.events.append({
"type": "step_start",
"step": step,
"total": total,
"title": title,
"timestamp": datetime.now().isoformat(),
})
def to_json(self) -> str:
return json.dumps(self.events, indent=2)
See Also