Airflow Integration

Airflow support is provided via a standalone provider package.

Installation

Install the provider package:

pip install "apache-airflow>=3.2.0,<4.0.0" apache-airflow-providers-imednet

For ImednetToS3Operator support, install the provider’s amazon extra:

pip install "apache-airflow>=3.2.0,<4.0.0" "apache-airflow-providers-imednet[amazon]"

Production Reference DAG

Use examples/airflow/multi_study_pipeline.py as the recommended production pattern for dynamic task mapping deployments. It demonstrates lightweight TaskFlow discovery feeding mapped provider operators while keeping execution logic inside mapped tasks.

from __future__ import annotations

import os
import re
from datetime import datetime, timedelta
from typing import Any

from airflow import DAG
from airflow.decorators import task

from apache_airflow_providers_imednet import ImednetExportOperator, ImednetHook

"""Production reference DAG for multi-study exports with dynamic task mapping.

Pattern:
- Upstream TaskFlow discovery is lightweight and returns JSON-serializable work.
- Downstream mapped provider operator performs study-level export execution.

Operational guardrails:
- Discovery and export tasks use retries and execution timeouts.
- The DAG limits parallel workload with ``max_active_runs``/``max_active_tasks``.
- Exports run in an Airflow pool (``imednet_exports``) for shared resource control.
  Create this pool in Airflow with a slot count that matches your backend limits.
"""

IMEDNET_CONN_ID = "imednet_default"
IMEDNET_EXPORT_POOL = "imednet_exports"
IMEDNET_EXPORT_ROOT = os.getenv("IMEDNET_EXPORT_ROOT", "/tmp/imednet_exports")

default_args = {
    "start_date": datetime(2024, 1, 1),
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}


def _safe_study_path_fragment(study_key: str) -> str:
    """Return a filesystem-safe filename token from a study key."""
    return re.sub(r"[^A-Za-z0-9_-]", "_", study_key).strip("_-") or "study"


with DAG(
    dag_id="imednet_multi_study_reference",
    schedule_interval=None,
    default_args=default_args,
    catchup=False,
    max_active_runs=1,
    max_active_tasks=8,
    tags=["imednet", "reference", "dynamic-mapping"],
) as dag:

    @task(
        task_id="discover_studies",
        retries=1,
        retry_delay=timedelta(minutes=2),
        execution_timeout=timedelta(minutes=5),
    )
    def discover_export_targets() -> list[dict[str, Any]]:
        """Enumerate per-study export targets only (no extraction or writes)."""
        hook = ImednetHook(IMEDNET_CONN_ID)
        try:
            study_keys = hook.list_study_keys()
        except Exception as exc:
            raise RuntimeError("Failed to discover study keys from iMednet.") from exc
        return [
            {
                "study_key": study_key,
                "output_path": (
                    f"{IMEDNET_EXPORT_ROOT}/{_safe_study_path_fragment(study_key)}.csv"
                ),
                "export_kwargs": {"index": False},
            }
            for study_key in study_keys
        ]

    ImednetExportOperator.partial(
        task_id="export_records",
        export_func="export_to_csv",
        imednet_conn_id=IMEDNET_CONN_ID,
        pool=IMEDNET_EXPORT_POOL,
        retries=3,
        retry_delay=timedelta(minutes=10),
        execution_timeout=timedelta(minutes=30),
    ).expand_kwargs(discover_export_targets())

Operational safeguards highlighted in the reference DAG:

  • Discovery only enumerates work items and returns JSON-serializable values.

  • Retries and execution_timeout are configured for both discovery and mapped export execution.

  • max_active_runs and max_active_tasks constrain DAG-level concurrency.

  • pool is set on the mapped operator for shared-resource throttling.

Additional single-purpose DAGs are available in examples/airflow/ for focused operator/sensor usage patterns.

Connections

Create an Airflow connection imednet_default or override imednet_conn_id. Provide api_key and security_key via the login/password fields or in the extra JSON. base_url may be added in extra for a non-standard environment. The hook merges these settings with values from imednet.config.load_config so environment variables still apply. The ImednetToS3Operator also uses an AWS connection (aws_default by default) when writing to S3.

For task-mapping discovery steps, ImednetHook provides explicit helper methods:

  • get_sdk_client() returns the live ImednetSDK for use only inside task execution context.

  • list_studies_metadata() returns JSON-serializable primitive dictionaries (with sensitive keys redacted) for safe mapped task expansion.

  • list_study_keys() returns primitive study keys only.

  • describe_connection() returns redacted connection metadata and never exposes raw credentials.

get_conn() is kept as a backward-compatible alias to get_sdk_client().

Operators and Sensors

The Airflow integration organizes hooks, operators, and sensors in dedicated subpackages for clarity.

ImednetExportOperator saves records to a local file using helpers from imednet.integrations.export. ImednetToS3Operator sends JSON data to S3 and ImednetJobSensor waits for an export job to complete. All operators use ImednetHook to obtain an ImednetSDK instance from an Airflow connection. The production reference DAG above shows the recommended dynamic-mapping pattern: keep static settings (for example export_func and imednet_conn_id) in .partial(...) and map only runtime fields (study_key, output_path, export_kwargs).

from typing import Any
from airflow.decorators import task
from apache_airflow_providers_imednet import ImednetExportOperator, ImednetHook

@task
def export_targets() -> list[dict[str, Any]]:
    hook = ImednetHook("imednet_default")
    return [
        {
            "study_key": study_key,
            "output_path": f"/tmp/{study_key}.csv",
            "export_kwargs": {"index": False},
        }
        for study_key in hook.list_study_keys()
    ]

ImednetExportOperator.partial(
    task_id="export_records",
    export_func="export_to_csv",
    imednet_conn_id="imednet_default",
).expand_kwargs(export_targets())

Testing with Airflow

The unit tests stub out the Airflow dependencies. When the airflow package is installed, tests/integration/test_airflow_dag.py runs a small DAG with ImednetToS3Operator and ImednetJobSensor. The test creates a temporary S3 bucket with moto and executes the tasks via TaskInstance.run. It is skipped automatically if Airflow is missing.