Python API
The Python shim wraps the Rust engine through a PyO3 binding and
ships abi3 wheels for cpython 3.10+. The high-level surface
(Queue / Worker / Job / QueueEvents) mirrors the Node API
shape so cross-language migrations stay mechanical.
Install:
pip install chasquimqImport:
from chasquimq import ( Queue, Worker, Job, QueueEvents, QueueEvent, RepeatPattern, BackoffSpec, MissedFiresPolicy, RepeatableMeta, UnrecoverableError, NotSupportedError, version,)On this page
Section titled “On this page”- Queue — async producer for one queue.
- Worker — async consumer with a user-supplied handler.
- Job — the dataclass passed to your handler.
- QueueEvents and QueueEvent — async-iterator subscriber.
- RepeatPattern, BackoffSpec, MissedFiresPolicy, RepeatableMeta — schedule and retry value types.
- Type aliases —
Handler,DelayMsLike,BackoffLike,RepeatLike,MissedFiresLike. - Errors —
NotSupportedError,UnrecoverableError. - Native power-user surface — the underscore-free
Producer/Consumer/Schedulerre-exports.
class Queue: def __init__( self, name: str, *, redis_url: str = "redis://127.0.0.1:6379", max_stream_len: int | None = None, max_delay_secs: int | None = None, ) -> None: ...High-level producer for a single ChasquiMQ queue. Construct one per queue. The native producer pool is created lazily on the first call. Safe to share across asyncio tasks and across threads.
name— queue name.redis_url— default"redis://127.0.0.1:6379".max_stream_len—XADD MAXLEN ~cap. Default engine value:1_000_000.max_delay_secs— rejectaddcalls withdelay > this. Default engine value: 30 days.
await queue.add(name, data, *, delay_ms, delay, attempts, backoff, job_id, repeat, missed_fires)
Section titled “await queue.add(name, data, *, delay_ms, delay, attempts, backoff, job_id, repeat, missed_fires)”async def add( self, name: str, data: Any, *, delay_ms: DelayMsLike | None = None, delay: timedelta | None = None, attempts: int | None = None, backoff: BackoffLike | None = None, job_id: str | None = None, repeat: RepeatLike | None = None, missed_fires: MissedFiresLike | None = None,) -> JobEnqueue a single job. Returns a Job whose id is the
engine-minted ULID, or the resolved spec key when repeat is set.
delay_ms— schedule for future delivery in milliseconds (BullMQ-compatible). Acceptsint,float, or adatetimefor absolute fire time. Naive datetimes are treated as UTC.delay— alternative parallel kwarg accepting adatetime.timedelta. Passing bothdelay_msanddelayraisesValueError.attempts— per-job override of queue-widemax_attempts.backoff— per-job backoff override.int(fixed delay in ms),BackoffSpec, or a raw native dict.job_id— stable id for at-most-once / idempotent scheduling.repeat— upsert a repeatable spec. SeeRepeatPattern.missed_fires— catch-up policy. Only meaningful withrepeat; raisesValueErrorotherwise.
Raises ValueError for whitespace-only job_id, negative or
non-finite delays, or missed_fires set without repeat. Raises
TypeError for wrong-typed delay.
await queue.add_unique(name, data, *, job_id, ...)
Section titled “await queue.add_unique(name, data, *, job_id, ...)”async def add_unique( self, name: str, data: Any, *, job_id: str, delay_ms: DelayMsLike | None = None, delay: timedelta | None = None, attempts: int | None = None, backoff: BackoffLike | None = None, repeat: RepeatLike | None = None,) -> JobIdempotent variant. Requires job_id. Same semantics as the Node
addUnique:
strict cross-process dedup on the delayed path; immediate-path
dedup is bounded to a single producer.
await queue.add_bulk(jobs)
Section titled “await queue.add_bulk(jobs)”async def add_bulk(self, jobs: Sequence[dict]) -> list[Job]Enqueue many jobs. Each entry is a dict with keys name, data,
and optional delay_ms, delay, attempts, backoff, job_id,
repeat. When all entries lack per-job overrides the call routes
through add_bulk_named (pipelined); otherwise degrades to a
per-entry add() loop.
await queue.cancel_delayed(job_id)
Section titled “await queue.cancel_delayed(job_id)”async def cancel_delayed(self, job_id: str) -> boolAtomically remove a delayed job by its stable id. Returns True
if the job was still in the delayed ZSET and is now removed,
False if it was already promoted (or never existed).
await queue.peek_dlq(limit=20)
Section titled “await queue.peek_dlq(limit=20)”async def peek_dlq(self, limit: int = 20) -> list[dict]Return up to limit DLQ entries oldest-first. Each dict carries
dlq_id, source_id, reason, optional detail, name, and
the raw payload bytes.
await queue.replay_dlq(limit=100)
Section titled “await queue.replay_dlq(limit=100)”async def replay_dlq(self, limit: int = 100) -> intMove up to limit DLQ entries back into the main stream with
their attempt counter reset. Returns the number of entries
actually replayed.
await queue.upsert_repeatable_job(name, data, *, repeat, ...)
Section titled “await queue.upsert_repeatable_job(name, data, *, repeat, ...)”async def upsert_repeatable_job( self, name: str, data: Any, *, repeat: RepeatPattern, limit: int | None = None, start_after_ms: int | None = None, end_before_ms: int | None = None, attempts: int | None = None, backoff: BackoffLike | None = None, job_id: str | None = None, missed_fires: MissedFiresLike | None = None,) -> JobUpsert a repeatable / cron spec. Returns a Job whose id is
the resolved spec key — pair with
remove_repeatable_by_key
to delete.
attempts / backoff / job_id are accepted for API symmetry
with add but not threaded into per-fire jobs in v1.
await queue.get_repeatable_jobs(limit=100)
Section titled “await queue.get_repeatable_jobs(limit=100)”async def get_repeatable_jobs(self, limit: int = 100) -> list[RepeatableMeta]List repeatable specs ordered by next fire time, ascending.
await queue.remove_repeatable_by_key(key)
Section titled “await queue.remove_repeatable_by_key(key)”async def remove_repeatable_by_key(self, key: str) -> boolRemove a repeatable spec by its resolved key.
await queue.get_job_result(job_id)
Section titled “await queue.get_job_result(job_id)”async def get_job_result(self, job_id: str) -> AnyRead the stored result for job_id. Returns the msgpack-decoded
value, or None for the three indistinguishable cases (job not
yet completed, result expired, no result was ever written).
await queue.get_job_result_bulk(job_ids)
Section titled “await queue.get_job_result_bulk(job_ids)”async def get_job_result_bulk(self, job_ids: Sequence[str]) -> list[Any]Pipelined bulk variant. Returns a list aligned by index with
job_ids.
await queue.close()
Section titled “await queue.close()”async def close(self) -> NoneDrop the cached native producer. Idempotent. Compatible with
async with:
async with Queue("emails") as queue: await queue.add("welcome", {"to": "ada@example.com"})# queue.close() runs at scope exit.Properties
Section titled “Properties”queue.name— the queue name.queue.is_closed—Trueafter the firstclose()call.
Worker
Section titled “Worker”class Worker: def __init__( self, queue_name: str, handler: Handler, *, redis_url: str = "redis://127.0.0.1:6379", concurrency: int = 100, max_attempts: int = 25, group: str = "default", consumer_id: str | None = None, read_block_ms: int | None = None, read_count: int | None = None, claim_min_idle_ms: int | None = None, max_payload_bytes: int | None = None, dlq_max_stream_len: int | None = None, events_enabled: bool = True, delayed_enabled: bool = True, run_scheduler: bool = True, scheduler_tick_ms: int | None = None, store_results: bool = False, result_ttl_ms: int | None = None, ) -> None: ...High-level async worker for a single queue. Construction does not
start the engine loop — call await worker.run(). The native
consumer auto-embeds a scheduler task (controlled by
run_scheduler); multiple workers cooperate via leader election
on {chasqui:<queue>}:scheduler:lock.
concurrency— max in-flight handler invocations. Default100.max_attempts— total attempts per job. Default25(the engine cap; the Node shim’smaxStalledCountdefaults lower).group— consumer group name. Default"default".consumer_id— optional XREADGROUP consumer name.read_block_ms—XREADGROUP BLOCKtimeout. Default engine:5000.read_count—XREADGROUP COUNT. Default engine:64.claim_min_idle_ms—CLAIMrecovery threshold. Default engine:30_000.max_payload_bytes— DLQ-route oversize payloads above this. Default engine:1_048_576(1 MiB).dlq_max_stream_len—MAXLEN ~cap on the DLQ stream. Default engine:100_000.events_enabled— write to the per-queue events stream. DefaultTrue.delayed_enabled— auto-spawn the delayed-job promoter. DefaultTrue.run_scheduler— auto-spawn the embedded scheduler. DefaultTrue.scheduler_tick_ms— scheduler tick interval. Default engine:1000.store_results— persist handler return values to the result key. DefaultFalse.result_ttl_ms— TTL for stored results. Default engine:3_600_000(1h). Rounded up to whole seconds at the FFI boundary.
await worker.run()
Section titled “await worker.run()”async def run(self) -> NoneStart the engine loop and resolve once it drains. Idempotent —
calling run() more than once awaits the in-flight loop instead
of starting a second one.
await worker.close()
Section titled “await worker.close()”async def close(self) -> NoneSignal shutdown. Trips the consumer’s shutdown token; the
in-flight run() returns promptly. Safe to call from any
coroutine, any number of times. Does not await the engine task
itself — that avoids the double-await race when a caller invokes
close while run is still in flight.
Worker implements __aenter__ / __aexit__:
async with Worker("emails", handler) as worker: await worker.run()Properties
Section titled “Properties”worker.nameworker.is_running— engine task in flight.worker.is_closed—Trueafter the firstclose()call.
Type alias: Handler
Section titled “Type alias: Handler”Handler = Callable[[Job], Awaitable[Any]]The handler signature: an async function that takes one Job and
returns anything msgpack-encodable (or None).
@dataclassclass Job: id: str name: str data: Any attempt: int created_at_ms: intThe lightweight value type passed to user handlers and returned
from Queue.add. v1 deliberately does not round-trip through
Redis for any field on this object — the engine streams jobs and
does not persist progress, return values, or per-job state.
Properties
Section titled “Properties”id— engine-minted ULID, or resolved spec key for repeatable upserts.name— dispatch name from the enginenfield. Empty when the producer added the job without a name (or when the delayed / repeatable path re-encoded without it).data— msgpack-decoded payload.attempt— 1-indexed attempt counter on the consumer side;0on the producer side.created_at_ms— submission time in epoch ms.attempts_made— alias ofattempt, matches BullMQ naming.
The dataclass is frozen-by-equality on the public fields; an
optional internal _queue backreference is set when the job is
returned by Queue.add so wait_for_result
works without an extra connection. It is excluded from repr and
__eq__ to keep the dataclass shape stable.
await job.wait_for_result(*, timeout, poll_interval)
Section titled “await job.wait_for_result(*, timeout, poll_interval)”async def wait_for_result( self, *, timeout: float = 30.0, poll_interval: float = 0.1,) -> Any | NonePoll until the engine’s stored result becomes readable, until
timeout elapses, or until the surrounding asyncio.Task is
cancelled. Returns the decoded handler return value on success.
timeout— total seconds. Default30.0.poll_interval— seconds between polls. Default0.1.
Raises asyncio.TimeoutError on deadline. Raises RuntimeError
when called on a Job produced by a worker handler (no _queue
backreference). Raises ValueError for non-positive timeout or
poll_interval.
QueueEvents
Section titled “QueueEvents”class QueueEvents: def __init__( self, queue_name: str, *, redis_url: str = "redis://127.0.0.1:6379", last_event_id: str = "$", block_ms: int = 5_000, count: int = 100, ) -> None: ...Cross-process subscriber to the engine’s events stream
({chasqui:<queue>}:events). Built on redis.asyncio because the
events stream is a generic Redis Stream (ASCII fields, not
msgpack), so a thin async-redis client is the simplest path.
last_event_id— start id. Default"$"(only new). Pass"0"to replay history.block_ms—XREAD BLOCKtimeout. Default5000.count—XREAD COUNT. Default100.
Async iteration
Section titled “Async iteration”events = QueueEvents("emails")async for ev in events: print(ev.name, ev.job_id, ev.fields)Yields QueueEvent values. Iteration ends when
close() is called.
await queue_events.close()
Section titled “await queue_events.close()”async def close(self) -> NoneStop iteration and release the Redis connection.
QueueEvent
Section titled “QueueEvent”@dataclass(frozen=True)class QueueEvent: name: str job_id: str | None job_name: str fields: dict[str, Any]One event from the engine.
name— event identifier ("waiting","active","completed","failed","retry-scheduled","delayed","dlq","drained").job_id—Nonefor queue-scoped events ("drained").job_name— dispatch name from thenfield. Empty when the engine omittedn.fields— remaining decoded fields verbatim. Numeric fields documented by the engine schema (attempt,backoff_ms,delay_ms,duration_us,ts) are coerced tointat parse time; a malformed entry whose value can’t be parsed silently falls back to the raw string.
RepeatPattern
Section titled “RepeatPattern”@dataclass(frozen=True)class RepeatPattern: kind: str expression: str | None = None tz: str | None = None interval_ms: int | None = NoneRecurring-job schedule descriptor. Build instances via the
factory methods so kind is set consistently.
RepeatPattern.cron(expression, *, tz=None)
Section titled “RepeatPattern.cron(expression, *, tz=None)”@staticmethoddef cron(expression: str, *, tz: str | None = None) -> "RepeatPattern"Cron-driven pattern. expression accepts both 5-field
(m h dom mon dow) and 6-field (with leading seconds) syntax.
tz may be "UTC" / "Z", a fixed offset ("+05:30"), or any
IANA name ("America/New_York"). IANA names are DST-aware: a
2 AM cron fires at the local 02:00 wall-clock on both sides of
spring-forward / fall-back.
RepeatPattern.every(interval_ms)
Section titled “RepeatPattern.every(interval_ms)”@staticmethoddef every(interval_ms: int) -> "RepeatPattern"Fixed-interval pattern. First fire lands one interval_ms after
upsert (no immediate fire). interval_ms must be positive — the
engine rejects zero.
pattern.to_dict()
Section titled “pattern.to_dict()”Serializes to the wire-format dict the native producer accepts.
BackoffSpec
Section titled “BackoffSpec”@dataclass(frozen=True)class BackoffSpec: kind: str delay_ms: int max_delay_ms: int | None = None multiplier: float | None = None jitter_ms: int | None = NonePer-job retry backoff override. Pair with attempts on
Queue.add
to scope retry behavior to a single job.
BackoffSpec.fixed(delay_ms, jitter_ms=None)
Section titled “BackoffSpec.fixed(delay_ms, jitter_ms=None)”Fixed delay between retries.
BackoffSpec.exponential(initial_ms, *, multiplier=2.0, max_ms=None, jitter_ms=None)
Section titled “BackoffSpec.exponential(initial_ms, *, multiplier=2.0, max_ms=None, jitter_ms=None)”Exponential backoff. multiplier defaults to 2.0.
await queue.add("send", payload, attempts=5, backoff=BackoffSpec.exponential(1_000, max_ms=30_000))MissedFiresPolicy
Section titled “MissedFiresPolicy”@dataclass(frozen=True)class MissedFiresPolicy: kind: str max_catchup: int | None = NoneCatch-up policy for windows missed during scheduler downtime.
MissedFiresPolicy.skip()
Section titled “MissedFiresPolicy.skip()”Drop missed windows; resume on the first future fire. Default. Safe; no thundering herd after a deploy / outage.
MissedFiresPolicy.fire_once()
Section titled “MissedFiresPolicy.fire_once()”Emit one job to represent the missed window(s), then advance.
MissedFiresPolicy.fire_all(max_catchup)
Section titled “MissedFiresPolicy.fire_all(max_catchup)”Replay each missed window up to max_catchup fires. max_catchup
must be a positive integer (>= 1); 0 is rejected because the
engine’s loop short-circuits there, making it wire-distinct but
semantically equivalent to skip().
RepeatableMeta
Section titled “RepeatableMeta”@dataclass(frozen=True)class RepeatableMeta: key: str job_name: str pattern: RepeatPattern next_fire_ms: int limit: int | None = None start_after_ms: int | None = None end_before_ms: int | None = None missed_fires: MissedFiresPolicy | None = NoneWire-compatible projection returned by
Queue.get_repeatable_jobs.
Carries no payload — listing thousands of specs stays cheap.
Type aliases
Section titled “Type aliases”DelayMsLike
Section titled “DelayMsLike”DelayMsLike = Union[int, float, datetime]Anything Queue.add accepts as delay_ms:
int— milliseconds (BullMQ-compatible).float— milliseconds; fractional accepted; truncated toint.datetime.datetime— absolute fire time. Naive datetimes are treated as UTC.
For seconds-friendly relative durations, pass a timedelta via the
parallel delay kwarg instead.
BackoffLike
Section titled “BackoffLike”BackoffLike = Union[int, BackoffSpec, dict]int— fixed delay in ms.BackoffSpec— typed builder.dict— raw native shape (advanced; bypasses validation).
RepeatLike
Section titled “RepeatLike”RepeatLike = Union[RepeatPattern, dict]MissedFiresLike
Section titled “MissedFiresLike”MissedFiresLike = Union[MissedFiresPolicy, dict]Raw dict shape: {"kind": "skip" | "fire-once" | "fire-all", "max_catchup"?: int}.
Handler
Section titled “Handler”Handler = Callable[[Job], Awaitable[Any]]The signature Worker accepts.
Errors
Section titled “Errors”The shim raises typed exceptions:
UnrecoverableError— raise from a handler to skip retries and route to the DLQ. Subclassing is supported via MRO-awareissubclasscheck on the native side.NotSupportedError— caller asked for a v1-stubbed feature.
from chasquimq import UnrecoverableError
async def handler(job): if not job.data.get("user_id"): raise UnrecoverableError("missing user_id; no retry possible")Native power-user surface
Section titled “Native power-user surface”The underscore-free re-exports Producer, Consumer, Scheduler
are the unwrapped PyO3 binding classes. Reach for them only when
you have a measured reason to bypass the high-level layer — most
callers will never need them. The signatures live in
chasquimq-py/src/chasquimq/_native.pyi.
from chasquimq import Producer, Consumer, Scheduler, version
print(version()) # engine binding crate versionProducer(redis_url, queue_name, *, pool_size, max_stream_len, max_delay_secs)— sync constructor.Consumer(redis_url, queue_name, *, concurrency, max_attempts, group, ...)— sync constructor with the full engine config surface.Scheduler(redis_url, queue_name, *, tick_interval_ms, batch, max_stream_len, lock_ttl_secs, holder_id)— sync constructor.
These map directly onto the engine’s
ProducerConfig /
ConsumerConfig /
SchedulerConfig — see
the Rust reference for the canonical field meanings.
version()
Section titled “version()”def version() -> str: ...Returns the version of the binding crate.