LanceDB
The lancedb connector provides target state APIs for writing rows to LanceDB tables.
from cocoindex.connectors import lancedb
This connector requires additional dependencies. Install with:
pip install cocoindex[lancedb]
Connection setup
LanceDB connections are created directly via the LanceDB library. CocoIndex exposes thin wrappers:
async def connect_async(uri: str, **options: Any) -> LanceAsyncConnection
def connect(uri: str, **options: Any) -> lancedb.DBConnection
Parameters:
uri— LanceDB URI (local path like"./lancedb_data"or cloud URI like"s3://bucket/path").**options— Additional options passed directly tolancedb.connect_async()/lancedb.connect().
Returns: A LanceDB connection.
Example:
conn = await lancedb.connect_async("./lancedb_data")
As target
The lancedb connector provides target state APIs for writing rows to tables. With it, CocoIndex tracks what rows should exist and automatically handles upserts and deletions.
Declaring target states
Database registration
Before declaring target states, register the connection with a stable key that identifies the logical database. This key allows CocoIndex to recognize the same database even when connection details change.
def register_db(key: str, conn: LanceAsyncConnection) -> LanceDatabase
Parameters:
key— A stable identifier for this database (e.g.,"main_db"). Must be unique.conn— An async LanceDB connection (fromconnect_async()).
Returns: A LanceDatabase handle for declaring target states.
The LanceDatabase can be used as a context manager to automatically unregister on exit:
conn = await lancedb.connect_async("./lancedb_data")
with lancedb.register_db("my_db", conn) as db:
# Use db to declare target states
...
# db is automatically unregistered here
Tables (parent state)
Declares a table as a target state. Returns a TableTarget for declaring rows.
def LanceDatabase.declare_table_target(
self,
table_name: str,
table_schema: TableSchema[RowT],
*,
managed_by: Literal["system", "user"] = "system",
) -> TableTarget[RowT, coco.PendingS]
Parameters:
table_name— Name of the table.table_schema— Schema definition including columns and primary key (see Table Schema).managed_by— Whether CocoIndex manages the table lifecycle ("system") or assumes it exists ("user").
Returns: A pending TableTarget. Use mount_run(...).result() to wait for resolution.
Rows (child states)
Once a TableTarget is resolved, declare rows to be upserted:
def TableTarget.declare_row(
self,
*,
row: RowT,
) -> None
Parameters:
row— A row object (dict, dataclass, NamedTuple, or Pydantic model). Must include all primary key columns.
Table schema: from Python class
Define the table structure using a Python class (dataclass, NamedTuple, or Pydantic model):
def TableSchema.__init__(
self,
columns: type[RowT],
primary_key: list[str],
*,
column_specs: dict[str, LanceType | VectorSchemaProvider] | None = None,
) -> None
Parameters:
columns— A record type whose fields define table columns.primary_key— List of column names forming the primary key.column_specs— Optional per-column overrides for type mapping or vector configuration.
Example:
@dataclass
class OutputDocument:
doc_id: str
title: str
content: str
embedding: Annotated[NDArray, embedder]
schema = lancedb.TableSchema(
OutputDocument,
primary_key=["doc_id"],
)
Python types are automatically mapped to PyArrow types:
| Python Type | PyArrow Type |
|---|---|
bool | bool |
int | int64 |
float | float64 |
str | string |
bytes | binary |
list, dict, nested structs | string (JSON encoded) |
NDArray (with vector schema) | fixed_size_list<float> |
To override the default mapping, provide a LanceType or VectorSchemaProvider via:
- Type annotation — using
typing.Annotatedon the field column_specs— passing overrides when constructingTableSchema
LanceType
Use LanceType to specify a custom PyArrow type or encoder:
from typing import Annotated
from cocoindex.connectors.lancedb import LanceType
import pyarrow as pa
@dataclass
class MyRow:
id: Annotated[int, LanceType(pa.int32())]
value: Annotated[float, LanceType(pa.float32())]
VectorSchemaProvider
For NDArray fields, a VectorSchemaProvider specifies the vector dimension and dtype. Vector dimensions are typically determined by the embedding model—hardcoding them is error-prone and creates maintenance burden when switching models. By using a VectorSchemaProvider, the dimension is derived automatically from the source configuration.
A VectorSchemaProvider can be:
- An embedding model (e.g.,
SentenceTransformerEmbedder) — dimension is inferred from the model - A
VectorSchema— for explicit size and dtype when not using an embedder
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2")
@dataclass
class Document:
id: str
content: str
embedding: Annotated[NDArray, embedder] # dimension inferred from model (384)
from cocoindex.resources.schema import VectorSchema
@dataclass
class Document:
id: str
content: str
embedding: Annotated[NDArray, VectorSchema(dtype=np.float32, size=384)]
Table schema: explicit column definitions
Define columns directly using ColumnDef:
def TableSchema.__init__(
self,
columns: dict[str, ColumnDef],
primary_key: list[str],
) -> None
Example:
schema = lancedb.TableSchema(
{
"doc_id": lancedb.ColumnDef(type=pa.string(), nullable=False),
"title": lancedb.ColumnDef(type=pa.string()),
"content": lancedb.ColumnDef(type=pa.string()),
"embedding": lancedb.ColumnDef(type=pa.list_(pa.float32(), list_size=384)),
},
primary_key=["doc_id"],
)
Example
import cocoindex as coco
import cocoindex.asyncio as coco_aio
from cocoindex.connectors import lancedb
LANCEDB_URI = "./lancedb_data"
LANCE_DB = coco.ContextKey[lancedb.LanceDatabase]("lance_db")
@dataclass
class OutputDocument:
doc_id: str
title: str
content: str
embedding: Annotated[NDArray, embedder]
@coco_aio.lifespan
async def coco_lifespan(builder: coco_aio.EnvironmentBuilder) -> AsyncIterator[None]:
conn = await lancedb.connect_async(LANCEDB_URI)
builder.provide(LANCE_DB, lancedb.register_db("main_db", conn))
yield
@coco.function
async def app_main() -> None:
db = coco.use_context(LANCE_DB)
# Declare table target state
table = await coco_aio.mount_run(
coco.component_subpath("setup", "table"),
db.declare_table_target,
table_name="documents",
table_schema=lancedb.TableSchema(
OutputDocument,
primary_key=["doc_id"],
),
).result()
# Declare rows
for doc in documents:
table.declare_row(row=doc)