Celery
Requires Celery 5.6+.
Install
Section titled “Install”pip install z4j-celeryPip transitively pulls z4j-core and z4j-bare. Auto-discovered - no explicit registration needed if you use z4j-django / z4j-flask / z4j-fastapi. The worker_init signal handler is wired:
- by
z4j-django(which eagerly importsz4j_celeryat module-load time when inINSTALLED_APPS), or - by direct
import z4j_celeryin yourcelery.py(Flask/FastAPI/bare layouts), or - automatically by celery itself via the entry point.
Worker bootstrap
Section titled “Worker bootstrap”When a Celery worker process starts, the worker_init signal fires and z4j_celery.worker_bootstrap._on_worker_init():
- Confirms the process is
celery worker(notcelery inspect/control/purge/etc. - those don’t deserve an agent slot). - Resolves the Celery app from
sender.apporcurrent_app. - Calls
install_agent(engines=[CeleryEngineAdapter(celery_app=...)]), which readsZ4J_BRAIN_URL/Z4J_TOKEN/Z4J_HMAC_SECRET/Z4J_PROJECT_ID/Z4J_AGENT_NAMEfrom the environment. - Logs
INFO:z4j.celery.worker_bootstrap:z4j worker bootstrap: agent runtime started (celery_app=..., framework=...)on success.
If you don’t see that log line on worker boot, the agent is not running and tasks will not reach the brain. Check the Z4J_* env vars and that the worker is actually invoked as celery worker (the bootstrap signal only fires under that command).
What it captures
Section titled “What it captures”What it captures
Section titled “What it captures”| Signal | z4j event |
|---|---|
before_task_publish | task_sent |
task_prerun | task_started |
task_postrun (state=SUCCESS) | task_succeeded |
task_postrun (state=FAILURE) | task_failed |
task_retry | task_retry |
task_revoked | task_revoked |
Plus the payload: args, kwargs (redacted), queue, routing key, exchange, retries count, ETA, expires, task parent (chord / group support).
Actions
Section titled “Actions”| Verb | How z4j performs it |
|---|---|
retry | app.send_task(name, args, kwargs, ...) with original payload; marks old task retried_as=<new_id> |
cancel | app.control.revoke(task_id, terminate=False) - worker-level cancellation |
cancel + terminate | requires operator+ role - sends SIGTERM to the worker |
purge_queue | app.control.purge() on the queue |
bulk_retry | loop native retry, capped at 10k |
Chord / group support
Section titled “Chord / group support”Celery’s chord and group primitives carry a group_id. z4j preserves this, letting the dashboard:
- Show sub-tasks grouped under the parent.
- Offer “retry whole chord” as a single button.
- Attribute failures to the root group in alerts (v1.1 feature).
Caveats
Section titled “Caveats”- Canvas signatures -
.si()/.s()are not re-serialized exactly; the agent captures the resolved payload frombefore_task_publish. Re-enqueueing re-runs with the same captured args, not the same signature object. This is fine 99% of the time. - eta/countdown on retry - z4j’s retry does not replay the original ETA. Retries run immediately.
- Broker state visibility - queue length reporting requires
redisbroker; RabbitMQ support works but depends onrabbitmq-managementplugin.
Config
Section titled “Config”The adapter takes no explicit config - it reads the Celery app’s config via the framework adapter.
For Django, the app is auto-detected via 5 candidates (see quickstart §Auto-detect). If your layout is unusual:
# settings.py - top-level Django setting, NOT inside the Z4J dictCELERY_APP = "myproject.celery:app"For Flask / FastAPI / bare-Python layouts, pass the app directly to the engine:
from z4j_celery.engine import CeleryEngineAdapterfrom z4j_bare.install import install_agent
install_agent( engines=[CeleryEngineAdapter(celery_app=my_celery_app)], # brain_url / token / hmac_secret / project_id / agent_name # are read from Z4J_* env vars if not passed explicitly)Performance notes
Section titled “Performance notes”- Event capture adds ~50µs per task (measured on Apple M3). Negligible.
task_postrunsignal is synchronous in Celery; the z4j handler does its work on a worker thread to avoid blocking.