Skip to content

Queues

A honker queue is a named row group in the _honker_live table. Jobs have a payload, a priority, a run_at time, an attempts counter, and (optionally) an expiration. A worker claims jobs atomically via an indexed UPDATE ... RETURNING, runs them, and acks (which DELETEs the row).

Every language binding shares the same on-disk format, so a Python enqueuer and a Go consumer can run against the same .db file.

import honker
db = honker.open("app.db")
q = db.queue("emails")
q.enqueue({"to": "alice@example.com"})
q.enqueue({"to": "bob@example.com"}, delay=60) # claimable in 60s
q.enqueue({"to": "urgent@example.com"}, priority=10) # higher = picked first
q.enqueue({"to": "timely@example.com"}, expires=3600) # drops out after 1h

Claiming moves a row to state='processing' and sets claim_expires_at = unixepoch() + visibility_timeout_s. If the worker doesn’t ack before that window elapses, another worker can reclaim.

# Async iterator — wakes on database updates or due deadlines.
async for job in q.claim("worker-1"):
try:
await send_email(job.payload)
job.ack()
except Exception as e:
job.retry(delay_s=60, error=str(e))

For handlers that benefit from batching (DB writes, HTTP calls), claim N jobs in one transaction:

jobs = q.claim_batch("worker-1", n=100)
for j in jobs:
process(j.payload)
q.ack_batch([j.id for j in jobs], "worker-1")

Every claim has a claim_expires_at (default 300s). If the worker doesn’t ack or extend before that elapses, another worker can reclaim.

For long-running jobs, call heartbeat periodically to extend the window:

# Python
import asyncio
async def keepalive(job, stop):
while not stop.is_set():
await asyncio.sleep(60)
job.heartbeat(extend_s=300)
async for job in q.claim("worker-1"):
stop = asyncio.Event()
hb = asyncio.create_task(keepalive(job, stop))
try:
await long_running(job.payload)
job.ack()
finally:
stop.set()
hb.cancel()
try:
await hb
except asyncio.CancelledError:
pass

Other bindings expose a plain job.heartbeat(extend_s) that you call on a timer.

  • job.retry(delay_s, error) puts the row back in _honker_live with a new run_at. After max_attempts retries, it auto-moves to _honker_dead.
  • job.fail(error) moves it to _honker_dead unconditionally.
  • _honker_dead rows have last_error set; inspect with SELECT * FROM _honker_dead WHERE queue = 'emails'.
  • _honker_live: all currently-claimable or in-flight jobs. Partial index on (queue, priority DESC, run_at, id) WHERE state IN ('pending', 'processing') for O(log n) claims.
  • _honker_dead: exhausted or manually-failed jobs. Never auto-pruned; DELETE when you want.

A queue with 100k dead rows claims as fast as one with zero because the claim index excludes the dead state.