Sentence Transformers Integration
The cocoindex.ops.sentence_transformers module provides integration with the sentence-transformers library for text embeddings.
Overview
The SentenceTransformerEmbedder class is a wrapper around SentenceTransformer models that:
- Implements
VectorSchemaProviderfor seamless integration with CocoIndex connectors - Handles model caching and thread-safe GPU access automatically
- Provides simple
embed()andembed_async()methods - Returns properly typed numpy arrays
Installation
To use sentence transformers with CocoIndex, install with the sentence_transformers extra:
pip install cocoindex[sentence_transformers]
Or with uv:
uv pip install cocoindex[sentence_transformers]
Basic usage
Creating an embedder
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
# Initialize embedder with a pre-trained model
embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2")
Embedding text
# Embed a single text (returns 1D array)
embedding = embedder.embed("Hello, world!")
print(f"Shape: {embedding.shape}") # Shape: (384,)
print(f"Dtype: {embedding.dtype}") # Dtype: float32
# Async embedding
import asyncio
async def embed_async_example():
embedding = await embedder.embed_async("Hello, world!")
return embedding
embedding = asyncio.run(embed_async_example())
Getting vector schema
The embedder automatically provides vector schema information:
schema = embedder.__coco_vector_schema__()
print(f"Dimension: {schema.size}") # 384
print(f"Dtype: {schema.dtype}") # float32
Using with CocoIndex connectors
The SentenceTransformerEmbedder implements VectorSchemaProvider, which allows it to be used directly in type annotations with CocoIndex connectors.
With Postgres
from dataclasses import dataclass
from typing import Annotated
from numpy.typing import NDArray
import cocoindex as coco
from cocoindex.connectors import postgres
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
# Create a global embedder instance
embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2")
@dataclass
class DocEmbedding:
filename: str
text: str
# Use embedder as a VectorSchemaProvider in type annotations
embedding: Annotated[NDArray, embedder]
@coco.function
def setup_table(db: postgres.PgDatabase):
return db.declare_table_target(
table_name="doc_embeddings",
table_schema=postgres.TableSchema(
DocEmbedding,
primary_key=["filename"],
),
pg_schema_name="public",
)
The connector will automatically:
- Extract the vector dimension from
embedder.__coco_vector_schema__() - Create the appropriate
vector(384)column in Postgres - Handle type conversions properly
With LanceDB
from cocoindex.connectors import lancedb
@dataclass
class CodeEmbedding:
filename: str
code: str
embedding: Annotated[NDArray, embedder]
@coco.function
def setup_table(db: lancedb.LanceDatabase):
return db.declare_table_target(
table_name="code_embeddings",
table_schema=lancedb.TableSchema(
CodeEmbedding,
primary_key=["filename"],
),
)
With Qdrant
from cocoindex.connectors import qdrant
@dataclass
class DocEmbedding:
id: str
text: str
embedding: Annotated[NDArray, embedder]
@coco.function
def setup_collection(db: qdrant.QdrantDatabase):
return db.declare_collection_target(
collection_name="doc_embeddings",
table_schema=qdrant.TableSchema(
DocEmbedding,
primary_key=["id"],
),
)
Example: text embedding pipeline
Here's a complete example of a text embedding pipeline:
import asyncio
import pathlib
from dataclasses import dataclass
from typing import Annotated, AsyncIterator
import asyncpg
from numpy.typing import NDArray
import cocoindex as coco
import cocoindex.asyncio as coco_aio
from cocoindex.connectors import localfs, postgres
from cocoindex.ops.text import RecursiveSplitter
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
from cocoindex.resources.file import FileLike, PatternFilePathMatcher
from cocoindex.resources.chunk import Chunk
# Global state and utilities
embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2")
splitter = RecursiveSplitter()
@dataclass
class DocEmbedding:
filename: str
chunk_start: int
chunk_end: int
text: str
embedding: Annotated[NDArray, embedder]
@coco.function
def setup_table(db: postgres.PgDatabase):
return db.declare_table_target(
table_name="doc_embeddings",
table_schema=postgres.TableSchema(
DocEmbedding,
primary_key=["filename", "chunk_start"],
),
pg_schema_name="public",
)
@coco.function(memo=True)
async def process_chunk(
filename: pathlib.PurePath,
chunk: Chunk,
table: postgres.TableTarget[DocEmbedding],
):
table.declare_row(
row=DocEmbedding(
filename=str(filename),
chunk_start=chunk.start.char_offset,
chunk_end=chunk.end.char_offset,
text=chunk.text,
embedding=await embedder.embed_async(chunk.text),
),
)
@coco.function(memo=True)
async def process_file(
file: FileLike,
table: postgres.TableTarget[DocEmbedding],
):
text = file.read_text()
chunks = splitter.split(text, chunk_size=2000, chunk_overlap=500)
# Process chunks in parallel using explicit context management
ctx = coco.get_component_context()
async def process_with_context(chunk: Chunk):
with ctx.attach():
await coco_aio.mount_run(
coco.component_subpath(str(chunk.start.char_offset)),
process_chunk,
file.relative_path,
chunk,
table,
)
await asyncio.gather(*(process_with_context(chunk) for chunk in chunks))
@coco.function
def app_main(sourcedir: pathlib.Path, db: postgres.PgDatabase):
table = coco.mount_run(coco.component_subpath("setup"), setup_table, db).result()
files = localfs.walk_dir(
sourcedir,
recursive=True,
path_matcher=PatternFilePathMatcher(included_patterns=["*.md"]),
)
for f in files:
coco.mount(
coco.component_subpath("file", str(f.relative_path)),
process_file,
f,
table,
)
API reference
SentenceTransformerEmbedder
::: cocoindex.ops.sentence_transformers.SentenceTransformerEmbedder options: show_root_heading: true show_source: false
Configuration options
Model selection
You can use any model from the sentence-transformers library:
# Small, fast model (384 dimensions)
embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2")
# Larger, more accurate model (768 dimensions)
embedder = SentenceTransformerEmbedder("sentence-transformers/all-mpnet-base-v2")
# Multilingual model
embedder = SentenceTransformerEmbedder("sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2")
# Local model
embedder = SentenceTransformerEmbedder("/path/to/local/model")
Normalization
By default, embeddings are normalized to unit length (suitable for cosine similarity):
# Default: normalized embeddings
embedder = SentenceTransformerEmbedder(
"sentence-transformers/all-MiniLM-L6-v2",
normalize_embeddings=True # Default
)
# Disable normalization if needed
embedder = SentenceTransformerEmbedder(
"sentence-transformers/all-MiniLM-L6-v2",
normalize_embeddings=False
)
Thread safety
The SentenceTransformerEmbedder is thread-safe:
- Model loading is lazy and uses double-checked locking
- GPU access is protected by a lock to prevent concurrent operations
- Safe to use in async contexts with
asyncio.to_thread()(whichembed_async()uses internally)
Performance considerations
Model caching
The model is loaded only once per embedder instance and cached in memory:
# Good: Reuse the same embedder instance
embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2")
for text in texts:
embedding = embedder.embed(text) # Model loaded only once
# Avoid: Creating new embedder instances repeatedly
for text in texts:
embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2")
embedding = embedder.embed(text) # Model loaded every time!
Batch processing
For better performance when embedding many texts, use async processing with asyncio.gather():
# Process many texts in parallel (with thread pool)
async def embed_all(texts: list[str]):
return await asyncio.gather(
*(embedder.embed_async(text) for text in texts)
)
embeddings = asyncio.run(embed_all(texts))
GPU usage
The embedder automatically uses GPU if available. To specify a device:
# Use specific GPU
embedder = SentenceTransformerEmbedder(
"sentence-transformers/all-MiniLM-L6-v2",
device="cuda:0"
)
# Force CPU
embedder = SentenceTransformerEmbedder(
"sentence-transformers/all-MiniLM-L6-v2",
device="cpu"
)
Note: Device selection is not currently exposed in the public API but can be added if needed.