Skip to content

Node

A Node is one step inside a Workflow — a database query, an HTTP call, a Python transform, a control-flow branch. Every node has a name (unique within the workflow), a node_type (e.g. "postgres", "python_transform"), and a configuration dict whose shape depends on the type.

You almost never instantiate Node directly. Use the typed builders on Workflow:

load = wf.postgres(
    "load",
    operation="select",
    query="SELECT id, status FROM orders WHERE updated_at >= NOW() - INTERVAL '1 day'",
)

The builder returns the Node so you can chain it with >> (see Connection) or store the reference for later use.

Categories

Nodes are grouped by category. Each category has its own module under athena_sdk.nodes:

Category Examples Builder method
Sources twitter, reddit, pubmed, clinical_trials, edgar, … wf.twitter() etc.
Actions postgres, mysql, mssql, s3, excel, api, output, ai_tagging wf.postgres() etc.
Transforms filter, map, aggregate, join, sort, limit, python_transform, schema_transform wf.filter() etc.
Controls if_, split, merge, switch wf.if_() etc.

Typed builders fail at build time

The builders validate as much as possible before the workflow is ever executed. For example, wf.postgres(operation="upsert", table="t") raises WorkflowBuildError immediately — upsert requires upsert_key, and the SDK refuses to construct an invalid node:

from athena_sdk import WorkflowBuildError

try:
    wf.postgres("bad", operation="upsert", table="t")
except WorkflowBuildError as e:
    print(e)
    # postgres node 'bad': operation='upsert' requires `upsert_key`.

This is by design: a typo in node configuration becomes a stack trace at build time, not a 30-second wait followed by an opaque engine error.

The escape hatch: add_node()

If a node type doesn't have a typed helper yet, drop down to Workflow.add_node():

custom = wf.add_node(
    name="custom_step",
    node_type="my_custom",
    category="actions",
    configuration={"foo": 1, "bar": "baz"},
)

Use this only when you need a node type that isn't covered by the typed surface — opening an issue / PR with a typed builder is usually the better long-term move.

Connecting nodes

Two equivalent ways to wire two nodes together:

load >> double                  # operator form
wf.connect(load, double)        # method form

Multi-fan-out is the natural shape — the same node can be the source of many edges:

load = wf.postgres("load", operation="select", query="SELECT * FROM events")
load >> wf.s3("archive", bucket="archive", operation="write", file_path="snapshot.csv")
load >> wf.postgres("warehouse", operation="upsert", table="events_warehouse", upsert_key="id")

For details on multi-input ports, named outputs, and the Connection object itself, see Connection.

Inspecting nodes

wf.nodes is a read-only view of the nodes in insertion order:

for node in wf.nodes:
    print(node.name, node.node_type, node.node_category)

Each node also exposes .configuration (the dict you built) and is iterable through wf.connections for edge inspection.