Getting started¶
This guide takes you from a clean machine to a workflow running locally in about five minutes. No accounts, no API keys, no remote services.
1. Install the SDK¶
2. Start the bundled engine¶
The bundled engine persists execution state in Postgres and streams inter-node data through S3-compatible storage. Both ship as a docker-compose file in the repo so you can bring them up with one command:
git clone https://gitlab.com/aviradigital/ai_research/ai-evidence360/athena-sdk.git
cd athena-sdk
docker compose up -d # Postgres on :5432, MinIO on :9000
cp .env.example .env # defaults already match the compose file
Already have your own Postgres + S3 (AWS, GCP, on-prem MinIO)? Edit
.env to point at them — every variable is documented in
.env.example.
Just want to design workflows, not run them?
If you only need wf.validate(), wf.visualize(), wf.to_json() /
wf.to_yaml(), you don't need the engine running at all — those
are pure Python and work straight after pip install.
3. Build a workflow¶
from athena_sdk import Workflow
wf = Workflow("first-flow", description="One Python step.")
wf.python_transform(
"echo",
code="def transform(row):\n return {'ok': True, 'row': row}\n",
)
Every typed builder (wf.python_transform, wf.postgres, wf.s3, …)
returns a Node you can chain together with >>:
load = wf.python_transform("load", code="def transform(row):\n return [{'id': i} for i in range(3)]\n")
double = wf.python_transform("double", code="def transform(row):\n return {'id': row['id'] * 2}\n")
load >> double # equivalent to wf.connect(load, double)
4. Validate before running¶
validate() does pure-Python structural checks — empty/duplicate names,
dangling edges, cycles, references to nodes that don't exist. It does
not touch the engine.
For a quick mental model, render the DAG:
In Jupyter, displaying the workflow object directly (wf on its own
line) renders the Mermaid diagram inline.
5. Run it¶
result = wf.run() # blocks until the workflow completes
print(result.ok) # True / False
print(result.duration) # wall-clock seconds
print(result.node_results["echo"].output)
The return value is a Result dataclass with a
truthy ok shortcut, total duration, and per-node NodeResult
entries (status, duration, output, error).
6. Inspect failures¶
If anything goes wrong, the failing node is in result.node_results
with status="failed" and an error string:
result = wf.run()
if not result:
for name, nr in result.node_results.items():
if nr.status == "failed":
print(f"{name}: {nr.error}")
What's next¶
- Concepts: Workflow — triggers, variables, serialization, the lifecycle.
- Concepts: Connection — how data flows
between nodes, multi-input/output ports, the
>>operator. - Guide: Build a workflow — building richer pipelines with multiple node types.
- Guide: Run remotely — for users who target a deployed nexus-backend instead of the bundled engine.