Documentation Index
Fetch the complete documentation index at: https://docs.myspellchecker.com/llms.txt
Use this file to discover all available pages before exploring further.
When you have many texts to check, check_batch() processes them together with reduced overhead. Combine with thread or process pools for up to ~10x throughput improvements over sequential checking.
Why Batch Processing?
Processing texts individually has overhead:
- Repeated initialization
- No parallelization
- Inefficient memory usage
Batch processing provides:
- Parallelization: Process multiple texts concurrently
- Reduced overhead: Share resources across texts
- Better throughput: Up to ~10x faster than sequential
Basic Usage
Simple Batch Check
from myspellchecker import SpellChecker
checker = SpellChecker()
texts = [
"မြန်မာနိုင်ငံ",
"ကျေးဇူးတင်ပါသည်",
"နေကောင်းလား",
]
# Check all texts
results = checker.check_batch(texts)
for text, result in zip(texts, results):
print(f"{text}: {len(result.errors)} errors")
With Validation Level
from myspellchecker.core.constants import ValidationLevel
# Check with syllable-level validation (faster)
results = checker.check_batch(texts, level=ValidationLevel.SYLLABLE)
# Check with word-level validation (more thorough)
results = checker.check_batch(texts, level=ValidationLevel.WORD)
Parallelization
OpenMP Parallelization
Cython extensions use OpenMP for parallel processing:
# Requires libomp on macOS: brew install libomp
# Automatically uses all available cores
results = checker.check_batch(texts)
Thread Pool Parallelization
Python-level parallelization for I/O-bound operations:
from concurrent.futures import ThreadPoolExecutor
checker = SpellChecker()
def check_single(text):
return checker.check(text)
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(check_single, texts))
Process Pool Parallelization
For CPU-bound operations with GIL limitations:
from concurrent.futures import ProcessPoolExecutor
# Each process gets its own checker
def check_in_process(text):
checker = SpellChecker()
return checker.check(text)
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(check_in_process, texts))
Configuration
Batch Configuration
from myspellchecker import SpellChecker
from myspellchecker.core.config import get_profile
# Use a preset optimized for batch processing
config = get_profile("fast")
checker = SpellChecker(config=config)
results = checker.check_batch(texts)
Memory-Efficient Processing
# For very large datasets, use streaming
def check_stream(file_handle, batch_size=100):
"""Process file in memory-efficient batches."""
batch = []
for line in file_handle:
batch.append(line.strip())
if len(batch) >= batch_size:
yield from checker.check_batch(batch)
batch = []
if batch:
yield from checker.check_batch(batch)
# Usage
with open("large_file.txt", encoding="utf-8") as f:
for result in check_stream(f):
if result.has_errors:
print(result.text)
Optimal Batch Size
| Text Length | Optimal Batch Size |
|---|
| Short (<50 chars) | 500-1000 |
| Medium (50-200 chars) | 100-500 |
| Long (>200 chars) | 50-100 |
def get_optimal_batch_size(texts):
avg_length = sum(len(t) for t in texts) / len(texts)
if avg_length < 50:
return 500
elif avg_length < 200:
return 100
else:
return 50
Worker Count
import os
# Rule of thumb: 1-2 workers per CPU core
num_cores = os.cpu_count() or 4
optimal_workers = min(num_cores, len(texts) // 10) # At least 10 texts per worker
Cython Acceleration
Ensure Cython extensions are compiled:
# Check if Cython is being used
python -c "from myspellchecker.text.normalize_c import remove_zero_width_chars; print('Cython OK')"
# Rebuild if needed
python setup.py build_ext --inplace
Benchmarks
Throughput Comparison
Test: 10,000 texts, average 100 chars each
Hardware: 8-core CPU
Note: Illustrative numbers — actual results vary by hardware and text complexity.
Sequential (no batch):
Time: 45.2s
Throughput: 221 texts/sec
Batch (1 worker):
Time: 38.1s
Throughput: 262 texts/sec
Batch (4 workers):
Time: 12.3s
Throughput: 813 texts/sec
Batch (8 workers):
Time: 8.7s
Throughput: 1,149 texts/sec
Batch (8 workers + Cython):
Time: 4.2s
Throughput: 2,381 texts/sec
Memory Usage
Sequential: ~50MB base + 0.1MB per text
Batch (100): ~50MB base + 10MB buffer
Batch (1000): ~50MB base + 100MB buffer
Streaming: ~50MB base + 10MB buffer (constant)
API Reference
check_batch
def check_batch(
texts: list[str],
level: ValidationLevel = ValidationLevel.SYLLABLE,
) -> list[Response]:
"""
Check multiple texts.
Args:
texts: List of texts to check
level: Validation level (SYLLABLE or WORD)
Returns:
List of Response objects
"""
StreamingChecker
For memory-efficient streaming, use StreamingChecker:
from myspellchecker.core.streaming import StreamingChecker
streaming = StreamingChecker(checker)
for result in streaming.check_stream(file_handle):
process_result(result)
Common Patterns
Progress Tracking
from tqdm import tqdm
def check_with_progress(texts):
"""Check texts with progress bar using streaming."""
pbar = tqdm(total=len(texts))
results = []
# Use streaming for progress tracking
# check_batch() does not support callback parameter
for i in range(0, len(texts), 100): # Process in chunks
batch = texts[i:i+100]
batch_results = checker.check_batch(batch)
results.extend(batch_results)
pbar.update(len(batch))
pbar.close()
return results
Note: For true streaming with callbacks, use StreamingChecker.check_stream() with on_progress.
Error Aggregation
from collections import Counter
def aggregate_errors(results):
"""Aggregate errors across all results."""
all_errors = []
error_type_counts = Counter()
for result in results:
for error in result.errors:
error_type_counts[error.error_type] += 1
all_errors.append(error)
return {
"total": len(all_errors),
"by_type": dict(error_type_counts),
# Possible keys: "invalid_syllable", "invalid_word",
# "context_probability", "grammar_error", etc.
"errors": all_errors,
}
Parallel File Processing
from pathlib import Path
from concurrent.futures import ProcessPoolExecutor
def process_file(file_path: Path) -> dict:
"""Process single file."""
checker = SpellChecker()
with open(file_path, encoding="utf-8") as f:
texts = f.readlines()
results = checker.check_batch(texts)
return {
"file": str(file_path),
"total_lines": len(texts),
"errors": sum(len(r.errors) for r in results),
}
def process_directory(dir_path: Path, pattern: str = "*.txt") -> list:
"""Process all files in directory."""
files = list(dir_path.glob(pattern))
with ProcessPoolExecutor() as executor:
results = list(executor.map(process_file, files))
return results
Chunked Processing for Very Large Files
def process_large_file(file_path: str, chunk_size: int = 10000):
"""Process very large file in chunks."""
all_results = []
chunk = []
with open(file_path, encoding="utf-8") as f:
for line in f:
chunk.append(line.strip())
if len(chunk) >= chunk_size:
results = checker.check_batch(chunk)
all_results.extend(results)
chunk = []
# Optional: Clear cache to free memory
# Note: SpellChecker has no clear_cache() method
if chunk:
results = checker.check_batch(chunk)
all_results.extend(results)
return all_results
Troubleshooting
Issue: No speedup with batch processing
Cause: GIL contention or I/O bottleneck
Solution: Use process-based parallelization:
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(check_single, texts))
Issue: Out of memory
Cause: Batch too large or results not processed
Solution: Use streaming:
from myspellchecker.core.streaming import StreamingChecker
streaming = StreamingChecker(checker)
for result in streaming.check_stream(text_iterator):
process_result(result) # Process immediately
# Results are garbage collected
Issue: Slow with many small texts
Cause: Worker overhead dominates
Solution: Process larger batches at once:
# Process texts in larger chunks manually
chunk_size = 1000
for i in range(0, len(texts), chunk_size):
chunk = texts[i:i + chunk_size]
results = checker.check_batch(chunk)
Next Steps