Skip to content

Bare Python

  • Standalone workers with no web framework (cron scripts, pure RQ workers, Dramatiq workers).
  • Custom frameworks z4j doesn’t provide an adapter for.
  • Library or SDK code that wants to ship its own agent.
import asyncio
from z4j_bare import Agent
async def main():
async with Agent(
brain_url="wss://z4j.example.com/ws",
token=os.environ["Z4J_TOKEN"],
project_id="default",
agent_name="worker-01",
):
await your_worker_loop()
asyncio.run(main())

async with will:

  1. Open the WebSocket, send hello, wait for ack.
  2. Install patches on auto-detected engines (scans installed z4j-* packages).
  3. Start the heartbeat task.
  4. On exit, send goodbye, await ack, close.

If your worker is sync, run the agent in a dedicated thread:

import threading, asyncio
from z4j_bare import Agent
def run_agent():
asyncio.run(Agent(brain_url=..., token=...).run_forever())
threading.Thread(target=run_agent, daemon=True).start()
your_sync_worker_loop()

Agent.run_forever() is a convenience for this case. It never returns; send SIGTERM to stop.

If auto-discovery misbehaves (multiple Celery apps in one process, unusual import patterns):

from z4j_bare import Agent
from z4j_celery import CeleryAdapter
agent = Agent(...)
agent.register_engine(CeleryAdapter(app=my_primary_celery_app))
agent.register_engine(CeleryAdapter(app=my_analytics_celery_app))

Each registered adapter shows separately in the brain with its own queue namespace.

The dispatcher (z4j_bare.dispatcher.Dispatcher) batches events, applies redaction, and manages the WS flush loop. You rarely touch it directly - but it’s public, so custom adapters can reuse it.

z4j-bare ships pytest fixtures that stub the brain:

from z4j_bare.testing import stub_brain
def test_enqueue_emits_event(stub_brain):
your_app.enqueue_something()
assert stub_brain.events[0].event_type == "task_sent"

python -m z4j_bare doctor runs the same probes the agent runtime does (buffer dir writable, brain DNS / TCP / TLS, WebSocket upgrade) using Z4J_* env vars or CLI flags. This is the canonical doctor implementation; the framework adapters (django/flask/fastapi) wrap the same logic.

Terminal window
# Use env vars (typical):
Z4J_BRAIN_URL=https://tasks.example.com \
Z4J_TOKEN=... \
Z4J_PROJECT_ID=ml-pipeline \
Z4J_HMAC_SECRET=... \
python -m z4j_bare doctor
# Or flags (useful for ad-hoc checks):
python -m z4j_bare doctor \
--brain-url https://tasks.example.com \
--token ... \
--project-id ml-pipeline
# Skip the WS upgrade probe:
python -m z4j_bare doctor --no-websocket
# JSON for scripting:
python -m z4j_bare doctor --json

Exits 0 on all-green, 1 on any failure. The same probes are exposed programmatically via z4j_bare.diagnostics (probe_buffer_path, probe_dns, probe_tcp, probe_tls, probe_websocket, plus a run_all(config) orchestrator) so custom adapters can build their own doctor commands.

See service-user deployments for the buffer-path failure mode the doctor catches.

3.10. Uses match statements and ParamSpec.

  • Agent - main entry point.
  • Dispatcher - batching + redaction + flush.
  • Redactor - pluggable secret filter.
  • HelloBuilder - constructs the capability map.
  • testing.stub_brain - test fixture.

See API § websocket-protocol for the wire-level reference.