Queues
A honker queue is a named row group in the _honker_live table. Jobs have a payload, a priority, a run_at time, an attempts counter, and (optionally) an expiration. A worker claims jobs atomically via an indexed UPDATE ... RETURNING, runs them, and acks (which DELETEs the row).
Every language binding shares the same on-disk format, so a Python enqueuer and a Go consumer can run against the same .db file.
Enqueue
Section titled “Enqueue”import honker
db = honker.open("app.db")q = db.queue("emails")
q.enqueue({"to": "alice@example.com"})q.enqueue({"to": "bob@example.com"}, delay=60) # claimable in 60sq.enqueue({"to": "urgent@example.com"}, priority=10) # higher = picked firstq.enqueue({"to": "timely@example.com"}, expires=3600) # drops out after 1hconst { open } = require('@russellthehippo/honker-node');const db = open('app.db');const q = db.queue('emails');
q.enqueue({ to: 'alice@example.com' });q.enqueue({ to: 'bob@example.com' }, { delay: 60 }); // claimable in 60sq.enqueue({ to: 'urgent@example.com' }, { priority: 10 }); // higher = picked firstq.enqueue({ to: 'timely@example.com' }, { expires: 3600 }); // drops after 1huse honker::{Database, EnqueueOpts, QueueOpts};use serde_json::json;
let db = Database::open("app.db")?;let q = db.queue("emails", QueueOpts::default());
q.enqueue( &json!({"to": "alice@example.com"}), EnqueueOpts::default(),)?;q.enqueue( &json!({"to": "bob@example.com"}), EnqueueOpts { delay: Some(60), ..Default::default() },)?;import honker "github.com/russellromney/honker-go"
db, _ := honker.Open("app.db", "./libhonker_ext.dylib")defer db.Close()
q := db.Queue("emails", honker.QueueOptions{})
q.Enqueue(map[string]any{"to": "alice@example.com"}, honker.EnqueueOptions{})
delay := int64(60)q.Enqueue( map[string]any{"to": "bob@example.com"}, honker.EnqueueOptions{Delay: &delay},)require "honker"
db = Honker::Database.new("app.db", extension_path: "./libhonker_ext.dylib")q = db.queue("emails")
q.enqueue({to: "alice@example.com"})q.enqueue({to: "bob@example.com"}, delay: 60)q.enqueue({to: "urgent@example.com"}, priority: 10)import { open } from "@russellthehippo/honker-bun";
const db = open("app.db", "./libhonker_ext.dylib");const q = db.queue("emails");
q.enqueue({ to: "alice@example.com" });q.enqueue({ to: "bob@example.com" }, { delay: 60 });q.enqueue({ to: "urgent@example.com" }, { priority: 10 });{:ok, db} = Honker.open("app.db", extension_path: "./libhonker_ext.dylib")
Honker.Queue.enqueue(db, "emails", %{to: "alice@example.com"})Honker.Queue.enqueue(db, "emails", %{to: "bob@example.com"}, delay: 60)#include "honker.hpp"
int main() { honker::Database db{"app.db", "./libhonker_ext.dylib"}; auto q = db.queue("emails");
q.enqueue(R"({"to":"alice@example.com"})"); q.enqueue(R"({"to":"bob@example.com"})", 60); q.enqueue(R"({"to":"urgent@example.com"})", 0, 10);}.load ./libhonker_extSELECT honker_bootstrap();
-- honker_enqueue(queue, payload_json, run_at, delay, priority, max_attempts, expires)SELECT honker_enqueue('emails', '{"to":"alice"}', NULL, NULL, 0, 3, NULL);SELECT honker_enqueue('emails', '{"to":"bob"}', NULL, 60, 0, 3, NULL);Claim and ack
Section titled “Claim and ack”Claiming moves a row to state='processing' and sets claim_expires_at = unixepoch() + visibility_timeout_s. If the worker doesn’t ack before that window elapses, another worker can reclaim.
# Async iterator — wakes on database updates or due deadlines.async for job in q.claim("worker-1"): try: await send_email(job.payload) job.ack() except Exception as e: job.retry(delay_s=60, error=str(e))// claimWaker() wakes on updates or due deadlines.const waker = q.claimWaker();while (true) { const job = await waker.next('worker-1'); if (!job) break; try { await sendEmail(job.payload); job.ack(); } catch (err) { job.retry(60, String(err)); }}let waker = q.claim_waker();while let Some(job) = waker.next("worker-1")? { match send_email(&job.payload) { Ok(_) => { job.ack()?; } Err(e) => { job.retry(60, &e.to_string())?; } }}for { job, err := q.ClaimOne("worker-1") if err != nil { log.Fatal(err) } if job == nil { time.Sleep(100 * time.Millisecond) continue } var payload map[string]any json.Unmarshal(job.Payload, &payload) if err := sendEmail(payload); err != nil { job.Retry(60, err.Error()) } else { job.Ack() }}loop do job = q.claim_one("worker-1") if job.nil? sleep 0.1 next end begin send_email(job.payload) job.ack rescue => e job.retry(delay_s: 60, error: e.message) endendwhile (running) { const job = q.claimOne("worker-1"); if (!job) { await Bun.sleep(100); continue; } try { await sendEmail(job.payload as { to: string }); job.ack(); } catch (e) { job.retry(60, String(e)); }}loop = fn loop -> case Honker.Queue.claim_one(db, "emails", "worker-1") do {:ok, nil} -> :timer.sleep(100) loop.(loop) {:ok, job} -> try do send_email(job.payload) Honker.Job.ack(db, job) rescue e -> Honker.Job.retry(db, job, 60, Exception.message(e)) end loop.(loop) endendloop.(loop)#include "honker.hpp"
#include <chrono>#include <thread>
int main() { honker::Database db{"app.db", "./libhonker_ext.dylib"}; auto q = db.queue("emails");
for (;;) { if (auto job = q.claim_one("worker-1")) { try { send_email(job->payload()); job->ack(); } catch (const std::exception& e) { job->retry(60, e.what()); } } else { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } }}-- Claim up to 32 jobs with a 300-second visibility timeout.-- Returns a JSON array: [{"id": ..., "queue": ..., "payload": ...,-- "worker_id": ..., "attempts": ...}, ...]SELECT honker_claim_batch('emails', 'worker-1', 32, 300);
-- Ack multiple jobs (DELETEs if the claim is still valid).SELECT honker_ack_batch('[1,2,3]', 'worker-1');Batch claim
Section titled “Batch claim”For handlers that benefit from batching (DB writes, HTTP calls), claim N jobs in one transaction:
jobs = q.claim_batch("worker-1", n=100)for j in jobs: process(j.payload)q.ack_batch([j.id for j in jobs], "worker-1")let jobs = q.claim_batch("worker-1", 100)?;for job in &jobs { process(&job.payload); }// batch ack via raw SQL; honker-rs typed wrapper coming.jobs, _ := q.ClaimBatch("worker-1", 100)for _, j := range jobs { process(j.Payload); j.Ack() }#include "honker.hpp"
#include <vector>
int main() { honker::Database db{"app.db", "./libhonker_ext.dylib"}; auto q = db.queue("emails");
auto jobs = q.claim_batch("worker-1", 100); for (const auto& job : jobs) { process(job.payload()); }
std::vector<int64_t> ids; ids.reserve(jobs.size()); for (const auto& job : jobs) ids.push_back(job.id()); q.ack_batch(ids, "worker-1");}SELECT honker_claim_batch('emails', 'worker-1', 100, 300);-- process each row in the returned JSONSELECT honker_ack_batch('[1,2,...,100]', 'worker-1');Visibility timeout and heartbeat
Section titled “Visibility timeout and heartbeat”Every claim has a claim_expires_at (default 300s). If the worker doesn’t ack or extend before that elapses, another worker can reclaim.
For long-running jobs, call heartbeat periodically to extend the window:
# Pythonimport asyncio
async def keepalive(job, stop): while not stop.is_set(): await asyncio.sleep(60) job.heartbeat(extend_s=300)
async for job in q.claim("worker-1"): stop = asyncio.Event() hb = asyncio.create_task(keepalive(job, stop)) try: await long_running(job.payload) job.ack() finally: stop.set() hb.cancel() try: await hb except asyncio.CancelledError: passOther bindings expose a plain job.heartbeat(extend_s) that you call on a timer.
Retry, fail, dead-letter
Section titled “Retry, fail, dead-letter”job.retry(delay_s, error)puts the row back in_honker_livewith a newrun_at. Aftermax_attemptsretries, it auto-moves to_honker_dead.job.fail(error)moves it to_honker_deadunconditionally._honker_deadrows havelast_errorset; inspect withSELECT * FROM _honker_dead WHERE queue = 'emails'.
Where it lives
Section titled “Where it lives”_honker_live: all currently-claimable or in-flight jobs. Partial index on(queue, priority DESC, run_at, id) WHERE state IN ('pending', 'processing')for O(log n) claims._honker_dead: exhausted or manually-failed jobs. Never auto-pruned; DELETE when you want.
A queue with 100k dead rows claims as fast as one with zero because the claim index excludes the dead state.