Building a Storage Backend
This guide documents the expectations for implementing a new storage backend for PyTaskFlow.
Contract: JobStorage
Every backend must implement the abstract methods in pytaskflow/storage/base.py. These methods are used by the client, worker, and dashboard. Any missing or partially implemented method will cause runtime errors.
Key responsibilities:
- Persist job records (
enqueue,schedule,get_job_data) - Move jobs through states (
set_job_state,acknowledge) - Provide queues for workers (
dequeue) - Expose dashboard queries (
get_jobs_by_state,get_state_job_count,get_job_history,get_servers,get_recurring_jobs) - Track servers/heartbeats (
server_heartbeat,remove_server)
Most Important Requirement: Atomic dequeue
The dequeue operation must be atomic. If two workers ask for a job at the same time, only one should receive it. This is critical to avoid duplicate processing and data loss.
Why it matters:
- If a worker crashes after marking a job as
Processingbut before completing it, the system needs to know that the job was in-flight. - If two workers race on the same job and both process it, you can get duplicate side effects (emails sent twice, payments double-charged, etc.).
Recommended approach:
- Use a transactional "fetch and lock" strategy.
- In SQL databases, this is typically:
SELECT ... FOR UPDATE SKIP LOCKED(Postgres/MySQL 8+)- Or a single atomic
UPDATE ... WHERE status = 'enqueued' ... RETURNINGpattern. - The dequeue should also update the job state to
Processingin the same transaction.
If the backend cannot guarantee atomic dequeue, it is not safe for multi-worker production use.
Processing Safety and Crash Recovery
At minimum, a backend should record:
fetched_atorprocessing_started_atworker_idandserver_idprocessingvsenqueuedstatus in the queue
This enables future safety features such as:
- Re-queueing jobs stuck in
Processingbeyond a timeout - Visibility into worker crashes
- Auditing job lifecycles
PyTaskFlow does not yet auto-requeue stuck jobs, but new backends should store the data needed to enable this safely later.
State Transitions and History
Every call to set_job_state should:
- Update the job record
- Record a history entry (
get_job_historyis used in the dashboard) - Keep state data serialized as JSON
If a backend drops history or fails to store state data, the dashboard will lose critical context.
Queues and Acknowledgements
Recommended queue behavior:
enqueueinserts a queue record with statusenqueueddequeuemoves it toprocessingacknowledgeremoves it from the processing listset_job_state(..., EnqueuedState)should move a job back toenqueued
This mirrors the behavior of the Redis backend and keeps job lifecycle consistent.
Scheduled and Recurring Jobs
Backends should support:
- Scheduled jobs (
schedule) with a durableenqueue_at - Recurring jobs with a cron expression and
last_execution
If a backend wants these to work under the worker schedulers, it should expose helper methods to move due scheduled/recurring jobs into the queue atomically.
Dashboard Queries
The dashboard expects these queries to be fast and deterministic:
- List jobs by state with pagination
- Get counts per state
- Get job details and history
- List active servers
- List recurring jobs
Add indexes on state_name, created_at, and any queue lookup columns to keep the UI responsive.
Summary Checklist
- Atomic dequeue with locking
- Durable processing metadata (worker, server, fetched_at)
- State transitions + history
- Scheduled and recurring storage
- Dashboard query support
- Reasonable indexes