Postgres
The postgres connector provides utilities for reading rows from and writing rows to PostgreSQL databases, with built-in support for pgvector.
from cocoindex.connectors import postgres
This connector requires additional dependencies. Install with:
pip install cocoindex[postgres]
Connection setup
create_pool() is a thin wrapper around asyncpg.create_pool() that registers necessary extensions (e.g., pgvector) on each connection.
async def create_pool(
dsn: str | None = None,
*,
init: Callable[[asyncpg.Connection], Any] | None = None,
**kwargs: Any,
) -> asyncpg.Pool
Parameters:
dsn— PostgreSQL connection string (e.g.,"postgresql://user:pass@localhost/dbname").init— Optional callback to initialize each connection (called after extension registration).**kwargs— Additional arguments passed directly toasyncpg.create_pool().
Returns: An asyncpg connection pool.
Example:
async with await postgres.create_pool("postgresql://localhost/mydb") as pool:
# Use pool for source or target operations
...
As source
Use PgTableSource to read rows from a PostgreSQL table. It returns a RowFetcher that supports both synchronous and asynchronous iteration.
PgTableSource
class PgTableSource(Generic[RowT]):
def __init__(
self,
pool: asyncpg.Pool,
*,
table_name: str,
columns: Sequence[str] | None = None,
pg_schema_name: str | None = None,
row_type: type[RowT] | None = None,
row_factory: Callable[[dict[str, Any]], RowT] | None = None,
) -> None
def fetch_rows(self) -> RowFetcher[RowT]
Parameters:
pool— An asyncpg connection pool.table_name— Name of the table to read from.columns— List of column names to select. If omitted withrow_type, uses the record's field names. If omitted withoutrow_type, usesSELECT *.pg_schema_name— Optional PostgreSQL schema name (defaults to"public").row_type— Optional record type (dataclass, NamedTuple, or Pydantic model) for automatic row conversion. When provided,columns(if specified) must be a subset of the record's fields.row_factory— Optional callable to transform each row dict. Mutually exclusive withrow_type.
Row mapping
By default, rows are returned as dict[str, Any], with PostgreSQL types converted to Python types using asyncpg's type conversion. You can configure automatic conversion to custom types using row_type or row_factory.
Using row_type
Pass a record type (dataclass, NamedTuple, or Pydantic model) to automatically convert rows. When columns is omitted, the record's field names are used:
from dataclasses import dataclass
@dataclass
class Product:
id: int
name: str
price: float
source = postgres.PgTableSource(
pool,
table_name="products",
row_type=Product, # columns inferred as ["id", "name", "price"]
)
Using row_factory
For custom transformations, pass a callable:
source = postgres.PgTableSource(
pool,
table_name="products",
columns=["id", "name", "price"],
row_factory=lambda row: (row["name"], row["price"] * 1.1), # Add 10% markup
)
Iterating rows
fetch_rows() returns a RowFetcher that supports both sync and async iteration:
# Synchronous iteration
for row in source.fetch_rows():
print(row.name, row.price)
# Asynchronous iteration (streams rows using a cursor)
async for row in source.fetch_rows():
print(row.name, row.price)
Example
import cocoindex as coco
from cocoindex.connectors import postgres
@dataclass
class SourceProduct:
product_id: str
name: str
description: str
@coco.function
async def app_main(pool: asyncpg.Pool) -> None:
source = postgres.PgTableSource(
pool,
table_name="products",
row_type=SourceProduct,
)
async for product in source.fetch_rows():
coco.mount(
coco.component_subpath("product", product.product_id),
process_product,
product,
)
As target
The postgres connector provides target state APIs for writing rows to tables. With it, CocoIndex tracks what rows should exist and automatically handles upserts and deletions.
Declaring target states
Database registration
Before declaring target states, register the connection pool with a stable key that identifies the logical database. This key allows CocoIndex to recognize the same database even when connection details change (e.g., username, password, or host address).
def register_db(key: str, pool: asyncpg.Pool) -> PgDatabase
Parameters:
key— A stable identifier for this database (e.g.,"main_db"). Must be unique.pool— An asyncpg connection pool.
Returns: A PgDatabase handle for declaring target states.
The PgDatabase can be used as a context manager to automatically unregister on exit:
async with await postgres.create_pool(DATABASE_URL) as pool:
with postgres.register_db("my_db", pool) as db:
# Use db to declare target states
...
# db is automatically unregistered here
Tables (parent state)
Declares a table as a target state. Returns a TableTarget for declaring rows.
def PgDatabase.declare_table_target(
self,
table_name: str,
table_schema: TableSchema[RowT],
*,
pg_schema_name: str | None = None,
managed_by: Literal["system", "user"] = "system",
) -> TableTarget[RowT, coco.PendingS]
Parameters:
table_name— Name of the table.table_schema— Schema definition including columns and primary key (see Table Schema).pg_schema_name— Optional PostgreSQL schema name (defaults to"public").managed_by— Whether CocoIndex manages the table lifecycle ("system") or assumes it exists ("user").
Returns: A pending TableTarget. Use mount_run(...).result() to wait for resolution.
Rows (child states)
Once a TableTarget is resolved, declare rows to be upserted:
def TableTarget.declare_row(
self,
*,
row: RowT,
) -> None
Parameters:
row— A row object (dict, dataclass, NamedTuple, or Pydantic model). Must include all primary key columns.
Table schema: from Python class
Define the table structure using a Python class (dataclass, NamedTuple, or Pydantic model):
def TableSchema.__init__(
self,
columns: type[RowT],
primary_key: list[str],
*,
column_overrides: dict[str, PgType | VectorSchemaProvider] | None = None,
) -> None
Parameters:
columns— A record type whose fields define table columns.primary_key— List of column names forming the primary key.column_overrides— Optional per-column overrides for type mapping or vector configuration.
Example:
@dataclass
class OutputProduct:
category: str
name: str
price: float
embedding: Annotated[NDArray, embedder]
schema = postgres.TableSchema(
OutputProduct,
primary_key=["category", "name"],
)
Python types are automatically mapped to PostgreSQL types:
| Python Type | PostgreSQL Type |
|---|---|
bool | boolean |
int | bigint |
float | double precision |
decimal.Decimal | numeric |
str | text |
bytes | bytea |
uuid.UUID | uuid |
datetime.date | date |
datetime.time | time with time zone |
datetime.datetime | timestamp with time zone |
datetime.timedelta | interval |
list, dict, nested structs | jsonb |
NDArray (with vector schema) | vector(n) or halfvec(n) |
To override the default mapping, provide a PgType or VectorSchemaProvider via:
- Type annotation — using
typing.Annotatedon the field column_overrides— passing overrides when constructingTableSchema
PgType
Use PgType to specify a custom PostgreSQL type:
from typing import Annotated
from cocoindex.connectors.postgres import PgType
@dataclass
class MyRow:
id: Annotated[int, PgType("integer")] # instead of bigint
value: Annotated[float, PgType("real")] # instead of double precision
created_at: Annotated[datetime.datetime, PgType("timestamp")] # without timezone
Or via column_overrides:
schema = postgres.TableSchema(
MyRow,
primary_key=["id"],
column_overrides={
"created_at": postgres.PgType("timestamp"),
},
)
VectorSchemaProvider
For NDArray fields, a VectorSchemaProvider specifies the vector dimension and dtype. Vector dimensions are typically determined by the embedding model—hardcoding them is error-prone and creates maintenance burden when switching models. By using a VectorSchemaProvider, the dimension is derived automatically from the source configuration.
The connector has built-in pgvector support and automatically creates the extension when needed.
A VectorSchemaProvider can be:
- An embedding model (e.g.,
SentenceTransformerEmbedder) — dimension is inferred from the model - A
VectorSchema— for explicit size and dtype when not using an embedder
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2")
@dataclass
class Document:
id: str
content: str
embedding: Annotated[NDArray, embedder] # dimension inferred from model (384)
from cocoindex.resources.schema import VectorSchema
@dataclass
class Document:
id: str
content: str
embedding: Annotated[NDArray, VectorSchema(dtype=np.float32, size=384)]
Table schema: explicit column definitions
Define columns directly using ColumnDef:
def TableSchema.__init__(
self,
columns: dict[str, ColumnDef],
primary_key: list[str],
) -> None
Example:
schema = postgres.TableSchema(
{
"category": postgres.ColumnDef(type="text", nullable=False),
"name": postgres.ColumnDef(type="text", nullable=False),
"price": postgres.ColumnDef(type="numeric"),
"embedding": postgres.ColumnDef(type="vector(384)"),
},
primary_key=["category", "name"],
)
Example
import cocoindex as coco
import cocoindex.asyncio as coco_aio
from cocoindex.connectors import postgres
DATABASE_URL = "postgresql://localhost/mydb"
PG_DB = coco.ContextKey[postgres.PgDatabase]("pg_db")
@dataclass
class OutputProduct:
category: str
name: str
description: str
embedding: Annotated[NDArray, embedder]
@coco_aio.lifespan
async def coco_lifespan(builder: coco_aio.EnvironmentBuilder) -> AsyncIterator[None]:
async with await postgres.create_pool(DATABASE_URL) as pool:
builder.provide(PG_DB, postgres.register_db("main_db", pool))
yield
@coco.function
async def app_main() -> None:
db = coco.use_context(PG_DB)
# Declare table target state
table = await coco_aio.mount_run(
coco.component_subpath("setup", "table"),
db.declare_table_target,
table_name="products",
table_schema=postgres.TableSchema(
OutputProduct,
primary_key=["category", "name"],
),
).result()
# Declare rows
for product in products:
table.declare_row(row=product)