Examples¶
Complete, runnable examples are available in the examples/ directory. SQLite and query builder examples run standalone with no external dependencies. PostgreSQL, ClickHouse, and bookstore examples require a running database server.
Repository¶
CRUD with SQLite¶
Basic insert, fetch, update, and delete operations using an in-memory SQLite database. No external dependencies required.
"""
Basic CRUD operations with SQLite (no external dependencies required).
Demonstrates:
- Sqlite3Connection with in-memory database
- fieldmapper record definitions
- Repository insert, fetch, update, delete
- fetch_where with conditions
"""
from rick_db import fieldmapper, Repository
from rick_db.backend.sqlite import Sqlite3Connection
@fieldmapper(tablename="users", pk="id_user")
class User:
id = "id_user"
name = "name"
email = "email"
active = "active"
def create_schema(conn):
with conn.cursor() as c:
c.exec(
"""
CREATE TABLE users (
id_user INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT NOT NULL,
active INTEGER DEFAULT 1
)
"""
)
c.close()
def main():
conn = Sqlite3Connection(":memory:")
create_schema(conn)
repo = Repository(conn, User)
# Insert
alice_id = repo.insert_pk(User(name="Alice", email="alice@example.com", active=1))
bob_id = repo.insert_pk(User(name="Bob", email="bob@example.com", active=1))
charlie_id = repo.insert_pk(User(name="Charlie", email="charlie@example.com", active=0))
print("Inserted Alice (id={}), Bob (id={}), Charlie (id={})".format(alice_id, bob_id, charlie_id))
# Fetch all
print("\nAll users:")
for user in repo.fetch_all():
print(" {} - {} ({})".format(user.id, user.name, user.email))
# Fetch by primary key
alice = repo.fetch_pk(alice_id)
print("\nFetched by pk: {} - {}".format(alice.name, alice.email))
# Fetch with conditions
active_users = repo.fetch_where([("active", "=", 1)])
print("\nActive users:")
for user in active_users:
print(" {} - {}".format(user.name, user.email))
# Update
alice.name = "Alice Updated"
repo.update(alice)
alice = repo.fetch_pk(alice_id)
print("\nAfter update: {} - {}".format(alice.name, alice.email))
# Delete
repo.delete_pk(charlie_id)
print("\nAfter deleting Charlie:")
for user in repo.fetch_all():
print(" {} - {}".format(user.name, user.email))
conn.close()
if __name__ == "__main__":
main()
Source: examples/repository/crud_sqlite.py
Transactions¶
Transaction context manager with automatic commit on success and rollback on exception. Demonstrates a funds transfer between accounts.
"""
Transaction usage with automatic commit/rollback.
Demonstrates:
- Repository.transaction() context manager
- Automatic commit on success
- Automatic rollback on exception
- Nested transactions via savepoints
"""
from rick_db import fieldmapper, Repository
from rick_db.backend.sqlite import Sqlite3Connection
@fieldmapper(tablename="accounts", pk="id_account")
class Account:
id = "id_account"
name = "name"
balance = "balance"
@fieldmapper(tablename="transfers", pk="id_transfer")
class Transfer:
id = "id_transfer"
from_account = "from_account"
to_account = "to_account"
amount = "amount"
def create_schema(conn):
with conn.cursor() as c:
c.exec(
"""
CREATE TABLE accounts (
id_account INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
balance REAL NOT NULL DEFAULT 0
)
"""
)
c.exec(
"""
CREATE TABLE transfers (
id_transfer INTEGER PRIMARY KEY AUTOINCREMENT,
from_account INTEGER NOT NULL,
to_account INTEGER NOT NULL,
amount REAL NOT NULL
)
"""
)
c.close()
def transfer_funds(account_repo, transfer_repo, from_id, to_id, amount):
"""Transfer funds between accounts inside a transaction."""
with account_repo.transaction():
sender = account_repo.fetch_pk(from_id)
receiver = account_repo.fetch_pk(to_id)
if sender.balance < amount:
raise ValueError(
"Insufficient funds: {} has {}, needs {}".format(
sender.name, sender.balance, amount
)
)
sender.balance -= amount
receiver.balance += amount
account_repo.update(sender)
account_repo.update(receiver)
transfer_repo.insert_pk(
Transfer(from_account=from_id, to_account=to_id, amount=amount)
)
print(
"Transferred {} from {} to {}".format(amount, sender.name, receiver.name)
)
def print_balances(repo):
for acc in repo.fetch_all():
print(" {} balance: {}".format(acc.name, acc.balance))
def main():
conn = Sqlite3Connection(":memory:")
create_schema(conn)
account_repo = Repository(conn, Account)
transfer_repo = Repository(conn, Transfer)
# Setup accounts
alice_id = account_repo.insert_pk(Account(name="Alice", balance=1000.0))
bob_id = account_repo.insert_pk(Account(name="Bob", balance=500.0))
print("Initial balances:")
print_balances(account_repo)
# Successful transfer
print("\n--- Transfer $200 from Alice to Bob ---")
transfer_funds(account_repo, transfer_repo, alice_id, bob_id, 200.0)
print("\nBalances after transfer:")
print_balances(account_repo)
# Failed transfer (insufficient funds) - rolled back
print("\n--- Attempt transfer $2000 from Bob to Alice (should fail) ---")
try:
transfer_funds(account_repo, transfer_repo, bob_id, alice_id, 2000.0)
except ValueError as e:
print("Transaction rolled back: {}".format(e))
print("\nBalances after failed transfer (unchanged):")
print_balances(account_repo)
conn.close()
if __name__ == "__main__":
main()
Source: examples/repository/transactions.py
DbGrid Search¶
Searchable, filterable, paginated data listings using DbGrid. Shows text search, match filters, sorting, pagination, and search types.
"""
DbGrid usage for searchable, filterable, paginated listings.
Demonstrates:
- DbGrid with text search
- Filtering with match_fields
- Sorting and pagination
- Different search types (ANY, START, END)
"""
from rick_db import fieldmapper, Repository, DbGrid
from rick_db.backend.sqlite import Sqlite3Connection
@fieldmapper(tablename="products", pk="id_product")
class Product:
id = "id_product"
name = "name"
category = "category"
price = "price"
active = "active"
def create_schema(conn):
with conn.cursor() as c:
c.exec(
"""
CREATE TABLE products (
id_product INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
category TEXT NOT NULL,
price REAL NOT NULL,
active INTEGER DEFAULT 1
)
"""
)
c.close()
def seed_data(repo):
products = [
Product(name="Python Cookbook", category="books", price=45.99, active=1),
Product(name="SQL Fundamentals", category="books", price=39.99, active=1),
Product(name="Python Sticker Pack", category="merch", price=9.99, active=1),
Product(name="Database Design Guide", category="books", price=54.99, active=1),
Product(name="SQL Cheat Sheet Poster", category="merch", price=14.99, active=0),
Product(name="Advanced Python Patterns", category="books", price=49.99, active=1),
Product(name="Python Mug", category="merch", price=12.99, active=1),
Product(name="PostgreSQL Administration", category="books", price=59.99, active=1),
]
for p in products:
repo.insert_pk(p)
def print_results(total, rows):
print(" Total matching: {}".format(total))
for row in rows:
print(
" [{}] {} - ${} ({})".format(
row.category, row.name, row.price, "active" if row.active else "inactive"
)
)
def main():
conn = Sqlite3Connection(":memory:")
create_schema(conn)
repo = Repository(conn, Product)
seed_data(repo)
# Search across name and category fields
grid = DbGrid(repo, search_fields=[Product.name, Product.category])
# Text search
print('Search for "python":')
total, rows = grid.run(search_text="python")
print_results(total, rows)
# Text search + filter by active
print('\nSearch for "sql", active only:')
total, rows = grid.run(
search_text="sql",
match_fields={Product.active: 1},
)
print_results(total, rows)
# Filter by category, sorted by price descending
print("\nAll books, sorted by price (desc):")
total, rows = grid.run(
match_fields={Product.category: "books"},
sort_fields={Product.price: "DESC"},
)
print_results(total, rows)
# Pagination: 3 items per page
print("\nAll products, page 1 (3 per page):")
total, rows = grid.run(
limit=3,
offset=0,
sort_fields={Product.id: "ASC"},
)
print_results(total, rows)
print("\nAll products, page 2 (3 per page):")
total, rows = grid.run(
limit=3,
offset=3,
sort_fields={Product.id: "ASC"},
)
print_results(total, rows)
# Search type: starts with
grid_start = DbGrid(
repo,
search_fields=[Product.name],
search_type=DbGrid.SEARCH_START,
)
print('\nSearch starts with "python":')
total, rows = grid_start.run(search_text="python")
print_results(total, rows)
conn.close()
if __name__ == "__main__":
main()
Source: examples/repository/dbgrid_search.py
Bookstore (PostgreSQL)¶
Custom repository with JOINs and aggregate queries. Requires a running PostgreSQL instance.
from rick_db import fieldmapper, Repository
from rick_db.backend.pg import PgConnectionPool
from rick_db.sql import Select, Literal
@fieldmapper(tablename='publisher', pk='id_publisher')
class Publisher:
id = 'id_publisher'
name = 'name'
@fieldmapper(tablename='book', pk='id_book')
class Book:
id = 'id_book'
title = 'title'
total_pages = 'total_pages'
rating = 'rating'
isbn = 'isbn'
published = 'published_date'
fk_publisher = 'fk_publisher'
@fieldmapper(tablename='author', pk='id_author')
class Author:
id = 'id_author'
first_name = 'first_name'
middle_name = 'middle_name'
last_name = 'last_name'
@fieldmapper(tablename='book_author', pk='id_book_author')
class BookAuthor:
id = 'id_book_author'
fk_book = 'fk_book'
fk_author = 'fk_author'
class AuthorRepository(Repository):
def __init__(self, db):
super().__init__(db, Author)
def calc_avg_rating(self, id_author: int):
"""
Calculate average rating for a given author
:param id_author:
:return: average rating, if any
"""
# generated query:
# SELECT avg(rating) AS "rating" FROM "book" INNER JOIN "book_author" ON
# "book"."id_book"="book_author"."fk_book" WHERE ("fk_author" = %s)
qry = Select(self.dialect). \
from_(Book, {Literal("avg({})".format(Book.rating)): 'rating'}). \
join(BookAuthor, BookAuthor.fk_book, Book, Book.id). \
where(BookAuthor.fk_author, '=', id_author)
# retrieve result as list of type Book (to get the rating field)
rset = self.fetch(qry, cls=Book)
if len(rset) > 0:
return rset.pop(0).rating
return 0
def books(self, id_author: int) -> list[Book]:
"""
Retrieve all books for the given author
:return: list[Book]
"""
qry = Select(self.dialect). \
from_(Book). \
join(BookAuthor, BookAuthor.fk_book, Book, Book.id). \
where(BookAuthor.fk_author, '=', id_author)
return self.fetch(qry, cls=Book)
def dump_author_rating(repo: AuthorRepository):
for author in repo.fetch_all():
# calculate average
rating = repo.calc_avg_rating(author.id)
# print book list
print("Books by {firstname} {lastname}:".format(firstname=author.first_name, lastname=author.last_name))
for book in repo.books(author.id):
print(book.title)
# print average rating
print("Average rating for {firstname} {lastname} is {rating}".
format(firstname=author.first_name, lastname=author.last_name, rating=rating))
if __name__ == '__main__':
db_cfg = {
'dbname': "rickdb-bookstore",
'user': "rickdb_user",
'password': "rickdb_pass",
'host': "localhost",
'port': 5432,
'sslmode': 'require'
}
pool = PgConnectionPool(**db_cfg)
repo = AuthorRepository(pool)
dump_author_rating(repo)
Source: examples/repository/example_bookstore.py
Query Builder¶
Fn Helper Functions¶
Aggregate, math, and general SQL functions using the Fn class. Covers COUNT, SUM, AVG,
ROUND, COALESCE, CAST, nesting, GROUP BY with HAVING, and dict-style column aliases.
"""
Fn helper class for SQL aggregate and math functions.
Demonstrates:
- Fn.count, Fn.sum, Fn.avg, Fn.min, Fn.max
- Fn.round with nesting (Fn.round(Fn.avg(...)))
- Fn.coalesce, Fn.cast
- GROUP BY with HAVING
- Dict-style column aliases with Fn expressions
"""
from rick_db.sql import Select, Fn, PgSqlDialect
dialect = PgSqlDialect()
# -- Basic aggregates with aliases --
# COUNT(*) with alias
qry, _ = Select(dialect).from_("orders", {Fn.count(): "total_orders"}).assemble()
# SELECT COUNT(*) AS "total_orders" FROM "orders"
print(qry)
# COUNT on a specific field
qry, _ = Select(dialect).from_("orders", {Fn.count("discount"): "discounted_orders"}).assemble()
# SELECT COUNT(discount) AS "discounted_orders" FROM "orders"
print(qry)
# -- Multiple aggregates with GROUP BY --
qry, _ = (
Select(dialect)
.from_(
"orders",
{
"category": None,
Fn.count(): "order_count",
Fn.sum("amount"): "total_amount",
Fn.avg("amount"): "avg_amount",
Fn.min("amount"): "min_amount",
Fn.max("amount"): "max_amount",
},
)
.group("category")
.assemble()
)
# SELECT "category",COUNT(*) AS "order_count",SUM(amount) AS "total_amount",...
print(qry)
# -- Nested functions: ROUND(AVG(...), 2) --
qry, _ = (
Select(dialect)
.from_(
"orders",
{
"category": None,
Fn.round(Fn.avg("amount"), 2): "avg_rounded",
},
)
.group("category")
.assemble()
)
# SELECT "category",ROUND(AVG(amount), 2) AS "avg_rounded" FROM "orders" GROUP BY "category"
print(qry)
# -- GROUP BY with HAVING --
qry, values = (
Select(dialect)
.from_(
"orders",
{
"category": None,
Fn.count(): "cnt",
Fn.sum("amount"): "total",
},
)
.group("category")
.having(Fn.count(), ">", 10)
.assemble()
)
# SELECT ... FROM "orders" GROUP BY "category" HAVING (COUNT(*) > %s)
print(qry)
print("values:", values)
# -- Math functions --
qry, _ = (
Select(dialect)
.from_(
"measurements",
{
"sensor_id": None,
Fn.abs("reading"): "abs_reading",
Fn.ceil("reading"): "ceil_reading",
Fn.floor("reading"): "floor_reading",
Fn.round("reading", 3): "rounded",
Fn.sqrt(Fn.abs("reading")): "sqrt_abs",
},
)
.assemble()
)
print(qry)
# -- COALESCE and CAST --
qry, _ = (
Select(dialect)
.from_(
"users",
{
"name": None,
Fn.coalesce("nickname", "name"): "display_name",
Fn.cast("created_at", "date"): "created_date",
},
)
.assemble()
)
# SELECT "name",COALESCE(nickname, name) AS "display_name",CAST(created_at AS date) AS "created_date" FROM "users"
print(qry)
Source: examples/query_builder/fn_aggregation.py
Recursive CTEs¶
Common Table Expressions with the With builder. Includes recursive tree walks, number series
generation, non-recursive CTEs, and multiple CTE clauses.
"""
Recursive CTE (Common Table Expression) for walking a tree structure.
Demonstrates:
- With() builder for CTEs
- Recursive CTEs with UNION ALL
- Non-recursive CTEs
- Multiple CTE clauses
"""
from rick_db import fieldmapper
from rick_db.sql import Select, With, Literal, Fn, Sql, PgSqlDialect
dialect = PgSqlDialect()
# -- Example 1: Recursive tree walk (e.g. folder hierarchy) --
@fieldmapper(tablename="folder", pk="id_folder")
class Folder:
id = "id_folder"
name = "name"
parent = "fk_parent"
# Walk all descendants of folder id=1
# Base case: the root folder
base = Select(dialect).from_({Folder: "f1"}).where(Folder.id, "=", 1)
# Recursive case: join children via parent FK
recursive = (
Select(dialect)
.from_({Folder: "f2"})
.join("folder_tree", Folder.parent, "f2", Folder.id)
)
# Combine with UNION ALL
union = Select(dialect).union([base, recursive], Sql.SQL_UNION_ALL)
# Build the CTE
qry, values = (
With(dialect)
.recursive()
.clause("folder_tree", union)
.query(Select(dialect).from_("folder_tree"))
.assemble()
)
# WITH RECURSIVE "folder_tree" AS (
# SELECT "f1".* FROM "folder" AS "f1" WHERE ("id_folder" = %s)
# UNION ALL
# SELECT "f2".* FROM "folder" AS "f2" INNER JOIN "folder_tree" ON "f2"."id_folder"="folder_tree"."fk_parent"
# ) SELECT "folder_tree".* FROM "folder_tree"
print("=== Recursive tree walk ===")
print(qry)
print("values:", values)
# -- Example 2: Recursive number series with column specification --
# Generate numbers 1 to 10 using a recursive CTE
base_num = Select(dialect).expr([Literal("1 AS n")])
recursive_num = Select(dialect).from_("nums", {Literal("n + 1"): "n"}).where("n", "<", 10)
union_num = Select(dialect).union([base_num, recursive_num], Sql.SQL_UNION_ALL)
qry, values = (
With(dialect)
.recursive()
.clause("nums", union_num, columns=["n"])
.query(Select(dialect).from_("nums"))
.assemble()
)
# WITH RECURSIVE "nums"("n") AS (
# SELECT 1 AS n UNION ALL SELECT n + 1 AS "n" FROM "nums" WHERE ("n" < %s)
# ) SELECT "nums".* FROM "nums"
print("\n=== Recursive number series ===")
print(qry)
print("values:", values)
# -- Example 3: Non-recursive CTE for readability --
# Use a CTE to pre-filter, then query the result
cte_query = (
Select(dialect)
.from_("orders", ["customer_id", "amount"])
.where("amount", ">", 100)
)
main_query = (
Select(dialect)
.from_("big_orders", {"customer_id": None, Fn.sum("amount"): "total"})
.group("customer_id")
)
qry, values = (
With(dialect)
.clause("big_orders", cte_query)
.query(main_query)
.assemble()
)
print("\n=== Non-recursive CTE ===")
print(qry)
print("values:", values)
# -- Example 4: Multiple CTE clauses --
active_users = (
Select(dialect)
.from_("users", ["id", "name"])
.where("active", "=", True)
)
user_totals = (
Select(dialect)
.from_("orders", {"user_id": None, Fn.sum("amount"): "total"})
.group("user_id")
)
main = (
Select(dialect)
.from_("au", ["name"])
.join("ut", "user_id", "au", "id", cols=["total"])
)
qry, values = (
With(dialect)
.clause("au", active_users)
.clause("ut", user_totals)
.query(main)
.assemble()
)
print("\n=== Multiple CTEs ===")
print(qry)
print("values:", values)
Source: examples/query_builder/cte_recursive.py
JSON Queries (PostgreSQL)¶
JSON field operations using PgJsonField. Covers text extraction (->>), object extraction (->),
bracket notation for nested access, json_where(), json_extract(), contains checks, and path queries.
"""
JSON field operations with PostgreSQL.
Demonstrates:
- PgJsonField extraction (->>, ->)
- Bracket notation for nested access
- json_where() for filtering on JSON fields
- json_extract() for selecting JSON values
- Contains and path queries
Note: These examples generate SQL strings; they require a PostgreSQL
connection to execute.
"""
from rick_db.sql import Select, PgJsonField, PgSqlDialect, Literal
dialect = PgSqlDialect()
# -- Basic JSON extraction --
jf = PgJsonField("metadata", dialect)
# Extract as text (uses ->>)
qry, _ = (
Select(dialect)
.from_("events", ["id", jf.extract_text("name", alias="event_name")])
.assemble()
)
# SELECT "id","metadata"::jsonb->>'name' AS "event_name" FROM "events"
print("=== Extract text ===")
print(qry)
# Extract as JSON object (uses ->, preserves type)
qry, _ = (
Select(dialect)
.from_("events", ["id", jf.extract_object("payload", alias="event_payload")])
.assemble()
)
print("\n=== Extract object ===")
print(qry)
# -- Bracket notation for nested access --
jf = PgJsonField("profile", dialect)
# Access nested fields: profile->"address"->"city"
# Bracket notation returns a PgJsonField; wrap with Literal() for use in where()
city_field = Literal(str(jf["address"]["city"]))
qry, values = (
Select(dialect)
.from_("users")
.where(city_field, "=", "New York")
.assemble()
)
print("\n=== Nested bracket notation ===")
print(qry)
print("values:", values)
# -- WHERE on JSON fields using json_where --
qry, values = (
Select(dialect)
.from_("users")
.json_where("profile", "active", "=", True)
.assemble()
)
print("\n=== json_where ===")
print(qry)
print("values:", values)
# -- JSON extraction in SELECT using json_extract --
qry, _ = (
Select(dialect)
.from_("users", ["id"])
.json_extract("profile", "name", "user_name")
.assemble()
)
print("\n=== json_extract ===")
print(qry)
# -- Contains check (@>) --
jf = PgJsonField("tags", dialect)
expr = jf.contains('["python", "sql"]')
print("\n=== Contains ===")
print(str(expr))
# -- Path query (PostgreSQL 12+, uses @?) --
jf = PgJsonField("data", dialect)
expr = jf.path_query("$.items[*].name")
print("\n=== Path query ===")
print(str(expr))
# -- Combined example: filter + extract + sort --
jf = PgJsonField("metadata", dialect)
qry, values = (
Select(dialect)
.from_("events", {
"id": None,
"event_type": None,
jf.extract_text("severity", alias="severity"): None,
jf.extract_text("source", alias="source"): None,
})
.where("event_type", "=", "error")
.json_where("metadata", "severity", "=", "critical")
.order("id", Select.ORDER_DESC)
.limit(50)
.assemble()
)
print("\n=== Combined filter + extract ===")
print(qry)
print("values:", values)
Source: examples/query_builder/json_queries.py
Complex Query (PostgreSQL)¶
Advanced SELECT with subquery joins, Literal expressions for multi-condition ON clauses, and PostgreSQL system catalog introspection.
"""
Complex example 1: Adapting PgInfo's list_foreign keys query using Query Builder
Features:
- Select() column limitations;
- Select() as a source table for JOIN;
- Arbitrary Literal() expressions for JOIN ON complex expressions;
Original Query:
SELECT sh.nspname AS table_schema,
tbl.relname AS table_name,
col.attname AS column_name,
referenced_sh.nspname AS foreign_table_schema,
referenced_tbl.relname AS foreign_table_name,
referenced_field.attname AS foreign_column_name
FROM pg_constraint c
INNER JOIN pg_namespace AS sh ON sh.oid = c.connamespace
INNER JOIN (SELECT oid, unnest(conkey) as conkey FROM pg_constraint) con ON c.oid = con.oid
INNER JOIN pg_class tbl ON tbl.oid = c.conrelid
INNER JOIN pg_attribute col ON (col.attrelid = tbl.oid AND col.attnum = con.conkey)
INNER JOIN pg_class referenced_tbl ON c.confrelid = referenced_tbl.oid
INNER JOIN pg_namespace AS referenced_sh ON referenced_sh.oid = referenced_tbl.relnamespace
INNER JOIN (SELECT oid, unnest(confkey) as confkey FROM pg_constraint) conf ON c.oid = conf.oid
INNER JOIN pg_attribute referenced_field ON
(referenced_field.attrelid = c.confrelid AND referenced_field.attnum = conf.confkey)
WHERE c.contype = 'f' AND sh.nspname = %s and tbl.relname = %s
"""
from rick_db.backend.pg import PgConnection, ForeignKeyRecord
from rick_db.sql import Select, Literal
db_cfg = {
'dbname': "rickdb-bookstore",
'user': "rickdb_user",
'password': "rickdb_pass",
'host': "localhost",
'port': 5432,
'sslmode': 'require'
}
# Connect to Database
conn = PgConnection(**db_cfg)
schema = 'public'
table_name = 'acl_role_resource'
# Build Query
# subqueries that are used as source join tables
subqry1 = Select(conn.dialect()).from_('pg_constraint', ['oid', {Literal('unnest(conkey)'): 'conkey'}])
subqry2 = Select(conn.dialect()).from_('pg_constraint', ['oid', {Literal('unnest(confkey)'): 'confkey'}])
# Main query
qry, values = Select(conn.dialect()) \
.from_({'pg_constraint': 'c'},
# please note: Select() must always select columns from the initial table, so we settle on contype (always 'f')
cols=['contype']) \
.join({'pg_namespace': 'sh'}, 'oid', 'c', 'connamespace', cols=[{'nspname': ForeignKeyRecord.schema}]) \
.join({subqry1: 'con'}, 'oid', 'c', 'oid') \
.join({'pg_class': 'tbl'}, 'oid', 'c', 'conrelid', cols=[{'relname': ForeignKeyRecord.table}]) \
.join({'pg_attribute': 'col'}, Literal('col.attrelid=tbl.oid AND col.attnum=con.conkey'),
# join() does not support multi-condition join on, so we hardcode the condition as a Literal
cols=[{'attname': ForeignKeyRecord.column}]) \
.join({'pg_class': 'referenced_tbl'}, 'oid', 'c', 'conrelid', cols={'relname': ForeignKeyRecord.foreign_table}) \
.join({'pg_namespace': 'referenced_sh'}, 'oid', 'referenced_tbl', 'relnamespace',
cols=[{'nspname': ForeignKeyRecord.foreign_schema}]) \
.join({subqry2: 'conf'}, 'oid', 'c', 'oid') \
.join({'pg_attribute': 'referenced_field'},
# join() does not support multi-condition join on, so we hardcode the condition as a Literal
Literal('referenced_field.attrelid = c.confrelid AND referenced_field.attnum = conf.confkey'),
cols={'attname': ForeignKeyRecord.foreign_column}) \
.where('contype', '=', 'f') \
.where({'sh': 'nspname'}, '=', schema) \
.where({'tbl': 'relname'}, '=', table_name) \
.assemble()
# execute query
with conn.cursor() as c:
for r in c.exec(qry, values, cls=ForeignKeyRecord):
print(r.asdict)
Source: examples/query_builder/complex_query_01.py
Migrations¶
Migration Workflow¶
Programmatic migration workflow using SQLite. Demonstrates installing the migration table, executing migrations, idempotent re-runs, and schema verification.
"""
Programmatic migration workflow using SQLite (no external dependencies).
Demonstrates:
- Installing the migration tracking table
- Executing migrations with SQL content
- Listing applied migrations
- Checking migration status
"""
from rick_db import fieldmapper, Repository
from rick_db.backend.sqlite import Sqlite3Connection, Sqlite3Manager, Sqlite3MigrationManager
from rick_db.migrations import MigrationRecord
# SQL content for each migration
MIGRATIONS = [
(
"001_create_users",
"""
CREATE TABLE users (
id_user INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT NOT NULL UNIQUE,
active INTEGER DEFAULT 1
);
""",
),
(
"002_create_orders",
"""
CREATE TABLE orders (
id_order INTEGER PRIMARY KEY AUTOINCREMENT,
fk_user INTEGER NOT NULL,
amount REAL NOT NULL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (fk_user) REFERENCES users(id_user)
);
""",
),
(
"003_add_user_role",
"""
ALTER TABLE users ADD COLUMN role TEXT DEFAULT 'user';
""",
),
]
def run_migrations(conn):
mgr = Sqlite3Manager(conn)
mm = Sqlite3MigrationManager(mgr)
# Install the migration tracking table if not already present
if not mm.is_installed():
result = mm.install()
if result.success:
print("Migration table installed.")
else:
print("Failed to install migration table: {}".format(result.error))
return
# Check which migrations are already applied
applied = {m.name for m in mm.list()}
print("Already applied: {}".format(applied if applied else "(none)"))
# Execute pending migrations
pending = [(name, sql) for name, sql in MIGRATIONS if name not in applied]
if not pending:
print("No pending migrations.")
return
for name, sql in pending:
record = MigrationRecord(name=name)
result = mm.execute(record, sql)
if result.success:
print("Applied: {}".format(name))
else:
print("Failed: {} - {}".format(name, result.error))
return
# List all applied migrations
print("\nAll applied migrations:")
for m in mm.list():
print(" {} (applied: {})".format(m.name, m.applied))
def verify_schema(conn):
"""Verify the migrations worked by checking the schema."""
mgr = Sqlite3Manager(conn)
print("\nTables in database:")
for table in mgr.tables():
print(" {}".format(table))
fields = mgr.table_fields(table)
for f in fields:
print(" - {} ({})".format(f.field, f.type))
def main():
conn = Sqlite3Connection(":memory:")
print("=== Running migrations ===")
run_migrations(conn)
print("\n=== Running again (should be idempotent) ===")
run_migrations(conn)
verify_schema(conn)
# Verify by inserting data through the migrated schema
@fieldmapper(tablename="users", pk="id_user")
class User:
id = "id_user"
name = "name"
email = "email"
repo = Repository(conn, User)
user_id = repo.insert_pk(User(name="Alice", email="alice@example.com"))
user = repo.fetch_pk(user_id)
print("\nInserted user via migrated schema: {} - {}".format(user.name, user.email))
conn.close()
if __name__ == "__main__":
main()
Source: examples/migrations/migration_workflow.py
ClickHouse¶
ClickHouse Example¶
Connection setup, schema introspection with ClickHouseManager, repository CRUD, and aggregate
queries using the query builder. Requires a running ClickHouse server.
"""
ClickHouse connection, repository, and introspection example.
Demonstrates:
- ClickHouseConnection setup
- ClickHouseManager for schema introspection
- Repository usage with ClickHouse
- Query builder with ClickHouseSqlDialect
Requirements:
- A running ClickHouse server (default: localhost:8123)
- pip install clickhouse-connect
"""
from rick_db import fieldmapper, Repository
from rick_db.backend.clickhouse import ClickHouseConnection, ClickHouseManager
from rick_db.sql import Select, Fn
@fieldmapper(tablename="events", pk="id")
class Event:
id = "id"
event_type = "event_type"
user_id = "user_id"
amount = "amount"
created_at = "created_at"
# -- Connection setup --
db_cfg = {
"host": "localhost",
"port": 8123,
"database": "default",
"username": "default",
"password": "",
}
def create_schema(conn):
"""Create the example table using raw SQL."""
with conn.cursor() as c:
c.exec(
"""
CREATE TABLE IF NOT EXISTS events (
id UInt64,
event_type String,
user_id UInt32,
amount Float64,
created_at DateTime DEFAULT now()
) ENGINE = MergeTree()
ORDER BY (id, created_at)
"""
)
c.close()
def seed_data(conn):
"""Insert sample data."""
with conn.cursor() as c:
c.exec(
"""
INSERT INTO events (id, event_type, user_id, amount) VALUES
(1, 'purchase', 101, 29.99),
(2, 'purchase', 102, 149.50),
(3, 'refund', 101, 29.99),
(4, 'purchase', 103, 75.00),
(5, 'purchase', 101, 50.00),
(6, 'purchase', 102, 200.00),
(7, 'refund', 103, 75.00),
(8, 'purchase', 104, 99.99)
"""
)
c.close()
def introspect(conn):
"""Database introspection with ClickHouseManager."""
mgr = ClickHouseManager(conn)
print("=== Introspection ===")
print("Databases:", mgr.databases())
print("Tables:", mgr.tables())
if mgr.table_exists("events"):
print("\nFields in 'events':")
for f in mgr.table_fields("events"):
print(" {} ({})".format(f.field, f.field_type))
pk = mgr.table_pk("events")
if pk:
print("Primary key:", pk.field)
def query_examples(conn):
"""Query builder examples with ClickHouse dialect."""
repo = Repository(conn, Event)
dialect = conn.dialect()
# Fetch all events
print("\n=== All events ===")
for event in repo.fetch_all():
print(
" id={} type={} user={} amount={}".format(
event.id, event.event_type, event.user_id, event.amount
)
)
# Fetch with WHERE
print("\n=== Purchases only ===")
purchases = repo.fetch_where([("event_type", "=", "purchase")])
for event in purchases:
print(" user={} amount={}".format(event.user_id, event.amount))
# Aggregation query using query builder
qry = (
Select(dialect)
.from_(
Event,
{
Event.event_type: None,
Fn.count(): "cnt",
Fn.sum(Event.amount): "total",
Fn.round(Fn.avg(Event.amount), 2): "avg_amount",
},
)
.group(Event.event_type)
)
print("\n=== Aggregation by event type ===")
sql, values = qry.assemble()
print("SQL:", sql)
with conn.cursor() as c:
for row in c.fetchall(sql, values):
print(" {}".format(row))
# Per-user purchase totals
qry = (
Select(dialect)
.from_(
Event,
{
Event.user_id: None,
Fn.sum(Event.amount): "total_spent",
Fn.count(): "num_purchases",
},
)
.where(Event.event_type, "=", "purchase")
.group(Event.user_id)
)
print("\n=== Per-user purchase totals ===")
sql, values = qry.assemble()
with conn.cursor() as c:
for row in c.fetchall(sql, values):
print(" {}".format(row))
def cleanup(conn):
with conn.cursor() as c:
c.exec("DROP TABLE IF EXISTS events")
c.close()
def main():
conn = ClickHouseConnection(**db_cfg)
try:
create_schema(conn)
seed_data(conn)
introspect(conn)
query_examples(conn)
finally:
cleanup(conn)
conn.close()
if __name__ == "__main__":
main()
Source: examples/clickhouse/clickhouse_example.py
Running the Examples¶
SQLite and query builder examples run directly:
For PostgreSQL and ClickHouse examples, a Docker Compose file is provided:
# Start database services
docker compose -f examples/docker-compose.yml up -d --wait
# Run an example
python examples/clickhouse/clickhouse_example.py
# Stop services
docker compose -f examples/docker-compose.yml down
Test Harness¶
A pytest test harness validates all examples: