Skip to main content
When checking files that don’t fit in memory (10 GB+ corpora, server log streams), the standard check() API won’t work. The streaming API keeps memory bounded by processing one line at a time and yielding results as they’re ready, with backpressure control to prevent downstream bottlenecks.

Quick Start

from myspellchecker import SpellChecker
from myspellchecker.core.streaming import StreamingChecker

checker = SpellChecker()
streaming = StreamingChecker(checker)

# Process large file with bounded memory
with open("large_file.txt", encoding="utf-8") as f:
    for result in streaming.check_stream(f):
        if result.response.has_errors:
            for error in result.response.errors:
                print(f"Line {result.line_number}: {error.text}")

StreamingConfig

from myspellchecker.core.streaming import StreamingConfig

config = StreamingConfig(
    chunk_size=100,                      # Lines per chunk
    max_memory_mb=100,                   # Memory limit before backpressure
    sentence_boundary_pattern=r"[။!?]+", # Myanmar sentence endings
    enable_cross_sentence_context=True,  # Context across sentences
    progress_interval=1000,              # Lines between callbacks
    timeout_per_chunk=30.0,              # Async timeout per chunk
)

streaming = StreamingChecker(checker, config=config)
OptionDefaultDescription
chunk_size100Lines processed per chunk
max_memory_mb100Max memory before backpressure
sentence_boundary_patternr"[။!?]+"Regex for sentence boundaries
enable_cross_sentence_contextTrueEnable cross-sentence context
progress_interval1000Lines between progress callbacks
timeout_per_chunk30.0Async processing timeout (seconds)

Configuration Presets

# High-throughput (large files on powerful hardware)
high_throughput = StreamingConfig(chunk_size=500, max_memory_mb=500, progress_interval=5000)

# Low-memory (constrained environments)
low_memory = StreamingConfig(chunk_size=10, max_memory_mb=50, progress_interval=100)

# Real-time (per-line processing)
realtime = StreamingConfig(chunk_size=1, max_memory_mb=100, timeout_per_chunk=5.0)

Synchronous Streaming

Processing Files

from myspellchecker.core.streaming import StreamingChecker

streaming = StreamingChecker(checker)

with open("document.txt", encoding="utf-8") as f:
    for result in streaming.check_stream(f):
        if result.response.has_errors:
            for error in result.response.errors:
                print(f"Line {result.line_number}: {error.text}")

Processing Different Input Types

# From file handle
with open("file.txt") as f:
    for result in streaming.check_stream(f):
        pass

# From StringIO
from io import StringIO
text_io = StringIO("Line 1\nLine 2\nLine 3")
for result in streaming.check_stream(text_io):
    pass

# From any iterable
lines = ["Line 1", "Line 2", "Line 3"]
for result in streaming.check_stream(iter(lines)):
    pass

Sentence-by-Sentence Processing

Process text by sentences with context preservation:
text = """
ဤစာကြောင်းသည် ပထမစာကြောင်းဖြစ်သည်။
ဒုတိယစာကြောင်းသည် ပိုရှည်သည်။
"""

for result in streaming.check_sentences(text):
    print(f"Sentence {result.chunk_index}: {result.response.text}")
    if result.response.has_errors:
        print(f"  Errors: {result.response.errors}")

Cross-Sentence Context

When enable_cross_sentence_context=True, the checker validates using context from the previous sentence:
config = StreamingConfig(enable_cross_sentence_context=True)
streaming = StreamingChecker(checker, config=config)

for result in streaming.check_sentences(text):
    pass

Async Streaming

Basic Async

import asyncio
import aiofiles
from myspellchecker import SpellChecker
from myspellchecker.core.streaming import StreamingChecker

async def process_file():
    checker = SpellChecker()
    streaming = StreamingChecker(checker)

    async with aiofiles.open("large_file.txt") as f:
        async for result in streaming.check_stream_async(f):
            if result.response.has_errors:
                for error in result.response.errors:
                    print(f"Line {result.line_number}: {error.text}{error.suggestions}")

asyncio.run(process_file())

Async with Timeout

config = StreamingConfig(timeout_per_chunk=10.0)
streaming = StreamingChecker(checker, config=config)

async for result in streaming.check_stream_async(reader):
    if "error" in result.response.metadata:
        print(f"Timeout on line {result.line_number}")

Cancellation

async def cancellable_check(filepath: str, cancel_event: asyncio.Event):
    checker = SpellChecker()
    streaming = StreamingChecker(checker)

    async def read_lines():
        import aiofiles
        async with aiofiles.open(filepath) as f:
            async for line in f:
                if cancel_event.is_set():
                    return
                yield line

    try:
        async for result in streaming.check_stream_async(read_lines()):
            if cancel_event.is_set():
                break
            print(f"Checked line {result.line_number}")
    except asyncio.CancelledError:
        print("Processing was cancelled")
        raise

Graceful Shutdown

import signal

async def graceful_streaming():
    checker = SpellChecker()
    streaming = StreamingChecker(checker)
    shutdown_event = asyncio.Event()

    def handle_signal():
        shutdown_event.set()

    loop = asyncio.get_event_loop()
    loop.add_signal_handler(signal.SIGINT, handle_signal)
    loop.add_signal_handler(signal.SIGTERM, handle_signal)

    async def read_lines():
        for i in range(10000):
            if shutdown_event.is_set():
                return
            yield f"Line {i}\n"
            await asyncio.sleep(0.001)

    async for result in streaming.check_stream_async(read_lines()):
        if shutdown_event.is_set():
            print("Shutting down gracefully...")
            break
        print(f"Checked line {result.line_number}")

Progress Tracking

Progress Callbacks

from myspellchecker.core.streaming import StreamingConfig, StreamingStats

def on_progress(stats: StreamingStats):
    print(f"Progress: {stats.lines_processed} lines, "
          f"{stats.lines_per_second:.1f} lines/sec, "
          f"Memory: {stats.current_memory_mb:.1f}MB")

config = StreamingConfig(progress_interval=100)
streaming = StreamingChecker(checker, config=config)

stats = StreamingStats()
with open("large_file.txt") as f:
    for result in streaming.check_stream(f, on_progress=on_progress, stats=stats):
        pass

# Final statistics
print(f"Total time: {stats.elapsed_time:.2f}s")
print(f"Total errors: {stats.errors_found}")

With tqdm Progress Bar

from tqdm import tqdm
from myspellchecker.core.streaming import StreamingStats

def check_with_progress(filepath: str):
    checker = SpellChecker()
    streaming = StreamingChecker(checker)

    with open(filepath) as f:
        total_lines = sum(1 for _ in f)

    stats = StreamingStats()

    with open(filepath) as f:
        with tqdm(total=total_lines, desc="Checking") as pbar:
            for result in streaming.check_stream(f, stats=stats):
                pbar.update(1)
                pbar.set_postfix({
                    "errors": stats.errors_found,
                    "rate": f"{stats.lines_per_second:.0f}/s"
                })

With Rich Progress Bar

from rich.progress import Progress

with Progress() as progress:
    task = progress.add_task("Checking...", total=total_lines)

    def on_progress(stats):
        progress.update(task, completed=stats.lines_processed)

    for result in streaming.check_stream(f, on_progress=on_progress):
        pass

StreamingStats

from myspellchecker.core.streaming import StreamingStats

stats = StreamingStats()

for result in streaming.check_stream(f, stats=stats):
    pass

# Available metrics
print(stats.bytes_processed)       # Total bytes processed
print(stats.lines_processed)       # Total lines processed
print(stats.sentences_processed)   # Sentences processed
print(stats.errors_found)          # Total errors found
print(stats.chunks_processed)      # Chunks processed
print(stats.elapsed_time)          # Seconds since start
print(stats.lines_per_second)      # Processing rate
print(stats.current_memory_mb)     # Current memory usage

# Serialize to dict
stats_dict = stats.to_dict()

ChunkResult

Each iteration yields a ChunkResult:
from myspellchecker.core.streaming import ChunkResult

for result in streaming.check_stream(f):
    result.response       # Response object with errors
    result.line_number    # Line number in source file
    result.chunk_index    # Sequential chunk number
    result.is_final       # True for last chunk

Memory Management

Backpressure

When memory exceeds max_memory_mb, the streaming checker automatically applies backpressure:
  1. Sync mode: Triggers garbage collection and adds a small delay
  2. Async mode: Adds an async sleep to allow cleanup
config = StreamingConfig(max_memory_mb=50)
streaming = StreamingChecker(checker, config=config)

# Automatic GC and delay when limit exceeded
for result in streaming.check_stream(f):
    pass

Bounded Memory Usage

# Process 10GB file with ~100MB memory
config = StreamingConfig(chunk_size=50, max_memory_mb=100)
streaming = StreamingChecker(checker, config=config)

with open("huge_file.txt") as f:
    for result in streaming.check_stream(f):
        pass  # Memory stays bounded

Error Handling

Sync Error Recovery

def safe_process(filepath: str):
    checker = SpellChecker()
    streaming = StreamingChecker(checker)

    try:
        with open(filepath, encoding="utf-8") as f:
            for result in streaming.check_stream(f):
                if "error" in result.response.metadata:
                    print(f"Line {result.line_number} had an error")
                else:
                    print(f"Line {result.line_number}: OK")
    except IOError as e:
        print(f"File error: {e}")

Async Timeout Recovery

config = StreamingConfig(timeout_per_chunk=5.0)

async for result in streaming.check_stream_async(reader):
    if result.response.metadata.get("error") == "Timeout exceeded":
        print(f"Timeout on line {result.line_number}")
        continue

Best Practices

  1. Reuse StreamingChecker instances because creating a new SpellChecker per file is expensive
  2. Choose chunk size by use case: 500 for throughput, 1 for real-time
  3. Use async for I/O-bound workloads such as network sources and file I/O with aiofiles
  4. Set appropriate validation level: SYLLABLE for speed, WORD for thoroughness

Integration Examples

WebSocket Streaming

import websockets
import json

async def websocket_spellcheck(websocket):
    checker = SpellChecker()
    streaming = StreamingChecker(checker)

    async def receive_lines():
        async for message in websocket:
            yield message

    async for result in streaming.check_stream_async(receive_lines()):
        await websocket.send(json.dumps({
            "line": result.line_number,
            "has_errors": result.response.has_errors,
            "errors": [e.to_dict() for e in result.response.errors],
        }))

FastAPI File Upload

from fastapi import FastAPI, UploadFile
from fastapi.responses import StreamingResponse
import json

app = FastAPI()

@app.post("/check-file")
async def check_file(file: UploadFile):
    checker = SpellChecker()
    streaming = StreamingChecker(checker)

    async def generate_results():
        async for line in file:
            text = line.decode("utf-8").strip()
            if text:
                response = await checker.check_async(text)
                yield json.dumps({
                    "text": text,
                    "errors": [e.to_dict() for e in response.errors],
                }) + "\n"

    return StreamingResponse(generate_results(), media_type="application/x-ndjson")

Batch File Processing

import glob

def process_files(pattern: str):
    checker = SpellChecker()
    streaming = StreamingChecker(checker)
    all_errors = []

    for filepath in glob.glob(pattern):
        with open(filepath) as f:
            for result in streaming.check_stream(f):
                if result.response.has_errors:
                    all_errors.append({
                        "file": filepath,
                        "line": result.line_number,
                        "errors": result.response.errors,
                    })

    return all_errors

See Also