Skip to content

Connection

A Connection is an edge between two Nodes. It's how data flows through a workflow: the engine runs each node, captures its output, and feeds that output to every connected downstream node.

The >> operator

The shortest way to wire two nodes is the right-shift operator:

load = wf.postgres("load", operation="select", query="SELECT * FROM orders")
sink = wf.s3("archive", bucket="orders-archive", operation="write", file_path="snapshot.csv")

load >> sink

>> returns the target node, so chains compose naturally:

wf.postgres("load", operation="select", query="...") \
  >> wf.python_transform("normalize", code="def transform(row): ...") \
  >> wf.s3("write", bucket="...", operation="write", file_path="...")

wf.connect(source, target) is the equivalent method form — useful when the two nodes are stored in variables already and you want a statement that reads as a verb, or when you want to thread the connection inside a loop.

Fan-out and fan-in

A node can be the source of many edges (fan-out) or the target of many edges (fan-in). The DAG is the union of all of them:

# Fan-out: one source, two sinks
load = wf.postgres("load", operation="select", query="SELECT * FROM events")
load >> wf.s3("archive", bucket="archive", operation="write", file_path="snap.csv")
load >> wf.postgres("warehouse", operation="upsert", table="events_wh", upsert_key="id")

# Fan-in: two upstreams merged into one node
wf.merge("combined", inputs=["postgres-extract", "api-extract"])

Cycles are not allowed — wf.validate() catches them and refuses to let you run a workflow with one.

Named outputs and inputs

Most nodes expose a single output channel called output and a single input channel called input. Some control-flow nodes (if_, split, switch) expose named output handles so you can route different data down different branches:

gate = wf.if_(
    "high-value-only",
    condition="$input.data.get('amount') > 1000",
)
load >> gate
gate.true  >> wf.api("notify", url="https://hooks.slack.com/...")
gate.false >> wf.s3("park", bucket="parked", operation="write", file_path="low.csv")

Conversely, fan-in nodes like merge expose multiple input ports. The typed helpers wire these correctly for you; if you build via add_node(), use wf.connect(source, target) with explicit handles.

Inspecting edges

wf.connections is a read-only view of every edge:

for edge in wf.connections:
    print(f"{edge.source.name}.{edge.source_output}{edge.target.name}.{edge.target_input}")

This is what wf.visualize() walks to render Mermaid / ASCII diagrams.

Validation rules

wf.validate() enforces:

  • Every edge references nodes that belong to this workflow (you can't connect nodes from two different Workflow instances).
  • The DAG has at least one terminal node (a node with no successors).
  • No cycles.

The validator is structural only — it doesn't check whether the node configurations are sensible (a postgres node pointing at a nonexistent table validates fine but fails at run time). Catch configuration issues with wf.run() against the bundled engine, or with the per-node typed builders that already validate config shape at build time.