Python ORM integration
Use these recipes when your Python app already owns the SQLite connection through SQLAlchemy, SQLModel, or Django. The ORM keeps the business transaction; Honker contributes SQL functions that enqueue, notify, or stream inside that transaction.
SQLAlchemy / SQLModel
Section titled “SQLAlchemy / SQLModel”SQLModel’s Session inherits from SQLAlchemy’s, so the wiring is identical for both. The only difference is which wrapper you use on top.
Load the extension
Section titled “Load the extension”from sqlalchemy import create_engine, eventfrom sqlalchemy.orm import Session
engine = create_engine("sqlite:///app.db")
@event.listens_for(engine, "connect")def _load_honker(conn, _): conn.enable_load_extension(True) conn.load_extension("/path/to/libhonker_ext") conn.execute("SELECT honker_bootstrap()")Python’s stdlib sqlite3 must be built with SQLITE_ENABLE_LOAD_EXTENSION. Homebrew Python and most Linux distros have it on; the python.org macOS installer does not. Check with hasattr(sqlite3.connect(":memory:"), "enable_load_extension").
Wrapper: a thin Queue class
Section titled “Wrapper: a thin Queue class”import jsonfrom typing import Any, Optionalfrom sqlalchemy import textfrom sqlalchemy.orm import Session
class Queue: def __init__(self, name: str, *, max_attempts: int = 3): self.name = name self.max_attempts = max_attempts
def enqueue( self, s: Session, payload: Any, *, delay: Optional[int] = None, priority: int = 0, ) -> int: return s.execute( text( "SELECT honker_enqueue(" " :q, :p, NULL, :d, :pr, :ma, NULL" ") AS id" ), { "q": self.name, "p": json.dumps(payload), "d": delay, "pr": priority, "ma": self.max_attempts, }, ).scalar_one()
def notify(s: Session, channel: str, payload: Any = None) -> None: s.execute( text("SELECT notify(:c, :p)"), {"c": channel, "p": json.dumps(payload) if payload is not None else None}, )Call sites read the way you’d write them with Honker’s native API:
from sqlalchemy.orm import Sessionfrom yourapp.honker_ext import Queue, notify
emails = Queue("emails")
with Session(engine) as s, s.begin(): s.add(Order(user_id=42)) emails.enqueue(s, {"to": "alice@example.com"}) notify(s, "orders", {"id": 42})SQLModel: typed payloads with Pydantic
Section titled “SQLModel: typed payloads with Pydantic”SQLModel users get an extra move: use a Pydantic model as the payload type. Validation at enqueue, a typed parse() on the worker side.
from typing import Generic, Type, TypeVar, Optionalfrom pydantic import BaseModelfrom sqlalchemy import textfrom sqlmodel import Session
T = TypeVar("T", bound=BaseModel)
class TypedQueue(Generic[T]): def __init__(self, name: str, model: Type[T], *, max_attempts: int = 3): self.name = name self.model = model self.max_attempts = max_attempts
def enqueue( self, s: Session, payload: T, *, delay: Optional[int] = None, priority: int = 0, ) -> int: return s.execute( text( "SELECT honker_enqueue(" " :q, :p, NULL, :d, :pr, :ma, NULL" ") AS id" ), { "q": self.name, "p": payload.model_dump_json(), "d": delay, "pr": priority, "ma": self.max_attempts, }, ).scalar_one()
def parse(self, raw: str) -> T: """Worker-side: validate a job payload back into the model.""" return self.model.model_validate_json(raw)Usage:
from pydantic import BaseModel, EmailStrfrom sqlmodel import Sessionfrom yourapp.honker_ext import TypedQueue
class EmailJob(BaseModel): to: EmailStr subject: str body: str
emails = TypedQueue("emails", EmailJob)
with Session(engine) as s, s.begin(): s.add(Order(user_id=42)) emails.enqueue(s, EmailJob( to="alice@example.com", subject="Hi", body="..."))Worker, in a separate process:
import asyncio, honker
db = honker.open("app.db")
async def run(): async for job in db.queue("emails").claim("worker-1"): email: EmailJob = emails.parse(job.payload) # validated await send(email.to, email.subject, email.body) job.ack()
asyncio.run(run())Django
Section titled “Django”Wiring
Section titled “Wiring”DATABASES = { "default": { "ENGINE": "django.db.backends.sqlite3", "NAME": BASE_DIR / "app.db", },}
# apps.py (or any module imported at startup)from django.db.backends.signals import connection_createdfrom django.dispatch import receiver
@receiver(connection_created)def _load_honker(sender, connection, **kwargs): if connection.vendor != "sqlite": return raw = connection.connection # underlying sqlite3.Connection raw.enable_load_extension(True) raw.load_extension("/path/to/libhonker_ext") raw.execute("SELECT honker_bootstrap()")Wrapper
Section titled “Wrapper”import jsonfrom typing import Any, Optionalfrom django.db import connection
def enqueue( queue: str, payload: Any, *, delay: Optional[int] = None, priority: int = 0, max_attempts: int = 3,) -> int: with connection.cursor() as cur: cur.execute( "SELECT honker_enqueue(%s, %s, NULL, %s, %s, %s, NULL)", [queue, json.dumps(payload), delay, priority, max_attempts], ) return cur.fetchone()[0]
def notify(channel: str, payload: Any = None) -> None: with connection.cursor() as cur: cur.execute( "SELECT notify(%s, %s)", [channel, json.dumps(payload) if payload is not None else None], )from django.db import transactionfrom yourapp.honker_ext import enqueue, notify
with transaction.atomic(): Order.objects.create(user_id=42) enqueue("emails", {"to": "alice@example.com"}) notify("orders", {"id": 42})Workers run as a Django management command (same pattern as Celery) using honker.open(settings.DATABASES["default"]["NAME"]).