Working with databases in Python offers several approaches, each with distinct trade-offs. This guide covers the main options—raw SQL, query builders, and ORMs—with practical advice on when to reach for each.
The Spectrum of Abstraction
Database access in Python sits on a spectrum from low-level to high-level:
- Raw SQL with DB-API (sqlite3, psycopg2, mysql-connector)
- Query builders that construct SQL programmatically (SQLBuilder, pypika)
- Lightweight data mappers (records, databases)
- Full ORMs (SQLAlchemy, Django ORM, Peewee, Tortoise)
The right choice depends on your project's complexity, team expertise, and performance requirements.
Raw SQL: Maximum Control
Python's built-in sqlite3 module implements the DB-API 2.0 specification, meaning the patterns you learn transfer to PostgreSQL, MySQL, and other databases.
import sqlite3
from contextlib import contextmanager
from typing import Iterator
@contextmanager
def get_connection(db_path: str) -> Iterator[sqlite3.Connection]:
"""Context manager for database connections with automatic cleanup."""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row # Access columns by name
try:
yield conn
finally:
conn.close()
def create_schema(conn: sqlite3.Connection) -> None:
conn.executescript("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
email TEXT UNIQUE NOT NULL,
name TEXT NOT NULL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users(id),
title TEXT NOT NULL,
body TEXT,
published_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_posts_user_id ON posts(user_id);
""")
conn.commit()
Parameterised Queries
Never interpolate user input into SQL strings. Always use parameterised queries to prevent SQL injection:
def create_user(conn: sqlite3.Connection, email: str, name: str) -> int:
"""Create a user and return their ID."""
cursor = conn.execute(
"INSERT INTO users (email, name) VALUES (?, ?)",
(email, name)
)
conn.commit()
return cursor.lastrowid
def find_user_by_email(conn: sqlite3.Connection, email: str) -> sqlite3.Row | None:
"""Find a user by email address."""
cursor = conn.execute(
"SELECT id, email, name, created_at FROM users WHERE email = ?",
(email,)
)
return cursor.fetchone()
def search_posts(conn: sqlite3.Connection, title_contains: str, limit: int = 10) -> list[sqlite3.Row]:
"""Search posts by title with LIKE pattern."""
cursor = conn.execute(
"SELECT id, title, published_at FROM posts WHERE title LIKE ? LIMIT ?",
(f"%{title_contains}%", limit)
)
return cursor.fetchall()
Transaction Management
Explicit transaction handling prevents partial updates and maintains data integrity:
def transfer_posts(conn: sqlite3.Connection, from_user_id: int, to_user_id: int) -> int:
"""Transfer all posts from one user to another atomically."""
cursor = conn.cursor()
try:
cursor.execute(
"UPDATE posts SET user_id = ? WHERE user_id = ?",
(to_user_id, from_user_id)
)
affected = cursor.rowcount
conn.commit()
return affected
except Exception:
conn.rollback()
raise
When Raw SQL Shines
Raw SQL is the right choice when you need complex queries that ORMs struggle to express cleanly (window functions, recursive CTEs, complex joins), when performance is critical and you want full control over the generated SQL, for one-off scripts and data migrations, and when working with database-specific features like PostgreSQL's JSONB operators or SQLite's FTS5.
SQLAlchemy: The Swiss Army Knife
SQLAlchemy operates at two levels: Core (a SQL expression language) and ORM (a full object-relational mapper). You can use either independently or combine them.
SQLAlchemy Core
Core gives you programmatic SQL construction with type safety and database portability:
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, Text, ForeignKey, select, insert
engine = create_engine("sqlite:///app.db", echo=False)
metadata = MetaData()
users = Table(
"users", metadata,
Column("id", Integer, primary_key=True),
Column("email", String(255), unique=True, nullable=False),
Column("name", String(255), nullable=False),
)
posts = Table(
"posts", metadata,
Column("id", Integer, primary_key=True),
Column("user_id", Integer, ForeignKey("users.id"), nullable=False),
Column("title", String(255), nullable=False),
Column("body", Text),
)
metadata.create_all(engine)
# Programmatic query construction
def get_user_post_counts(engine):
"""Get post counts per user using Core."""
from sqlalchemy import func
query = (
select(users.c.name, func.count(posts.c.id).label("post_count"))
.select_from(users.outerjoin(posts))
.group_by(users.c.id)
.order_by(func.count(posts.c.id).desc())
)
with engine.connect() as conn:
return conn.execute(query).fetchall()
SQLAlchemy ORM
The ORM layer maps Python classes to database tables:
from sqlalchemy import create_engine, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship, Session
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(unique=True)
name: Mapped[str]
posts: Mapped[list["Post"]] = relationship(back_populates="author", cascade="all, delete-orphan")
def __repr__(self) -> str:
return f"User(id={self.id}, email={self.email!r})"
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
title: Mapped[str]
body: Mapped[str | None]
author: Mapped[User] = relationship(back_populates="posts")
# Usage
engine = create_engine("sqlite:///app.db")
Base.metadata.create_all(engine)
def create_user_with_posts(engine, email: str, name: str, post_titles: list[str]) -> User:
"""Create a user with multiple posts in a single transaction."""
with Session(engine) as session:
user = User(email=email, name=name)
user.posts = [Post(title=title) for title in post_titles]
session.add(user)
session.commit()
session.refresh(user)
return user
Avoiding the N+1 Problem
The N+1 query problem occurs when you fetch a list of objects, then lazily load a relationship for each one. Use eager loading to prevent this:
from sqlalchemy.orm import joinedload, selectinload
def get_users_with_posts(engine) -> list[User]:
"""Fetch users with their posts in minimal queries."""
with Session(engine) as session:
# joinedload: single query with JOIN (good for one-to-one, small one-to-many)
# selectinload: separate IN query (better for large one-to-many)
stmt = select(User).options(selectinload(User.posts))
return list(session.scalars(stmt))
Lightweight Alternatives
For simpler needs, consider these focused libraries.
Peewee
Peewee offers a simpler ORM with less ceremony:
from peewee import SqliteDatabase, Model, CharField, TextField, ForeignKeyField
db = SqliteDatabase("app.db")
class BaseModel(Model):
class Meta:
database = db
class User(BaseModel):
email = CharField(unique=True)
name = CharField()
class Post(BaseModel):
author = ForeignKeyField(User, backref="posts")
title = CharField()
body = TextField(null=True)
db.create_tables([User, Post])
# Simple queries read naturally
recent_posts = Post.select().where(Post.author == some_user).limit(10)
databases (async)
For async applications, the databases library provides a clean interface:
import asyncio
from databases import Database
database = Database("sqlite:///app.db")
async def main():
await database.connect()
# Raw SQL with async/await
query = "SELECT * FROM users WHERE email = :email"
user = await database.fetch_one(query, values={"email": "alice@example.com"})
await database.disconnect()
asyncio.run(main())
SQLModel
SQLModel merges SQLAlchemy and Pydantic into a single model definition—particularly useful with FastAPI:
from sqlmodel import SQLModel, Field, Session, create_engine
class User(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
email: str = Field(unique=True)
name: str
engine = create_engine("sqlite:///app.db")
SQLModel.metadata.create_all(engine)
with Session(engine) as session:
user = User(email="alice@example.com", name="Alice")
session.add(user)
session.commit()
# Also works as Pydantic model
user.model_dump_json()
Testing Database Code
Good database tests are isolated, repeatable, and fast. The key is using in-memory databases and pytest fixtures to give each test a fresh, predictable starting point.
The pattern below uses two fixtures: a base db_connection that creates an empty schema, and a seeded_db that builds on it with standard test data. This layering lets simple tests start clean while tests that need existing data get a consistent dataset without repetition.
import pytest
import sqlite3
from contextlib import contextmanager
@pytest.fixture
def db_connection():
"""Provide a fresh in-memory database for each test."""
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
create_schema(conn) # Your schema creation function
yield conn
conn.close()
@pytest.fixture
def seeded_db(db_connection):
"""Database with standard test data."""
conn = db_connection
conn.executemany(
"INSERT INTO users (email, name) VALUES (?, ?)",
[
("alice@example.com", "Alice"),
("bob@example.com", "Bob"),
]
)
conn.commit()
return conn
How Fixtures Work
Pytest fixtures use dependency injection—when a test function declares a parameter matching a fixture name, pytest automatically calls that fixture and passes the result. This means db_connection in the test signature isn't just a variable name; pytest sees it matches the @pytest.fixture function and wires them together.
The yield in our fixtures is key: code before yield runs as setup, the yielded value gets passed to the test, and code after yield runs as teardown—even if the test fails. This guarantees connections get closed and resources get cleaned up.
Fixtures can also build on each other. Notice how seeded_db takes db_connection as a parameter—pytest resolves this chain automatically, creating the connection first, then passing it to seeded_db for population.
class TestUserQueries:
def test_create_user_returns_id(self, db_connection):
user_id = create_user(db_connection, "test@example.com", "Test User")
assert user_id == 1
def test_find_user_by_email_returns_user(self, seeded_db):
user = find_user_by_email(seeded_db, "alice@example.com")
assert user is not None
assert user["name"] == "Alice"
def test_find_user_by_email_returns_none_for_missing(self, seeded_db):
user = find_user_by_email(seeded_db, "nobody@example.com")
assert user is None
def test_duplicate_email_raises_integrity_error(self, seeded_db):
with pytest.raises(sqlite3.IntegrityError):
create_user(seeded_db, "alice@example.com", "Another Alice")
Testing with SQLAlchemy
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
@pytest.fixture
def engine():
"""Create a fresh in-memory database."""
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
return engine
@pytest.fixture
def session(engine):
"""Provide a transactional session that rolls back after each test."""
with Session(engine) as session:
yield session
session.rollback()
class TestUserModel:
def test_user_posts_relationship(self, session):
user = User(email="test@example.com", name="Test")
user.posts.append(Post(title="First Post"))
session.add(user)
session.flush()
assert len(user.posts) == 1
assert user.posts[0].author is user
Integration Testing Against Real Databases
In-memory SQLite is fast and convenient, but it won't catch everything. SQLite has different type handling, lacks features like JSONB or array columns, and its query planner behaves differently. If your production database is PostgreSQL, some bugs will only surface when you test against PostgreSQL.
The trade-off is speed and infrastructure. Integration tests against a real database are slower and require that database to be running somewhere—locally via Docker, or in CI. A practical approach is to run your full test suite against SQLite for fast feedback during development, then run a subset of critical path tests against the real database in CI.
The scope="session" on the postgres_url fixture means it runs once per test session rather than once per test, avoiding repeated environment lookups. The pytest.skip call lets the same test suite work both locally (where the database might not be running) and in CI (where it is).
import os
import pytest
@pytest.fixture(scope="session")
def postgres_url():
"""Get PostgreSQL URL from environment, skip if not available."""
url = os.environ.get("TEST_DATABASE_URL")
if not url:
pytest.skip("TEST_DATABASE_URL not set")
return url
@pytest.fixture
def pg_engine(postgres_url):
"""Create tables and clean up after tests."""
engine = create_engine(postgres_url)
Base.metadata.create_all(engine)
yield engine
Base.metadata.drop_all(engine)
Mark tests that need the real database so you can run them selectively:
@pytest.mark.integration
def test_jsonb_query(pg_engine):
"""This test requires PostgreSQL's JSONB support."""
with Session(pg_engine) as session:
# Test PostgreSQL-specific functionality
...
Then run with pytest -m integration when you want the full integration suite, or pytest -m "not integration" for fast local iteration.
Repository Pattern: Abstracting Database Access
For larger applications, wrap database operations in repository classes. This improves testability and separates concerns:
from abc import ABC, abstractmethod
from pydantic import BaseModel, ConfigDict
class UserDTO(BaseModel):
"""Data transfer object—decoupled from ORM."""
model_config = ConfigDict(from_attributes=True)
id: int
email: str
name: str
class UserRepository(ABC):
@abstractmethod
def get_by_id(self, user_id: int) -> UserDTO | None:
pass
@abstractmethod
def get_by_email(self, email: str) -> UserDTO | None:
pass
@abstractmethod
def create(self, email: str, name: str) -> UserDTO:
pass
class SQLiteUserRepository(UserRepository):
def __init__(self, conn: sqlite3.Connection):
self.conn = conn
def get_by_id(self, user_id: int) -> UserDTO | None:
row = self.conn.execute(
"SELECT id, email, name FROM users WHERE id = ?",
(user_id,)
).fetchone()
return UserDTO.model_validate(dict(row)) if row else None
def get_by_email(self, email: str) -> UserDTO | None:
row = self.conn.execute(
"SELECT id, email, name FROM users WHERE email = ?",
(email,)
).fetchone()
return UserDTO.model_validate(dict(row)) if row else None
def create(self, email: str, name: str) -> UserDTO:
cursor = self.conn.execute(
"INSERT INTO users (email, name) VALUES (?, ?) RETURNING id, email, name",
(email, name)
)
self.conn.commit()
row = cursor.fetchone()
return UserDTO.model_validate(dict(row))
class FakeUserRepository(UserRepository):
"""In-memory implementation for testing."""
def __init__(self):
self.users: dict[int, UserDTO] = {}
self.next_id = 1
def get_by_id(self, user_id: int) -> UserDTO | None:
return self.users.get(user_id)
def get_by_email(self, email: str) -> UserDTO | None:
return next((u for u in self.users.values() if u.email == email), None)
def create(self, email: str, name: str) -> UserDTO:
user = UserDTO(id=self.next_id, email=email, name=name)
self.users[self.next_id] = user
self.next_id += 1
return user
Connection Pooling
When your application talks to a networked database like PostgreSQL or MySQL, establishing a connection is expensive—TCP handshake, authentication, and protocol negotiation on every request adds latency. Connection pooling solves this by maintaining a pool of open connections that your application reuses.
SQLite doesn't benefit from pooling since it's an embedded database with no network overhead. The examples below apply to PostgreSQL but work similarly for MySQL.
SQLAlchemy (Built-in)
SQLAlchemy includes a connection pool by default. The QueuePool maintains a set of connections and hands them out as needed:
from sqlalchemy import create_engine
engine = create_engine(
"postgresql://user:pass@localhost/db",
pool_size=5, # Maintain 5 connections
max_overflow=10, # Allow up to 10 more under load
pool_timeout=30, # Seconds to wait for a connection
pool_recycle=1800, # Recreate connections after 30 minutes
pool_pre_ping=True, # Verify connections are alive before using
)
The pool_pre_ping option is worth enabling—it catches stale connections that the database server closed due to inactivity, avoiding errors on first use after idle periods.
For different pooling strategies:
from sqlalchemy.pool import NullPool, StaticPool
# No pooling—new connection each time (useful for testing or serverless)
engine = create_engine("postgresql://...", poolclass=NullPool)
# Single shared connection (only for SQLite in-memory with threads)
engine = create_engine("sqlite:///:memory:", poolclass=StaticPool)
Psycopg 3 (Direct)
If you're using psycopg 3 directly without SQLAlchemy, it provides its own connection pool:
from psycopg_pool import ConnectionPool
pool = ConnectionPool(
"postgresql://user:pass@localhost/db",
min_size=5,
max_size=20,
timeout=30,
)
# Use connections from the pool
with pool.connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM users WHERE id = %s", (user_id,))
row = cur.fetchone()
# Connection automatically returned to pool
For async applications:
from psycopg_pool import AsyncConnectionPool
pool = AsyncConnectionPool("postgresql://user:pass@localhost/db")
async with pool.connection() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT * FROM users")
rows = await cur.fetchall()
Peewee
Peewee requires explicit pool configuration via the playhouse extensions:
from playhouse.pool import PooledPostgresqlDatabase
db = PooledPostgresqlDatabase(
"my_database",
max_connections=20,
stale_timeout=300, # Seconds before a connection is considered stale
user="postgres",
password="secret",
host="localhost",
)
External Poolers (PgBouncer, pgcat)
For high-scale deployments, consider an external connection pooler that sits between your application and the database. This is particularly valuable when you have multiple application instances competing for database connections, your database has a hard connection limit you're hitting, or you're running in serverless environments where each invocation might otherwise create a new connection.
PgBouncer in transaction mode is a common choice:
# pgbouncer.ini
[databases]
mydb = host=localhost dbname=mydb
[pgbouncer]
pool_mode = transaction
max_client_conn = 1000
default_pool_size = 20
Your application connects to PgBouncer instead of the database directly. From the application's perspective, nothing changes—you can even disable SQLAlchemy's built-in pooling since PgBouncer handles it:
engine = create_engine(
"postgresql://user:pass@localhost:6432/mydb", # PgBouncer port
poolclass=NullPool, # Let PgBouncer handle pooling
)
Sizing Your Pool
A common mistake is making pools too large. More connections isn't always better—each connection consumes memory on the database server, and too many concurrent queries can thrash the disk or CPU.
A reasonable starting point: pool_size = (2 * cpu_cores) + disk_spindles. For a typical cloud database with SSDs, 10-20 connections per application instance is often plenty. Monitor your connection wait times and database CPU to tune from there.
Choose Your Approach
With raw SQL, query builders, ORMs, and everything in between, picking the right tool can feel overwhelming. The reality is simpler than it looks—most decisions come down to three factors: how complex your queries are, how much your schema maps to objects, and whether you need database portability.
The flowchart below walks through these decision points. Start with your project's primary need and follow the branches.
One thing the flowchart can't capture: these choices aren't permanent or exclusive. SQLAlchemy lets you drop to raw SQL for that one gnarly reporting query. A Peewee project can shell out to psycopg for a bulk insert. The best codebases use the right tool for each job rather than forcing everything through one abstraction.