Skip to main content
Long-running pipeline stages need to communicate progress without coupling to a specific output target. The PipelineReporter protocol lets you swap between rich console output, structured logging, or silent test mocks.

Overview

from myspellchecker.utils.console import PipelineConsole
from myspellchecker.data_pipeline.reporter import PipelineReporter

console = PipelineConsole()
reporter = PipelineReporter(console)

# Report progress
reporter.report_step_start(1, 4, "Processing data")
reporter.report_info("Found 1000 records")
reporter.report_step_complete(1, 4, "Processing data", "2.5s")

ReporterInterface

Abstract base class defining the reporting contract:
from abc import ABC, abstractmethod

class ReporterInterface(ABC):
    """Abstract interface for pipeline reporting."""

    @abstractmethod
    def report_step_start(self, step: int, total: int, title: str) -> None:
        """Report that a pipeline step is starting."""
        raise NotImplementedError

    @abstractmethod
    def report_step_complete(self, step: int, total: int, title: str, duration: str = "") -> None:
        """Report that a pipeline step completed successfully."""
        raise NotImplementedError

    @abstractmethod
    def report_step_skipped(self, step: int, total: int, title: str, reason: str = "") -> None:
        """Report that a pipeline step was skipped."""
        raise NotImplementedError

    @abstractmethod
    def report_progress(self, message: str) -> None:
        """Report a progress message within a step."""
        raise NotImplementedError

    @abstractmethod
    def report_info(self, message: str) -> None:
        """Report an informational message."""
        raise NotImplementedError

    @abstractmethod
    def report_warning(self, message: str) -> None:
        """Report a warning message."""
        raise NotImplementedError

    @abstractmethod
    def report_error(self, message: str) -> None:
        """Report an error message."""
        raise NotImplementedError

    @abstractmethod
    def report_success(self, message: str) -> None:
        """Report a success message."""
        raise NotImplementedError

PipelineReporter

Console-based implementation for production use:
class PipelineReporter(ReporterInterface):
    """Console-based pipeline reporter.

    Delegates to PipelineConsole for output formatting
    while also logging messages for debugging.
    """

    def __init__(self, console: PipelineConsole):
        self.console = console
        self.logger = get_logger(__name__)

Usage

from myspellchecker.utils.console import PipelineConsole
from myspellchecker.data_pipeline.reporter import PipelineReporter

# Create reporter
console = PipelineConsole()
reporter = PipelineReporter(console)

# Report pipeline progress
def run_pipeline(reporter):
    total_steps = 4

    # Step 1
    reporter.report_step_start(1, total_steps, "Ingesting corpus")
    reporter.report_progress("Reading file...")
    reporter.report_info("Found 10,000 lines")
    reporter.report_step_complete(1, total_steps, "Ingesting corpus", "1.2s")

    # Step 2
    reporter.report_step_start(2, total_steps, "Segmenting text")
    reporter.report_progress("Processing...")
    reporter.report_step_complete(2, total_steps, "Segmenting text", "5.3s")

    # Step 3 (skipped)
    reporter.report_step_skipped(3, total_steps, "POS Tagging", "No POS seed provided")

    # Step 4
    reporter.report_step_start(4, total_steps, "Building database")
    reporter.report_progress("Creating tables...")
    reporter.report_success("Database created successfully")
    reporter.report_step_complete(4, total_steps, "Building database", "2.1s")

Methods

MethodDescription
report_step_start(step, total, title)Announce step starting
report_step_complete(step, total, title, duration)Announce step completed
report_step_skipped(step, total, title, reason)Announce step skipped
report_progress(message)Progress within a step
report_info(message)Informational message
report_warning(message)Warning message
report_error(message)Error message
report_success(message)Success message
print_raw(*args, **kwargs)Print raw content
print_newline()Print blank line

MockReporter

Testing implementation that records all messages:
from myspellchecker.data_pipeline.reporter import MockReporter

class MockReporter(ReporterInterface):
    """Mock reporter for testing pipeline logic."""

    def __init__(self):
        self.step_starts: List[tuple] = []
        self.step_completes: List[tuple] = []
        self.step_skips: List[tuple] = []
        self.progress_messages: List[str] = []
        self.infos: List[str] = []
        self.warnings: List[str] = []
        self.errors: List[str] = []
        self.successes: List[str] = []

    def reset(self) -> None:
        """Clear all recorded messages."""
        ...

Testing Usage

from myspellchecker.data_pipeline.reporter import MockReporter

def test_pipeline_reports_progress():
    # Arrange
    reporter = MockReporter()

    # Act
    run_pipeline(reporter)

    # Assert
    assert len(reporter.step_starts) == 4
    assert len(reporter.step_completes) == 3
    assert len(reporter.step_skips) == 1
    assert "Found 10,000 lines" in reporter.infos
    assert reporter.step_skips[0] == (3, 4, "POS Tagging", "No POS seed provided")

def test_pipeline_handles_errors():
    reporter = MockReporter()

    try:
        run_failing_pipeline(reporter)
    except Exception:
        pass

    assert len(reporter.errors) > 0
    assert "Failed" in reporter.errors[0]

Reset Between Tests

def test_multiple_scenarios():
    reporter = MockReporter()

    # Scenario 1
    run_pipeline_scenario_1(reporter)
    assert len(reporter.infos) == 5
    reporter.reset()

    # Scenario 2 (fresh start)
    run_pipeline_scenario_2(reporter)
    assert len(reporter.infos) == 3

Integration with Pipeline

The Pipeline receives its reporter through PipelineConfig, not as a direct constructor parameter:
from myspellchecker.data_pipeline import Pipeline
from myspellchecker.data_pipeline.config import PipelineConfig
from myspellchecker.data_pipeline.reporter import PipelineReporter
from myspellchecker.utils.console import PipelineConsole

# Configure reporter via PipelineConfig
console = PipelineConsole()
reporter = PipelineReporter(console)
config = PipelineConfig(reporter=reporter)

pipeline = Pipeline(config=config)
pipeline.build_database(["corpus.txt"], "output.db")

Custom Reporters

File-Based Reporter

class FileReporter(ReporterInterface):
    """Write progress to a log file."""

    def __init__(self, log_path: Path):
        self.log_path = log_path

    def report_step_start(self, step: int, total: int, title: str) -> None:
        with open(self.log_path, "a") as f:
            f.write(f"[START] Step {step}/{total}: {title}\n")

    def report_step_complete(self, step: int, total: int, title: str, duration: str = "") -> None:
        with open(self.log_path, "a") as f:
            f.write(f"[DONE] Step {step}/{total}: {title} ({duration})\n")

    # ... implement other methods ...

JSON Reporter

import json

class JSONReporter(ReporterInterface):
    """Collect progress as JSON for API consumption."""

    def __init__(self):
        self.events = []

    def report_step_start(self, step: int, total: int, title: str) -> None:
        self.events.append({
            "type": "step_start",
            "step": step,
            "total": total,
            "title": title,
            "timestamp": datetime.now().isoformat(),
        })

    def to_json(self) -> str:
        return json.dumps(self.events, indent=2)

See Also