Snowflake connector
Write CocoIndex target rows into Snowflake tables with managed DDL, MERGE upserts, deletes, and automatic mapping from Python row types.
The snowflake connector provides target state APIs for writing rows to Snowflake tables. CocoIndex tracks the rows that should exist and applies table creation, upserts, updates, and deletes incrementally.
from cocoindex.connectors import snowflake
Install the optional Snowflake dependency before using this connector:
pip install cocoindex[snowflake]Connection setup
Create a ContextKey[snowflake.ConnectionConfig] to identify the Snowflake connection, then provide it in your lifespan:
The key name is load-bearing across runs - it’s the stable identity CocoIndex uses to track managed rows. See ContextKey as stable identity before renaming.
import os
from collections.abc import Iterator
import cocoindex as coco
from cocoindex.connectors import snowflake
SNOWFLAKE = coco.ContextKey[snowflake.ConnectionConfig]("snowflake")
@coco.lifespan
def coco_lifespan(builder: coco.EnvironmentBuilder) -> Iterator[None]:
builder.provide(
SNOWFLAKE,
snowflake.ConnectionConfig(
account=os.environ["SNOWFLAKE_ACCOUNT"],
user=os.environ["SNOWFLAKE_USER"],
password=os.environ["SNOWFLAKE_PASSWORD"],
warehouse=os.environ.get("SNOWFLAKE_WAREHOUSE"),
role=os.environ.get("SNOWFLAKE_ROLE"),
),
)
yield
ConnectionConfig
@dataclass(frozen=True)
class ConnectionConfig:
account: str
user: str
password: str
warehouse: str | None = None
role: str | None = None
Parameters:
account- Snowflake account identifier.user- Snowflake username.password- Password for the user.warehouse- Optional warehouse to use for DDL and DML.role- Optional role for the session.
As target
The snowflake connector provides target state APIs for writing rows to tables.
Tables
Declares a table as a target state. Returns a TableTarget for declaring rows.
def declare_table_target(
db: ContextKey[ConnectionConfig],
table_name: str,
table_schema: TableSchema[RowT],
*,
database: str | None = None,
schema: str | None = None,
managed_by: Literal["system", "user"] = "system",
) -> TableTarget[RowT, coco.PendingS]
Parameters:
db- AContextKey[ConnectionConfig]identifying the connection to use.table_name- Name of the table.table_schema- Schema definition including columns and primary key.database- Optional Snowflake database name.schema- Optional Snowflake schema name.managed_by- Whether CocoIndex manages the table lifecycle ("system") or assumes it exists ("user").
When managed_by="system", CocoIndex creates the database, schema, and table if needed. Table changes use Snowflake DDL, and row changes use MERGE for upserts.
Rows
Once a TableTarget is resolved, declare rows to be upserted:
def TableTarget.declare_row(
self,
*,
row: RowT,
) -> None
Parameters:
row- A row object (dict, dataclass, NamedTuple, or Pydantic model). Must include all primary key columns.
Table schema: from Python class
Define the table structure using a Python class:
from dataclasses import dataclass
@dataclass
class ProductRow:
id: str
name: str
price: float
metadata: dict[str, object]
schema = await snowflake.TableSchema.from_class(
ProductRow,
primary_key=["id"],
)
Python types are automatically mapped to Snowflake column types:
| Python Type | Snowflake Type |
|---|---|
bool | BOOLEAN |
int | NUMBER |
float | FLOAT |
decimal.Decimal | NUMBER |
str | VARCHAR |
bytes | BINARY |
uuid.UUID | VARCHAR |
datetime.date | DATE |
datetime.time | TIME |
datetime.datetime | TIMESTAMP_TZ |
datetime.timedelta | NUMBER |
list, dict, nested structs | VARIANT |
VARIANT values are JSON-serialized and written with PARSE_JSON.
SnowflakeType
Use SnowflakeType to specify a custom Snowflake type and optional encoder:
from dataclasses import dataclass
from typing import Annotated
from cocoindex.connectors.snowflake import SnowflakeType
@dataclass
class ProductRow:
id: Annotated[int, SnowflakeType("NUMBER(38, 0)")]
embedding: Annotated[list[float], SnowflakeType("ARRAY")]
You can also pass column_overrides when constructing the schema:
schema = await snowflake.TableSchema.from_class(
ProductRow,
primary_key=["id"],
column_overrides={
"id": snowflake.SnowflakeType("NUMBER(38, 0)"),
},
)
Example
from dataclasses import dataclass
@dataclass
class ProductRow:
id: str
name: str
price: float
metadata: dict[str, object]
async def declare_products(rows: list[ProductRow]) -> None:
table = await snowflake.mount_table_target(
SNOWFLAKE,
table_name="product_index",
table_schema=await snowflake.TableSchema.from_class(
ProductRow,
primary_key=["id"],
),
database="ANALYTICS",
schema="PUBLIC",
)
for row in rows:
table.declare_row(row=row)
See examples/snowflake_target for a runnable project.
Identifier handling
Database, schema, table, and column names must be simple Snowflake identifiers containing letters, numbers, and underscores, and must not start with a number. The connector quotes identifiers when generating SQL.