Airflow Integration

The SDK ships with operators and sensors for building Airflow pipelines.

Installation

Install the SDK together with Airflow and the Amazon provider:

pip install imednet apache-airflow apache-airflow-providers-amazon

Example DAG

from datetime import datetime
from airflow import DAG
from imednet.integrations.airflow import ImednetToS3Operator, ImednetJobSensor

default_args = {"start_date": datetime(2024, 1, 1)}

with DAG(
    dag_id="imednet_example",
    schedule_interval=None,
    default_args=default_args,
    catchup=False,
) as dag:
    export_records = ImednetToS3Operator(
        task_id="export_records",
        study_key="STUDY_KEY",
        s3_bucket="your-bucket",
        s3_key="imednet/records.json",
    )

    wait_for_job = ImednetJobSensor(
        task_id="wait_for_job",
        study_key="STUDY_KEY",
        batch_id="BATCH_ID",
        poke_interval=60,
    )

    export_records >> wait_for_job

Connections

Create an Airflow connection imednet_default or override imednet_conn_id. Provide api_key and security_key via the login/password fields or in the extra JSON. base_url may be added in extra for a non-standard environment. The hook merges these settings with values from imednet.config.load_config so environment variables still apply. The ImednetToS3Operator also uses an AWS connection (aws_default by default) when writing to S3.

Operators and Sensors

The Airflow integration organizes hooks, operators, and sensors in dedicated subpackages for clarity.

ImednetExportOperator saves records to a local file using helpers from imednet.integrations.export. ImednetToS3Operator sends JSON data to S3 and ImednetJobSensor waits for an export job to complete. All operators use ImednetHook to obtain an ImednetSDK instance from an Airflow connection.

Testing with Airflow

The unit tests stub out the Airflow dependencies. When the airflow package is installed, tests/integration/test_airflow_dag.py runs a small DAG with ImednetToS3Operator and ImednetJobSensor. The test creates a temporary S3 bucket with moto and executes the tasks via TaskInstance.run. It is skipped automatically if Airflow is missing.