Connection¶
A Connection is an edge between two Nodes. It's how data
flows through a workflow: the engine runs each node, captures its
output, and feeds that output to every connected downstream node.
The >> operator¶
The shortest way to wire two nodes is the right-shift operator:
load = wf.postgres("load", operation="select", query="SELECT * FROM orders")
sink = wf.s3("archive", bucket="orders-archive", operation="write", file_path="snapshot.csv")
load >> sink
>> returns the target node, so chains compose naturally:
wf.postgres("load", operation="select", query="...") \
>> wf.python_transform("normalize", code="def transform(row): ...") \
>> wf.s3("write", bucket="...", operation="write", file_path="...")
wf.connect(source, target) is the equivalent method form — useful
when the two nodes are stored in variables already and you want a
statement that reads as a verb, or when you want to thread the
connection inside a loop.
Fan-out and fan-in¶
A node can be the source of many edges (fan-out) or the target of many edges (fan-in). The DAG is the union of all of them:
# Fan-out: one source, two sinks
load = wf.postgres("load", operation="select", query="SELECT * FROM events")
load >> wf.s3("archive", bucket="archive", operation="write", file_path="snap.csv")
load >> wf.postgres("warehouse", operation="upsert", table="events_wh", upsert_key="id")
# Fan-in: two upstreams merged into one node
wf.merge("combined", inputs=["postgres-extract", "api-extract"])
Cycles are not allowed — wf.validate() catches them and refuses to
let you run a workflow with one.
Named outputs and inputs¶
Most nodes expose a single output channel called output and a single
input channel called input. Some control-flow nodes (if_, split,
switch) expose named output handles so you can route different
data down different branches:
gate = wf.if_(
"high-value-only",
condition="$input.data.get('amount') > 1000",
)
load >> gate
gate.true >> wf.api("notify", url="https://hooks.slack.com/...")
gate.false >> wf.s3("park", bucket="parked", operation="write", file_path="low.csv")
Conversely, fan-in nodes like merge expose multiple input ports.
The typed helpers wire these correctly for you; if you build via
add_node(), use wf.connect(source, target) with explicit handles.
Inspecting edges¶
wf.connections is a read-only view of every edge:
for edge in wf.connections:
print(f"{edge.source.name}.{edge.source_output} → {edge.target.name}.{edge.target_input}")
This is what wf.visualize() walks to render Mermaid / ASCII diagrams.
Validation rules¶
wf.validate() enforces:
- Every edge references nodes that belong to this workflow (you
can't connect nodes from two different
Workflowinstances). - The DAG has at least one terminal node (a node with no successors).
- No cycles.
The validator is structural only — it doesn't check whether the
node configurations are sensible (a postgres node pointing at a
nonexistent table validates fine but fails at run time). Catch
configuration issues with wf.run() against the bundled engine, or
with the per-node typed builders that already validate config shape
at build time.