Skip to main content
When multiple threads access the spell checker concurrently (e.g., in a web server), the connection pool manages SQLite connections safely — creating them on demand, validating before use, and recycling aged connections.

Overview

from myspellchecker.providers.connection_pool import ConnectionPool
from myspellchecker.core.config import ConnectionPoolConfig

pool = ConnectionPool("/path/to/database.db", pool_config=ConnectionPoolConfig(min_size=2, max_size=10))

with pool.checkout() as conn:
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM words WHERE word = ?", ("မြန်မာ",))
    result = cursor.fetchone()

Features

FeatureDescription
Thread-safeSafe for concurrent access from multiple threads
Auto-scalingCreates connections up to max_size on demand
Health checksValidates connections before use
Connection agingRecreates old connections automatically
StatisticsMonitors pool performance

Configuration

ConnectionPoolConfig

from myspellchecker.core.config import ConnectionPoolConfig

config = ConnectionPoolConfig(
    min_size=2,              # Minimum connections to maintain
    max_size=10,             # Maximum connections allowed
    timeout=5.0,             # Seconds to wait for connection
    max_connection_age=3600, # Max age before recreation (seconds)
    check_same_thread=False, # Allow cross-thread connection use
)

pool = ConnectionPool("/path/to/db.sqlite", pool_config=config)

Configuration Options

OptionDefaultDescription
min_size2Minimum pool size (pre-created)
max_size10Maximum pool size
timeout5.0Checkout timeout in seconds
max_connection_age3600Max connection age in seconds
check_same_threadFalseSQLite threading check
sqlite_timeout30.0SQLite busy timeout in seconds

Basic Usage

Context Manager

from myspellchecker.providers.connection_pool import ConnectionPool

pool = ConnectionPool("/path/to/db.sqlite")

# Connection automatically returned to pool
with pool.checkout() as conn:
    cursor = conn.cursor()
    cursor.execute("SELECT COUNT(*) FROM words")
    count = cursor.fetchone()[0]
    print(f"Total words: {count}")

Pool Lifecycle

# Create pool
pool_config = ConnectionPoolConfig(min_size=2, max_size=10)
pool = ConnectionPool("/path/to/db.sqlite", pool_config=pool_config)

# Use connections
with pool.checkout() as conn:
    # ... use connection ...
    pass

# Clean shutdown
pool.close_all()

As Context Manager

with ConnectionPool("/path/to/db.sqlite") as pool:
    with pool.checkout() as conn:
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM words LIMIT 10")
# Pool automatically closed on exit

Thread Safety

Multi-threaded Usage

import threading
from myspellchecker.providers.connection_pool import ConnectionPool
from myspellchecker.core.config.validation_configs import ConnectionPoolConfig

pool_config = ConnectionPoolConfig(min_size=2, max_size=10)
pool = ConnectionPool("/path/to/db.sqlite", pool_config=pool_config)

def worker(word):
    with pool.checkout() as conn:
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM words WHERE word = ?", (word,))
        return cursor.fetchone()

# Safe to use from multiple threads
threads = []
for word in ["မြန်မာ", "နိုင်ငံ", "ကျောင်း"]:
    t = threading.Thread(target=worker, args=(word,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

Thread Isolation

By default, check_same_thread=False allows connections to be used across threads:
# Enable SQLite's thread safety (default)
pool = ConnectionPool(db_path, pool_config=ConnectionPoolConfig(
    check_same_thread=False
))

# Strict thread checking (connections bound to creating thread)
pool = ConnectionPool(db_path, pool_config=ConnectionPoolConfig(
    check_same_thread=True
))

Connection Health

Automatic Health Checks

The pool validates connections before checkout and on return:
# Health check query
def _is_connection_healthy(conn):
    cursor = conn.cursor()
    cursor.execute("SELECT 1")
    cursor.fetchone()
    return True

Connection Recreation

Old or unhealthy connections are automatically recreated:
config = ConnectionPoolConfig(
    max_connection_age=1800,  # Recreate after 30 minutes
)

pool = ConnectionPool(db_path, pool_config=config)

# Connections older than max_connection_age are recreated on checkout

Error Handling

Timeout Handling

from myspellchecker.providers.connection_pool import ConnectionPool

config = ConnectionPoolConfig(timeout=2.0)  # 2 second timeout
pool = ConnectionPool(db_path, pool_config=config)

try:
    with pool.checkout() as conn:
        # ... use connection ...
        pass
except TimeoutError as e:
    print(f"Pool exhausted: {e}")
    # Consider increasing max_size or reducing hold time

Database Errors

try:
    with pool.checkout() as conn:
        cursor = conn.cursor()
        cursor.execute("INVALID SQL")
except sqlite3.Error as e:
    print(f"Database error: {e}")
    # Connection is still returned to pool (if healthy)

Statistics and Monitoring

Get Pool Stats

stats = pool.get_stats()

print(f"Pool size: {stats['pool_size']}")
print(f"Active: {stats['active_connections']}")
print(f"Available: {stats['available_connections']}")
print(f"Total checkouts: {stats['total_checkouts']}")
print(f"Avg wait time: {stats['average_wait_time_ms']}ms")
print(f"Peak active: {stats['peak_active']}")

Stats Fields

FieldDescription
pool_sizeCurrent total connections
active_connectionsTotal connections created (including in-use and available)
available_connectionsConnections ready for checkout
total_checkoutsTotal checkout operations
average_wait_time_msAverage wait for connection
peak_activeMaximum concurrent checkouts
min_sizeConfigured minimum pool size
max_sizeConfigured maximum pool size

Monitoring Example

import time

def monitor_pool(pool, interval=5):
    while True:
        stats = pool.get_stats()
        print(f"[{time.strftime('%H:%M:%S')}] "
              f"Available: {stats['available_connections']}/{stats['max_size']}, "
              f"Checkouts: {stats['total_checkouts']}, "
              f"Avg wait: {stats['average_wait_time_ms']:.1f}ms")
        time.sleep(interval)

Integration with Providers

SQLiteProvider Usage

from myspellchecker.providers import SQLiteProvider

# SQLiteProvider uses connection pooling internally
# Defaults: pool_min_size=1, pool_max_size=5
provider = SQLiteProvider(
    database_path="/path/to/db.sqlite",
    pool_min_size=2,   # Override default of 1
    pool_max_size=10,  # Override default of 5
)

# Provider handles pool management
with provider:
    result = provider.is_valid_word("မြန်မာ")

SpellChecker Integration

from myspellchecker import SpellChecker
from myspellchecker.core.config import SpellCheckerConfig
from myspellchecker.core.config.validation_configs import ProviderConfig

config = SpellCheckerConfig(
    provider_config=ProviderConfig(
        pool_min_size=2,
        pool_max_size=10,
        pool_timeout=5.0,
    ),
)

checker = SpellChecker(config=config)

Performance Tuning

Pool Sizing

Scenariomin_sizemax_sizeNotes
Single-threaded11No pooling needed
Light multi-threaded25Default configuration
Heavy multi-threaded520Web server workloads
Batch processing1CPU coresMatch worker threads

Timeout Tuning

# Fast-fail for real-time applications
config = ConnectionPoolConfig(timeout=1.0)

# Patient waiting for batch processing
config = ConnectionPoolConfig(timeout=30.0)

Connection Age

# Short-lived connections (high churn environments)
config = ConnectionPoolConfig(max_connection_age=300)  # 5 minutes

# Long-lived connections (stable environments)
config = ConnectionPoolConfig(max_connection_age=7200)  # 2 hours

PooledConnection

Internal connection wrapper with metadata:
@dataclass
class PooledConnection:
    connection: sqlite3.Connection
    created_at: float    # Creation timestamp
    last_used: float     # Last checkout timestamp
    use_count: int       # Total checkouts

Best Practices

Always Use Context Manager

# Good: Connection automatically returned
with pool.checkout() as conn:
    do_work(conn)

# Bad: Manual management can leak connections
conn = pool.checkout()  # This doesn't work - use context manager!

Close Pool on Shutdown

# Explicit close
pool.close_all()

# Or use context manager
with ConnectionPool(db_path) as pool:
    # ... use pool ...
pass  # Pool closed automatically

Monitor Pool Usage

# Log stats periodically
stats = pool.get_stats()
if stats['average_wait_time_ms'] > 100:
    logger.warning("High pool wait time - consider increasing max_size")

if stats['peak_active'] == stats['max_size']:
    logger.warning("Pool reached maximum - consider increasing max_size")

Handle Exhaustion Gracefully

import time

def resilient_checkout(pool, max_retries=3):
    for attempt in range(max_retries):
        try:
            return pool.checkout()
        except TimeoutError:
            if attempt < max_retries - 1:
                time.sleep(0.1 * (2 ** attempt))  # Exponential backoff
            else:
                raise

See Also