Node.js API
The Node.js shim wraps the Rust engine through a NAPI-RS binding. The
high-level surface (Queue / Worker / Job / QueueEvents) is what
application code reaches for. The unwrapped native classes (Producer
/ Consumer / Scheduler) are exported for power users; most callers
should not need them.
Install:
npm install chasquimqImport:
import { Queue, Worker, Job, QueueEvents, BackoffSpec } from "chasquimq";On this page
Section titled “On this page”- Queue — produce jobs, schedule recurring specs, inspect the DLQ, fetch results.
- Worker — consume jobs, handle retries, surface lifecycle events.
- Job — the value type your processor receives and
Queue.addreturns. - QueueEvents — subscribe to engine transitions across processes.
- Option types — every interface that crosses a public surface.
- Errors — typed error classes, with full text in error codes.
- Native power-user surface — the unwrapped NAPI bindings.
class Queue<DataType = unknown, ResultType = unknown, NameType extends string = string>A producer for a single queue. Construct one per logical queue; the native producer pool is created lazily on the first call. Safe to share across async contexts.
new Queue(name, opts)
Section titled “new Queue(name, opts)”constructor(name: string, opts: QueueOptions)name— queue name, without the{chasqui:...}hash-tag wrapping.opts— connection and default-job options. SeeQueueOptions.
const queue = new Queue("emails", { connection: { host: "127.0.0.1", port: 6379 },});queue.add(name, data, opts?)
Section titled “queue.add(name, data, opts?)”async add( name: NameType, data: DataType, opts?: JobsOptions,): Promise<Job<DataType, ResultType, NameType>>Enqueue a single job. name is the dispatch name carried on the
stream entry’s n field; the worker reads it back as job.name.
data is MessagePack-encoded on the way out and decoded on the way
in. Returns a Job whose id is the engine-minted ULID (or the
resolved spec key when opts.repeat is set).
When opts.delay > 0, routes through the delayed ZSET; otherwise
goes straight to the stream via XADD. When opts.repeat is set,
upserts a repeatable spec instead — see
Repeatable jobs.
Throws RangeError for non-finite or negative delay,
TypeError for whitespace-only jobId, NotSupportedError for
parent/child flows.
queue.addUnique(name, data, opts?)
Section titled “queue.addUnique(name, data, opts?)”async addUnique( name: NameType, data: DataType, opts?: JobsOptions,): Promise<Job<DataType, ResultType, NameType>>Idempotent variant. Requires opts.jobId to be a non-empty string;
throws TypeError otherwise. Otherwise identical to
queue.add.
Idempotency guarantees differ by path:
- Delayed (
delay > 0) — strict and cross-process. ASET NX EXLua marker on{chasqui:<queue>}:dlid:<jobId>gates theZADD. The marker outlives the fire time by 1h so a producer-retry can’t race promotion. - Immediate (no
delay) — strict within a singleQueueinstance. Redis 8.6XADD IDMP <producer_id> <jobId>dedups at the wire layer, but the IDMP scope is the producer id (one perQueue).
For cross-process strict dedup on the immediate path, use delay > 0
with the same jobId.
queue.addBulk(jobs)
Section titled “queue.addBulk(jobs)”async addBulk( jobs: Array<{ name: NameType; data: DataType; opts?: BulkJobOptions }>,): Promise<Job<DataType, ResultType, NameType>[]>Enqueue many jobs in one round trip. When all entries lack per-job
overrides (delay, jobId, attempts, backoff), routes through
the engine’s pipelined add_bulk_named path; otherwise degrades to
a per-entry add() loop and loses the bulk pipelining win.
Throws RangeError on any non-finite / negative delay,
NotSupportedError if any entry sets parent.
queue.getRepeatableJobs(limit?)
Section titled “queue.getRepeatableJobs(limit?)”async getRepeatableJobs(limit?: number): Promise<RepeatableJobMeta[]>List repeatable specs ordered by next fire time, ascending. Default
limit = 100. Payloads are intentionally not included to keep
listing thousands of specs cheap; reach for the spec hash directly
if you need them. See RepeatableJobMeta.
queue.removeRepeatableByKey(key)
Section titled “queue.removeRepeatableByKey(key)”async removeRepeatableByKey(key: string): Promise<boolean>Remove a repeatable spec by its resolved key. Returns true if a
spec was removed, false if no spec with that key existed.
queue.getJobResult(jobId)
Section titled “queue.getJobResult(jobId)”async getJobResult(jobId: string): Promise<ResultType | undefined>Read the stored handler result. Returns undefined for three
indistinguishable cases: the job has not yet completed, the result
key already expired, or no result was written (handler returned
undefined, worker ran without storeResults, or job was DLQ’d).
The bytes are msgpack-decoded with the same wire format the worker encoded them with.
queue.peekDlq(limit?)
Section titled “queue.peekDlq(limit?)”async peekDlq(limit?: number): Promise<DlqEntry[]>Inspect up to limit DLQ entries oldest-first. Default
limit = 20. Each DlqEntry carries the relocated dlqId,
the original sourceId, the routing reason, optional detail,
the dispatch name, and the raw payload bytes.
queue.replayDlq(limit?)
Section titled “queue.replayDlq(limit?)”async replayDlq(limit?: number): Promise<number>Atomically move up to limit DLQ entries back into the main stream
with their attempt counter reset. Default limit = 100. Returns
the number of entries actually replayed.
queue.remove(jobId)
Section titled “queue.remove(jobId)”async remove(jobId: string): Promise<number>Best-effort cancel of a delayed job. Returns 1 if the entry was
removed from the delayed ZSET, throws NotSupportedError otherwise
(in-stream entries can’t be removed by id alone).
queue.close()
Section titled “queue.close()”async close(): Promise<void>Drop the cached native producer. Idempotent. Compatible with
await using:
await using queue = new Queue("emails", { connection });await queue.add("welcome", { to: "ada@example.com" });// queue.close() runs automatically at scope exit.queue.isClosed
Section titled “queue.isClosed”get isClosed(): booleantrue after the first close() call.
Stubbed methods (NotSupportedError)
Section titled “Stubbed methods (NotSupportedError)”These throw NotSupportedError
in v1: getJob, getJobs, getJobCounts, getWaitingCount,
getActiveCount, getDelayedCount, getFailedCount, count,
pause, resume, drain, obliterate, clean. Some pure-stat
methods return 0 or 'unknown' instead. See the options
index and Thinking in
ChasquiMQ for the engine semantics that
make these intentionally unsupported.
Worker
Section titled “Worker”class Worker<DataType = unknown, ResultType = unknown, NameType extends string = string> extends EventEmitterRuns a user-supplied processor against a queue. The native
Consumer does all the scheduling, retry, DLQ, and ack work; this
class is a thin presentation layer.
new Worker(name, processor, opts)
Section titled “new Worker(name, processor, opts)”constructor( name: string, processor: Processor<DataType, ResultType, NameType>, opts: WorkerOptions,)name— queue name to consume.processor— a(job: Job) => Promise<ResultType>function. Sandboxed processors (string / URL paths) are not supported; passing one throwsNotSupportedError.opts— seeWorkerOptions.
By default the worker calls .run() on the next microtask. Disable
with opts.autorun = false if you need to attach listeners
synchronously after construction.
worker.run()
Section titled “worker.run()”async run(): Promise<void>Start the engine loop. Resolves once the engine drains (after
close() is called). Calling run() more than once returns the
same Promise — it does not start a second loop.
worker.close(force?)
Section titled “worker.close(force?)”async close(force?: boolean): Promise<void>Signal shutdown. The engine drains in-flight handlers up to its
configured deadline, then resolves. Idempotent; calling close()
more than once awaits the in-flight drain. The force parameter
is currently a no-op — engine-side hard-cancel is reserved.
worker.isClosed, worker.isRunning, worker.isPaused
Section titled “worker.isClosed, worker.isRunning, worker.isPaused”get isClosed(): booleanisRunning(): booleanisPaused(): boolean // always false in v1State predicates. pause() and resume() throw NotSupportedError
in v1 — close and re-create instead.
worker.rateLimit(expireTimeMs)
Section titled “worker.rateLimit(expireTimeMs)”async rateLimit(expireTimeMs: number): Promise<void>Throws NotSupportedError in v1.
Symbol.asyncDispose
Section titled “Symbol.asyncDispose”Worker implements [Symbol.asyncDispose], routing through
close(). Use with await using.
Worker events
Section titled “Worker events”| Event | Args | Fires when |
|---|---|---|
ready | () | .run() starts the engine loop. |
active | (job, prev) | Before each processor invocation. prev reserved (always ''). |
completed | (job, result, prev) | Processor resolves. Engine acks the job. |
failed | (job, err, prev) | Processor rejects. Error rethrown into engine retry/DLQ path. |
error | (err) | Engine-side error surfaced from the native loop. |
closing | (msg) | Start of .close(). |
closed | () | Shutdown completes. |
worker.on("completed", (job, result) => { console.log("completed", job.id, result);});class Job<DataType = unknown, ResultType = unknown, NameType extends string = string>The value type a processor receives, and what Queue.add /
Queue.addBulk return. v1 does not round-trip Job state through
Redis — the engine streams via XREADGROUP / XACK and does not
persist progress, return values, or per-job state metadata.
Properties
Section titled “Properties”| Property | Type | Meaning |
|---|---|---|
id | string | Engine-minted ULID, or resolved spec key for repeatable upserts. |
name | NameType | Dispatch name from the stream entry’s n field. Empty for unnamed jobs. |
data | DataType | The msgpack-decoded payload. |
opts | JobsOptions | The options the job was enqueued with. |
attemptsMade | number | 1-indexed attempt count. 0 for never-yet-run; 1 on first invocation. |
progress | JobProgress | In-memory only; engine does not persist. |
returnvalue | ResultType? | Set after the processor resolves. |
failedReason | string? | Set after the processor rejects. |
stacktrace | string[] | Reserved; empty in v1. |
timestamp | number | Submission time, ms since epoch. Defaults to Date.now(). |
delay | number | Original delay in ms. |
priority | number | Always 0 — Streams are FIFO. |
processedOn | number? | Reserved. |
finishedOn | number? | Reserved. |
queue | Queue? | Backreference for waitForResult. Set on producer-side jobs only. |
job.updateProgress(progress)
Section titled “job.updateProgress(progress)”async updateProgress(progress: JobProgress): Promise<void>In-memory progress update. The engine does not persist progress;
the worker shim surfaces this via the progress event when called
from inside a processor.
job.waitForResult(opts?)
Section titled “job.waitForResult(opts?)”async waitForResult(opts?: WaitForResultOptions): Promise<ResultType | undefined>Poll until the engine’s stored result for this job becomes
readable, until opts.timeoutMs elapses, or until opts.signal
fires. See WaitForResultOptions.
Throws WaitForResultTimeoutError
on timeout. Throws the abort reason on cancel.
job.toJSON()
Section titled “job.toJSON()”toJSON(): objectPlain-object snapshot of the job for logging / serialization.
Stubbed methods
Section titled “Stubbed methods”log, getState, remove, retry, discard, update,
updateData, isCompleted, isFailed, isActive, isWaiting
all throw NotSupportedError or return a fixed false in v1 —
engine-side state queries land in a future slice. isDelayed()
returns delay > 0 from the in-memory option.
QueueEvents
Section titled “QueueEvents”class QueueEvents extends EventEmitterSubscribe to a queue’s events stream
({chasqui:<queue>}:events) across processes. Backed by ioredis
because the events stream is a generic Redis stream (ASCII
fields, not msgpack), so a thin pure-JS subscriber is the simplest
path.
new QueueEvents(name, opts)
Section titled “new QueueEvents(name, opts)”constructor(name: string, opts: QueueEventsOptions)By default starts from $ (only events emitted after the
subscriber is opened). Pass opts.lastEventId = "0" to replay
history. Auto-runs on the next microtask unless autorun: false.
queueEvents.run(), queueEvents.close(), queueEvents.waitUntilReady()
Section titled “queueEvents.run(), queueEvents.close(), queueEvents.waitUntilReady()”Lifecycle. close() is idempotent and concurrency-safe; calling
it from multiple call sites awaits the same in-flight drain.
Implements [Symbol.asyncDispose].
Events
Section titled “Events”| Event | Args | Engine origin |
|---|---|---|
waiting | ({ jobId, name }, eventId) | Producer added (or promoter promoted) the job. |
active | ({ jobId, name, prev, attempt }, eventId) | Worker pulled the job; processor is about to run. |
completed | ({ jobId, name, attempt, returnvalue }, eventId) | Processor resolved. |
failed | ({ jobId, name, failedReason, attempt }, eventId) | Processor rejected. Fires before retry/DLQ relocation. |
retry-scheduled | ({ jobId, name, attempt, backoffMs }, eventId) | Engine atomically rescheduled the job onto the delayed ZSET. |
delayed | ({ jobId, name, delay }, eventId) | Producer enqueued with delay > 0. |
dlq | ({ jobId, name, reason, attempt }, eventId) | DLQ relocator wrote the entry to the DLQ stream. |
retries-exhausted | ({ jobId, name, attemptsMade, reason }, eventId) | Synthetic alias of dlq (chasquimq-specific). |
drained | (eventId) | Engine drained (queue-scoped, no jobId). |
unknown | ({ eventName, fields }, eventId) | Forward-compat sink for unrecognized event types. |
error | (err) | Operational error during XREAD. |
Numeric fields (attempt, backoffMs, delay, duration_us,
ts) are coerced to number at parse time so subscribers don’t
have to remember which ones need an explicit cast.
Option types
Section titled “Option types”QueueOptions
Section titled “QueueOptions”interface QueueOptions { connection: ConnectionOptions; prefix?: string; defaultJobOptions?: Partial<JobsOptions>;}connection— required. SeeConnectionOptions.prefix— accepted; ignored. ChasquiMQ uses{chasqui:<queue>}Cluster hash tags; there is no tunable prefix.defaultJobOptions— applied as defaults on everyadd()/addBulk()call.
ConnectionOptions
Section titled “ConnectionOptions”interface ConnectionOptions { host?: string; port?: number; password?: string; username?: string; db?: number; [key: string]: unknown;}host— default"127.0.0.1".port— default6379.password,username— optional auth.db— optional logical database number.- Extra keys are accepted and silently ignored; the native pool manages its own connection lifetime.
WorkerOptions
Section titled “WorkerOptions”interface WorkerOptions { connection: ConnectionOptions; concurrency?: number; autorun?: boolean; drainDelay?: number; maxStalledCount?: number; removeOnComplete?: unknown; removeOnFail?: unknown; prefix?: string; name?: string; runScheduler?: boolean; schedulerTickMs?: number; storeResults?: boolean; resultTtlMs?: number;}concurrency— max in-flight handler invocations. Default100.autorun— whether.run()is called on the next microtask. Defaulttrue.drainDelay—XREADGROUP BLOCKtimeout in ms. Default5000.maxStalledCount— total attempts per job (initial + retries). Maps to enginemaxAttempts. Default3.removeOnComplete,removeOnFail— accepted; no-ops. The engineXACKDELs on success and DLQ-relocates on failure.prefix— accepted; no-op.name— optional consumer ID forXREADGROUP CONSUMER.runScheduler— auto-spawn an embedded scheduler. Defaulttrue. Setfalsewhen running a separate scheduler process.schedulerTickMs— scheduler tick interval. Default1000.storeResults— persist handler return values to{chasqui:<queue>}:result:<jobId>. Defaultfalse.resultTtlMs— TTL for stored results. Default3_600_000(1h). Rounded up to whole seconds at the FFI boundary.
JobsOptions
Section titled “JobsOptions”interface JobsOptions { delay?: number; attempts?: number; backoff?: number | BackoffOptions; removeOnComplete?: boolean | number; removeOnFail?: boolean | number; priority?: number; jobId?: string; lifo?: boolean; timestamp?: number; repeat?: RepeatOptions; repeatJobKey?: string; parent?: { id: string; queue: string };}delay— ms before the job becomes processable. Default0. Negative or non-finite →RangeError.attempts— total attempt budget for this job, overrides queue-widemaxAttempts. Default queue-wide.backoff— per-job backoff override. Either a plainnumber(fixed delay in ms) or aBackoffOptionsobject.removeOnComplete— accepted; no-op (engineXACKDELs).removeOnFail— accepted; reserved for future DLQ trim policy.priority— accepted; ignored with a one-time console warning. Streams are FIFO.jobId— stable id for at-most-once / idempotent scheduling.lifo— accepted; ignored with a one-time console warning.timestamp— submission time in ms. DefaultDate.now().repeat— schedule a recurring job. SeeRepeatOptions.repeatJobKey— stable key for the repeat spec. Default: engine derives<jobName>::<patternSignature>.parent— throwsNotSupportedError. Parent/child flows are not supported.
BackoffOptions
Section titled “BackoffOptions”interface BackoffOptions { type: "fixed" | "exponential"; delay?: number; maxDelay?: number; multiplier?: number; jitterMs?: number;}type— strategy. The NAPI binding rejects unknown strings up-front.delay— base delay in ms.maxDelay— cap on the computed backoff per attempt.multiplier— forexponential:delay * multiplier^(attempt - 1). Ignored forfixed. Default2when built viaBackoffSpec.exponential.jitterMs— symmetric ±jitter applied per attempt.
The BackoffSpec builder returns a BackoffOptions literal:
import { BackoffSpec } from "chasquimq";
await queue.add("send", payload, { attempts: 5, backoff: BackoffSpec.exponential(1_000, { maxDelayMs: 30_000 }),});RepeatOptions
Section titled “RepeatOptions”interface RepeatOptions { pattern?: string; every?: number; limit?: number; immediately?: boolean; startDate?: Date | string | number; endDate?: Date | string | number; tz?: string; jobId?: string; missedFires?: MissedFiresOption;}Pass exactly one of pattern (cron) or every (ms); both or
neither is rejected.
pattern— cron expression. Accepts both 5-field and 6-field syntax.every— fixed ms interval. First fire lands one interval after upsert.limit— max total fires before the spec is removed.immediately— accepted; no-op in v1.startDate,endDate—Date, ms since epoch, or ISO string.tz—"UTC"/"Z", fixed offset ("+05:30"), or any IANA name. IANA names are DST-aware. Ignored wheneveryis set.jobId— reserved for future explicit-id-per-fire wiring.missedFires— catch-up policy. Default{ kind: "skip" }. SeeMissedFiresOption.
MissedFiresOption
Section titled “MissedFiresOption”type MissedFiresOption = | { kind: "skip" } | { kind: "fire-once" } | { kind: "fire-all"; maxCatchup: number };skip— drop missed windows; resume on the first future fire. Default. Safe; no thundering herd.fire-once— emit one job to represent the missed window(s).fire-all— replay each missed window up tomaxCatchupfires.maxCatchupmust be>= 1.
RepeatableJobMeta
Section titled “RepeatableJobMeta”interface RepeatableJobMeta { key: string; jobName: string; patternKind: "cron" | "every"; pattern?: string; tz?: string; every?: number; nextFireMs: number; limit?: number; startAfterMs?: number; endBeforeMs?: number; missedFires?: MissedFiresOption;}Wire-compatible projection returned by
Queue.getRepeatableJobs. No
payload — listing thousands of specs stays cheap.
QueueEventsOptions
Section titled “QueueEventsOptions”interface QueueEventsOptions { connection: ConnectionOptions; prefix?: string; autorun?: boolean; lastEventId?: string; blockingTimeout?: number;}prefix— accepted; ignored.autorun— defaulttrue.lastEventId— start id. Default"$"(only new events).blockingTimeout—XREAD BLOCKtimeout in ms. Default10_000.
WaitForResultOptions
Section titled “WaitForResultOptions”interface WaitForResultOptions { timeoutMs?: number; intervalMs?: number; signal?: AbortSignal;}timeoutMs— total time budget. Default30_000.intervalMs— polling interval. Default100.signal— cancel the poll loop. Aborts surface as the standardAbortError.
Processor<DataType, ResultType, NameType>
Section titled “Processor<DataType, ResultType, NameType>”type Processor<DataType = unknown, ResultType = unknown, NameType extends string = string> = (job: Job<DataType, ResultType, NameType>) => Promise<ResultType>;The function signature Worker accepts. Resolving the returned
promise acks the job; rejecting routes through the engine’s retry
path.
JobState, JobType, JobProgress
Section titled “JobState, JobType, JobProgress”type JobState = "waiting" | "active" | "completed" | "failed" | "delayed" | "unknown";type JobType = JobState | "paused" | "prioritized" | "waiting-children";type JobProgress = number | object;Type aliases for documentation. The engine itself does not
implement most of these states in v1 — see the Queue stub
methods.
Errors
Section titled “Errors”The shim throws typed errors so application code can branch on
err.name:
UnrecoverableError— throw from a processor to skip retries and route to the DLQ.WaitForResultTimeoutError—Job.waitForResulttimed out.NotSupportedError— caller asked for a v1-stubbed feature.RateLimitError— reserved;Worker.rateLimitthrows this in a future slice.
See error codes for the full table with When, Why, Fix, and See also for each.
Native power-user surface
Section titled “Native power-user surface”The unwrapped NAPI bindings are re-exported for callers who want to bypass the shim. Reach for these only when you have a measured reason to skip the high-level layer.
import { Producer, Consumer, Scheduler, engineVersion } from "chasquimq";Producer.connect(redisUrl, opts)— async constructor.new Consumer(redisUrl, opts)— sync constructor.new Scheduler(redisUrl, opts)— sync constructor.engineVersion()— returns the engine binding crate version.
The full TypeScript surface lives in
chasquimq-node/index.d.ts. The low-level option types
(ProducerOpts, ConsumerOpts, SchedulerOpts, RetryOpts,
AddOptions, BackoffSpec (native), DlqEntry,
JobRetryOverride, NamedPayload, RepeatPattern,
RepeatableSpec, RepeatableMeta, MissedFiresPolicy) are also
exported.
These types collapse onto the engine’s
ProducerConfig /
ConsumerConfig /
SchedulerConfig — see
the Rust reference for the canonical field meanings.
Utilities
Section titled “Utilities”encodePayload(data) / decodePayload(buf)
Section titled “encodePayload(data) / decodePayload(buf)”function encodePayload(data: unknown): Buffer;function decodePayload(buf: Buffer | Uint8Array): unknown;The shim’s MessagePack helpers, exported for parity tests across
language boundaries. encodePayload returns a zero-copy view of
the bytes; the native producer performs exactly one copy at the
FFI boundary into engine-managed Bytes.
engineVersion()
Section titled “engineVersion()”function engineVersion(): string;Returns the version of the binding crate. The npm package version tracks this 1:1.