Skip to main content
The Pipeline Reporter provides an abstraction layer for reporting progress during data pipeline execution, supporting console output, logging, and testing scenarios.

Overview

from myspellchecker.utils.console import PipelineConsole
from myspellchecker.data_pipeline.reporter import PipelineReporter

console = PipelineConsole()
reporter = PipelineReporter(console)

# Report progress
reporter.report_step_start(1, 4, "Processing data")
reporter.report_info("Found 1000 records")
reporter.report_step_complete(1, 4, "Processing data", "2.5s")

ReporterInterface

Abstract base class defining the reporting contract:
from abc import ABC, abstractmethod

class ReporterInterface(ABC):
    """Abstract interface for pipeline reporting."""

    @abstractmethod
    def report_step_start(self, step: int, total: int, title: str) -> None:
        """Report that a pipeline step is starting."""
        pass

    @abstractmethod
    def report_step_complete(self, step: int, total: int, title: str, duration: str = "") -> None:
        """Report that a pipeline step completed successfully."""
        pass

    @abstractmethod
    def report_step_skipped(self, step: int, total: int, title: str, reason: str = "") -> None:
        """Report that a pipeline step was skipped."""
        pass

    @abstractmethod
    def report_progress(self, message: str) -> None:
        """Report a progress message within a step."""
        pass

    @abstractmethod
    def report_info(self, message: str) -> None:
        """Report an informational message."""
        pass

    @abstractmethod
    def report_warning(self, message: str) -> None:
        """Report a warning message."""
        pass

    @abstractmethod
    def report_error(self, message: str) -> None:
        """Report an error message."""
        pass

    @abstractmethod
    def report_success(self, message: str) -> None:
        """Report a success message."""
        pass

PipelineReporter

Console-based implementation for production use:
class PipelineReporter(ReporterInterface):
    """Console-based pipeline reporter.

    Delegates to PipelineConsole for output formatting
    while also logging messages for debugging.
    """

    def __init__(self, console: PipelineConsole):
        self.console = console
        self.logger = get_logger(__name__)

Usage

from myspellchecker.utils.console import PipelineConsole
from myspellchecker.data_pipeline.reporter import PipelineReporter

# Create reporter
console = PipelineConsole()
reporter = PipelineReporter(console)

# Report pipeline progress
def run_pipeline(reporter):
    total_steps = 4

    # Step 1
    reporter.report_step_start(1, total_steps, "Ingesting corpus")
    reporter.report_progress("Reading file...")
    reporter.report_info("Found 10,000 lines")
    reporter.report_step_complete(1, total_steps, "Ingesting corpus", "1.2s")

    # Step 2
    reporter.report_step_start(2, total_steps, "Segmenting text")
    reporter.report_progress("Processing...")
    reporter.report_step_complete(2, total_steps, "Segmenting text", "5.3s")

    # Step 3 (skipped)
    reporter.report_step_skipped(3, total_steps, "POS Tagging", "No POS seed provided")

    # Step 4
    reporter.report_step_start(4, total_steps, "Building database")
    reporter.report_progress("Creating tables...")
    reporter.report_success("Database created successfully")
    reporter.report_step_complete(4, total_steps, "Building database", "2.1s")

Methods

MethodDescription
report_step_start(step, total, title)Announce step starting
report_step_complete(step, total, title, duration)Announce step completed
report_step_skipped(step, total, title, reason)Announce step skipped
report_progress(message)Progress within a step
report_info(message)Informational message
report_warning(message)Warning message
report_error(message)Error message
report_success(message)Success message
print_table(table)Print Rich table
print_raw(*args, **kwargs)Print raw content
print_newline()Print blank line

MockReporter

Testing implementation that records all messages:
from myspellchecker.data_pipeline.reporter import MockReporter

class MockReporter(ReporterInterface):
    """Mock reporter for testing pipeline logic."""

    def __init__(self):
        self.step_starts: List[tuple] = []
        self.step_completes: List[tuple] = []
        self.step_skips: List[tuple] = []
        self.progress_messages: List[str] = []
        self.infos: List[str] = []
        self.warnings: List[str] = []
        self.errors: List[str] = []
        self.successes: List[str] = []

    def reset(self) -> None:
        """Clear all recorded messages."""
        ...

    @property
    def all_messages(self) -> Dict[str, List]:
        """Get all recorded messages as a dictionary."""
        ...

Testing Usage

from myspellchecker.data_pipeline.reporter import MockReporter

def test_pipeline_reports_progress():
    # Arrange
    reporter = MockReporter()

    # Act
    run_pipeline(reporter)

    # Assert
    assert len(reporter.step_starts) == 4
    assert len(reporter.step_completes) == 3
    assert len(reporter.step_skips) == 1
    assert "Found 10,000 lines" in reporter.infos
    assert reporter.step_skips[0] == (3, 4, "POS Tagging", "No POS seed provided")

def test_pipeline_handles_errors():
    reporter = MockReporter()

    try:
        run_failing_pipeline(reporter)
    except Exception:
        pass

    assert len(reporter.errors) > 0
    assert "Failed" in reporter.errors[0]

Reset Between Tests

def test_multiple_scenarios():
    reporter = MockReporter()

    # Scenario 1
    run_pipeline_scenario_1(reporter)
    assert len(reporter.infos) == 5
    reporter.reset()

    # Scenario 2 (fresh start)
    run_pipeline_scenario_2(reporter)
    assert len(reporter.infos) == 3

Integration with Pipeline

from myspellchecker.data_pipeline import Pipeline
from myspellchecker.data_pipeline.reporter import PipelineReporter
from myspellchecker.utils.console import PipelineConsole

class Pipeline:
    def __init__(self, reporter: Optional[ReporterInterface] = None):
        if reporter is None:
            console = PipelineConsole()
            reporter = PipelineReporter(console)
        self.reporter = reporter

    def run(self, input_path: Path, output_path: Path) -> None:
        self.reporter.report_step_start(1, 4, "Ingesting")
        # ... processing ...
        self.reporter.report_step_complete(1, 4, "Ingesting", "1.5s")

Custom Reporters

File-Based Reporter

class FileReporter(ReporterInterface):
    """Write progress to a log file."""

    def __init__(self, log_path: Path):
        self.log_path = log_path

    def report_step_start(self, step: int, total: int, title: str) -> None:
        with open(self.log_path, "a") as f:
            f.write(f"[START] Step {step}/{total}: {title}\n")

    def report_step_complete(self, step: int, total: int, title: str, duration: str = "") -> None:
        with open(self.log_path, "a") as f:
            f.write(f"[DONE] Step {step}/{total}: {title} ({duration})\n")

    # ... implement other methods ...

JSON Reporter

import json

class JSONReporter(ReporterInterface):
    """Collect progress as JSON for API consumption."""

    def __init__(self):
        self.events = []

    def report_step_start(self, step: int, total: int, title: str) -> None:
        self.events.append({
            "type": "step_start",
            "step": step,
            "total": total,
            "title": title,
            "timestamp": datetime.now().isoformat(),
        })

    def to_json(self) -> str:
        return json.dumps(self.events, indent=2)

See Also