Skip to content

Examples

Complete, runnable examples are available in the examples/ directory. SQLite and query builder examples run standalone with no external dependencies. PostgreSQL, ClickHouse, and bookstore examples require a running database server.

Repository

CRUD with SQLite

Basic insert, fetch, update, and delete operations using an in-memory SQLite database. No external dependencies required.

"""
Basic CRUD operations with SQLite (no external dependencies required).

Demonstrates:
 - Sqlite3Connection with in-memory database
 - fieldmapper record definitions
 - Repository insert, fetch, update, delete
 - fetch_where with conditions
"""
from rick_db import fieldmapper, Repository
from rick_db.backend.sqlite import Sqlite3Connection


@fieldmapper(tablename="users", pk="id_user")
class User:
    id = "id_user"
    name = "name"
    email = "email"
    active = "active"


def create_schema(conn):
    with conn.cursor() as c:
        c.exec(
            """
            CREATE TABLE users (
                id_user INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL,
                email TEXT NOT NULL,
                active INTEGER DEFAULT 1
            )
            """
        )
        c.close()


def main():
    conn = Sqlite3Connection(":memory:")
    create_schema(conn)
    repo = Repository(conn, User)

    # Insert
    alice_id = repo.insert_pk(User(name="Alice", email="alice@example.com", active=1))
    bob_id = repo.insert_pk(User(name="Bob", email="bob@example.com", active=1))
    charlie_id = repo.insert_pk(User(name="Charlie", email="charlie@example.com", active=0))
    print("Inserted Alice (id={}), Bob (id={}), Charlie (id={})".format(alice_id, bob_id, charlie_id))

    # Fetch all
    print("\nAll users:")
    for user in repo.fetch_all():
        print("  {} - {} ({})".format(user.id, user.name, user.email))

    # Fetch by primary key
    alice = repo.fetch_pk(alice_id)
    print("\nFetched by pk: {} - {}".format(alice.name, alice.email))

    # Fetch with conditions
    active_users = repo.fetch_where([("active", "=", 1)])
    print("\nActive users:")
    for user in active_users:
        print("  {} - {}".format(user.name, user.email))

    # Update
    alice.name = "Alice Updated"
    repo.update(alice)
    alice = repo.fetch_pk(alice_id)
    print("\nAfter update: {} - {}".format(alice.name, alice.email))

    # Delete
    repo.delete_pk(charlie_id)
    print("\nAfter deleting Charlie:")
    for user in repo.fetch_all():
        print("  {} - {}".format(user.name, user.email))

    conn.close()


if __name__ == "__main__":
    main()

Source: examples/repository/crud_sqlite.py

Transactions

Transaction context manager with automatic commit on success and rollback on exception. Demonstrates a funds transfer between accounts.

"""
Transaction usage with automatic commit/rollback.

Demonstrates:
 - Repository.transaction() context manager
 - Automatic commit on success
 - Automatic rollback on exception
 - Nested transactions via savepoints
"""
from rick_db import fieldmapper, Repository
from rick_db.backend.sqlite import Sqlite3Connection


@fieldmapper(tablename="accounts", pk="id_account")
class Account:
    id = "id_account"
    name = "name"
    balance = "balance"


@fieldmapper(tablename="transfers", pk="id_transfer")
class Transfer:
    id = "id_transfer"
    from_account = "from_account"
    to_account = "to_account"
    amount = "amount"


def create_schema(conn):
    with conn.cursor() as c:
        c.exec(
            """
            CREATE TABLE accounts (
                id_account INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL,
                balance REAL NOT NULL DEFAULT 0
            )
            """
        )
        c.exec(
            """
            CREATE TABLE transfers (
                id_transfer INTEGER PRIMARY KEY AUTOINCREMENT,
                from_account INTEGER NOT NULL,
                to_account INTEGER NOT NULL,
                amount REAL NOT NULL
            )
            """
        )
        c.close()


def transfer_funds(account_repo, transfer_repo, from_id, to_id, amount):
    """Transfer funds between accounts inside a transaction."""
    with account_repo.transaction():
        sender = account_repo.fetch_pk(from_id)
        receiver = account_repo.fetch_pk(to_id)

        if sender.balance < amount:
            raise ValueError(
                "Insufficient funds: {} has {}, needs {}".format(
                    sender.name, sender.balance, amount
                )
            )

        sender.balance -= amount
        receiver.balance += amount
        account_repo.update(sender)
        account_repo.update(receiver)

        transfer_repo.insert_pk(
            Transfer(from_account=from_id, to_account=to_id, amount=amount)
        )
        print(
            "Transferred {} from {} to {}".format(amount, sender.name, receiver.name)
        )


def print_balances(repo):
    for acc in repo.fetch_all():
        print("  {} balance: {}".format(acc.name, acc.balance))


def main():
    conn = Sqlite3Connection(":memory:")
    create_schema(conn)

    account_repo = Repository(conn, Account)
    transfer_repo = Repository(conn, Transfer)

    # Setup accounts
    alice_id = account_repo.insert_pk(Account(name="Alice", balance=1000.0))
    bob_id = account_repo.insert_pk(Account(name="Bob", balance=500.0))

    print("Initial balances:")
    print_balances(account_repo)

    # Successful transfer
    print("\n--- Transfer $200 from Alice to Bob ---")
    transfer_funds(account_repo, transfer_repo, alice_id, bob_id, 200.0)
    print("\nBalances after transfer:")
    print_balances(account_repo)

    # Failed transfer (insufficient funds) - rolled back
    print("\n--- Attempt transfer $2000 from Bob to Alice (should fail) ---")
    try:
        transfer_funds(account_repo, transfer_repo, bob_id, alice_id, 2000.0)
    except ValueError as e:
        print("Transaction rolled back: {}".format(e))

    print("\nBalances after failed transfer (unchanged):")
    print_balances(account_repo)

    conn.close()


if __name__ == "__main__":
    main()

Source: examples/repository/transactions.py

Searchable, filterable, paginated data listings using DbGrid. Shows text search, match filters, sorting, pagination, and search types.

"""
DbGrid usage for searchable, filterable, paginated listings.

Demonstrates:
 - DbGrid with text search
 - Filtering with match_fields
 - Sorting and pagination
 - Different search types (ANY, START, END)
"""
from rick_db import fieldmapper, Repository, DbGrid
from rick_db.backend.sqlite import Sqlite3Connection


@fieldmapper(tablename="products", pk="id_product")
class Product:
    id = "id_product"
    name = "name"
    category = "category"
    price = "price"
    active = "active"


def create_schema(conn):
    with conn.cursor() as c:
        c.exec(
            """
            CREATE TABLE products (
                id_product INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL,
                category TEXT NOT NULL,
                price REAL NOT NULL,
                active INTEGER DEFAULT 1
            )
            """
        )
        c.close()


def seed_data(repo):
    products = [
        Product(name="Python Cookbook", category="books", price=45.99, active=1),
        Product(name="SQL Fundamentals", category="books", price=39.99, active=1),
        Product(name="Python Sticker Pack", category="merch", price=9.99, active=1),
        Product(name="Database Design Guide", category="books", price=54.99, active=1),
        Product(name="SQL Cheat Sheet Poster", category="merch", price=14.99, active=0),
        Product(name="Advanced Python Patterns", category="books", price=49.99, active=1),
        Product(name="Python Mug", category="merch", price=12.99, active=1),
        Product(name="PostgreSQL Administration", category="books", price=59.99, active=1),
    ]
    for p in products:
        repo.insert_pk(p)


def print_results(total, rows):
    print("  Total matching: {}".format(total))
    for row in rows:
        print(
            "    [{}] {} - ${} ({})".format(
                row.category, row.name, row.price, "active" if row.active else "inactive"
            )
        )


def main():
    conn = Sqlite3Connection(":memory:")
    create_schema(conn)
    repo = Repository(conn, Product)
    seed_data(repo)

    # Search across name and category fields
    grid = DbGrid(repo, search_fields=[Product.name, Product.category])

    # Text search
    print('Search for "python":')
    total, rows = grid.run(search_text="python")
    print_results(total, rows)

    # Text search + filter by active
    print('\nSearch for "sql", active only:')
    total, rows = grid.run(
        search_text="sql",
        match_fields={Product.active: 1},
    )
    print_results(total, rows)

    # Filter by category, sorted by price descending
    print("\nAll books, sorted by price (desc):")
    total, rows = grid.run(
        match_fields={Product.category: "books"},
        sort_fields={Product.price: "DESC"},
    )
    print_results(total, rows)

    # Pagination: 3 items per page
    print("\nAll products, page 1 (3 per page):")
    total, rows = grid.run(
        limit=3,
        offset=0,
        sort_fields={Product.id: "ASC"},
    )
    print_results(total, rows)

    print("\nAll products, page 2 (3 per page):")
    total, rows = grid.run(
        limit=3,
        offset=3,
        sort_fields={Product.id: "ASC"},
    )
    print_results(total, rows)

    # Search type: starts with
    grid_start = DbGrid(
        repo,
        search_fields=[Product.name],
        search_type=DbGrid.SEARCH_START,
    )
    print('\nSearch starts with "python":')
    total, rows = grid_start.run(search_text="python")
    print_results(total, rows)

    conn.close()


if __name__ == "__main__":
    main()

Source: examples/repository/dbgrid_search.py

Bookstore (PostgreSQL)

Custom repository with JOINs and aggregate queries. Requires a running PostgreSQL instance.

from rick_db import fieldmapper, Repository
from rick_db.backend.pg import PgConnectionPool
from rick_db.sql import Select, Literal


@fieldmapper(tablename='publisher', pk='id_publisher')
class Publisher:
    id = 'id_publisher'
    name = 'name'


@fieldmapper(tablename='book', pk='id_book')
class Book:
    id = 'id_book'
    title = 'title'
    total_pages = 'total_pages'
    rating = 'rating'
    isbn = 'isbn'
    published = 'published_date'
    fk_publisher = 'fk_publisher'


@fieldmapper(tablename='author', pk='id_author')
class Author:
    id = 'id_author'
    first_name = 'first_name'
    middle_name = 'middle_name'
    last_name = 'last_name'


@fieldmapper(tablename='book_author', pk='id_book_author')
class BookAuthor:
    id = 'id_book_author'
    fk_book = 'fk_book'
    fk_author = 'fk_author'


class AuthorRepository(Repository):

    def __init__(self, db):
        super().__init__(db, Author)

    def calc_avg_rating(self, id_author: int):
        """
        Calculate average rating for a given author
        :param id_author:
        :return: average rating, if any
        """

        # generated query:
        # SELECT avg(rating) AS "rating" FROM "book" INNER JOIN "book_author" ON
        # "book"."id_book"="book_author"."fk_book" WHERE ("fk_author" = %s)
        qry = Select(self.dialect). \
            from_(Book, {Literal("avg({})".format(Book.rating)): 'rating'}). \
            join(BookAuthor, BookAuthor.fk_book, Book, Book.id). \
            where(BookAuthor.fk_author, '=', id_author)

        # retrieve result as list of type Book (to get the rating field)
        rset = self.fetch(qry, cls=Book)
        if len(rset) > 0:
            return rset.pop(0).rating
        return 0

    def books(self, id_author: int) -> list[Book]:
        """
        Retrieve all books for the given author
        :return: list[Book]
        """

        qry = Select(self.dialect). \
            from_(Book). \
            join(BookAuthor, BookAuthor.fk_book, Book, Book.id). \
            where(BookAuthor.fk_author, '=', id_author)

        return self.fetch(qry, cls=Book)


def dump_author_rating(repo: AuthorRepository):
    for author in repo.fetch_all():

        # calculate average
        rating = repo.calc_avg_rating(author.id)

        # print book list
        print("Books by {firstname} {lastname}:".format(firstname=author.first_name, lastname=author.last_name))
        for book in repo.books(author.id):
            print(book.title)

        # print average rating
        print("Average rating for {firstname} {lastname} is {rating}".
              format(firstname=author.first_name, lastname=author.last_name, rating=rating))


if __name__ == '__main__':
    db_cfg = {
        'dbname': "rickdb-bookstore",
        'user': "rickdb_user",
        'password': "rickdb_pass",
        'host': "localhost",
        'port': 5432,
        'sslmode': 'require'
    }

    pool = PgConnectionPool(**db_cfg)
    repo = AuthorRepository(pool)
    dump_author_rating(repo)

Source: examples/repository/example_bookstore.py

Query Builder

Fn Helper Functions

Aggregate, math, and general SQL functions using the Fn class. Covers COUNT, SUM, AVG, ROUND, COALESCE, CAST, nesting, GROUP BY with HAVING, and dict-style column aliases.

"""
Fn helper class for SQL aggregate and math functions.

Demonstrates:
 - Fn.count, Fn.sum, Fn.avg, Fn.min, Fn.max
 - Fn.round with nesting (Fn.round(Fn.avg(...)))
 - Fn.coalesce, Fn.cast
 - GROUP BY with HAVING
 - Dict-style column aliases with Fn expressions
"""
from rick_db.sql import Select, Fn, PgSqlDialect

dialect = PgSqlDialect()

# -- Basic aggregates with aliases --

# COUNT(*) with alias
qry, _ = Select(dialect).from_("orders", {Fn.count(): "total_orders"}).assemble()
# SELECT COUNT(*) AS "total_orders" FROM "orders"
print(qry)

# COUNT on a specific field
qry, _ = Select(dialect).from_("orders", {Fn.count("discount"): "discounted_orders"}).assemble()
# SELECT COUNT(discount) AS "discounted_orders" FROM "orders"
print(qry)


# -- Multiple aggregates with GROUP BY --

qry, _ = (
    Select(dialect)
    .from_(
        "orders",
        {
            "category": None,
            Fn.count(): "order_count",
            Fn.sum("amount"): "total_amount",
            Fn.avg("amount"): "avg_amount",
            Fn.min("amount"): "min_amount",
            Fn.max("amount"): "max_amount",
        },
    )
    .group("category")
    .assemble()
)
# SELECT "category",COUNT(*) AS "order_count",SUM(amount) AS "total_amount",...
print(qry)


# -- Nested functions: ROUND(AVG(...), 2) --

qry, _ = (
    Select(dialect)
    .from_(
        "orders",
        {
            "category": None,
            Fn.round(Fn.avg("amount"), 2): "avg_rounded",
        },
    )
    .group("category")
    .assemble()
)
# SELECT "category",ROUND(AVG(amount), 2) AS "avg_rounded" FROM "orders" GROUP BY "category"
print(qry)


# -- GROUP BY with HAVING --

qry, values = (
    Select(dialect)
    .from_(
        "orders",
        {
            "category": None,
            Fn.count(): "cnt",
            Fn.sum("amount"): "total",
        },
    )
    .group("category")
    .having(Fn.count(), ">", 10)
    .assemble()
)
# SELECT ... FROM "orders" GROUP BY "category" HAVING (COUNT(*) > %s)
print(qry)
print("values:", values)


# -- Math functions --

qry, _ = (
    Select(dialect)
    .from_(
        "measurements",
        {
            "sensor_id": None,
            Fn.abs("reading"): "abs_reading",
            Fn.ceil("reading"): "ceil_reading",
            Fn.floor("reading"): "floor_reading",
            Fn.round("reading", 3): "rounded",
            Fn.sqrt(Fn.abs("reading")): "sqrt_abs",
        },
    )
    .assemble()
)
print(qry)


# -- COALESCE and CAST --

qry, _ = (
    Select(dialect)
    .from_(
        "users",
        {
            "name": None,
            Fn.coalesce("nickname", "name"): "display_name",
            Fn.cast("created_at", "date"): "created_date",
        },
    )
    .assemble()
)
# SELECT "name",COALESCE(nickname, name) AS "display_name",CAST(created_at AS date) AS "created_date" FROM "users"
print(qry)

Source: examples/query_builder/fn_aggregation.py

Recursive CTEs

Common Table Expressions with the With builder. Includes recursive tree walks, number series generation, non-recursive CTEs, and multiple CTE clauses.

"""
Recursive CTE (Common Table Expression) for walking a tree structure.

Demonstrates:
 - With() builder for CTEs
 - Recursive CTEs with UNION ALL
 - Non-recursive CTEs
 - Multiple CTE clauses
"""
from rick_db import fieldmapper
from rick_db.sql import Select, With, Literal, Fn, Sql, PgSqlDialect

dialect = PgSqlDialect()


# -- Example 1: Recursive tree walk (e.g. folder hierarchy) --

@fieldmapper(tablename="folder", pk="id_folder")
class Folder:
    id = "id_folder"
    name = "name"
    parent = "fk_parent"


# Walk all descendants of folder id=1
# Base case: the root folder
base = Select(dialect).from_({Folder: "f1"}).where(Folder.id, "=", 1)

# Recursive case: join children via parent FK
recursive = (
    Select(dialect)
    .from_({Folder: "f2"})
    .join("folder_tree", Folder.parent, "f2", Folder.id)
)

# Combine with UNION ALL
union = Select(dialect).union([base, recursive], Sql.SQL_UNION_ALL)

# Build the CTE
qry, values = (
    With(dialect)
    .recursive()
    .clause("folder_tree", union)
    .query(Select(dialect).from_("folder_tree"))
    .assemble()
)
# WITH RECURSIVE "folder_tree" AS (
#   SELECT "f1".* FROM "folder" AS "f1" WHERE ("id_folder" = %s)
#   UNION ALL
#   SELECT "f2".* FROM "folder" AS "f2" INNER JOIN "folder_tree" ON "f2"."id_folder"="folder_tree"."fk_parent"
# ) SELECT "folder_tree".* FROM "folder_tree"
print("=== Recursive tree walk ===")
print(qry)
print("values:", values)


# -- Example 2: Recursive number series with column specification --

# Generate numbers 1 to 10 using a recursive CTE
base_num = Select(dialect).expr([Literal("1 AS n")])
recursive_num = Select(dialect).from_("nums", {Literal("n + 1"): "n"}).where("n", "<", 10)
union_num = Select(dialect).union([base_num, recursive_num], Sql.SQL_UNION_ALL)

qry, values = (
    With(dialect)
    .recursive()
    .clause("nums", union_num, columns=["n"])
    .query(Select(dialect).from_("nums"))
    .assemble()
)
# WITH RECURSIVE "nums"("n") AS (
#   SELECT 1 AS n UNION ALL SELECT n + 1 AS "n" FROM "nums" WHERE ("n" < %s)
# ) SELECT "nums".* FROM "nums"
print("\n=== Recursive number series ===")
print(qry)
print("values:", values)


# -- Example 3: Non-recursive CTE for readability --

# Use a CTE to pre-filter, then query the result
cte_query = (
    Select(dialect)
    .from_("orders", ["customer_id", "amount"])
    .where("amount", ">", 100)
)

main_query = (
    Select(dialect)
    .from_("big_orders", {"customer_id": None, Fn.sum("amount"): "total"})
    .group("customer_id")
)

qry, values = (
    With(dialect)
    .clause("big_orders", cte_query)
    .query(main_query)
    .assemble()
)
print("\n=== Non-recursive CTE ===")
print(qry)
print("values:", values)


# -- Example 4: Multiple CTE clauses --

active_users = (
    Select(dialect)
    .from_("users", ["id", "name"])
    .where("active", "=", True)
)

user_totals = (
    Select(dialect)
    .from_("orders", {"user_id": None, Fn.sum("amount"): "total"})
    .group("user_id")
)

main = (
    Select(dialect)
    .from_("au", ["name"])
    .join("ut", "user_id", "au", "id", cols=["total"])
)

qry, values = (
    With(dialect)
    .clause("au", active_users)
    .clause("ut", user_totals)
    .query(main)
    .assemble()
)
print("\n=== Multiple CTEs ===")
print(qry)
print("values:", values)

Source: examples/query_builder/cte_recursive.py

JSON Queries (PostgreSQL)

JSON field operations using PgJsonField. Covers text extraction (->>), object extraction (->), bracket notation for nested access, json_where(), json_extract(), contains checks, and path queries.

"""
JSON field operations with PostgreSQL.

Demonstrates:
 - PgJsonField extraction (->>, ->)
 - Bracket notation for nested access
 - json_where() for filtering on JSON fields
 - json_extract() for selecting JSON values
 - Contains and path queries

Note: These examples generate SQL strings; they require a PostgreSQL
connection to execute.
"""
from rick_db.sql import Select, PgJsonField, PgSqlDialect, Literal

dialect = PgSqlDialect()


# -- Basic JSON extraction --

jf = PgJsonField("metadata", dialect)

# Extract as text (uses ->>)
qry, _ = (
    Select(dialect)
    .from_("events", ["id", jf.extract_text("name", alias="event_name")])
    .assemble()
)
# SELECT "id","metadata"::jsonb->>'name' AS "event_name" FROM "events"
print("=== Extract text ===")
print(qry)


# Extract as JSON object (uses ->, preserves type)
qry, _ = (
    Select(dialect)
    .from_("events", ["id", jf.extract_object("payload", alias="event_payload")])
    .assemble()
)
print("\n=== Extract object ===")
print(qry)


# -- Bracket notation for nested access --

jf = PgJsonField("profile", dialect)

# Access nested fields: profile->"address"->"city"
# Bracket notation returns a PgJsonField; wrap with Literal() for use in where()
city_field = Literal(str(jf["address"]["city"]))
qry, values = (
    Select(dialect)
    .from_("users")
    .where(city_field, "=", "New York")
    .assemble()
)
print("\n=== Nested bracket notation ===")
print(qry)
print("values:", values)


# -- WHERE on JSON fields using json_where --

qry, values = (
    Select(dialect)
    .from_("users")
    .json_where("profile", "active", "=", True)
    .assemble()
)
print("\n=== json_where ===")
print(qry)
print("values:", values)


# -- JSON extraction in SELECT using json_extract --

qry, _ = (
    Select(dialect)
    .from_("users", ["id"])
    .json_extract("profile", "name", "user_name")
    .assemble()
)
print("\n=== json_extract ===")
print(qry)


# -- Contains check (@>) --

jf = PgJsonField("tags", dialect)
expr = jf.contains('["python", "sql"]')
print("\n=== Contains ===")
print(str(expr))


# -- Path query (PostgreSQL 12+, uses @?) --

jf = PgJsonField("data", dialect)
expr = jf.path_query("$.items[*].name")
print("\n=== Path query ===")
print(str(expr))


# -- Combined example: filter + extract + sort --

jf = PgJsonField("metadata", dialect)
qry, values = (
    Select(dialect)
    .from_("events", {
        "id": None,
        "event_type": None,
        jf.extract_text("severity", alias="severity"): None,
        jf.extract_text("source", alias="source"): None,
    })
    .where("event_type", "=", "error")
    .json_where("metadata", "severity", "=", "critical")
    .order("id", Select.ORDER_DESC)
    .limit(50)
    .assemble()
)
print("\n=== Combined filter + extract ===")
print(qry)
print("values:", values)

Source: examples/query_builder/json_queries.py

Complex Query (PostgreSQL)

Advanced SELECT with subquery joins, Literal expressions for multi-condition ON clauses, and PostgreSQL system catalog introspection.

"""
Complex example 1: Adapting PgInfo's list_foreign keys query using Query Builder

Features:
 - Select() column limitations;
 - Select() as a source table for JOIN;
 - Arbitrary Literal() expressions for JOIN ON complex expressions;

Original Query:

            SELECT sh.nspname AS table_schema,
              tbl.relname AS table_name,
              col.attname AS column_name,
              referenced_sh.nspname AS foreign_table_schema,
              referenced_tbl.relname AS foreign_table_name,
              referenced_field.attname AS foreign_column_name
            FROM pg_constraint c
                INNER JOIN pg_namespace AS sh ON sh.oid = c.connamespace
                INNER JOIN (SELECT oid, unnest(conkey) as conkey FROM pg_constraint) con ON c.oid = con.oid
                INNER JOIN pg_class tbl ON tbl.oid = c.conrelid
                INNER JOIN pg_attribute col ON (col.attrelid = tbl.oid AND col.attnum = con.conkey)
                INNER JOIN pg_class referenced_tbl ON c.confrelid = referenced_tbl.oid
                INNER JOIN pg_namespace AS referenced_sh ON referenced_sh.oid = referenced_tbl.relnamespace
                INNER JOIN (SELECT oid, unnest(confkey) as confkey FROM pg_constraint) conf ON c.oid = conf.oid
                INNER JOIN pg_attribute referenced_field ON
                    (referenced_field.attrelid = c.confrelid AND referenced_field.attnum = conf.confkey)
            WHERE c.contype = 'f' AND sh.nspname = %s and tbl.relname = %s

"""
from rick_db.backend.pg import PgConnection, ForeignKeyRecord
from rick_db.sql import Select, Literal

db_cfg = {
    'dbname': "rickdb-bookstore",
    'user': "rickdb_user",
    'password': "rickdb_pass",
    'host': "localhost",
    'port': 5432,
    'sslmode': 'require'
}

# Connect to Database
conn = PgConnection(**db_cfg)

schema = 'public'
table_name = 'acl_role_resource'

# Build Query
# subqueries that are used as source join tables
subqry1 = Select(conn.dialect()).from_('pg_constraint', ['oid', {Literal('unnest(conkey)'): 'conkey'}])
subqry2 = Select(conn.dialect()).from_('pg_constraint', ['oid', {Literal('unnest(confkey)'): 'confkey'}])

# Main query

qry, values = Select(conn.dialect()) \
    .from_({'pg_constraint': 'c'},
           # please note: Select() must always select columns from the initial table, so we settle on contype (always 'f')
           cols=['contype']) \
    .join({'pg_namespace': 'sh'}, 'oid', 'c', 'connamespace', cols=[{'nspname': ForeignKeyRecord.schema}]) \
    .join({subqry1: 'con'}, 'oid', 'c', 'oid') \
    .join({'pg_class': 'tbl'}, 'oid', 'c', 'conrelid', cols=[{'relname': ForeignKeyRecord.table}]) \
    .join({'pg_attribute': 'col'}, Literal('col.attrelid=tbl.oid AND col.attnum=con.conkey'),
          # join() does not support multi-condition join on, so we hardcode the condition as a Literal
          cols=[{'attname': ForeignKeyRecord.column}]) \
    .join({'pg_class': 'referenced_tbl'}, 'oid', 'c', 'conrelid', cols={'relname': ForeignKeyRecord.foreign_table}) \
    .join({'pg_namespace': 'referenced_sh'}, 'oid', 'referenced_tbl', 'relnamespace',
          cols=[{'nspname': ForeignKeyRecord.foreign_schema}]) \
    .join({subqry2: 'conf'}, 'oid', 'c', 'oid') \
    .join({'pg_attribute': 'referenced_field'},
          # join() does not support multi-condition join on, so we hardcode the condition as a Literal
          Literal('referenced_field.attrelid = c.confrelid AND referenced_field.attnum = conf.confkey'),
          cols={'attname': ForeignKeyRecord.foreign_column}) \
    .where('contype', '=', 'f') \
    .where({'sh': 'nspname'}, '=', schema) \
    .where({'tbl': 'relname'}, '=', table_name) \
    .assemble()

# execute query
with conn.cursor() as c:
    for r in c.exec(qry, values, cls=ForeignKeyRecord):
        print(r.asdict)

Source: examples/query_builder/complex_query_01.py

Migrations

Migration Workflow

Programmatic migration workflow using SQLite. Demonstrates installing the migration table, executing migrations, idempotent re-runs, and schema verification.

"""
Programmatic migration workflow using SQLite (no external dependencies).

Demonstrates:
 - Installing the migration tracking table
 - Executing migrations with SQL content
 - Listing applied migrations
 - Checking migration status
"""
from rick_db import fieldmapper, Repository
from rick_db.backend.sqlite import Sqlite3Connection, Sqlite3Manager, Sqlite3MigrationManager
from rick_db.migrations import MigrationRecord


# SQL content for each migration
MIGRATIONS = [
    (
        "001_create_users",
        """
        CREATE TABLE users (
            id_user INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            email TEXT NOT NULL UNIQUE,
            active INTEGER DEFAULT 1
        );
        """,
    ),
    (
        "002_create_orders",
        """
        CREATE TABLE orders (
            id_order INTEGER PRIMARY KEY AUTOINCREMENT,
            fk_user INTEGER NOT NULL,
            amount REAL NOT NULL,
            created_at TEXT DEFAULT CURRENT_TIMESTAMP,
            FOREIGN KEY (fk_user) REFERENCES users(id_user)
        );
        """,
    ),
    (
        "003_add_user_role",
        """
        ALTER TABLE users ADD COLUMN role TEXT DEFAULT 'user';
        """,
    ),
]


def run_migrations(conn):
    mgr = Sqlite3Manager(conn)
    mm = Sqlite3MigrationManager(mgr)

    # Install the migration tracking table if not already present
    if not mm.is_installed():
        result = mm.install()
        if result.success:
            print("Migration table installed.")
        else:
            print("Failed to install migration table: {}".format(result.error))
            return

    # Check which migrations are already applied
    applied = {m.name for m in mm.list()}
    print("Already applied: {}".format(applied if applied else "(none)"))

    # Execute pending migrations
    pending = [(name, sql) for name, sql in MIGRATIONS if name not in applied]
    if not pending:
        print("No pending migrations.")
        return

    for name, sql in pending:
        record = MigrationRecord(name=name)
        result = mm.execute(record, sql)
        if result.success:
            print("Applied: {}".format(name))
        else:
            print("Failed: {} - {}".format(name, result.error))
            return

    # List all applied migrations
    print("\nAll applied migrations:")
    for m in mm.list():
        print("  {} (applied: {})".format(m.name, m.applied))


def verify_schema(conn):
    """Verify the migrations worked by checking the schema."""
    mgr = Sqlite3Manager(conn)

    print("\nTables in database:")
    for table in mgr.tables():
        print("  {}".format(table))
        fields = mgr.table_fields(table)
        for f in fields:
            print("    - {} ({})".format(f.field, f.type))


def main():
    conn = Sqlite3Connection(":memory:")

    print("=== Running migrations ===")
    run_migrations(conn)

    print("\n=== Running again (should be idempotent) ===")
    run_migrations(conn)

    verify_schema(conn)

    # Verify by inserting data through the migrated schema
    @fieldmapper(tablename="users", pk="id_user")
    class User:
        id = "id_user"
        name = "name"
        email = "email"

    repo = Repository(conn, User)
    user_id = repo.insert_pk(User(name="Alice", email="alice@example.com"))
    user = repo.fetch_pk(user_id)
    print("\nInserted user via migrated schema: {} - {}".format(user.name, user.email))

    conn.close()


if __name__ == "__main__":
    main()

Source: examples/migrations/migration_workflow.py

ClickHouse

ClickHouse Example

Connection setup, schema introspection with ClickHouseManager, repository CRUD, and aggregate queries using the query builder. Requires a running ClickHouse server.

"""
ClickHouse connection, repository, and introspection example.

Demonstrates:
 - ClickHouseConnection setup
 - ClickHouseManager for schema introspection
 - Repository usage with ClickHouse
 - Query builder with ClickHouseSqlDialect

Requirements:
 - A running ClickHouse server (default: localhost:8123)
 - pip install clickhouse-connect
"""
from rick_db import fieldmapper, Repository
from rick_db.backend.clickhouse import ClickHouseConnection, ClickHouseManager
from rick_db.sql import Select, Fn


@fieldmapper(tablename="events", pk="id")
class Event:
    id = "id"
    event_type = "event_type"
    user_id = "user_id"
    amount = "amount"
    created_at = "created_at"


# -- Connection setup --

db_cfg = {
    "host": "localhost",
    "port": 8123,
    "database": "default",
    "username": "default",
    "password": "",
}


def create_schema(conn):
    """Create the example table using raw SQL."""
    with conn.cursor() as c:
        c.exec(
            """
            CREATE TABLE IF NOT EXISTS events (
                id UInt64,
                event_type String,
                user_id UInt32,
                amount Float64,
                created_at DateTime DEFAULT now()
            ) ENGINE = MergeTree()
            ORDER BY (id, created_at)
            """
        )
        c.close()


def seed_data(conn):
    """Insert sample data."""
    with conn.cursor() as c:
        c.exec(
            """
            INSERT INTO events (id, event_type, user_id, amount) VALUES
                (1, 'purchase', 101, 29.99),
                (2, 'purchase', 102, 149.50),
                (3, 'refund', 101, 29.99),
                (4, 'purchase', 103, 75.00),
                (5, 'purchase', 101, 50.00),
                (6, 'purchase', 102, 200.00),
                (7, 'refund', 103, 75.00),
                (8, 'purchase', 104, 99.99)
            """
        )
        c.close()


def introspect(conn):
    """Database introspection with ClickHouseManager."""
    mgr = ClickHouseManager(conn)

    print("=== Introspection ===")
    print("Databases:", mgr.databases())
    print("Tables:", mgr.tables())

    if mgr.table_exists("events"):
        print("\nFields in 'events':")
        for f in mgr.table_fields("events"):
            print("  {} ({})".format(f.field, f.field_type))

        pk = mgr.table_pk("events")
        if pk:
            print("Primary key:", pk.field)


def query_examples(conn):
    """Query builder examples with ClickHouse dialect."""
    repo = Repository(conn, Event)
    dialect = conn.dialect()

    # Fetch all events
    print("\n=== All events ===")
    for event in repo.fetch_all():
        print(
            "  id={} type={} user={} amount={}".format(
                event.id, event.event_type, event.user_id, event.amount
            )
        )

    # Fetch with WHERE
    print("\n=== Purchases only ===")
    purchases = repo.fetch_where([("event_type", "=", "purchase")])
    for event in purchases:
        print("  user={} amount={}".format(event.user_id, event.amount))

    # Aggregation query using query builder
    qry = (
        Select(dialect)
        .from_(
            Event,
            {
                Event.event_type: None,
                Fn.count(): "cnt",
                Fn.sum(Event.amount): "total",
                Fn.round(Fn.avg(Event.amount), 2): "avg_amount",
            },
        )
        .group(Event.event_type)
    )
    print("\n=== Aggregation by event type ===")
    sql, values = qry.assemble()
    print("SQL:", sql)
    with conn.cursor() as c:
        for row in c.fetchall(sql, values):
            print("  {}".format(row))

    # Per-user purchase totals
    qry = (
        Select(dialect)
        .from_(
            Event,
            {
                Event.user_id: None,
                Fn.sum(Event.amount): "total_spent",
                Fn.count(): "num_purchases",
            },
        )
        .where(Event.event_type, "=", "purchase")
        .group(Event.user_id)
    )
    print("\n=== Per-user purchase totals ===")
    sql, values = qry.assemble()
    with conn.cursor() as c:
        for row in c.fetchall(sql, values):
            print("  {}".format(row))


def cleanup(conn):
    with conn.cursor() as c:
        c.exec("DROP TABLE IF EXISTS events")
        c.close()


def main():
    conn = ClickHouseConnection(**db_cfg)

    try:
        create_schema(conn)
        seed_data(conn)
        introspect(conn)
        query_examples(conn)
    finally:
        cleanup(conn)
        conn.close()


if __name__ == "__main__":
    main()

Source: examples/clickhouse/clickhouse_example.py

Running the Examples

SQLite and query builder examples run directly:

python examples/repository/crud_sqlite.py
python examples/query_builder/fn_aggregation.py

For PostgreSQL and ClickHouse examples, a Docker Compose file is provided:

# Start database services
docker compose -f examples/docker-compose.yml up -d --wait

# Run an example
python examples/clickhouse/clickhouse_example.py

# Stop services
docker compose -f examples/docker-compose.yml down

Test Harness

A pytest test harness validates all examples:

# Run only SQLite/query builder tests (no Docker needed)
./examples/run_tests.sh sqlite

# Run all tests (starts and stops Docker automatically)
./examples/run_tests.sh

# Run all tests, keep containers running afterward
./examples/run_tests.sh --no-teardown