Documentation Index
Fetch the complete documentation index at: https://docs.myspellchecker.com/llms.txt
Use this file to discover all available pages before exploring further.
When checking files that don’t fit in memory (10 GB+ corpora, server log streams), the standard check() API won’t work. The streaming API keeps memory bounded by processing one line at a time and yielding results as they’re ready, with backpressure control to prevent downstream bottlenecks.
Quick Start
from myspellchecker import SpellChecker
from myspellchecker.core.streaming import StreamingChecker
checker = SpellChecker()
streaming = StreamingChecker(checker)
# Process large file with bounded memory
with open("large_file.txt", encoding="utf-8") as f:
for result in streaming.check_stream(f):
if result.response.has_errors:
for error in result.response.errors:
print(f"Line {result.line_number}: {error.text}")
StreamingConfig
from myspellchecker.core.streaming import StreamingConfig
config = StreamingConfig(
chunk_size=100, # Lines per chunk
max_memory_mb=100, # Memory limit before backpressure
sentence_boundary_pattern=r"[။!?]+", # Myanmar sentence endings
enable_cross_sentence_context=True, # Context across sentences
progress_interval=1000, # Lines between callbacks
timeout_per_chunk=30.0, # Async timeout per chunk
)
streaming = StreamingChecker(checker, config=config)
| Option | Default | Description |
|---|
chunk_size | 100 | Lines processed per chunk |
max_memory_mb | 100 | Max memory before backpressure |
sentence_boundary_pattern | r"[။!?]+" | Regex for sentence boundaries |
enable_cross_sentence_context | True | Enable cross-sentence context |
progress_interval | 1000 | Lines between progress callbacks |
timeout_per_chunk | 30.0 | Async processing timeout (seconds) |
Configuration Presets
# High-throughput (large files on powerful hardware)
high_throughput = StreamingConfig(chunk_size=500, max_memory_mb=500, progress_interval=5000)
# Low-memory (constrained environments)
low_memory = StreamingConfig(chunk_size=10, max_memory_mb=50, progress_interval=100)
# Real-time (per-line processing)
realtime = StreamingConfig(chunk_size=1, max_memory_mb=100, timeout_per_chunk=5.0)
Synchronous Streaming
Processing Files
from myspellchecker.core.streaming import StreamingChecker
streaming = StreamingChecker(checker)
with open("document.txt", encoding="utf-8") as f:
for result in streaming.check_stream(f):
if result.response.has_errors:
for error in result.response.errors:
print(f"Line {result.line_number}: {error.text}")
# From file handle
with open("file.txt") as f:
for result in streaming.check_stream(f):
pass
# From StringIO
from io import StringIO
text_io = StringIO("Line 1\nLine 2\nLine 3")
for result in streaming.check_stream(text_io):
pass
# From any iterable
lines = ["Line 1", "Line 2", "Line 3"]
for result in streaming.check_stream(iter(lines)):
pass
Sentence-by-Sentence Processing
Process text by sentences with context preservation:
text = """
ဤစာကြောင်းသည် ပထမစာကြောင်းဖြစ်သည်။
ဒုတိယစာကြောင်းသည် ပိုရှည်သည်။
"""
for result in streaming.check_sentences(text):
print(f"Sentence {result.chunk_index}: {result.response.text}")
if result.response.has_errors:
print(f" Errors: {result.response.errors}")
Cross-Sentence Context
When enable_cross_sentence_context=True, the checker validates using context from the previous sentence:
config = StreamingConfig(enable_cross_sentence_context=True)
streaming = StreamingChecker(checker, config=config)
for result in streaming.check_sentences(text):
pass
Async Streaming
Basic Async
import asyncio
import aiofiles
from myspellchecker import SpellChecker
from myspellchecker.core.streaming import StreamingChecker
async def process_file():
checker = SpellChecker()
streaming = StreamingChecker(checker)
async with aiofiles.open("large_file.txt") as f:
async for result in streaming.check_stream_async(f):
if result.response.has_errors:
for error in result.response.errors:
print(f"Line {result.line_number}: {error.text} → {error.suggestions}")
asyncio.run(process_file())
Async with Timeout
config = StreamingConfig(timeout_per_chunk=10.0)
streaming = StreamingChecker(checker, config=config)
async for result in streaming.check_stream_async(reader):
if "error" in result.response.metadata:
print(f"Timeout on line {result.line_number}")
Cancellation
async def cancellable_check(filepath: str, cancel_event: asyncio.Event):
checker = SpellChecker()
streaming = StreamingChecker(checker)
async def read_lines():
import aiofiles
async with aiofiles.open(filepath) as f:
async for line in f:
if cancel_event.is_set():
return
yield line
try:
async for result in streaming.check_stream_async(read_lines()):
if cancel_event.is_set():
break
print(f"Checked line {result.line_number}")
except asyncio.CancelledError:
print("Processing was cancelled")
raise
Graceful Shutdown
import signal
async def graceful_streaming():
checker = SpellChecker()
streaming = StreamingChecker(checker)
shutdown_event = asyncio.Event()
def handle_signal():
shutdown_event.set()
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, handle_signal)
loop.add_signal_handler(signal.SIGTERM, handle_signal)
async def read_lines():
for i in range(10000):
if shutdown_event.is_set():
return
yield f"Line {i}\n"
await asyncio.sleep(0.001)
async for result in streaming.check_stream_async(read_lines()):
if shutdown_event.is_set():
print("Shutting down gracefully...")
break
print(f"Checked line {result.line_number}")
Progress Tracking
Progress Callbacks
from myspellchecker.core.streaming import StreamingConfig, StreamingStats
def on_progress(stats: StreamingStats):
print(f"Progress: {stats.lines_processed} lines, "
f"{stats.lines_per_second:.1f} lines/sec, "
f"Memory: {stats.current_memory_mb:.1f}MB")
config = StreamingConfig(progress_interval=100)
streaming = StreamingChecker(checker, config=config)
stats = StreamingStats()
with open("large_file.txt") as f:
for result in streaming.check_stream(f, on_progress=on_progress, stats=stats):
pass
# Final statistics
print(f"Total time: {stats.elapsed_time:.2f}s")
print(f"Total errors: {stats.errors_found}")
With tqdm Progress Bar
from tqdm import tqdm
from myspellchecker.core.streaming import StreamingStats
def check_with_progress(filepath: str):
checker = SpellChecker()
streaming = StreamingChecker(checker)
with open(filepath) as f:
total_lines = sum(1 for _ in f)
stats = StreamingStats()
with open(filepath) as f:
with tqdm(total=total_lines, desc="Checking") as pbar:
for result in streaming.check_stream(f, stats=stats):
pbar.update(1)
pbar.set_postfix({
"errors": stats.errors_found,
"rate": f"{stats.lines_per_second:.0f}/s"
})
With Rich Progress Bar
from rich.progress import Progress
with Progress() as progress:
task = progress.add_task("Checking...", total=total_lines)
def on_progress(stats):
progress.update(task, completed=stats.lines_processed)
for result in streaming.check_stream(f, on_progress=on_progress):
pass
StreamingStats
from myspellchecker.core.streaming import StreamingStats
stats = StreamingStats()
for result in streaming.check_stream(f, stats=stats):
pass
# Available metrics
print(stats.bytes_processed) # Total bytes processed
print(stats.lines_processed) # Total lines processed
print(stats.sentences_processed) # Sentences processed
print(stats.errors_found) # Total errors found
print(stats.chunks_processed) # Chunks processed
print(stats.elapsed_time) # Seconds since start
print(stats.lines_per_second) # Processing rate
print(stats.current_memory_mb) # Current memory usage
# Serialize to dict
stats_dict = stats.to_dict()
ChunkResult
Each iteration yields a ChunkResult:
from myspellchecker.core.streaming import ChunkResult
for result in streaming.check_stream(f):
result.response # Response object with errors
result.line_number # Line number in source file
result.chunk_index # Sequential chunk number
result.is_final # True for last chunk
Memory Management
Backpressure
When memory exceeds max_memory_mb, the streaming checker automatically applies backpressure:
- Sync mode: Triggers garbage collection and adds a small delay
- Async mode: Adds an async sleep to allow cleanup
config = StreamingConfig(max_memory_mb=50)
streaming = StreamingChecker(checker, config=config)
# Automatic GC and delay when limit exceeded
for result in streaming.check_stream(f):
pass
Bounded Memory Usage
# Process 10GB file with ~100MB memory
config = StreamingConfig(chunk_size=50, max_memory_mb=100)
streaming = StreamingChecker(checker, config=config)
with open("huge_file.txt") as f:
for result in streaming.check_stream(f):
pass # Memory stays bounded
Error Handling
Sync Error Recovery
def safe_process(filepath: str):
checker = SpellChecker()
streaming = StreamingChecker(checker)
try:
with open(filepath, encoding="utf-8") as f:
for result in streaming.check_stream(f):
if "error" in result.response.metadata:
print(f"Line {result.line_number} had an error")
else:
print(f"Line {result.line_number}: OK")
except IOError as e:
print(f"File error: {e}")
Async Timeout Recovery
config = StreamingConfig(timeout_per_chunk=5.0)
async for result in streaming.check_stream_async(reader):
if result.response.metadata.get("error") == "Timeout exceeded":
print(f"Timeout on line {result.line_number}")
continue
Best Practices
- Reuse
StreamingChecker instances because creating a new SpellChecker per file is expensive
- Choose chunk size by use case: 500 for throughput, 1 for real-time
- Use async for I/O-bound workloads such as network sources and file I/O with aiofiles
- Set appropriate validation level:
SYLLABLE for speed, WORD for thoroughness
Integration Examples
WebSocket Streaming
import websockets
import json
async def websocket_spellcheck(websocket):
checker = SpellChecker()
streaming = StreamingChecker(checker)
async def receive_lines():
async for message in websocket:
yield message
async for result in streaming.check_stream_async(receive_lines()):
await websocket.send(json.dumps({
"line": result.line_number,
"has_errors": result.response.has_errors,
"errors": [e.to_dict() for e in result.response.errors],
}))
FastAPI File Upload
from fastapi import FastAPI, UploadFile
from fastapi.responses import StreamingResponse
import json
app = FastAPI()
@app.post("/check-file")
async def check_file(file: UploadFile):
checker = SpellChecker()
streaming = StreamingChecker(checker)
async def generate_results():
async for line in file:
text = line.decode("utf-8").strip()
if text:
response = await checker.check_async(text)
yield json.dumps({
"text": text,
"errors": [e.to_dict() for e in response.errors],
}) + "\n"
return StreamingResponse(generate_results(), media_type="application/x-ndjson")
Batch File Processing
import glob
def process_files(pattern: str):
checker = SpellChecker()
streaming = StreamingChecker(checker)
all_errors = []
for filepath in glob.glob(pattern):
with open(filepath) as f:
for result in streaming.check_stream(f):
if result.response.has_errors:
all_errors.append({
"file": filepath,
"line": result.line_number,
"errors": result.response.errors,
})
return all_errors
See Also