Skip to content

Orchestrator integration

The SDK ships an operator-shaped task you can drop into any Python-friendly DAG / orchestration framework.

from athena_sdk.integrations.orchestrator import AthenaWorkflowTask

run_ingest = AthenaWorkflowTask(
    task_id="run_ingest",
    workflow_id="wf-abc-123",
    wait_for_completion=True,
    poll_interval_seconds=10,
    timeout_seconds=3600,
)

The class implements the canonical __init__ + execute(context) shape that operator-style schedulers expect. Drop it into your DAG / pipeline / flow definition the same way you would any other Python operator.

AthenaWorkflowTask:

  1. Triggers the workflow via client.executions.execute(workflow_id).
  2. Polls client.executions.get_status() every poll_interval_seconds.
  3. Returns the final status dict when the execution reaches a terminal state (completed / failed / cancelled).
  4. Raises TimeoutError if timeout_seconds elapses first.

Install: pip install athena-sdk[orchestrator] — your DAG host is your responsibility; the SDK doesn't pin one.

Making the host scheduler recognize AthenaWorkflowTask

Some Python orchestrators require their tasks to subclass the framework's own BaseOperator / Task class so the scheduler/executor can pick them up. AthenaWorkflowTask supports this via two environment variables:

# Example — any operator-style Python framework will do
export ATHENA_HOST_OPERATOR_MODULE=your_framework.models
export ATHENA_HOST_OPERATOR_ATTR=BaseOperator   # default

If the import succeeds at module load, AthenaWorkflowTask subclasses that class. If the env vars are unset or the import fails, it falls back to plain object.execute(context) still works directly, just without framework-native recognition.