Sharing resources via context
Use ContextKey, builder.provide(), and use_context() to share connections, models, and config across processing components. Covers change detection on context values and accessing context outside mount boundaries.
CocoIndex provides a context mechanism for sharing resources across your pipeline. This is useful for database connections, API clients, configuration objects, or any resource that multiple processing components need to access.
ContextKey
A ContextKey[T] is a typed key that identifies a resource. Define keys at module level:
import asyncpg
import cocoindex as coco
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
# Database connection — no change detection (swapping credentials shouldn't reprocess)
PG_DB = coco.ContextKey[asyncpg.Pool]("text_embedding_db")
# Embedding model — with change detection (switching models should reprocess)
EMBEDDER = coco.ContextKey[SentenceTransformerEmbedder]("embedder", detect_change=True)
The type parameter (asyncpg.Pool, SentenceTransformerEmbedder) enables type checking — when you retrieve the value, your editor knows its type.
Change detection
By default, context keys have change detection disabled — changing the provided value between runs does not automatically invalidate memoized functions that consumed it via use_context(). To opt in to change detection, pass detect_change=True. When enabled, context changes are their own category — tracked by use_context() at the call site, independent of @coco.fn. When a fingerprint changes, any memoized function whose execution involved a use_context() call on that key is invalidated.
Use detect_change=True for resources that affect computation results — models, configuration objects, etc. This ensures memoized functions re-execute when those values change. Resources that don’t affect computation results — database connections, loggers, debug flags, monitoring clients — can use the default (detect_change=False).
Change detection is transitive: if function foo (memoized) calls function bar, and bar calls use_context(key) on a change-detected key, then foo’s memo is also invalidated when the context value changes.
ContextKey as stable identity
Beyond sharing resources, a ContextKey also serves as the stable identity of the resource it points to. When you anchor sources or targets to a ContextKey, CocoIndex treats the key itself — not the underlying value — as the identifier across runs.
This has two consequences:
-
The underlying value can change without losing tracked state. Rotating credentials, moving a database, or relocating a directory won’t invalidate memoization or managed state, as long as the same
ContextKeyis used. -
Renaming a
ContextKeyis a breaking change. Two different keys are two different resources, even if they point to the same physical backend. Existing tracked state will be treated as orphaned. When migrating code, reuse the previous key name to preserve continuity.
Pick a ContextKey name that reflects the logical role of the resource, not its current address. The name is what CocoIndex persists.
- Applications: use any descriptive name — e.g.,
"text_embedding_db","docs_root". - Libraries: prefix with your package name and a
/to avoid collisions with application keys or other libraries — e.g.,"my_library/db","cocoindex.connectors.postgres/pool".
Providing values
In your lifespan function, use builder.provide() to make resources available:
from typing import AsyncIterator
from cocoindex.connectors import postgres
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
async with await asyncpg.create_pool(DATABASE_URL) as pool:
builder.provide(PG_DB, pool)
builder.provide(EMBEDDER, SentenceTransformerEmbedder(EMBED_MODEL))
yield
The resource is available for the lifetime of the environment. When the lifespan exits (after yield), cleanup happens automatically if you use a context manager pattern.
Retrieving values
In processing components, use coco.use_context() to retrieve provided resources:
@coco.fn
async def process_chunk(chunk: Chunk, table: postgres.TableTarget[DocEmbedding]) -> None:
# Retrieve the embedder from context
embedding = await coco.use_context(EMBEDDER).embed(chunk.text)
table.declare_row(row=DocEmbedding(text=chunk.text, embedding=embedding, ...))
Some connectors also accept ContextKeys directly as a convenience — for example, postgres.mount_table_target() takes a ContextKey[asyncpg.Pool] and resolves the connection internally:
@coco.fn
async def app_main(sourcedir: pathlib.Path) -> None:
# PG_DB is resolved internally by the connector
table = await postgres.mount_table_target(
PG_DB,
table_name="doc_embeddings",
table_schema=await postgres.TableSchema.from_class(DocEmbedding, primary_key=["id"]),
)
# ... mount processing components ...
Complete example
Here’s a complete pipeline that uses context to share a database connection and an embedding model across processing components:
from __future__ import annotations
import pathlib
from dataclasses import dataclass
from typing import AsyncIterator, Annotated
import asyncpg
from numpy.typing import NDArray
import cocoindex as coco
from cocoindex.connectors import localfs, postgres
from cocoindex.ops.text import RecursiveSplitter
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
from cocoindex.resources.chunk import Chunk
from cocoindex.resources.file import FileLike, PatternFilePathMatcher
from cocoindex.resources.id import IdGenerator
DATABASE_URL = "postgres://cocoindex:cocoindex@localhost/cocoindex"
EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
# 1. Define context keys at module level
PG_DB = coco.ContextKey[asyncpg.Pool]("text_embedding_db")
EMBEDDER = coco.ContextKey[SentenceTransformerEmbedder]("embedder", detect_change=True)
_splitter = RecursiveSplitter()
# 2. Provide values in the lifespan
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
async with await asyncpg.create_pool(DATABASE_URL) as pool:
builder.provide(PG_DB, pool)
builder.provide(EMBEDDER, SentenceTransformerEmbedder(EMBED_MODEL))
yield
# 3. Use EMBEDDER in type annotations (for vector column schema)
@dataclass
class DocEmbedding:
id: int
filename: str
text: str
embedding: Annotated[NDArray, EMBEDDER] # dimension resolved from context
# 4. Retrieve values in processing functions
@coco.fn
async def process_chunk(
chunk: Chunk,
filename: pathlib.PurePath,
id_gen: IdGenerator,
table: postgres.TableTarget[DocEmbedding],
) -> None:
table.declare_row(
row=DocEmbedding(
id=await id_gen.next_id(chunk.text),
filename=str(filename),
text=chunk.text,
embedding=await coco.use_context(EMBEDDER).embed(chunk.text),
),
)
@coco.fn(memo=True)
async def process_file(
file: FileLike,
table: postgres.TableTarget[DocEmbedding],
) -> None:
text = await file.read_text()
chunks = _splitter.split(text, chunk_size=2000, chunk_overlap=500, language="markdown")
id_gen = IdGenerator()
await coco.map(process_chunk, chunks, file.file_path.path, id_gen, table)
# 5. PG_DB used directly by the connector (resolved internally)
@coco.fn
async def app_main(sourcedir: pathlib.Path) -> None:
table = await postgres.mount_table_target(
PG_DB,
table_name="doc_embeddings",
table_schema=await postgres.TableSchema.from_class(
DocEmbedding, primary_key=["id"],
),
)
files = localfs.walk_dir(
sourcedir,
recursive=True,
path_matcher=PatternFilePathMatcher(included_patterns=["**/*.md"]),
)
await coco.mount_each(process_file, files.items(), table)
app = coco.App(
coco.AppConfig(name="TextEmbedding"),
app_main,
sourcedir=pathlib.Path("./markdown_files"),
)
Accessing context outside processing components
If you need to access context values outside of CocoIndex processing components — for example, in query/serving logic that shares resources with your indexing pipeline — use env.get_context():
# Sync API
db = coco.default_env().get_context(PG_DB)
# Async API
db = (await coco.default_env()).get_context(PG_DB)
This is useful when your application runs both indexing and serving in the same process and you want to initialize shared resources (like database connection pools or configuration) once in the lifespan.
default_env() starts the environment if it hasn’t been started yet, which runs the lifespan function. If you’re using an explicit environment, call get_context() directly on that environment instance.