Skip to content

Build a workflow

This guide walks through the thinking behind a workflow, not just the API. By the end you'll know how to take a vague goal ("archive yesterday's orders to S3 once a day") and turn it into a working Workflow object — including how to pick the right nodes, how to compose them, and how to debug when something doesn't behave the way you expected.

If you've already read Concepts: Workflow and just want the API surface, skip to Common shapes below.

The mental model

Every workflow is a directed acyclic graph (DAG) of three rough stages:

   ┌────────┐    ┌────────────────┐    ┌────────┐
   │ source │ →  │ transform(s)   │ →  │  sink  │
   └────────┘    └────────────────┘    └────────┘
   bring data    reshape it             send it
   into the      (filter, map,          somewhere
   workflow      enrich, branch)        useful

Almost every real workflow is some elaboration of that shape — more sources, more transforms, branches, fan-out to multiple sinks. Once you can spot which stage each node belongs to, the API stops feeling like a long list of helpers and starts feeling like a small grammar.

The four kinds of nodes

The SDK groups every node into one of four categories. When you're building a workflow, ask "what do I need to do next?" and the category usually picks itself.

Category When to reach for it Examples
Source Bring external data into the workflow. twitter, pubmed, clinical_trials
Action Talk to an external system — read or write. postgres, s3, api, output
Transform Reshape, filter, or enrich data already inside the workflow. filter, map, aggregate, python_transform
Control Decide what runs next — branch, fan-out, fan-in. if_, switch, split, merge

Both sources and actions read or write external systems; the distinction is direction. A postgres SELECT is an action that reads (it could be the source of your data); a postgres INSERT is an action that writes. The SDK doesn't enforce a hard line — use whatever node makes sense at each step.

Building it: a recipe

A reliable order of operations when starting a new workflow:

  1. Write the goal in one sentence. "Once a day, pull yesterday's orders, filter for orders > $1000, and post them to Slack."
  2. Pick the source(s). Where does the data come from? In our example, a postgres SELECT.
  3. Pick the sink(s). Where does the result need to land? In our example, an api POST to Slack.
  4. Sketch the transforms in between. What needs to happen between source and sink? Here, a filter to keep only high-value orders.
  5. Add control flow if you need it. Branching, fan-out, fan-in come last — most workflows don't need any of these.
  6. Wire it up with >>, validate, visualize, run.

That's literally it. Let's walk through the example end-to-end:

from athena_sdk import Trigger, Workflow, expr

wf = Workflow("daily-high-value-orders")
wf.trigger(Trigger.cron("0 9 * * *", timezone="UTC"))   # 9am UTC

# 1. Source
load = wf.postgres(
    "load_orders",
    operation="select",
    query="""
        SELECT id, customer_id, amount, status
        FROM orders
        WHERE created_at::date = CURRENT_DATE - 1
    """,
)

# 2. Transform
filter_ = wf.filter(
    "high_value_only",
    conditions=[{"field": "amount", "op": "gt", "value": 1000}],
)

# 3. Sink
notify = wf.api(
    "post_to_slack",
    url="https://hooks.slack.com/services/T000/B000/XXX",
    method="POST",
    body={"text": expr.raw("'High-value orders today: ' + str(len($input.data))")},
)

# 4. Wire it up
load >> filter_ >> notify

# 5. Sanity-check
issues = wf.validate()
assert not issues, issues
print(wf.visualize())

Running wf.run() (with the bundled engine up) executes the whole chain. If you wanted Slack notification only and no Postgres source yet, you could replace load with a python_transform returning hard- coded data while you iterate on the downstream pieces.

Naming conventions that pay off

Names are how you reference a node in expressions, in wf.connect(), in error messages, and in the rendered diagram. Two rules carry their weight:

  • Use snake_case verbs. load_orders, filter_high_value, post_to_slack — not Step1, pgquery, or slack. Future you, and anyone reading the diagram, will read the workflow as a paragraph.
  • Prefix related nodes. When a workflow has parallel branches, prefix each branch (eu_sink, eu_archive, us_sink, us_archive) so the diagram groups visually.

Node names must be unique within a workflow — the SDK rejects duplicates at build time.

Common shapes

The next four examples cover ~80% of workflows you'll write. Skim them once; come back to copy-paste the one you need.

Linear ETL

The simplest shape: source → transform → sink.

wf = Workflow("orders-archive")
load    = wf.postgres("load",    operation="select", query="SELECT * FROM orders WHERE updated_at > NOW() - INTERVAL '1 day'")
shape   = wf.python_transform(
    "shape",
    code="def transform(row):\n    return {'order_id': row['id'], 'usd': row['amount'] / 100}\n",
)
archive = wf.s3("archive", bucket="orders-archive", operation="write", file_path="snapshot.csv")

load >> shape >> archive

Filter + branch

Two sinks, one for each branch of a condition:

wf = Workflow("orders-routing")

load = wf.postgres("load", operation="select", query="SELECT * FROM orders")
gate = wf.if_(
    "is_high_value",
    condition="$input.data.get('amount') > 1000",
)

high_path = wf.api(
    "notify_account_manager",
    url="https://hooks.example.com/high-value",
    method="POST",
)
low_path = wf.s3(
    "park_for_batch",
    bucket="low-value", operation="write", file_path="parked.csv",
)

load >> gate
gate.true  >> high_path
gate.false >> low_path

Multi-source merge

Two sources, joined together, written to one place:

wf = Workflow("enrich-orders")

orders   = wf.postgres("orders",   operation="select", query="SELECT id, customer_id FROM orders")
customers= wf.postgres("customers",operation="select", query="SELECT id AS customer_id, name, tier FROM customers")
joined   = wf.join("joined", how="inner", left_on="customer_id", right_on="customer_id")

orders    >> joined
customers >> joined

joined >> wf.s3("export", bucket="exports", operation="write", file_path="snapshot.csv")

merge is the more general fan-in — join is shorthand for the common case of two upstreams with a matching key.

N-way switch

When you have more than two branches, reach for switch instead of nested if_ chains:

wf = Workflow("region-router")

load   = wf.postgres("load", operation="select", query="SELECT region, * FROM events")
router = wf.switch("by_region", expr="$input.data.get('region')")

router.case("EU")  >> wf.s3("eu_sink",  bucket="eu-bucket",  operation="write", file_path="snapshot.csv")
router.case("US")  >> wf.s3("us_sink",  bucket="us-bucket",  operation="write", file_path="snapshot.csv")
router.case("APAC")>> wf.s3("apac_sink",bucket="apac-bucket",operation="write", file_path="snapshot.csv")
router.default()   >> wf.s3("global_sink", bucket="global", operation="write", file_path="snapshot.csv")

load >> router

Every row goes down exactly one branch. default() catches anything no case() matched.

Picking the right transform

Not every reshape needs Python. The typed transforms cover most cases faster, render in the diagram, and validate at build time:

Need Reach for
Drop rows by a condition on a field wf.filter()
Rename / project columns wf.map()
Group + summarize wf.aggregate()
Combine two upstreams on a key wf.join()
Order rows wf.sort()
Keep top N wf.limit()
Anything more complex (regex, branching logic, lookups, custom math) wf.python_transform()
Mustache-style template into a structured shape wf.schema_transform()

Default to the typed transforms; drop down to python_transform only when the logic genuinely doesn't fit. A workflow with seven typed transforms is easier to reason about than one with one big python_transform doing everything.

Transforms by example

A working snippet for every transform helper. Each example assumes load is the upstream node feeding the transform — wire it up with load >> transform_node (omitted in the snippets to keep them readable).

filter — keep rows that match a condition

high_value = wf.filter(
    "high_value",
    conditions=[
        {"field": "amount", "op": "gt", "value": 1000},
        {"field": "status", "op": "eq", "value": "paid"},
    ],
    combine="and",   # "and" (default) or "or"
)

Supported op values: eq, ne, lt, lte, gt, gte, in, not_in, contains, starts_with, ends_with, is_null, is_not_null. Multiple conditions are combined with and / or.

map — rename and project columns

shape = wf.map(
    "shape",
    mappings={
        "order_id": "$input.data.get('id')",
        "usd":      "$input.data.get('amount_cents') / 100",
        "customer": "$input.data.get('customer_email')",
    },
)

The right-hand side is an expression evaluated per-row — use $input.data.get('field') to read incoming columns. The output row contains exactly the keys you list (other columns are dropped). For expression authoring patterns, see Wiring up with expressions.

aggregate — group and summarize

totals = wf.aggregate(
    "daily_totals",
    group_by=["customer_id", "currency"],
    aggregations={
        "order_count": "count(*)",
        "gross":       "sum(amount)",
        "avg_amount":  "avg(amount)",
        "max_amount":  "max(amount)",
    },
)

Output: one row per (customer_id, currency) combination, with the aggregation columns listed. Common aggregation functions: count(*), count(col), sum, avg, min, max.

join — combine two upstreams on a key

joined = wf.join(
    "joined",
    how="inner",            # "inner" | "left" | "right" | "outer"
    left_on="customer_id",
    right_on="id",
)

orders    >> joined        # left side
customers >> joined        # right side

join requires two incoming edges. Order matters: the first edge becomes the left side, the second becomes the right.

sort — order rows

ranked = wf.sort(
    "by_amount",
    by=["amount", "created_at"],
    ascending=[False, True],     # amount desc, created_at asc
)

ascending can be a single bool (applies to every key) or a list the same length as by.

limit — keep the first N rows

top10 = wf.limit("top10", n=10)

Pair with sort to get a "top N by X" pattern:

load >> wf.sort("by_amount", by=["amount"], ascending=False) >> wf.limit("top10", n=10) >> sink

python_transform — anything the typed transforms can't express

Three execution modes:

# Mode 1: function — the most common. Define `transform(row)`; return
# the new row, return None to drop, or return a list to fan out.
clean = wf.python_transform(
    "extract_domains",
    code=(
        "import re\n"
        "URL_RE = re.compile(r'https?://([^/]+)')\n"
        "def transform(row):\n"
        "    text = row.get('body') or ''\n"
        "    return {'id': row['id'], 'domains': sorted(set(URL_RE.findall(text)))}\n"
    ),
)

# Mode 2: script — top-to-bottom code; SDK returns whichever variable
# you name in `output_variable`.
generate = wf.python_transform(
    "fixture_rows",
    code=(
        "rows = [{'i': i, 'square': i * i} for i in range(20)]\n"
    ),
    execution_mode="script",
    output_variable="rows",
)

# Mode 3: transform — mutate the input dict in place.
enrich = wf.python_transform(
    "tag",
    code="row['tagged_at'] = row.get('updated_at')\n",
    execution_mode="transform",
)

Inside function mode, transform(row) runs once per upstream row — the engine handles the looping. Use function_name="my_func" if you want to call your function something other than transform.

Hoisting values for downstream ((var)) placeholders. Pass export_variables= to publish fields into workflow-scoped variables that downstream nodes (typically api() bodies) can interpolate via ((name)) placeholders:

build_payload = wf.python_transform(
    "build_payload",
    code=(
        "def transform(row):\n"
        "    return {'documents': row['rows'], 'topic_size': 3}\n"
    ),
    export_variables={"documents": "documents"},
)

# Downstream API node references the exported variable in its body:
wf.api(
    "topic_model",
    url="https://api.example.com/topics/model",
    method="POST",
    body='{"documents": ((documents)), "min_topic_size": 3}',
)

((name)) placeholders are resolved by the engine's export_processor at execution time. They live in a separate namespace from {{name}} workflow variables (set via wf.set_variable() — see Wiring up with expressions).

schema_transform — Mustache-template into a structured shape

reshape = wf.schema_transform(
    "to_warehouse_schema",
    template={
        "order_id": "{{ id }}",
        "amount_usd": "{{ amount_cents }}",
        "customer": {
            "email": "{{ customer_email }}",
            "tier":  "{{ customer_tier }}",
        },
    },
    strict_mode=False,           # True → fail if a placeholder is missing
)

Useful when the source rows have flat fields but the downstream node expects a nested object — saves writing a python_transform for what is essentially a remap. Pass partials={...} to define reusable sub-templates referenced as {{> partial_name }}.

Wiring up with expressions

Inside a node configuration, you can reference upstream nodes and workflow variables without string-templating. The expr module returns objects that serialize into the engine's expression syntax:

from athena_sdk import expr

wf.set_variable("slack_webhook", "https://hooks.slack.com/...")

wf.api(
    "notify",
    url=expr.variable("slack_webhook"),
    body={
        "text": expr.raw("'Imported ' + str(len($input.data)) + ' rows'"),
        "first_id": expr.input.get("id"),
    },
)

The three patterns to know:

  • expr.input.get("field") — current row's field. Most common.
  • expr.node("loader").get("field") — pull a field from a named upstream node's output. Useful when you fan out and need to refer back across branches.
  • expr.variable("name") — workflow variable set via wf.set_variable(). Equivalent to a {{name}} token in any node config string. Both local (wf.run()) and remote (wf.deploy()) paths resolve these — set once at the top of the workflow, reference many times below.
  • expr.raw("…") — escape hatch when you need an expression shape the builders don't cover (len(), arithmetic, string concat).

Why bother instead of plain {{ name }} templating?

  1. Refactor-safety — renaming load_orders updates the reference if you grep for expr.node("load_orders"); raw template strings won't.
  2. Validationwf.validate() checks every expr.node("X") reference and warns if X doesn't exist. Typos surface at build time, not run time.

Build incrementally — validate every time

The fastest feedback loop is a tight build → validate → visualize cycle:

wf = Workflow("draft")
load = wf.postgres("load", operation="select", query="SELECT 1")
issues = wf.validate(); assert not issues, issues
print(wf.visualize())                # confirm the shape

wf.python_transform(
    "step",
    code="def transform(row):\n    return row\n",
)
load >> wf.nodes[-1]
issues = wf.validate(); assert not issues, issues
print(wf.visualize())

You don't need a running engine for any of this — validate() and visualize() are pure Python. By the time you call wf.run() for the first time, the structure is already vouched for.

When to use python_transform

Three rules of thumb:

  • Yes: the logic involves regex, multi-step lookups, dataclasses, a third-party library, or anything you'd write naturally in Python.
  • Probably not: the logic is "filter rows where X" or "rename these columns" — wf.filter() / wf.map() already do that.
  • Definitely not: the logic is "loop over rows" — that's what the engine already does. Write the per-row logic as the body of transform(row); the engine fans it out.
wf.python_transform(
    "extract_domains",
    code=(
        "import re\n"
        "URL_RE = re.compile(r'https?://([^/]+)')\n"
        "def transform(row):\n"
        "    matches = URL_RE.findall(row.get('text') or '')\n"
        "    return {'order_id': row['id'], 'domains': sorted(set(matches))}\n"
    ),
)

The transform(row) function runs once per input row; the return value becomes the row that flows downstream. Return None to drop a row.

Triggers — when should the workflow run?

wf.run() ignores triggers and runs the workflow once, immediately — that's what you want during development. Triggers matter when you deploy to a hosted backend that schedules workflows for you:

wf.trigger(Trigger.manual())                       # run on demand
wf.trigger(Trigger.cron("0 6 * * *", "UTC"))       # 6am UTC daily
wf.trigger(Trigger.webhook(path="/ingest/orders")) # HTTP POST starts a run

A workflow without a trigger is fine for local execution; a deployed workflow without a trigger is technically valid but will only run when something else calls the executions API.

Things that go wrong (and how to spot them)

Symptom Likely cause
WorkflowBuildError: operation='upsert' requires \upsert_key`` Forgot a required keyword for that node type. Read the helper's docstring.
validate() returns "Workflow has a cycle" An edge points back into an upstream node — most often via >> reused after a typo.
validate() returns "no terminal node" Every node has a successor — your DAG is a cycle or you forgot the sink.
Run finishes ok=True but node_results[X].status == "failed" A non-terminal node failed but downstream skipped. Inspect error on the failed node.
Run hangs forever Engine isn't running (docker compose up -d) or .env points at the wrong DB.

Where to next

  • Run remotely — deploy the same workflow to a hosted nexus-backend.
  • Expression helpers — typed wiring for cross-node references and variables.
  • CLI usage — validate / visualize / run from the shell, useful in CI.
  • API reference — every method on Workflow, every typed builder, signatures + examples.