Node¶
A Node is one step inside a Workflow — a database
query, an HTTP call, a Python transform, a control-flow branch. Every
node has a name (unique within the workflow), a node_type (e.g.
"postgres", "python_transform"), and a configuration dict whose
shape depends on the type.
You almost never instantiate Node directly. Use the typed builders
on Workflow:
load = wf.postgres(
"load",
operation="select",
query="SELECT id, status FROM orders WHERE updated_at >= NOW() - INTERVAL '1 day'",
)
The builder returns the Node so you can chain it with >> (see
Connection) or store the reference for later use.
Categories¶
Nodes are grouped by category. Each category has its own module under
athena_sdk.nodes:
| Category | Examples | Builder method |
|---|---|---|
| Sources | twitter, reddit, pubmed, clinical_trials, edgar, … |
wf.twitter() etc. |
| Actions | postgres, mysql, mssql, s3, excel, api, output, ai_tagging |
wf.postgres() etc. |
| Transforms | filter, map, aggregate, join, sort, limit, python_transform, schema_transform |
wf.filter() etc. |
| Controls | if_, split, merge, switch |
wf.if_() etc. |
Typed builders fail at build time¶
The builders validate as much as possible before the workflow is ever
executed. For example, wf.postgres(operation="upsert", table="t")
raises WorkflowBuildError immediately —
upsert requires upsert_key, and the SDK refuses to construct an
invalid node:
from athena_sdk import WorkflowBuildError
try:
wf.postgres("bad", operation="upsert", table="t")
except WorkflowBuildError as e:
print(e)
# postgres node 'bad': operation='upsert' requires `upsert_key`.
This is by design: a typo in node configuration becomes a stack trace at build time, not a 30-second wait followed by an opaque engine error.
The escape hatch: add_node()¶
If a node type doesn't have a typed helper yet, drop down to
Workflow.add_node():
custom = wf.add_node(
name="custom_step",
node_type="my_custom",
category="actions",
configuration={"foo": 1, "bar": "baz"},
)
Use this only when you need a node type that isn't covered by the typed surface — opening an issue / PR with a typed builder is usually the better long-term move.
Connecting nodes¶
Two equivalent ways to wire two nodes together:
Multi-fan-out is the natural shape — the same node can be the source of many edges:
load = wf.postgres("load", operation="select", query="SELECT * FROM events")
load >> wf.s3("archive", bucket="archive", operation="write", file_path="snapshot.csv")
load >> wf.postgres("warehouse", operation="upsert", table="events_warehouse", upsert_key="id")
For details on multi-input ports, named outputs, and the Connection object itself, see Connection.
Inspecting nodes¶
wf.nodes is a read-only view of the nodes in insertion order:
Each node also exposes .configuration (the dict you built) and is
iterable through wf.connections for edge inspection.