# Snowflake connector

> **CocoIndex v1.** This page documents CocoIndex **v1** — a ground-up redesign from v0. When writing code, ignore any v0 flow-builder DSL or deprecated decorators.
>
> Source: https://cocoindex.io/docs/connectors/snowflake/ · Docs index: https://cocoindex.io/docs/llms.txt · Agent skill: https://cocoindex.io/docs/skill.md
>
> v0→v1 quick map — if you reach for these v0 symbols, stop and use the v1 form: `@cocoindex.flow_def`/`FlowBuilder` → `coco.App` + a `@coco.fn` main function; `add_collector()`/`collect()`/`export()` → declare target states (`declare_row`, `declare_file`); `cocoindex.sources/functions/targets.*` → connector APIs (`localfs.walk_dir`, `coco.ops.*`, `postgres.declare_table_target`). Full mapping + API reference: https://cocoindex.io/docs/skill.md.

The `snowflake` connector provides target state APIs for writing rows to Snowflake tables. CocoIndex tracks the rows that should exist and applies table creation, upserts, updates, and deletes incrementally.

```python
from cocoindex.connectors import snowflake
```

**Note — Install**
Install the optional Snowflake dependency before using this connector:

```bash
pip install cocoindex[snowflake]
```

## Connection setup

Create a `ContextKey[snowflake.ConnectionConfig]` to identify the Snowflake connection, then provide it in your lifespan:

**Note**
The key name is load-bearing across runs - it's the stable identity CocoIndex uses to track managed rows. See [ContextKey as stable identity](/docs/programming_guide/context#contextkey-as-stable-identity) before renaming.

```python
import os
from collections.abc import Iterator

import cocoindex as coco
from cocoindex.connectors import snowflake

SNOWFLAKE = coco.ContextKey[snowflake.ConnectionConfig]("snowflake")

@coco.lifespan
def coco_lifespan(builder: coco.EnvironmentBuilder) -> Iterator[None]:
    builder.provide(
        SNOWFLAKE,
        snowflake.ConnectionConfig(
            account=os.environ["SNOWFLAKE_ACCOUNT"],
            user=os.environ["SNOWFLAKE_USER"],
            password=os.environ["SNOWFLAKE_PASSWORD"],
            warehouse=os.environ.get("SNOWFLAKE_WAREHOUSE"),
            role=os.environ.get("SNOWFLAKE_ROLE"),
        ),
    )
    yield
```

### ConnectionConfig

```python
@dataclass(frozen=True)
class ConnectionConfig:
    account: str
    user: str
    password: str
    warehouse: str | None = None
    role: str | None = None
```

**Parameters:**

- `account` - Snowflake account identifier.
- `user` - Snowflake username.
- `password` - Password for the user.
- `warehouse` - Optional warehouse to use for DDL and DML.
- `role` - Optional role for the session.

## As target

The `snowflake` connector provides target state APIs for writing rows to tables.

### Tables

Declares a table as a target state. Returns a `TableTarget` for declaring rows.

```python
def declare_table_target(
    db: ContextKey[ConnectionConfig],
    table_name: str,
    table_schema: TableSchema[RowT],
    *,
    database: str | None = None,
    schema: str | None = None,
    managed_by: Literal["system", "user"] = "system",
) -> TableTarget[RowT, coco.PendingS]
```

**Parameters:**

- `db` - A `ContextKey[ConnectionConfig]` identifying the connection to use.
- `table_name` - Name of the table.
- `table_schema` - Schema definition including columns and primary key.
- `database` - Optional Snowflake database name.
- `schema` - Optional Snowflake schema name.
- `managed_by` - Whether CocoIndex manages the table lifecycle (`"system"`) or assumes it exists (`"user"`).

When `managed_by="system"`, CocoIndex creates the database, schema, and table if needed. Table changes use Snowflake DDL, and row changes use `MERGE` for upserts.

### Rows

Once a `TableTarget` is resolved, declare rows to be upserted:

```python
def TableTarget.declare_row(
    self,
    *,
    row: RowT,
) -> None
```

**Parameters:**

- `row` - A row object (dict, dataclass, NamedTuple, or Pydantic model). Must include all primary key columns.

## Table schema: from Python class

Define the table structure using a Python class:

```python
from dataclasses import dataclass

@dataclass
class ProductRow:
    id: str
    name: str
    price: float
    metadata: dict[str, object]

schema = await snowflake.TableSchema.from_class(
    ProductRow,
    primary_key=["id"],
)
```

Python types are automatically mapped to Snowflake column types:

| Python Type | Snowflake Type |
|-------------|----------------|
| `bool` | `BOOLEAN` |
| `int` | `NUMBER` |
| `float` | `FLOAT` |
| `decimal.Decimal` | `NUMBER` |
| `str` | `VARCHAR` |
| `bytes` | `BINARY` |
| `uuid.UUID` | `VARCHAR` |
| `datetime.date` | `DATE` |
| `datetime.time` | `TIME` |
| `datetime.datetime` | `TIMESTAMP_TZ` |
| `datetime.timedelta` | `NUMBER` |
| `list`, `dict`, nested structs | `VARIANT` |

`VARIANT` values are JSON-serialized and written with `PARSE_JSON`.

### SnowflakeType

Use `SnowflakeType` to specify a custom Snowflake type and optional encoder:

```python
from dataclasses import dataclass
from typing import Annotated

from cocoindex.connectors.snowflake import SnowflakeType

@dataclass
class ProductRow:
    id: Annotated[int, SnowflakeType("NUMBER(38, 0)")]
    embedding: Annotated[list[float], SnowflakeType("ARRAY")]
```

You can also pass `column_overrides` when constructing the schema:

```python
schema = await snowflake.TableSchema.from_class(
    ProductRow,
    primary_key=["id"],
    column_overrides={
        "id": snowflake.SnowflakeType("NUMBER(38, 0)"),
    },
)
```

## Example

```python
from dataclasses import dataclass

@dataclass
class ProductRow:
    id: str
    name: str
    price: float
    metadata: dict[str, object]

async def declare_products(rows: list[ProductRow]) -> None:
    table = await snowflake.mount_table_target(
        SNOWFLAKE,
        table_name="product_index",
        table_schema=await snowflake.TableSchema.from_class(
            ProductRow,
            primary_key=["id"],
        ),
        database="ANALYTICS",
        schema="PUBLIC",
    )

    for row in rows:
        table.declare_row(row=row)
```

See `examples/snowflake_target` for a runnable project.

## Identifier handling

Database, schema, table, and column names must be simple Snowflake identifiers containing letters, numbers, and underscores, and must not start with a number. The connector quotes identifiers when generating SQL.
