"""Airflow operators for interacting with iMednet."""
from __future__ import annotations
import json
from typing import Any, Dict, Optional, Sequence
from .._airflow_compat import AirflowException, Context
_MISSING_AMAZON_PROVIDER_MESSAGE = (
"apache-airflow-providers-amazon package is required for "
"ImednetToS3Operator. Install with: "
'pip install "apache-airflow-providers-imednet[amazon]"'
)
try: # pragma: no cover - optional Airflow dependency
from airflow.models import BaseOperator # type: ignore
from airflow.providers.amazon.aws.hooks.s3 import S3Hook # type: ignore
except (ImportError, ModuleNotFoundError): # pragma: no cover - placeholder fallback
class BaseOperator: # type: ignore
template_fields: Sequence[str] = ()
def __init__(self, *args: Any, **kwargs: Any) -> None: # pragma: no cover
pass
class S3Hook: # type: ignore
def __init__(self, *args: Any, **kwargs: Any) -> None: # pragma: no cover
raise ImportError(_MISSING_AMAZON_PROVIDER_MESSAGE)
def load_string(self, *args: Any, **kwargs: Any) -> None: # pragma: no cover
raise ImportError(_MISSING_AMAZON_PROVIDER_MESSAGE)
from imednet.sdk import ImednetSDK # noqa: E402 - imported after optional Airflow stubs
from ..hooks import ImednetHook # noqa: E402 - imported after optional Airflow stubs
[docs]class ImednetToS3Operator(BaseOperator):
"""Fetch data from iMednet and store it in S3 as JSON."""
template_fields: Sequence[str] = ("study_key", "s3_key")
[docs] def __init__(
self,
*,
study_key: str,
s3_bucket: str,
s3_key: str,
endpoint: str = "records",
endpoint_kwargs: Optional[Dict[str, Any]] = None,
imednet_conn_id: str = "imednet_default",
aws_conn_id: str = "aws_default",
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
self.study_key = study_key
self.s3_bucket = s3_bucket
self.s3_key = s3_key
self.endpoint = endpoint
self.endpoint_kwargs = endpoint_kwargs or {}
self.imednet_conn_id = imednet_conn_id
self.aws_conn_id = aws_conn_id
def _get_sdk(self) -> ImednetSDK:
return ImednetHook(self.imednet_conn_id).get_conn()
[docs] def execute(self, context: Context) -> str:
sdk = self._get_sdk()
endpoint_obj = getattr(sdk, self.endpoint)
if hasattr(endpoint_obj, "list"):
data = endpoint_obj.list(self.study_key, **self.endpoint_kwargs)
else:
raise AirflowException(f"Endpoint '{self.endpoint}' has no list method")
records = [d.model_dump() if hasattr(d, "model_dump") else d for d in data]
hook = S3Hook(aws_conn_id=self.aws_conn_id)
hook.load_string(json.dumps(records), self.s3_key, self.s3_bucket, replace=True)
return self.s3_key
__all__ = ["ImednetToS3Operator"]