Skip to main content
The Worker Context module provides thread-local dependency injection for multiprocessing workers, enabling efficient sharing of expensive resources like segmenters and validators across parallel processing tasks.

Overview

from myspellchecker.data_pipeline.worker_context import (
    WorkerContext,
    set_worker_context,
    get_worker_context,
    ContextManager,
)

# Set up context in worker process
context = WorkerContext(
    segmenter=segmenter,
    repairer=repairer,
    validator=validator,
)
set_worker_context(context)

# Access in processing function
def process_line(line: str) -> List[str]:
    ctx = get_worker_context()
    words = ctx.segmenter.segment_words(line)
    return ctx.repairer.repair(words)

WorkerContext Dataclass

Holds dependencies for worker processes:
from dataclasses import dataclass, field
from typing import Any, Dict, Optional

@dataclass
class WorkerContext:
    """Context holding dependencies for worker processes.

    Attributes:
        segmenter: Text segmentation component (DefaultSegmenter)
        repairer: Segmentation repair component (SegmentationRepair)
        validator: Syllable rule validator (SyllableRuleValidator)
        pos_map: POS tag mapping dictionary
        custom_data: Extension point for custom data
    """

    # Segmentation dependencies
    segmenter: Optional["DefaultSegmenter"] = None
    repairer: Optional["SegmentationRepair"] = None

    # Frequency building dependencies
    validator: Optional["SyllableRuleValidator"] = None
    pos_map: Optional[Dict[str, Any]] = None

    # Extension point
    custom_data: Dict[str, Any] = field(default_factory=dict)

Thread-Local Storage

The module uses thread-local storage for process isolation:
import threading

# Thread-local storage for worker context
_worker_context_storage = threading.local()

def set_worker_context(context: WorkerContext) -> None:
    """Set the worker context for the current thread/process.

    Args:
        context: WorkerContext instance to set
    """
    _worker_context_storage.context = context

def get_worker_context() -> WorkerContext:
    """Get the worker context for the current thread/process.

    Returns:
        The current WorkerContext

    Raises:
        RuntimeError: If no context has been set
    """
    if not hasattr(_worker_context_storage, "context"):
        raise RuntimeError("No worker context set. Call set_worker_context first.")
    return _worker_context_storage.context

def has_worker_context() -> bool:
    """Check if a worker context is set.

    Returns:
        True if context is set, False otherwise
    """
    return hasattr(_worker_context_storage, "context")

def clear_worker_context() -> None:
    """Clear the current worker context."""
    if hasattr(_worker_context_storage, "context"):
        delattr(_worker_context_storage, "context")

ContextManager

Context manager for automatic cleanup:
from myspellchecker.data_pipeline.worker_context import ContextManager, WorkerContext

# Create context with components
context = WorkerContext(
    segmenter=segmenter,
    repairer=repairer,
    validator=validator,
)

# Use context manager for automatic lifecycle
with ContextManager(context) as ctx:
    # Context is set and available
    result = process_with_context(data)
# Context is automatically cleared after block

Implementation

class ContextManager:
    """Context manager for automatic worker context lifecycle.

    Automatically sets context on enter and clears on exit.
    """

    def __init__(self, context: Optional[WorkerContext] = None):
        """
        Initialize context manager.

        Args:
            context: Optional pre-configured WorkerContext
        """
        self.context = context or WorkerContext()

    def __enter__(self) -> WorkerContext:
        """Set up worker context."""
        set_worker_context(self.context)
        return self.context

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Clean up worker context."""
        clear_worker_context()
        return False

Factory Functions

create_segmentation_context

Creates a context optimized for segmentation tasks:
from myspellchecker.data_pipeline.worker_context import create_segmentation_context
from myspellchecker.segmenters import DefaultSegmenter
from myspellchecker.data_pipeline.repair import SegmentationRepair

# Create segmenter and repairer
segmenter = DefaultSegmenter()
repairer = SegmentationRepair()

# Create configured context
context = create_segmentation_context(segmenter, repairer)
Signature:
def create_segmentation_context(
    segmenter: DefaultSegmenter,
    repairer: Optional[Any] = None,
) -> WorkerContext:
    """Create a context for segmentation tasks.

    Args:
        segmenter: Text segmenter instance
        repairer: Optional segmentation repair instance

    Returns:
        WorkerContext configured for segmentation
    """

create_frequency_context

Creates a context for frequency counting:
from myspellchecker.data_pipeline.worker_context import create_frequency_context

def create_frequency_context(
    validator: SyllableRuleValidator,
    pos_map: Optional[Dict[str, Any]] = None,
) -> WorkerContext:
    """Create a context for frequency counting tasks.

    Args:
        validator: Syllable rule validator for filtering
        pos_map: Optional POS tag mapping

    Returns:
        WorkerContext configured for frequency counting
    """
    return WorkerContext(
        validator=validator,
        pos_map=pos_map,
    )

Multiprocessing Usage

Pool Initializer

Use with multiprocessing.Pool initializer:
from multiprocessing import Pool
from myspellchecker.data_pipeline.worker_context import (
    WorkerContext,
    set_worker_context,
    get_worker_context,
)

def init_worker(engine: str):
    """Initialize worker with segmentation context."""
    from myspellchecker.segmenters import DefaultSegmenter
    from myspellchecker.data_pipeline.repair import SegmentationRepair

    context = WorkerContext(
        segmenter=DefaultSegmenter(word_engine=engine),
        repairer=SegmentationRepair(),
    )
    set_worker_context(context)

def process_line(line: str) -> List[str]:
    """Process a line using worker context."""
    ctx = get_worker_context()
    words = ctx.segmenter.segment_words(line)
    return ctx.repairer.repair(words)

# Use with Pool
with Pool(4, initializer=init_worker, initargs=("myword",)) as pool:
    results = pool.map(process_line, lines)

ProcessPoolExecutor

Use with concurrent.futures:
from concurrent.futures import ProcessPoolExecutor
from functools import partial

def process_batch(lines: List[str], engine: str) -> List[List[str]]:
    """Process a batch of lines."""
    # Initialize context for this process
    from myspellchecker.data_pipeline.worker_context import create_segmentation_context, set_worker_context
    from myspellchecker.segmenters import DefaultSegmenter
    from myspellchecker.data_pipeline.repair import SegmentationRepair

    segmenter = DefaultSegmenter(word_engine=engine)
    repairer = SegmentationRepair()
    ctx = create_segmentation_context(segmenter, repairer)
    set_worker_context(ctx)

    results = []
    for line in lines:
        words = ctx.segmenter.segment_words(line)
        results.append(ctx.repairer.repair(words))
    return results

# Split work into batches
batches = [lines[i:i+100] for i in range(0, len(lines), 100)]

with ProcessPoolExecutor(max_workers=4) as executor:
    process_fn = partial(process_batch, engine="myword")
    results = list(executor.map(process_fn, batches))

Integration with Cython

The worker context integrates with Cython batch processor:
# In batch_processor.pyx
from myspellchecker.data_pipeline.worker_context import get_worker_context

def process_lines_parallel(lines: List[str]) -> List[List[str]]:
    """Process lines in parallel using worker context."""
    ctx = get_worker_context()

    results = []
    for line in lines:
        # Use context components
        words = ctx.segmenter.segment_words(line)
        if ctx.repairer is not None:
            words = ctx.repairer.repair(words)
        results.append(words)

    return results

Error Handling

Missing Context

from myspellchecker.data_pipeline.worker_context import (
    get_worker_context,
    has_worker_context,
)

def safe_process(line: str) -> List[str]:
    """Process with fallback if no context."""
    if not has_worker_context():
        # Fallback to creating local components
        from myspellchecker.segmenters import RegexSegmenter
        segmenter = RegexSegmenter()
        return segmenter.segment_syllables(line)

    ctx = get_worker_context()
    return ctx.segmenter.segment_words(line)

Context Validation

def validated_process(line: str) -> List[str]:
    """Process with validation."""
    ctx = get_worker_context()

    if ctx.segmenter is None:
        raise RuntimeError("Worker context missing segmenter")

    words = ctx.segmenter.segment_words(line)

    if ctx.repairer is not None:
        words = ctx.repairer.repair(words)

    if ctx.validator is not None:
        words = [w for w in words if ctx.validator.is_valid_syllable(w)]

    return words

Best Practices

1. Initialize Once Per Process

# Good: Initialize once in pool initializer
def init_worker():
    segmenter = DefaultSegmenter()
    ctx = create_segmentation_context(segmenter)
    set_worker_context(ctx)

# Bad: Initialize in every task
def process_line(line):
    ctx = create_segmentation_context(DefaultSegmenter())  # Expensive!
    ...

2. Use Factory Functions

# Good: Use predefined factory
segmenter = DefaultSegmenter(word_engine="myword")
ctx = create_segmentation_context(segmenter)

# Also good: Custom context when needed
ctx = WorkerContext(
    segmenter=custom_segmenter,
    custom_data={"custom_option": True},
)

3. Clean Up Resources

# Good: Use context manager for cleanup
with ContextManager(context) as ctx:
    process(data)

# Or: Explicit cleanup
try:
    set_worker_context(ctx)
    process(data)
finally:
    clear_worker_context()

See Also