Documentation Index
Fetch the complete documentation index at: https://docs.myspellchecker.com/llms.txt
Use this file to discover all available pages before exploring further.
When multiple threads access the spell checker concurrently (e.g., in a web server), the connection pool manages SQLite connections safely by creating them on demand, validating before use, and recycling aged connections.
Overview
from myspellchecker.providers.connection_pool import ConnectionPool
from myspellchecker.core.config import ConnectionPoolConfig
pool = ConnectionPool("/path/to/database.db", pool_config=ConnectionPoolConfig(min_size=2, max_size=10))
with pool.checkout() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM words WHERE word = ?", ("မြန်မာ",))
result = cursor.fetchone()
Features
| Feature | Description |
|---|
| Thread-safe | Safe for concurrent access from multiple threads |
| Auto-scaling | Creates connections up to max_size on demand |
| Health checks | Validates connections before use |
| Connection aging | Recreates old connections automatically |
| Statistics | Monitors pool performance |
Configuration
ConnectionPoolConfig
from myspellchecker.core.config import ConnectionPoolConfig
config = ConnectionPoolConfig(
min_size=2, # Minimum connections to maintain
max_size=10, # Maximum connections allowed
timeout=5.0, # Seconds to wait for connection
max_connection_age=3600, # Max age before recreation (seconds)
check_same_thread=False, # Allow cross-thread connection use
)
pool = ConnectionPool("/path/to/db.sqlite", pool_config=config)
Configuration Options
| Option | Default | Description |
|---|
min_size | 2 | Minimum pool size (pre-created) |
max_size | 10 | Maximum pool size |
timeout | 5.0 | Checkout timeout in seconds |
max_connection_age | 3600 | Max connection age in seconds |
check_same_thread | False | SQLite threading check |
sqlite_timeout | 30.0 | SQLite busy timeout in seconds |
skip_health_check | False | Skip connection health validation on checkout |
Basic Usage
Context Manager
from myspellchecker.providers.connection_pool import ConnectionPool
pool = ConnectionPool("/path/to/db.sqlite")
# Connection automatically returned to pool
with pool.checkout() as conn:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM words")
count = cursor.fetchone()[0]
print(f"Total words: {count}")
Pool Lifecycle
# Create pool
pool_config = ConnectionPoolConfig(min_size=2, max_size=10)
pool = ConnectionPool("/path/to/db.sqlite", pool_config=pool_config)
# Use connections
with pool.checkout() as conn:
# ... use connection ...
pass
# Clean shutdown
pool.close_all()
As Context Manager
with ConnectionPool("/path/to/db.sqlite") as pool:
with pool.checkout() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM words LIMIT 10")
# Pool automatically closed on exit
Thread Safety
Multi-threaded Usage
import threading
from myspellchecker.providers.connection_pool import ConnectionPool
from myspellchecker.core.config.validation_configs import ConnectionPoolConfig
pool_config = ConnectionPoolConfig(min_size=2, max_size=10)
pool = ConnectionPool("/path/to/db.sqlite", pool_config=pool_config)
def worker(word):
with pool.checkout() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM words WHERE word = ?", (word,))
return cursor.fetchone()
# Safe to use from multiple threads
threads = []
for word in ["မြန်မာ", "နိုင်ငံ", "ကျောင်း"]:
t = threading.Thread(target=worker, args=(word,))
threads.append(t)
t.start()
for t in threads:
t.join()
Thread Isolation
By default, check_same_thread=False allows connections to be used across threads:
# Enable SQLite's thread safety (default)
pool = ConnectionPool(db_path, pool_config=ConnectionPoolConfig(
check_same_thread=False
))
# Strict thread checking (connections bound to creating thread)
pool = ConnectionPool(db_path, pool_config=ConnectionPoolConfig(
check_same_thread=True
))
Connection Health
Automatic Health Checks
The pool validates connections before checkout and on return:
# Health check query
def _is_connection_healthy(conn):
cursor = conn.cursor()
cursor.execute("SELECT 1")
cursor.fetchone()
return True
Connection Recreation
Old or unhealthy connections are automatically recreated:
config = ConnectionPoolConfig(
max_connection_age=1800, # Recreate after 30 minutes
)
pool = ConnectionPool(db_path, pool_config=config)
# Connections older than max_connection_age are recreated on checkout
Error Handling
Timeout Handling
from myspellchecker.providers.connection_pool import ConnectionPool
config = ConnectionPoolConfig(timeout=2.0) # 2 second timeout
pool = ConnectionPool(db_path, pool_config=config)
try:
with pool.checkout() as conn:
# ... use connection ...
pass
except TimeoutError as e:
print(f"Pool exhausted: {e}")
# Consider increasing max_size or reducing hold time
Database Errors
try:
with pool.checkout() as conn:
cursor = conn.cursor()
cursor.execute("INVALID SQL")
except sqlite3.Error as e:
print(f"Database error: {e}")
# Connection is still returned to pool (if healthy)
Statistics and Monitoring
Get Pool Stats
stats = pool.get_stats()
print(f"Pool size: {stats['pool_size']}")
print(f"Active: {stats['active_connections']}")
print(f"Available: {stats['available_connections']}")
print(f"Total checkouts: {stats['total_checkouts']}")
print(f"Avg wait time: {stats['average_wait_time_ms']}ms")
print(f"Peak active: {stats['peak_active']}")
Stats Fields
| Field | Description |
|---|
pool_size | Current total connections |
active_connections | Total connections created (including in-use and available) |
available_connections | Connections ready for checkout |
total_checkouts | Total checkout operations |
average_wait_time_ms | Average wait for connection |
peak_active | Maximum concurrent checkouts |
min_size | Configured minimum pool size |
max_size | Configured maximum pool size |
Monitoring Example
import time
def monitor_pool(pool, interval=5):
while True:
stats = pool.get_stats()
print(f"[{time.strftime('%H:%M:%S')}] "
f"Available: {stats['available_connections']}/{stats['max_size']}, "
f"Checkouts: {stats['total_checkouts']}, "
f"Avg wait: {stats['average_wait_time_ms']:.1f}ms")
time.sleep(interval)
Integration with Providers
SQLiteProvider Usage
from myspellchecker.providers import SQLiteProvider
# SQLiteProvider uses connection pooling internally
# Defaults: pool_min_size=1, pool_max_size=5 (from ProviderConfig)
provider = SQLiteProvider(
database_path="/path/to/db.sqlite",
pool_min_size=1, # Default: 1
pool_max_size=5, # Default: 5
)
# Provider handles pool management
with provider:
result = provider.is_valid_word("မြန်မာ")
SpellChecker Integration
from myspellchecker import SpellChecker
from myspellchecker.core.config import SpellCheckerConfig
from myspellchecker.core.config.validation_configs import ProviderConfig
config = SpellCheckerConfig(
provider_config=ProviderConfig(
pool_min_size=1,
pool_max_size=5,
pool_timeout=5.0,
),
)
checker = SpellChecker(config=config)
Pool Sizing
| Scenario | min_size | max_size | Notes |
|---|
| Single-threaded | 1 | 1 | No pooling needed |
| Light multi-threaded | 2 | 10 | Default configuration |
| Heavy multi-threaded | 5 | 20 | Web server workloads |
| Batch processing | 1 | CPU cores | Match worker threads |
Timeout Tuning
# Fast-fail for real-time applications
config = ConnectionPoolConfig(timeout=1.0)
# Patient waiting for batch processing
config = ConnectionPoolConfig(timeout=30.0)
Connection Age
# Short-lived connections (high churn environments)
config = ConnectionPoolConfig(max_connection_age=300) # 5 minutes
# Long-lived connections (stable environments)
config = ConnectionPoolConfig(max_connection_age=7200) # 2 hours
PooledConnection
Internal connection wrapper with metadata:
@dataclass
class PooledConnection:
connection: sqlite3.Connection
created_at: float # Creation timestamp
last_used: float # Last checkout timestamp
use_count: int # Total checkouts
Best Practices
Always Use Context Manager
# Good: Connection automatically returned
with pool.checkout() as conn:
do_work(conn)
# Bad: Manual management can leak connections
conn = pool.checkout() # This doesn't work - use context manager!
Close Pool on Shutdown
# Explicit close
pool.close_all()
# Or use context manager
with ConnectionPool(db_path) as pool:
# ... use pool ...
pass # Pool closed automatically
Monitor Pool Usage
# Log stats periodically
stats = pool.get_stats()
if stats['average_wait_time_ms'] > 100:
logger.warning("High pool wait time - consider increasing max_size")
if stats['peak_active'] == stats['max_size']:
logger.warning("Pool reached maximum - consider increasing max_size")
Handle Exhaustion Gracefully
import time
def resilient_checkout(pool, max_retries=3):
for attempt in range(max_retries):
try:
return pool.checkout()
except TimeoutError:
if attempt < max_retries - 1:
time.sleep(0.1 * (2 ** attempt)) # Exponential backoff
else:
raise
See Also