Snowflake Export ================ This guide covers the warehouse-native Snowflake export path introduced as the first implementation of the staged warehouse export framework. For the DuckDB / local-analytical path see :doc:`duckdb_integration`. For general architecture context see :doc:`architecture`. Overview -------- The Snowflake export path uses a two-phase staged-loading approach: 1. **Stage** – Study records are serialised to Parquet files in a local directory (one file per batch). 2. **PUT** – Each Parquet file is uploaded to a Snowflake *internal* stage via ``PUT file://...``. 3. **COPY INTO** – Snowflake bulk-loads the staged file into the target table using ``COPY INTO ... FILE_FORMAT = (TYPE = PARQUET)``. This is intentionally different from :func:`~imednet.integrations.export_to_sql`, which uses SQLAlchemy ``DataFrame.to_sql()`` and is appropriate for transactional SQL targets (SQLite, PostgreSQL, etc.). Do **not** mix the two paths. .. important:: **Staging constraint — local staging only.** The first version of this framework supports **local intermediate staging** only. Parquet files are written to the local filesystem and then uploaded to a Snowflake *internal* stage via ``PUT``. Object-storage staging (S3, GCS, Azure Blob) is **not** yet supported. If your Snowflake account restricts ``PUT`` to external stages only, you will need to wait for a future release. Installation ------------ .. code-block:: bash pip install 'imednet[snowflake]' This installs ``snowflake-connector-python`` (≥ 3.0) and ``pyarrow`` (included transitively). Quickstart — ``export_to_snowflake`` ------------------------------------- The :func:`~imednet.integrations.export_to_snowflake` convenience function handles batching, staging, and cleanup automatically: .. code-block:: python import os from imednet import ImednetSDK from imednet.integrations import export_to_snowflake, SnowflakeSinkConfig sdk = ImednetSDK(api_key=os.environ["IMEDNET_API_KEY"]) cfg = SnowflakeSinkConfig( account="myorg-myaccount", user="loader", **{"password": os.environ["SNOWFLAKE_PASSWORD"]}, # never hardcode database="IMEDNET_DB", schema="PUBLIC", warehouse="COMPUTE_WH", stage="MY_STAGE", table="RECORDS", ) rows_loaded = export_to_snowflake(sdk, "MY_STUDY", config=cfg) print(f"Loaded {rows_loaded} rows") Advanced usage — ``SnowflakeExportSink`` directly -------------------------------------------------- Use :class:`~imednet.integrations.warehouse.SnowflakeExportSink` directly when you need per-batch control, custom ``batch_id`` keys, or integration with a workflow engine: .. code-block:: python import os from imednet import ImednetSDK from imednet.integrations.sink_base import iter_batches from imednet.integrations.warehouse import SnowflakeExportSink, SnowflakeSinkConfig sdk = ImednetSDK(api_key=os.environ["IMEDNET_API_KEY"]) records = sdk.records.list(study_key="MY_STUDY") cfg = SnowflakeSinkConfig( account="myorg-myaccount", user="loader", **{"password": os.environ["SNOWFLAKE_PASSWORD"]}, database="IMEDNET_DB", schema="PUBLIC", warehouse="COMPUTE_WH", stage="MY_STAGE", table="RECORDS", stage_prefix="imednet/MY_STUDY", batch_size=1000, max_retries=5, idempotent=True, ) with SnowflakeExportSink(config=cfg) as sink: for i, batch in enumerate(iter_batches(records, cfg.batch_size)): rows = sink.write_batch(batch, batch_id=f"MY_STUDY/records/{i}") print(f" batch {i}: {rows} rows") Manifest output --------------- Pass ``manifest_path`` to record every loaded batch in a JSON-lines file for audit purposes or replay: .. code-block:: python cfg = SnowflakeSinkConfig( ... manifest_path="/tmp/imednet_manifest.jsonl", ) Each entry contains: .. code-block:: json { "batch_id": "MY_STUDY/records/0", "stage_path": "@MY_STAGE/imednet/MY_STUDY_records_0.parquet", "row_count": 500, "loaded_at": "2026-01-15T12:00:00+00:00" } Idempotency ----------- ``SnowflakeSinkConfig.idempotent`` (default ``True``) maps to Snowflake's ``FORCE = FALSE`` option in ``COPY INTO``. When ``True``, Snowflake skips files that have already been loaded, making re-runs of the same batch safe. Set ``idempotent = False`` (``--force-reload`` in the CLI) to force re-ingestion. Error handling and retries -------------------------- Every ``write_batch`` call retries up to ``SinkConfig.max_retries`` times (default 3) with exponential back-off (``retry_backoff * 2 ** attempt``, default base 1 s). After all retries are exhausted, :class:`~imednet.errors.ExportBatchError` is raised with the failing ``batch_id``. Connection failures during ``__init__`` raise :class:`~imednet.errors.ExportConfigurationError`. Stage lifecycle and cleanup --------------------------- * **Temporary staging directory** – When ``local_staging_dir`` is not set, a :class:`tempfile.TemporaryDirectory` is created automatically and removed when the context manager exits, regardless of success or failure. * **Persistent staging directory** – When ``local_staging_dir`` is set, the directory is **not** cleaned up automatically; this lets you inspect or re-upload the Parquet files. * **Snowflake stage cleanup** – The SDK does **not** remove files from the Snowflake internal stage after loading. Use ``REMOVE @MY_STAGE/imednet/`` from a Snowflake session to purge them when they are no longer needed. CLI reference ------------- .. code-block:: bash imednet export snowflake --help Minimal example: .. code-block:: bash imednet export snowflake MY_STUDY \ myorg-myaccount loader "$SNOWFLAKE_PASSWORD" \ IMEDNET_DB PUBLIC COMPUTE_WH MY_STAGE RECORDS With options: .. code-block:: bash imednet export snowflake MY_STUDY \ myorg-myaccount loader "$SNOWFLAKE_PASSWORD" \ IMEDNET_DB PUBLIC COMPUTE_WH MY_STAGE RECORDS \ --stage-prefix imednet/prod \ --batch-size 1000 \ --manifest-path /tmp/manifest.jsonl \ --force-reload Configuration reference ----------------------- .. autoclass:: imednet.integrations.warehouse.SnowflakeSinkConfig :members: :undoc-members: .. autofunction:: imednet.integrations.warehouse.export_to_snowflake .. autoclass:: imednet.integrations.warehouse.SnowflakeExportSink :members: Out of scope ------------ * Object-storage (S3/GCS/Azure) staging — planned for a future release. * Snowpark / Python connector Pandas integration — not used; Parquet staging is the preferred path for throughput. * Row-wise insert via SQLAlchemy — use :func:`~imednet.integrations.export_to_sql` for transactional SQL targets instead.