Synchronous spell checking blocks the event loop for ~50-200ms per call. The async API offloads CPU-bound checking to a thread pool, keeping your web server responsive under concurrent load.
Why Async?
Synchronous spell checking blocks the event loop:
# Blocking - can't handle other requests
result = checker.check(text) # Blocks for ~50-200ms
Async spell checking allows concurrent handling:
# Non-blocking - event loop can handle other tasks
result = await checker.check_async(text)
Basic Usage
Async Check
import asyncio
from myspellchecker import SpellChecker
async def main():
checker = SpellChecker()
# Single async check
result = await checker.check_async("မြန်မာနိုင်ငံ")
print(f"Errors: {len(result.errors)}")
asyncio.run(main())
Async Batch
async def check_multiple():
checker = SpellChecker()
texts = [
"မြန်မာနိုင်ငံ",
"ကျေးဇူးတင်ပါသည်",
"နေကောင်းလား",
]
# Async batch check
results = await checker.check_batch_async(texts)
for text, result in zip(texts, results):
print(f"{text}: {len(result.errors)} errors")
asyncio.run(check_multiple())
Concurrent Checks
async def concurrent_checks():
checker = SpellChecker()
texts = ["text1", "text2", "text3", "text4"]
# Run checks concurrently
tasks = [checker.check_async(text) for text in texts]
results = await asyncio.gather(*tasks)
return results
Web Framework Integration
FastAPI
from fastapi import FastAPI
from pydantic import BaseModel
from myspellchecker import SpellChecker
app = FastAPI()
checker = SpellChecker()
class CheckRequest(BaseModel):
text: str
class CheckResponse(BaseModel):
has_errors: bool
error_count: int
errors: list
@app.post("/check", response_model=CheckResponse)
async def check_spelling(request: CheckRequest):
result = await checker.check_async(request.text)
return CheckResponse(
has_errors=result.has_errors,
error_count=len(result.errors),
errors=[
{
"position": e.position,
"text": e.text,
"type": str(e.error_type),
"suggestions": e.suggestions[:3],
}
for e in result.errors
]
)
@app.post("/check/batch")
async def check_batch(texts: list[str]):
results = await checker.check_batch_async(texts)
return [
{
"text": r.text,
"has_errors": r.has_errors,
"error_count": len(r.errors),
}
for r in results
]
Starlette
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route
from myspellchecker import SpellChecker
checker = SpellChecker()
async def check_endpoint(request):
data = await request.json()
text = data.get("text", "")
result = await checker.check_async(text)
return JSONResponse({
"has_errors": result.has_errors,
"errors": [e.to_dict() for e in result.errors],
})
app = Starlette(routes=[
Route("/check", check_endpoint, methods=["POST"]),
])
aiohttp
from aiohttp import web
from myspellchecker import SpellChecker
checker = SpellChecker()
async def check_handler(request):
data = await request.json()
text = data.get("text", "")
result = await checker.check_async(text)
return web.json_response({
"has_errors": result.has_errors,
"errors": [e.to_dict() for e in result.errors],
})
app = web.Application()
app.router.add_post("/check", check_handler)
Configuration
Async Settings
The async API runs CPU-intensive logic in a thread pool to avoid blocking the event loop. Configuration is handled at the method level:
from myspellchecker import SpellChecker
from myspellchecker.core.constants import ValidationLevel
checker = SpellChecker()
# Specify validation level for async calls
result = await checker.check_async(text, level=ValidationLevel.WORD)
# Control concurrency for batch async
results = await checker.check_batch_async(texts, max_concurrency=4)
Connection Pool for High Concurrency
from myspellchecker import SpellChecker
from myspellchecker.providers import SQLiteProvider
# Use connection pool for concurrent access
provider = SQLiteProvider(pool_max_size=10)
checker = SpellChecker(provider=provider)
Patterns
Rate Limiting
import asyncio
from asyncio import Semaphore
class RateLimitedChecker:
def __init__(self, max_concurrent: int = 10):
self.checker = SpellChecker()
self.semaphore = Semaphore(max_concurrent)
async def check(self, text: str):
async with self.semaphore:
return await self.checker.check_async(text)
async def check_batch(self, texts: list[str]):
tasks = [self.check(text) for text in texts]
return await asyncio.gather(*tasks)
# Usage
limited_checker = RateLimitedChecker(max_concurrent=5)
results = await limited_checker.check_batch(texts)
Timeout Handling
import asyncio
async def check_with_timeout(text: str, timeout: float = 5.0):
"""Check with timeout protection."""
try:
result = await asyncio.wait_for(
checker.check_async(text),
timeout=timeout
)
return result
except asyncio.TimeoutError:
return {"error": "timeout", "text": text}
Background Processing
import asyncio
from collections import deque
class BackgroundChecker:
def __init__(self):
self.checker = SpellChecker()
self.queue = deque()
self.results = {}
self._running = False
async def start(self):
"""Start background processing."""
self._running = True
while self._running:
if self.queue:
text_id, text = self.queue.popleft()
result = await self.checker.check_async(text)
self.results[text_id] = result
else:
await asyncio.sleep(0.01)
def submit(self, text_id: str, text: str):
"""Submit text for background processing."""
self.queue.append((text_id, text))
def get_result(self, text_id: str):
"""Get result if available."""
return self.results.get(text_id)
def stop(self):
self._running = False
Streaming WebSocket
from fastapi import FastAPI, WebSocket
from myspellchecker import SpellChecker
app = FastAPI()
checker = SpellChecker()
@app.websocket("/ws/check")
async def websocket_check(websocket: WebSocket):
await websocket.accept()
try:
while True:
text = await websocket.receive_text()
result = await checker.check_async(text)
await websocket.send_json({
"has_errors": result.has_errors,
"errors": [e.to_dict() for e in result.errors],
})
except Exception:
await websocket.close()
Async vs Sync Comparison
The async API significantly improves throughput under concurrent load by offloading CPU-bound work to a thread pool. Actual numbers depend on hardware, text length, validation level, and concurrency settings.
These are illustrative numbers from a single test run, not formal benchmarks. Run your own load tests for production capacity planning.
# Illustrative example (100 concurrent requests, 4-core CPU):
# Sync (sequential): ~6-7 req/sec
# Async (concurrent): ~30-35 req/sec
# Async (connection pool): ~50-55 req/sec
Optimization Tips
- Use connection pooling for database access
- Limit concurrency to prevent resource exhaustion
- Set appropriate timeouts for production
- Cache frequently checked texts
from functools import lru_cache
@lru_cache(maxsize=1000)
def get_cached_result(text: str):
# For sync fallback
return checker.check(text)
async def check_with_cache(text: str):
# Check cache first
cached = get_cached_result.__wrapped__(text)
if cached:
return cached
return await checker.check_async(text)
API Reference
check_async
async def check_async(
text: str,
level: ValidationLevel = ValidationLevel.SYLLABLE,
use_semantic: bool | None = None,
) -> Response:
"""
Asynchronously check text for spelling errors.
Args:
text: Text to check
level: Validation level (SYLLABLE or WORD)
use_semantic: Override semantic checking for this call
Returns:
Response object
"""
check_batch_async
async def check_batch_async(
texts: list[str],
level: ValidationLevel = ValidationLevel.SYLLABLE,
max_concurrency: int = 4,
use_semantic: bool | None = None,
) -> list[Response]:
"""
Asynchronously check multiple texts.
Args:
texts: List of texts to check
level: Validation level (SYLLABLE or WORD)
max_concurrency: Maximum concurrent checks (default: 4)
use_semantic: Override semantic checking (None uses config default)
Returns:
List of Response objects
"""
Troubleshooting
Issue: Event loop blocked
Cause: Synchronous code in async context
Solution: Ensure all I/O is async:
# Bad: blocks event loop
result = checker.check(text)
# Good: non-blocking
result = await checker.check_async(text)
Issue: “RuntimeError: Event loop is closed”
Cause: Checker used after loop closed
Solution: Create checker within async function scope:
async def main():
with SpellChecker() as checker:
result = await checker.check_async(text)
Note: SpellChecker uses synchronous context manager (with), not async (async with). The async methods work within this context.
Issue: Database connection errors
Cause: Concurrent access without pooling
Solution: Enable connection pooling:
provider = SQLiteProvider(pool_max_size=10)
checker = SpellChecker(provider=provider)
Issue: High memory usage with many connections
Cause: Too many concurrent checks
Solution: Limit concurrency:
results = await checker.check_batch_async(
texts,
max_concurrency=5 # Limit concurrent checks
)
Next Steps