Skip to content

Rust ORM integration

For new Rust code, the native honker crate is the clean path. If you already have a sqlx app, load the SQLite extension and call the same honker_* functions inside your existing executors and transactions.

If you’re writing a new Rust app, skip this section. The honker crate wraps rusqlite and already gives you typed Queue, Stream, and Database::transaction() primitives. There is nothing to integrate — it is the integration.

use honker::Database;
let db = Database::open("app.db")?;
let q = db.queue("emails", Default::default());
q.enqueue(&serde_json::json!({"to": "alice@example.com"}), Default::default())?;

See the Getting Started guide (Rust tab) for the full native API. Use sqlx below only if you’re retrofitting into an existing sqlx-based codebase.

use sqlx::sqlite::SqliteConnectOptions;
use sqlx::{ConnectOptions, Executor};
use std::str::FromStr;
let opts = SqliteConnectOptions::from_str("sqlite:///app.db")?
.extension("/path/to/libhonker_ext");
let mut conn = opts.connect().await?;
conn.execute("SELECT honker_bootstrap()").await?;

sqlx enables SQLite’s C load-extension API only while the requested extensions load, then disables it — so SQL-injected load_extension() calls can’t fire later in the session.

src/honker_ext.rs
use serde::Serialize;
use sqlx::sqlite::SqliteExecutor;
pub struct Queue {
pub name: String,
pub max_attempts: i32,
}
impl Queue {
pub async fn enqueue<'e, E, P>(
&self,
executor: E,
payload: &P,
delay: Option<i32>,
priority: i32,
) -> sqlx::Result<i64>
where
E: SqliteExecutor<'e>,
P: Serialize,
{
let payload_json = serde_json::to_string(payload)
.map_err(|e| sqlx::Error::Encode(Box::new(e)))?;
let (id,): (i64,) = sqlx::query_as(
"SELECT honker_enqueue($1, $2, NULL, $3, $4, $5, NULL)"
)
.bind(&self.name)
.bind(payload_json)
.bind(delay)
.bind(priority)
.bind(self.max_attempts)
.fetch_one(executor)
.await?;
Ok(id)
}
}
pub async fn notify<'e, E, P>(executor: E, channel: &str, payload: Option<&P>) -> sqlx::Result<()>
where
E: SqliteExecutor<'e>,
P: Serialize,
{
let body = payload
.map(serde_json::to_string)
.transpose()
.map_err(|e| sqlx::Error::Encode(Box::new(e)))?;
sqlx::query("SELECT notify($1, $2)")
.bind(channel)
.bind(body)
.execute(executor)
.await?;
Ok(())
}

SqliteExecutor<'e> is sqlx’s “anything you can run a query against” — satisfied by &SqlitePool, &mut SqliteConnection, and &mut Transaction<'_, Sqlite>, so one helper serves transactional and non-transactional callers.