Turbopuffer connector
Write rows to Turbopuffer namespaces with single or named vectors, per-namespace distance metric, and schemaless attributes — configured via VectorDef and NamespaceSchema.
The turbopuffer connector provides utilities for writing rows to Turbopuffer namespaces, with support for both single and named vectors.
from cocoindex.connectors import turbopuffer
This connector requires additional dependencies. Install with:
pip install cocoindex[turbopuffer]Connection setup
Turbopuffer uses a single client object that owns the API key and region. Construct one using AsyncTurbopuffer:
from cocoindex.connectors import turbopuffer
client = turbopuffer.AsyncTurbopuffer(
region="gcp-us-central1",
api_key=os.environ["TURBOPUFFER_API_KEY"],
)
turbopuffer.AsyncTurbopuffer is re-exported from the Turbopuffer Python SDK; importing it directly via from turbopuffer import AsyncTurbopuffer works too.
As target
The turbopuffer connector provides target state APIs for writing rows to namespaces. CocoIndex tracks what rows should exist and automatically handles upserts and deletions. Turbopuffer creates namespaces implicitly on the first write, so there is no separate “create namespace” step — but the connector still tracks namespace-level configuration (vector schema and distance metric) and clears the namespace if it must be rebuilt.
Declaring target states
Setting up a connection
Create a ContextKey[AsyncTurbopuffer] to identify your client, then provide it in your lifespan:
The key name is load-bearing across runs — it’s the stable identity CocoIndex uses to track managed namespaces. See ContextKey as stable identity before renaming.
from cocoindex.connectors import turbopuffer
import cocoindex as coco
TPUF = coco.ContextKey[turbopuffer.AsyncTurbopuffer]("my_vectors")
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
client = turbopuffer.AsyncTurbopuffer(
region="gcp-us-central1",
api_key=os.environ["TURBOPUFFER_API_KEY"],
)
builder.provide(TPUF, client)
yield
Namespaces (parent state)
Declares a namespace as a target state. Returns a NamespaceTarget for declaring rows.
def declare_namespace_target(
db: ContextKey[AsyncTurbopuffer],
namespace_name: str,
schema: NamespaceSchema,
*,
managed_by: Literal["system", "user"] = "system",
) -> NamespaceTarget[coco.PendingS]
Parameters:
db— AContextKey[AsyncTurbopuffer]identifying the client to use.namespace_name— Name of the namespace.schema— Schema definition specifying vector configuration and distance metric (see Namespace schema).managed_by— Whether CocoIndex manages the namespace lifecycle ("system") or assumes it exists ("user").
Returns: A pending NamespaceTarget. Use the convenience wrapper await turbopuffer.mount_namespace_target(TPUF, namespace_name, schema) to resolve.
Rows (child states)
Once a NamespaceTarget is resolved, declare rows to be upserted using turbopuffer.Row:
def NamespaceTarget.declare_row(
self,
row: turbopuffer.Row,
) -> None
Row is a small dataclass:
@dataclass
class Row:
id: str | int
vector: Sequence[float] | np.ndarray | dict[str, Sequence[float] | np.ndarray]
attributes: dict[str, Any] | None = None
id— Document id (string or integer).vector— For an unnamed-vector schema, pass a single sequence. For a named-vectors schema, pass a dict mapping vector field name to its sequence.attributes— Non-vector attributes (text, tags, metadata, etc.). Turbopuffer infers attribute types from the data.
Namespace schema
Define vector configuration and distance metric for a namespace using NamespaceSchema:
class NamespaceSchema:
@classmethod
async def create(
cls,
vectors: VectorDef | dict[str, VectorDef],
*,
distance: Literal["cosine_distance", "euclidean_squared"] = "cosine_distance",
) -> NamespaceSchema
Parameters:
vectors— Either:- A single
VectorDeffor an unnamed vector (stored under turbopuffer’s default"vector"field). - A dict mapping vector names to
VectorDeffor named vectors.
- A single
distance— Distance metric applied to all vector columns in the namespace. Turbopuffer applies a single distance metric per namespace.
VectorDef
Specifies a vector field’s dimension and dtype:
class VectorDef(NamedTuple):
schema: VectorSchemaProvider | ContextKey[VectorSchemaProvider]
The schema field accepts a VectorSchemaProvider, a ContextKey, or an explicit VectorSchema. The dtype on the VectorSchema (must be np.float32 or np.float16) controls turbopuffer’s vector type — [N]f32 or [N]f16.
Single (unnamed) vector
For namespaces with a single unnamed vector:
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2")
schema = await turbopuffer.NamespaceSchema.create(
vectors=turbopuffer.VectorDef(schema=embedder),
)
Rows pass the vector directly:
target.declare_row(turbopuffer.Row(
id="doc-123",
vector=embedding,
attributes={"text": "...", "tags": ["a", "b"]},
))
Named vectors
Namespaces can have multiple named vector columns (turbopuffer supports up to two per namespace). The name "id" is reserved for the row id and cannot be used as a vector field name.
from cocoindex.resources.schema import VectorSchema
import numpy as np
schema = await turbopuffer.NamespaceSchema.create(
vectors={
"text_embedding": turbopuffer.VectorDef(
schema=VectorSchema(dtype=np.float32, size=384),
),
"image_embedding": turbopuffer.VectorDef(
schema=VectorSchema(dtype=np.float32, size=512),
),
},
distance="cosine_distance",
)
Rows pass a dict of vectors:
target.declare_row(turbopuffer.Row(
id="doc-123",
vector={
"text_embedding": text_vec,
"image_embedding": image_vec,
},
attributes={"title": "..."},
))
Distance metrics
Turbopuffer applies a single distance_metric per namespace. Supported values:
"cosine_distance"— Cosine distance (default)."euclidean_squared"— Squared Euclidean distance.
Example
from typing import AsyncIterator
import os
import cocoindex as coco
from cocoindex.connectors import turbopuffer
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
TPUF = coco.ContextKey[turbopuffer.AsyncTurbopuffer]("main_vectors")
embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2")
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
client = turbopuffer.AsyncTurbopuffer(
region="gcp-us-central1",
api_key=os.environ["TURBOPUFFER_API_KEY"],
)
builder.provide(TPUF, client)
yield
@coco.fn
async def process_document(
doc_id: str,
text: str,
target: turbopuffer.NamespaceTarget,
) -> None:
embedding = await embedder.embed(text)
target.declare_row(turbopuffer.Row(
id=doc_id,
vector=embedding,
attributes={"text": text},
))
@coco.fn
async def app_main() -> None:
namespace = await turbopuffer.mount_namespace_target(
TPUF,
"documents",
await turbopuffer.NamespaceSchema.create(
vectors=turbopuffer.VectorDef(schema=embedder),
),
)
for doc_id, text in documents:
await coco.mount(
coco.component_subpath("doc", doc_id),
process_document,
doc_id,
text,
namespace,
)
Row IDs
Turbopuffer rows are identified by str or int. UUIDs should be passed as strings.
Attributes
Row attributes are schemaless; turbopuffer infers attribute types from the values you write. Supported scalar types include string, int, uint, float, bool, uuid, and datetime, plus their array variants. See Turbopuffer’s schema reference for the full list.
Reserved attribute names depend on the schema; putting any reserved name in Row.attributes raises a ValueError:
idis always reserved — it’s the row id.- For an unnamed-vector schema,
vectoris also reserved (it’s the wire-level vector field). - For a named-vectors schema, each declared vector field name is reserved instead.
Vector search
The connector focuses on writing rows. For vector search, use the turbopuffer client directly:
ns = client.namespace("documents")
result = await ns.query(
rank_by=("vector", "ANN", query_embedding.tolist()),
top_k=10,
)