Skip to content

Python ORM integration

Use these recipes when your Python app already owns the SQLite connection through SQLAlchemy, SQLModel, or Django. The ORM keeps the business transaction; Honker contributes SQL functions that enqueue, notify, or stream inside that transaction.

SQLModel’s Session inherits from SQLAlchemy’s, so the wiring is identical for both. The only difference is which wrapper you use on top.

from sqlalchemy import create_engine, event
from sqlalchemy.orm import Session
engine = create_engine("sqlite:///app.db")
@event.listens_for(engine, "connect")
def _load_honker(conn, _):
conn.enable_load_extension(True)
conn.load_extension("/path/to/libhonker_ext")
conn.execute("SELECT honker_bootstrap()")

Python’s stdlib sqlite3 must be built with SQLITE_ENABLE_LOAD_EXTENSION. Homebrew Python and most Linux distros have it on; the python.org macOS installer does not. Check with hasattr(sqlite3.connect(":memory:"), "enable_load_extension").

yourapp/honker_ext.py
import json
from typing import Any, Optional
from sqlalchemy import text
from sqlalchemy.orm import Session
class Queue:
def __init__(self, name: str, *, max_attempts: int = 3):
self.name = name
self.max_attempts = max_attempts
def enqueue(
self,
s: Session,
payload: Any,
*,
delay: Optional[int] = None,
priority: int = 0,
) -> int:
return s.execute(
text(
"SELECT honker_enqueue("
" :q, :p, NULL, :d, :pr, :ma, NULL"
") AS id"
),
{
"q": self.name,
"p": json.dumps(payload),
"d": delay,
"pr": priority,
"ma": self.max_attempts,
},
).scalar_one()
def notify(s: Session, channel: str, payload: Any = None) -> None:
s.execute(
text("SELECT notify(:c, :p)"),
{"c": channel, "p": json.dumps(payload) if payload is not None else None},
)

Call sites read the way you’d write them with Honker’s native API:

from sqlalchemy.orm import Session
from yourapp.honker_ext import Queue, notify
emails = Queue("emails")
with Session(engine) as s, s.begin():
s.add(Order(user_id=42))
emails.enqueue(s, {"to": "alice@example.com"})
notify(s, "orders", {"id": 42})

SQLModel users get an extra move: use a Pydantic model as the payload type. Validation at enqueue, a typed parse() on the worker side.

yourapp/honker_ext.py
from typing import Generic, Type, TypeVar, Optional
from pydantic import BaseModel
from sqlalchemy import text
from sqlmodel import Session
T = TypeVar("T", bound=BaseModel)
class TypedQueue(Generic[T]):
def __init__(self, name: str, model: Type[T], *, max_attempts: int = 3):
self.name = name
self.model = model
self.max_attempts = max_attempts
def enqueue(
self,
s: Session,
payload: T,
*,
delay: Optional[int] = None,
priority: int = 0,
) -> int:
return s.execute(
text(
"SELECT honker_enqueue("
" :q, :p, NULL, :d, :pr, :ma, NULL"
") AS id"
),
{
"q": self.name,
"p": payload.model_dump_json(),
"d": delay,
"pr": priority,
"ma": self.max_attempts,
},
).scalar_one()
def parse(self, raw: str) -> T:
"""Worker-side: validate a job payload back into the model."""
return self.model.model_validate_json(raw)

Usage:

from pydantic import BaseModel, EmailStr
from sqlmodel import Session
from yourapp.honker_ext import TypedQueue
class EmailJob(BaseModel):
to: EmailStr
subject: str
body: str
emails = TypedQueue("emails", EmailJob)
with Session(engine) as s, s.begin():
s.add(Order(user_id=42))
emails.enqueue(s, EmailJob(
to="alice@example.com", subject="Hi", body="..."))

Worker, in a separate process:

import asyncio, honker
db = honker.open("app.db")
async def run():
async for job in db.queue("emails").claim("worker-1"):
email: EmailJob = emails.parse(job.payload) # validated
await send(email.to, email.subject, email.body)
job.ack()
asyncio.run(run())
settings.py
DATABASES = {
"default": {
"ENGINE": "django.db.backends.sqlite3",
"NAME": BASE_DIR / "app.db",
},
}
# apps.py (or any module imported at startup)
from django.db.backends.signals import connection_created
from django.dispatch import receiver
@receiver(connection_created)
def _load_honker(sender, connection, **kwargs):
if connection.vendor != "sqlite":
return
raw = connection.connection # underlying sqlite3.Connection
raw.enable_load_extension(True)
raw.load_extension("/path/to/libhonker_ext")
raw.execute("SELECT honker_bootstrap()")
yourapp/honker_ext.py
import json
from typing import Any, Optional
from django.db import connection
def enqueue(
queue: str,
payload: Any,
*,
delay: Optional[int] = None,
priority: int = 0,
max_attempts: int = 3,
) -> int:
with connection.cursor() as cur:
cur.execute(
"SELECT honker_enqueue(%s, %s, NULL, %s, %s, %s, NULL)",
[queue, json.dumps(payload), delay, priority, max_attempts],
)
return cur.fetchone()[0]
def notify(channel: str, payload: Any = None) -> None:
with connection.cursor() as cur:
cur.execute(
"SELECT notify(%s, %s)",
[channel, json.dumps(payload) if payload is not None else None],
)
from django.db import transaction
from yourapp.honker_ext import enqueue, notify
with transaction.atomic():
Order.objects.create(user_id=42)
enqueue("emails", {"to": "alice@example.com"})
notify("orders", {"id": 42})

Workers run as a Django management command (same pattern as Celery) using honker.open(settings.DATABASES["default"]["NAME"]).