Skip to content

Celery

Requires Celery 5.6+.

Terminal window
pip install z4j-celery

Pip transitively pulls z4j-core and z4j-bare. Auto-discovered - no explicit registration needed if you use z4j-django / z4j-flask / z4j-fastapi. The worker_init signal handler is wired:

  • by z4j-django (which eagerly imports z4j_celery at module-load time when in INSTALLED_APPS), or
  • by direct import z4j_celery in your celery.py (Flask/FastAPI/bare layouts), or
  • automatically by celery itself via the entry point.

When a Celery worker process starts, the worker_init signal fires and z4j_celery.worker_bootstrap._on_worker_init():

  1. Confirms the process is celery worker (not celery inspect/control/purge/etc. - those don’t deserve an agent slot).
  2. Resolves the Celery app from sender.app or current_app.
  3. Calls install_agent(engines=[CeleryEngineAdapter(celery_app=...)]), which reads Z4J_BRAIN_URL / Z4J_TOKEN / Z4J_HMAC_SECRET / Z4J_PROJECT_ID / Z4J_AGENT_NAME from the environment.
  4. Logs INFO:z4j.celery.worker_bootstrap:z4j worker bootstrap: agent runtime started (celery_app=..., framework=...) on success.

If you don’t see that log line on worker boot, the agent is not running and tasks will not reach the brain. Check the Z4J_* env vars and that the worker is actually invoked as celery worker (the bootstrap signal only fires under that command).

Signalz4j event
before_task_publishtask_sent
task_preruntask_started
task_postrun (state=SUCCESS)task_succeeded
task_postrun (state=FAILURE)task_failed
task_retrytask_retry
task_revokedtask_revoked

Plus the payload: args, kwargs (redacted), queue, routing key, exchange, retries count, ETA, expires, task parent (chord / group support).

VerbHow z4j performs it
retryapp.send_task(name, args, kwargs, ...) with original payload; marks old task retried_as=<new_id>
cancelapp.control.revoke(task_id, terminate=False) - worker-level cancellation
cancel + terminaterequires operator+ role - sends SIGTERM to the worker
purge_queueapp.control.purge() on the queue
bulk_retryloop native retry, capped at 10k

Celery’s chord and group primitives carry a group_id. z4j preserves this, letting the dashboard:

  • Show sub-tasks grouped under the parent.
  • Offer “retry whole chord” as a single button.
  • Attribute failures to the root group in alerts (v1.1 feature).
  • Canvas signatures - .si() / .s() are not re-serialized exactly; the agent captures the resolved payload from before_task_publish. Re-enqueueing re-runs with the same captured args, not the same signature object. This is fine 99% of the time.
  • eta/countdown on retry - z4j’s retry does not replay the original ETA. Retries run immediately.
  • Broker state visibility - queue length reporting requires redis broker; RabbitMQ support works but depends on rabbitmq-management plugin.

The adapter takes no explicit config - it reads the Celery app’s config via the framework adapter.

For Django, the app is auto-detected via 5 candidates (see quickstart §Auto-detect). If your layout is unusual:

# settings.py - top-level Django setting, NOT inside the Z4J dict
CELERY_APP = "myproject.celery:app"

For Flask / FastAPI / bare-Python layouts, pass the app directly to the engine:

from z4j_celery.engine import CeleryEngineAdapter
from z4j_bare.install import install_agent
install_agent(
engines=[CeleryEngineAdapter(celery_app=my_celery_app)],
# brain_url / token / hmac_secret / project_id / agent_name
# are read from Z4J_* env vars if not passed explicitly
)
  • Event capture adds ~50µs per task (measured on Apple M3). Negligible.
  • task_postrun signal is synchronous in Celery; the z4j handler does its work on a worker thread to avoid blocking.

See scheduler: celery-beat.