Snowflake Export
This guide covers the warehouse-native Snowflake export path introduced as the first implementation of the staged warehouse export framework.
For the DuckDB / local-analytical path see DuckDB Integration. For general architecture context see Architecture Overview.
Overview
The Snowflake export path uses a two-phase staged-loading approach:
Stage – Study records are serialised to Parquet files in a local directory (one file per batch).
PUT – Each Parquet file is uploaded to a Snowflake internal stage via
PUT file://....COPY INTO – Snowflake bulk-loads the staged file into the target table using
COPY INTO ... FILE_FORMAT = (TYPE = PARQUET).
This is intentionally different from export_to_sql(), which
uses SQLAlchemy DataFrame.to_sql() and is appropriate for transactional SQL targets
(SQLite, PostgreSQL, etc.). Do not mix the two paths.
Important
Staging constraint — local staging only.
The first version of this framework supports local intermediate staging only.
Parquet files are written to the local filesystem and then uploaded to a Snowflake
internal stage via PUT. Object-storage staging (S3, GCS, Azure Blob) is
not yet supported. If your Snowflake account restricts PUT to external
stages only, you will need to wait for a future release.
Installation
pip install 'imednet[snowflake]'
This installs snowflake-connector-python (≥ 3.0) and pyarrow (included
transitively).
Quickstart — export_to_snowflake
The export_to_snowflake() convenience function handles
batching, staging, and cleanup automatically:
import os
from imednet import ImednetSDK
from imednet.integrations import export_to_snowflake, SnowflakeSinkConfig
sdk = ImednetSDK(api_key=os.environ["IMEDNET_API_KEY"])
cfg = SnowflakeSinkConfig(
account="myorg-myaccount",
user="loader",
**{"password": os.environ["SNOWFLAKE_PASSWORD"]}, # never hardcode
database="IMEDNET_DB",
schema="PUBLIC",
warehouse="COMPUTE_WH",
stage="MY_STAGE",
table="RECORDS",
)
rows_loaded = export_to_snowflake(sdk, "MY_STUDY", config=cfg)
print(f"Loaded {rows_loaded} rows")
Advanced usage — SnowflakeExportSink directly
Use SnowflakeExportSink directly when you need
per-batch control, custom batch_id keys, or integration with a workflow engine:
import os
from imednet import ImednetSDK
from imednet.integrations.sink_base import iter_batches
from imednet.integrations.warehouse import SnowflakeExportSink, SnowflakeSinkConfig
sdk = ImednetSDK(api_key=os.environ["IMEDNET_API_KEY"])
records = sdk.records.list(study_key="MY_STUDY")
cfg = SnowflakeSinkConfig(
account="myorg-myaccount",
user="loader",
**{"password": os.environ["SNOWFLAKE_PASSWORD"]},
database="IMEDNET_DB",
schema="PUBLIC",
warehouse="COMPUTE_WH",
stage="MY_STAGE",
table="RECORDS",
stage_prefix="imednet/MY_STUDY",
batch_size=1000,
max_retries=5,
idempotent=True,
)
with SnowflakeExportSink(config=cfg) as sink:
for i, batch in enumerate(iter_batches(records, cfg.batch_size)):
rows = sink.write_batch(batch, batch_id=f"MY_STUDY/records/{i}")
print(f" batch {i}: {rows} rows")
Manifest output
Pass manifest_path to record every loaded batch in a JSON-lines file for audit
purposes or replay:
cfg = SnowflakeSinkConfig(
...
manifest_path="/tmp/imednet_manifest.jsonl",
)
Each entry contains:
{
"batch_id": "MY_STUDY/records/0",
"stage_path": "@MY_STAGE/imednet/MY_STUDY_records_0.parquet",
"row_count": 500,
"loaded_at": "2026-01-15T12:00:00+00:00"
}
Idempotency
SnowflakeSinkConfig.idempotent (default True) maps to Snowflake’s
FORCE = FALSE option in COPY INTO. When True, Snowflake skips files that
have already been loaded, making re-runs of the same batch safe. Set
idempotent = False (--force-reload in the CLI) to force re-ingestion.
Error handling and retries
Every write_batch call retries up to SinkConfig.max_retries times (default 3)
with exponential back-off (retry_backoff * 2 ** attempt, default base 1 s). After
all retries are exhausted, ExportBatchError is raised with the
failing batch_id. Connection failures during __init__ raise
ExportConfigurationError.
Stage lifecycle and cleanup
Temporary staging directory – When
local_staging_diris not set, atempfile.TemporaryDirectoryis created automatically and removed when the context manager exits, regardless of success or failure.Persistent staging directory – When
local_staging_diris set, the directory is not cleaned up automatically; this lets you inspect or re-upload the Parquet files.Snowflake stage cleanup – The SDK does not remove files from the Snowflake internal stage after loading. Use
REMOVE @MY_STAGE/imednet/from a Snowflake session to purge them when they are no longer needed.
CLI reference
imednet export snowflake --help
Minimal example:
imednet export snowflake MY_STUDY \
myorg-myaccount loader "$SNOWFLAKE_PASSWORD" \
IMEDNET_DB PUBLIC COMPUTE_WH MY_STAGE RECORDS
With options:
imednet export snowflake MY_STUDY \
myorg-myaccount loader "$SNOWFLAKE_PASSWORD" \
IMEDNET_DB PUBLIC COMPUTE_WH MY_STAGE RECORDS \
--stage-prefix imednet/prod \
--batch-size 1000 \
--manifest-path /tmp/manifest.jsonl \
--force-reload
Configuration reference
- class imednet.integrations.warehouse.SnowflakeSinkConfig[source]
Bases:
SinkConfigConfiguration for
SnowflakeExportSink.Parameters
- account:
Snowflake account identifier (
<org>-<account>or legacy format).- user:
Snowflake user name.
- password:
Snowflake password. Never logged.
- database:
Target database.
- schema:
Target schema.
- warehouse:
Virtual warehouse used for the
COPY INTOcommand.- stage:
Snowflake internal stage name (e.g.
"MY_STAGE").- table:
Destination table name inside database.*schema*.
- stage_prefix:
Path prefix inside the stage (default
"imednet").- local_staging_dir:
Local directory used to write Parquet files before
PUT. Defaults to a temporary directory created bytempfile.- manifest_path:
Optional path to a JSON-lines file where each loaded batch is recorded.
- __init__(batch_size=500, max_retries=3, retry_backoff=1.0, idempotent=True, extra=<factory>, account='', user='', password='', database='', schema='PUBLIC', warehouse='', stage='', table='', stage_prefix='imednet', local_staging_dir=None, manifest_path=None)
- Parameters:
batch_size (int) –
max_retries (int) –
retry_backoff (float) –
idempotent (bool) –
extra (dict[str, Any]) –
account (str) –
user (str) –
password (str) –
database (str) –
schema (str) –
warehouse (str) –
stage (str) –
table (str) –
stage_prefix (str) –
local_staging_dir (str | PathLike[str] | None) –
manifest_path (str | PathLike[str] | None) –
- Return type:
None
-
account:
str= ''
-
database:
str= ''
-
extra:
dict[str,Any]
-
local_staging_dir:
Union[str,PathLike[str],None] = None
-
manifest_path:
Union[str,PathLike[str],None] = None
-
password:
str= ''
-
schema:
str= 'PUBLIC'
-
stage:
str= ''
-
stage_prefix:
str= 'imednet'
-
table:
str= ''
-
user:
str= ''
-
warehouse:
str= ''
- imednet.integrations.warehouse.export_to_snowflake(sdk, study_key, *, config)[source]
Export study records to Snowflake using
SnowflakeExportSink.- Return type:
int- Parameters:
sdk (ImednetSDK) –
study_key (str) –
config (SnowflakeSinkConfig) –
- class imednet.integrations.warehouse.SnowflakeExportSink[source]
Bases:
ExportSinkStage Parquet files and bulk-load them into Snowflake.
Parameters
- config:
SnowflakeSinkConfigcontaining all connection details and staging paths.
Raises
- ~imednet.errors.ExportConfigurationError
When the Snowflake connector cannot be initialised or the required configuration values are missing.
- ImportError
When
snowflake-connector-pythonorpyarroware not installed.
- __init__(config=None)[source]
- Parameters:
config (SinkConfig | None) –
- Return type:
None
- close()[source]
Close the Snowflake connection and clean up temporary staging files.
- Return type:
None
- write_batch(records, *, batch_id)[source]
Write records to Snowflake via Parquet staging + COPY INTO.
- Return type:
int- Parameters:
records (Sequence[Any]) –
batch_id (str) –
Parameters
- records:
Sequence of typed
Recordmodel instances or plain dicts.- batch_id:
Idempotency key (e.g.
"MYSTUDY/FORM1/0").
Returns
- int
Number of rows loaded.
Out of scope
Object-storage (S3/GCS/Azure) staging — planned for a future release.
Snowpark / Python connector Pandas integration — not used; Parquet staging is the preferred path for throughput.
Row-wise insert via SQLAlchemy — use
export_to_sql()for transactional SQL targets instead.