Snowflake Export

This guide covers the warehouse-native Snowflake export path introduced as the first implementation of the staged warehouse export framework.

For the DuckDB / local-analytical path see DuckDB Integration. For general architecture context see Architecture Overview.

Overview

The Snowflake export path uses a two-phase staged-loading approach:

  1. Stage – Study records are serialised to Parquet files in a local directory (one file per batch).

  2. PUT – Each Parquet file is uploaded to a Snowflake internal stage via PUT file://....

  3. COPY INTO – Snowflake bulk-loads the staged file into the target table using COPY INTO ... FILE_FORMAT = (TYPE = PARQUET).

This is intentionally different from export_to_sql(), which uses SQLAlchemy DataFrame.to_sql() and is appropriate for transactional SQL targets (SQLite, PostgreSQL, etc.). Do not mix the two paths.

Important

Staging constraint — local staging only. The first version of this framework supports local intermediate staging only. Parquet files are written to the local filesystem and then uploaded to a Snowflake internal stage via PUT. Object-storage staging (S3, GCS, Azure Blob) is not yet supported. If your Snowflake account restricts PUT to external stages only, you will need to wait for a future release.

Installation

pip install 'imednet[snowflake]'

This installs snowflake-connector-python (≥ 3.0) and pyarrow (included transitively).

Quickstart — export_to_snowflake

The export_to_snowflake() convenience function handles batching, staging, and cleanup automatically:

import os
from imednet import ImednetSDK
from imednet.integrations import export_to_snowflake, SnowflakeSinkConfig

sdk = ImednetSDK(api_key=os.environ["IMEDNET_API_KEY"])

cfg = SnowflakeSinkConfig(
    account="myorg-myaccount",
    user="loader",
    **{"password": os.environ["SNOWFLAKE_PASSWORD"]},  # never hardcode
    database="IMEDNET_DB",
    schema="PUBLIC",
    warehouse="COMPUTE_WH",
    stage="MY_STAGE",
    table="RECORDS",
)

rows_loaded = export_to_snowflake(sdk, "MY_STUDY", config=cfg)
print(f"Loaded {rows_loaded} rows")

Advanced usage — SnowflakeExportSink directly

Use SnowflakeExportSink directly when you need per-batch control, custom batch_id keys, or integration with a workflow engine:

import os
from imednet import ImednetSDK
from imednet.integrations.sink_base import iter_batches
from imednet.integrations.warehouse import SnowflakeExportSink, SnowflakeSinkConfig

sdk = ImednetSDK(api_key=os.environ["IMEDNET_API_KEY"])
records = sdk.records.list(study_key="MY_STUDY")

cfg = SnowflakeSinkConfig(
    account="myorg-myaccount",
    user="loader",
    **{"password": os.environ["SNOWFLAKE_PASSWORD"]},
    database="IMEDNET_DB",
    schema="PUBLIC",
    warehouse="COMPUTE_WH",
    stage="MY_STAGE",
    table="RECORDS",
    stage_prefix="imednet/MY_STUDY",
    batch_size=1000,
    max_retries=5,
    idempotent=True,
)

with SnowflakeExportSink(config=cfg) as sink:
    for i, batch in enumerate(iter_batches(records, cfg.batch_size)):
        rows = sink.write_batch(batch, batch_id=f"MY_STUDY/records/{i}")
        print(f"  batch {i}: {rows} rows")

Manifest output

Pass manifest_path to record every loaded batch in a JSON-lines file for audit purposes or replay:

cfg = SnowflakeSinkConfig(
    ...
    manifest_path="/tmp/imednet_manifest.jsonl",
)

Each entry contains:

{
    "batch_id":   "MY_STUDY/records/0",
    "stage_path": "@MY_STAGE/imednet/MY_STUDY_records_0.parquet",
    "row_count":  500,
    "loaded_at":  "2026-01-15T12:00:00+00:00"
}

Idempotency

SnowflakeSinkConfig.idempotent (default True) maps to Snowflake’s FORCE = FALSE option in COPY INTO. When True, Snowflake skips files that have already been loaded, making re-runs of the same batch safe. Set idempotent = False (--force-reload in the CLI) to force re-ingestion.

Error handling and retries

Every write_batch call retries up to SinkConfig.max_retries times (default 3) with exponential back-off (retry_backoff * 2 ** attempt, default base 1 s). After all retries are exhausted, ExportBatchError is raised with the failing batch_id. Connection failures during __init__ raise ExportConfigurationError.

Stage lifecycle and cleanup

  • Temporary staging directory – When local_staging_dir is not set, a tempfile.TemporaryDirectory is created automatically and removed when the context manager exits, regardless of success or failure.

  • Persistent staging directory – When local_staging_dir is set, the directory is not cleaned up automatically; this lets you inspect or re-upload the Parquet files.

  • Snowflake stage cleanup – The SDK does not remove files from the Snowflake internal stage after loading. Use REMOVE @MY_STAGE/imednet/ from a Snowflake session to purge them when they are no longer needed.

CLI reference

imednet export snowflake --help

Minimal example:

imednet export snowflake MY_STUDY \
    myorg-myaccount loader "$SNOWFLAKE_PASSWORD" \
    IMEDNET_DB PUBLIC COMPUTE_WH MY_STAGE RECORDS

With options:

imednet export snowflake MY_STUDY \
    myorg-myaccount loader "$SNOWFLAKE_PASSWORD" \
    IMEDNET_DB PUBLIC COMPUTE_WH MY_STAGE RECORDS \
    --stage-prefix imednet/prod \
    --batch-size 1000 \
    --manifest-path /tmp/manifest.jsonl \
    --force-reload

Configuration reference

class imednet.integrations.warehouse.SnowflakeSinkConfig[source]

Bases: SinkConfig

Configuration for SnowflakeExportSink.

Parameters

account:

Snowflake account identifier (<org>-<account> or legacy format).

user:

Snowflake user name.

password:

Snowflake password. Never logged.

database:

Target database.

schema:

Target schema.

warehouse:

Virtual warehouse used for the COPY INTO command.

stage:

Snowflake internal stage name (e.g. "MY_STAGE").

table:

Destination table name inside database.*schema*.

stage_prefix:

Path prefix inside the stage (default "imednet").

local_staging_dir:

Local directory used to write Parquet files before PUT. Defaults to a temporary directory created by tempfile.

manifest_path:

Optional path to a JSON-lines file where each loaded batch is recorded.

__init__(batch_size=500, max_retries=3, retry_backoff=1.0, idempotent=True, extra=<factory>, account='', user='', password='', database='', schema='PUBLIC', warehouse='', stage='', table='', stage_prefix='imednet', local_staging_dir=None, manifest_path=None)
Parameters:
  • batch_size (int) –

  • max_retries (int) –

  • retry_backoff (float) –

  • idempotent (bool) –

  • extra (dict[str, Any]) –

  • account (str) –

  • user (str) –

  • password (str) –

  • database (str) –

  • schema (str) –

  • warehouse (str) –

  • stage (str) –

  • table (str) –

  • stage_prefix (str) –

  • local_staging_dir (str | PathLike[str] | None) –

  • manifest_path (str | PathLike[str] | None) –

Return type:

None

account: str = ''
database: str = ''
extra: dict[str, Any]
local_staging_dir: Union[str, PathLike[str], None] = None
manifest_path: Union[str, PathLike[str], None] = None
password: str = ''
schema: str = 'PUBLIC'
stage: str = ''
stage_prefix: str = 'imednet'
table: str = ''
user: str = ''
warehouse: str = ''
imednet.integrations.warehouse.export_to_snowflake(sdk, study_key, *, config)[source]

Export study records to Snowflake using SnowflakeExportSink.

Return type:

int

Parameters:
class imednet.integrations.warehouse.SnowflakeExportSink[source]

Bases: ExportSink

Stage Parquet files and bulk-load them into Snowflake.

Parameters

config:

SnowflakeSinkConfig containing all connection details and staging paths.

Raises

~imednet.errors.ExportConfigurationError

When the Snowflake connector cannot be initialised or the required configuration values are missing.

ImportError

When snowflake-connector-python or pyarrow are not installed.

__init__(config=None)[source]
Parameters:

config (SinkConfig | None) –

Return type:

None

close()[source]

Close the Snowflake connection and clean up temporary staging files.

Return type:

None

flush()[source]

No-op: each batch is committed individually.

Return type:

None

write_batch(records, *, batch_id)[source]

Write records to Snowflake via Parquet staging + COPY INTO.

Return type:

int

Parameters:
  • records (Sequence[Any]) –

  • batch_id (str) –

Parameters

records:

Sequence of typed Record model instances or plain dicts.

batch_id:

Idempotency key (e.g. "MYSTUDY/FORM1/0").

Returns

int

Number of rows loaded.

Out of scope

  • Object-storage (S3/GCS/Azure) staging — planned for a future release.

  • Snowpark / Python connector Pandas integration — not used; Parquet staging is the preferred path for throughput.

  • Row-wise insert via SQLAlchemy — use export_to_sql() for transactional SQL targets instead.