Bare Python
When to use z4j-bare directly
Section titled “When to use z4j-bare directly”- Standalone workers with no web framework (cron scripts, pure RQ workers, Dramatiq workers).
- Custom frameworks z4j doesn’t provide an adapter for.
- Library or SDK code that wants to ship its own agent.
Agent lifecycle
Section titled “Agent lifecycle”import asynciofrom z4j_bare import Agent
async def main(): async with Agent( brain_url="wss://z4j.example.com/ws", token=os.environ["Z4J_TOKEN"], project_id="default", agent_name="worker-01", ): await your_worker_loop()
asyncio.run(main())async with will:
- Open the WebSocket, send
hello, wait forack. - Install patches on auto-detected engines (scans installed z4j-* packages).
- Start the heartbeat task.
- On exit, send
goodbye, await ack, close.
Sync workers
Section titled “Sync workers”If your worker is sync, run the agent in a dedicated thread:
import threading, asynciofrom z4j_bare import Agent
def run_agent(): asyncio.run(Agent(brain_url=..., token=...).run_forever())
threading.Thread(target=run_agent, daemon=True).start()your_sync_worker_loop()Agent.run_forever() is a convenience for this case. It never returns; send SIGTERM to stop.
Explicit engine registration
Section titled “Explicit engine registration”If auto-discovery misbehaves (multiple Celery apps in one process, unusual import patterns):
from z4j_bare import Agentfrom z4j_celery import CeleryAdapter
agent = Agent(...)agent.register_engine(CeleryAdapter(app=my_primary_celery_app))agent.register_engine(CeleryAdapter(app=my_analytics_celery_app))Each registered adapter shows separately in the brain with its own queue namespace.
Dispatcher internals
Section titled “Dispatcher internals”The dispatcher (z4j_bare.dispatcher.Dispatcher) batches events, applies redaction, and manages the WS flush loop. You rarely touch it directly - but it’s public, so custom adapters can reuse it.
Test fixtures
Section titled “Test fixtures”z4j-bare ships pytest fixtures that stub the brain:
from z4j_bare.testing import stub_brain
def test_enqueue_emits_event(stub_brain): your_app.enqueue_something() assert stub_brain.events[0].event_type == "task_sent"Verify with doctor
Section titled “Verify with doctor”python -m z4j_bare doctor runs the same probes the agent runtime does (buffer dir writable, brain DNS / TCP / TLS, WebSocket upgrade) using Z4J_* env vars or CLI flags. This is the canonical doctor implementation; the framework adapters (django/flask/fastapi) wrap the same logic.
# Use env vars (typical):Z4J_BRAIN_URL=https://tasks.example.com \ Z4J_TOKEN=... \ Z4J_PROJECT_ID=ml-pipeline \ Z4J_HMAC_SECRET=... \ python -m z4j_bare doctor
# Or flags (useful for ad-hoc checks):python -m z4j_bare doctor \ --brain-url https://tasks.example.com \ --token ... \ --project-id ml-pipeline
# Skip the WS upgrade probe:python -m z4j_bare doctor --no-websocket
# JSON for scripting:python -m z4j_bare doctor --jsonExits 0 on all-green, 1 on any failure. The same probes are exposed programmatically via z4j_bare.diagnostics (probe_buffer_path, probe_dns, probe_tcp, probe_tls, probe_websocket, plus a run_all(config) orchestrator) so custom adapters can build their own doctor commands.
See service-user deployments for the buffer-path failure mode the doctor catches.
Minimum Python
Section titled “Minimum Python”3.10. Uses match statements and ParamSpec.
What’s in the box
Section titled “What’s in the box”Agent- main entry point.Dispatcher- batching + redaction + flush.Redactor- pluggable secret filter.HelloBuilder- constructs the capability map.testing.stub_brain- test fixture.
See API § websocket-protocol for the wire-level reference.