Orchestrator integration¶
The SDK ships an operator-shaped task you can drop into any Python-friendly DAG / orchestration framework.
from athena_sdk.integrations.orchestrator import AthenaWorkflowTask
run_ingest = AthenaWorkflowTask(
task_id="run_ingest",
workflow_id="wf-abc-123",
wait_for_completion=True,
poll_interval_seconds=10,
timeout_seconds=3600,
)
The class implements the canonical __init__ + execute(context) shape
that operator-style schedulers expect. Drop it into your DAG / pipeline /
flow definition the same way you would any other Python operator.
AthenaWorkflowTask:
- Triggers the workflow via
client.executions.execute(workflow_id). - Polls
client.executions.get_status()everypoll_interval_seconds. - Returns the final status dict when the execution reaches a terminal
state (
completed/failed/cancelled). - Raises
TimeoutErroriftimeout_secondselapses first.
Install: pip install athena-sdk[orchestrator] — your DAG host is your
responsibility; the SDK doesn't pin one.
Making the host scheduler recognize AthenaWorkflowTask¶
Some Python orchestrators require their tasks to subclass the framework's
own BaseOperator / Task class so the scheduler/executor can pick them
up. AthenaWorkflowTask supports this via two environment variables:
# Example — any operator-style Python framework will do
export ATHENA_HOST_OPERATOR_MODULE=your_framework.models
export ATHENA_HOST_OPERATOR_ATTR=BaseOperator # default
If the import succeeds at module load, AthenaWorkflowTask subclasses
that class. If the env vars are unset or the import fails, it falls back
to plain object — .execute(context) still works directly, just without
framework-native recognition.