Overview
Copy
Ask AI
from myspellchecker.data_pipeline.worker_context import (
WorkerContext,
set_worker_context,
get_worker_context,
ContextManager,
)
# Set up context in worker process
context = WorkerContext(
segmenter=segmenter,
repairer=repairer,
validator=validator,
)
set_worker_context(context)
# Access in processing function
def process_line(line: str) -> List[str]:
ctx = get_worker_context()
words = ctx.segmenter.segment_words(line)
return ctx.repairer.repair(words)
WorkerContext Dataclass
Holds dependencies for worker processes:Copy
Ask AI
from dataclasses import dataclass, field
from typing import Any, Dict, Optional
@dataclass
class WorkerContext:
"""Context holding dependencies for worker processes.
Attributes:
segmenter: Text segmentation component (DefaultSegmenter)
repairer: Segmentation repair component (SegmentationRepair)
validator: Syllable rule validator (SyllableRuleValidator)
pos_map: POS tag mapping dictionary
custom_data: Extension point for custom data
"""
# Segmentation dependencies
segmenter: Optional["DefaultSegmenter"] = None
repairer: Optional["SegmentationRepair"] = None
# Frequency building dependencies
validator: Optional["SyllableRuleValidator"] = None
pos_map: Optional[Dict[str, Any]] = None
# Extension point
custom_data: Dict[str, Any] = field(default_factory=dict)
Thread-Local Storage
The module uses thread-local storage for process isolation:Copy
Ask AI
import threading
# Thread-local storage for worker context
_worker_context_storage = threading.local()
def set_worker_context(context: WorkerContext) -> None:
"""Set the worker context for the current thread/process.
Args:
context: WorkerContext instance to set
"""
_worker_context_storage.context = context
def get_worker_context() -> WorkerContext:
"""Get the worker context for the current thread/process.
Returns:
The current WorkerContext
Raises:
RuntimeError: If no context has been set
"""
if not hasattr(_worker_context_storage, "context"):
raise RuntimeError("No worker context set. Call set_worker_context first.")
return _worker_context_storage.context
def has_worker_context() -> bool:
"""Check if a worker context is set.
Returns:
True if context is set, False otherwise
"""
return hasattr(_worker_context_storage, "context")
def clear_worker_context() -> None:
"""Clear the current worker context."""
if hasattr(_worker_context_storage, "context"):
delattr(_worker_context_storage, "context")
ContextManager
Context manager for automatic cleanup:Copy
Ask AI
from myspellchecker.data_pipeline.worker_context import ContextManager, WorkerContext
# Create context with components
context = WorkerContext(
segmenter=segmenter,
repairer=repairer,
validator=validator,
)
# Use context manager for automatic lifecycle
with ContextManager(context) as ctx:
# Context is set and available
result = process_with_context(data)
# Context is automatically cleared after block
Implementation
Copy
Ask AI
class ContextManager:
"""Context manager for automatic worker context lifecycle.
Automatically sets context on enter and clears on exit.
"""
def __init__(self, context: Optional[WorkerContext] = None):
"""
Initialize context manager.
Args:
context: Optional pre-configured WorkerContext
"""
self.context = context or WorkerContext()
def __enter__(self) -> WorkerContext:
"""Set up worker context."""
set_worker_context(self.context)
return self.context
def __exit__(self, exc_type, exc_val, exc_tb):
"""Clean up worker context."""
clear_worker_context()
return False
Factory Functions
create_segmentation_context
Creates a context optimized for segmentation tasks:Copy
Ask AI
from myspellchecker.data_pipeline.worker_context import create_segmentation_context
from myspellchecker.segmenters import DefaultSegmenter
from myspellchecker.data_pipeline.repair import SegmentationRepair
# Create segmenter and repairer
segmenter = DefaultSegmenter()
repairer = SegmentationRepair()
# Create configured context
context = create_segmentation_context(segmenter, repairer)
Copy
Ask AI
def create_segmentation_context(
segmenter: DefaultSegmenter,
repairer: Optional[Any] = None,
) -> WorkerContext:
"""Create a context for segmentation tasks.
Args:
segmenter: Text segmenter instance
repairer: Optional segmentation repair instance
Returns:
WorkerContext configured for segmentation
"""
create_frequency_context
Creates a context for frequency counting:Copy
Ask AI
from myspellchecker.data_pipeline.worker_context import create_frequency_context
def create_frequency_context(
validator: SyllableRuleValidator,
pos_map: Optional[Dict[str, Any]] = None,
) -> WorkerContext:
"""Create a context for frequency counting tasks.
Args:
validator: Syllable rule validator for filtering
pos_map: Optional POS tag mapping
Returns:
WorkerContext configured for frequency counting
"""
return WorkerContext(
validator=validator,
pos_map=pos_map,
)
Multiprocessing Usage
Pool Initializer
Use with multiprocessing.Pool initializer:Copy
Ask AI
from multiprocessing import Pool
from myspellchecker.data_pipeline.worker_context import (
WorkerContext,
set_worker_context,
get_worker_context,
)
def init_worker(engine: str):
"""Initialize worker with segmentation context."""
from myspellchecker.segmenters import DefaultSegmenter
from myspellchecker.data_pipeline.repair import SegmentationRepair
context = WorkerContext(
segmenter=DefaultSegmenter(word_engine=engine),
repairer=SegmentationRepair(),
)
set_worker_context(context)
def process_line(line: str) -> List[str]:
"""Process a line using worker context."""
ctx = get_worker_context()
words = ctx.segmenter.segment_words(line)
return ctx.repairer.repair(words)
# Use with Pool
with Pool(4, initializer=init_worker, initargs=("myword",)) as pool:
results = pool.map(process_line, lines)
ProcessPoolExecutor
Use with concurrent.futures:Copy
Ask AI
from concurrent.futures import ProcessPoolExecutor
from functools import partial
def process_batch(lines: List[str], engine: str) -> List[List[str]]:
"""Process a batch of lines."""
# Initialize context for this process
from myspellchecker.data_pipeline.worker_context import create_segmentation_context, set_worker_context
from myspellchecker.segmenters import DefaultSegmenter
from myspellchecker.data_pipeline.repair import SegmentationRepair
segmenter = DefaultSegmenter(word_engine=engine)
repairer = SegmentationRepair()
ctx = create_segmentation_context(segmenter, repairer)
set_worker_context(ctx)
results = []
for line in lines:
words = ctx.segmenter.segment_words(line)
results.append(ctx.repairer.repair(words))
return results
# Split work into batches
batches = [lines[i:i+100] for i in range(0, len(lines), 100)]
with ProcessPoolExecutor(max_workers=4) as executor:
process_fn = partial(process_batch, engine="myword")
results = list(executor.map(process_fn, batches))
Integration with Cython
The worker context integrates with Cython batch processor:Copy
Ask AI
# In batch_processor.pyx
from myspellchecker.data_pipeline.worker_context import get_worker_context
def process_lines_parallel(lines: List[str]) -> List[List[str]]:
"""Process lines in parallel using worker context."""
ctx = get_worker_context()
results = []
for line in lines:
# Use context components
words = ctx.segmenter.segment_words(line)
if ctx.repairer is not None:
words = ctx.repairer.repair(words)
results.append(words)
return results
Error Handling
Missing Context
Copy
Ask AI
from myspellchecker.data_pipeline.worker_context import (
get_worker_context,
has_worker_context,
)
def safe_process(line: str) -> List[str]:
"""Process with fallback if no context."""
if not has_worker_context():
# Fallback to creating local components
from myspellchecker.segmenters import RegexSegmenter
segmenter = RegexSegmenter()
return segmenter.segment_syllables(line)
ctx = get_worker_context()
return ctx.segmenter.segment_words(line)
Context Validation
Copy
Ask AI
def validated_process(line: str) -> List[str]:
"""Process with validation."""
ctx = get_worker_context()
if ctx.segmenter is None:
raise RuntimeError("Worker context missing segmenter")
words = ctx.segmenter.segment_words(line)
if ctx.repairer is not None:
words = ctx.repairer.repair(words)
if ctx.validator is not None:
words = [w for w in words if ctx.validator.is_valid_syllable(w)]
return words
Best Practices
1. Initialize Once Per Process
Copy
Ask AI
# Good: Initialize once in pool initializer
def init_worker():
segmenter = DefaultSegmenter()
ctx = create_segmentation_context(segmenter)
set_worker_context(ctx)
# Bad: Initialize in every task
def process_line(line):
ctx = create_segmentation_context(DefaultSegmenter()) # Expensive!
...
2. Use Factory Functions
Copy
Ask AI
# Good: Use predefined factory
segmenter = DefaultSegmenter(word_engine="myword")
ctx = create_segmentation_context(segmenter)
# Also good: Custom context when needed
ctx = WorkerContext(
segmenter=custom_segmenter,
custom_data={"custom_option": True},
)
3. Clean Up Resources
Copy
Ask AI
# Good: Use context manager for cleanup
with ContextManager(context) as ctx:
process(data)
# Or: Explicit cleanup
try:
set_worker_context(ctx)
process(data)
finally:
clear_worker_context()
See Also
- Stage Protocol - Pipeline stage interface
- Batch Processing - Parallel processing
- Segmenters - Text segmentation
- Segmentation Repair - Repair broken segments