Skip to content

Workflow

A Workflow is a directed acyclic graph (DAG) of Nodes connected by Connections. It's the top-level object you build, validate, serialize, and run.

from athena_sdk import Workflow, Trigger

wf = Workflow("daily-ingest", description="Pull yesterday's rows + archive.")
wf.trigger(Trigger.cron("0 6 * * *", timezone="UTC"))

Lifecycle

Every workflow goes through the same stages — none of them require an external service except the last:

Stage Method Pure Python?
Build wf.postgres(...), wf.api(...), …
Validate wf.validate()
Visualize wf.visualize(fmt="mermaid")
Serialize wf.to_json(), wf.to_yaml()
Run locally wf.run() / wf.run_async() needs engine
Run remotely wf.deploy_and_run() needs backend

You can ship a workflow defined in Python, parsed from JSON, or rebuilt from YAML — they all produce the same internal representation (wf.to_workflow_data()).

Triggers

Triggers describe when a workflow should fire. They're optional for local execution (wf.run() ignores them and runs once) but required if you want the deployed backend to schedule the workflow.

from athena_sdk import Trigger

wf.trigger(Trigger.manual())                       # explicit "run now"
wf.trigger(Trigger.cron("0 6 * * *", "UTC"))       # 6am UTC daily
wf.trigger(Trigger.webhook(path="/ingest/orders")) # HTTP-triggered

wf.trigger() is last-write-wins; calling it twice replaces the trigger.

Variables

Workflow-scoped variables are referenced from any node configuration via {{ name }} substitution and from expressions via expr.variable("name").

wf.set_variable("env", "prod").set_variable("limit", 100)

wf.python_transform(
    "filter",
    code="def transform(row):\n    return row if row['count'] > {{ limit }} else None\n",
)

Variables are interpolated at execution time, so changing them between runs (in YAML, JSON, or via set_variable()) re-parameterizes the workflow without touching node configs.

Validation

wf.validate() returns a list of human-readable issues. An empty list means the workflow is structurally valid:

  • Every node has a unique non-empty name.
  • Every edge references nodes that belong to this workflow.
  • The DAG has at least one terminal node (a node with no successors).
  • No cycles.
  • Every expr.node("X") reference points at a node that exists.
issues = wf.validate()
if issues:
    for issue in issues:
        print(issue)

wf.assert_valid() is the strict variant — it raises WorkflowBuildError instead of returning a list. Use it in pre-deploy gates.

Visualization

print(wf.visualize())              # Mermaid (default)
print(wf.visualize(fmt="ascii"))   # ASCII text diagram

In Jupyter / IPython, the workflow object's _repr_html_ renders the Mermaid diagram inline:

wf                                 # last expression in a cell

Serialization

spec_json = wf.to_json(indent=2)   # pretty-printed JSON
spec_yaml = wf.to_yaml()           # requires the [yaml] extra

restored = Workflow.from_json(spec_json)
restored = Workflow.from_yaml(spec_yaml)
restored = Workflow.from_workflow_data(wf.to_workflow_data())

Round-tripping through JSON or YAML produces an equivalent workflow — useful for storing workflows in Git, generating them dynamically, or sharing across services without sharing Python code.

Running

The OSS-friendly path is wf.run() — it loads the bundled engine in-process and blocks until the workflow finishes. There's also an async variant for use inside event loops:

result = wf.run()                  # synchronous, OSS, no remote
result = await wf.run_async()      # async, same engine

The result is a Result dataclass with per-node outcomes, total duration, and a truthy ok shortcut.

If you have a deployed nexus-backend, wf.deploy() pushes the definition and returns the workflow ID; wf.deploy_and_run() deploys, triggers, and blocks for the result. Both read connection details from NEXUS_API_URL / NEXUS_API_KEY (or an explicit AthenaClient) — see Run remotely.