# CocoIndex Docs — full text
> The complete CocoIndex documentation and example walkthroughs concatenated into one file for LLMs and agents. Each section below is one docs page or example page, in reading order. For a lighter index of pages and examples, see /docs/llms.txt.
---
# CocoIndex overview
Source: https://cocoindex.io/docs/getting_started/overview/
CocoIndex is an ultra-performant framework for building data processing pipelines for AI workloads, with built-in incremental processing.
## Programming model
CocoIndex uses a *declarative*, state-driven programming model. You specify *what* your target should look like as a function of your source data — not *how* to incrementally update it. CocoIndex handles change detection and applies only the necessary updates automatically.
If you’ve used React, spreadsheets, or materialized views, this will feel familiar:
- **React**: declare UI as a function of state → React re-renders what changed
- **Spreadsheets**: declare formulas → cells recompute when inputs change
- **CocoIndex**: declare [target states](../programming_guide/target_state) as a function of source → CocoIndex syncs what changed
## CocoIndex features
### High-performance Rust 🦀 engine
CocoIndex executes pipelines on a high-performance Rust engine, delivering resilient and scalable data processing.
### Easy to code
- Write simple transformations in Python without learning new DSLs
- Write batch-style code without worrying about deltas — CocoIndex runs it incrementally in both batch and live mode, continuously updating results. No separate DAGs, operators, or orchestration logic required.
### Incremental & low-latency
CocoIndex tracks fine-grained dependencies and only recomputes what changed in the input data or the code. End-to-end updates drop from hours/days to seconds while keeping full correctness.
### Full lineage & explainability
Every processing step, intermediate result, and execution path is inspectable. This helps it remain compliant with the EU AI Act for transparency, and satisfies enterprise auditability/traceability requirements.
### Open integration model
Sources and targets plug in through a standard, open interface (no vendor lock-in). Leverage the full Python ecosystem for models, functions, and libraries.
### High throughput + controlled concurrency
Pipelines automatically parallelize with managed concurrency and request batching — reducing GPU cost, RPC fanout, and end-to-end latency.
### Fault-tolerant runtime
The engine gracefully retries transient failures and resumes from previous progress after interruptions — eliminating manual backfills and replays.
### Low operational overhead
CocoIndex removes the need for elaborate plumbing: refreshing datasets, maintaining state, handling backfills, ensuring correctness, coordinating GPUs, scaling workers, and managing infra are all handled by the engine.
## Incremental data processing
CocoIndex continuously maintains and tracks state while processing only new or changed data. It is designed to support incremental processing from day zero.
What incremental processing means:
- Avoid unnecessarily recomputing work, based on multi-level change detection:
- **Component level**: only reprocess source items with changes
- **Function level**: within an item’s processing, memoize expensive function calls and reuse when possible
- **Target level**: apply minimum necessary changes (insertions, updates, deletions) to the target
- Support multiple mechanisms to capture source changes (CDC, poll-based) out of the box
You write simple batch-style code — no delta logic, no state handling. CocoIndex automatically runs your pipeline incrementally and keeps the output up to date for serving, training, or feature computation.
## Next steps
- [Install CocoIndex](./installation) and follow the [Quickstart](./quickstart) to build your first pipeline in 5 minutes
- Read [Core Concepts](../programming_guide/core_concepts) for the mental model behind CocoIndex
---
# CocoIndex installation
Source: https://cocoindex.io/docs/getting_started/installation/
## Install Python and pip
To follow the steps in this guide, you'll need:
1. Install [Python](https://wiki.python.org/moin/BeginnersGuide(2f)Download.html). We support Python 3.11 to 3.13.
2. Install [pip](https://pip.pypa.io/en/stable/installation/) - a Python package installer
## Install CocoIndex
### Using pip
```sh
pip install -U cocoindex
```
### Using uv
```sh
uv add cocoindex
```
### Using Poetry
```sh
poetry add cocoindex
```
Or specify in `pyproject.toml`:
```toml
[tool.poetry.dependencies]
cocoindex = { version = "^1.0" }
```
## System requirements
CocoIndex is supported on the following operating systems:
- **macOS**: 10.12+ on x86_64, 11.0+ on arm64
- **Linux**: x86_64 or arm64, glibc 2.28+ (e.g., Debian 10+, Ubuntu 18.10+, Fedora 29+, CentOS/RHEL 8+)
- **Windows**: 10+ on x86_64
---
# CocoIndex quickstart
Source: https://cocoindex.io/docs/getting_started/quickstart/
In this tutorial, we'll build a simple app that converts PDF files to Markdown and saves them to a local directory.
## Overview
1. Read PDF files from a local directory
2. Convert each file to Markdown using Docling
3. Save the Markdown files to an output directory (as **target states**)
You declare the transformation logic with native Python without worrying about changes.
Think: **target_state = transformation(source_state)**
When your source data is updated, or your processing logic is changed (for example, switching parsers or tweaking conversion settings), CocoIndex performs smart incremental processing that only reprocesses the minimum. And it keeps your Markdown files always up to date.
## Setup
1. Install CocoIndex (see [Installation](./installation) for other package managers) and the Docling dependency:
```bash
pip install -U cocoindex docling
```
2. Create a new directory for your project:
```bash
mkdir cocoindex-quickstart
cd cocoindex-quickstart
```
3. Create a `pdf_files/` directory and add your PDF files:
```bash
mkdir pdf_files
```
You can download sample PDF files from the [git repo](https://github.com/cocoindex-io/cocoindex/tree/main/examples/pdf_to_markdown).
4. Create a `.env` file to configure the database path:
```bash
echo "COCOINDEX_DB=./cocoindex.db" > .env
```
## Define the app
At a high level, the app has three layers:
1. **App** — binds the pipeline function to concrete input and output paths
2. **Main function** — finds PDF files and mounts one processing component per file
3. **File processing** — converts one PDF to Markdown and declares the output file
We'll define the code in the opposite order so each Python symbol exists before it is referenced.
Create a new file `main.py`. We'll define the processing functions first, then wire them into an App.
### Define file processing
This function converts a single PDF to Markdown:
```python title="main.py"
import pathlib
import cocoindex as coco
from cocoindex.connectors import localfs
from cocoindex.resources.file import PatternFilePathMatcher
from docling.datamodel.accelerator_options import AcceleratorDevice, AcceleratorOptions
from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import PdfPipelineOptions
from docling.document_converter import DocumentConverter, PdfFormatOption
_pipeline_options = PdfPipelineOptions(
accelerator_options=AcceleratorOptions(device=AcceleratorDevice.CPU)
)
_converter = DocumentConverter(
format_options={
InputFormat.PDF: PdfFormatOption(pipeline_options=_pipeline_options)
}
)
@coco.fn(memo=True)
def process_file(
file: localfs.File,
outdir: pathlib.Path,
) -> None:
markdown = _converter.convert(
file.file_path.resolve()
).document.export_to_markdown()
outname = file.file_path.path.stem + ".md"
localfs.declare_file(outdir / outname, markdown, create_parent_dirs=True)
```
- **`localfs.File`** — A file object returned by `localfs.walk_dir()`, implementing the [`FileLike`](../common_resources/data_types#filelike) base class. See the [localfs connector](../connectors/localfs) for full details.
- **`memo=True`** — Caches results; unchanged files are skipped on re-runs
- **`localfs.declare_file()`** — Declares a file [target state](../programming_guide/target_state); auto-deleted if source is removed. See [localfs as target](../connectors/localfs#as-target) for the full API.
### Define the main function
```python title="main.py"
@coco.fn
async def app_main(sourcedir: pathlib.Path, outdir: pathlib.Path) -> None:
files = localfs.walk_dir(
sourcedir,
recursive=True,
path_matcher=PatternFilePathMatcher(included_patterns=["**/*.pdf"]),
)
await coco.mount_each(process_file, files.items(), outdir)
```
`mount_each()` mounts one processing component per file. Each item from `files.items()` is a `(key, file)` pair — the key (the file's relative path) becomes the component subpath automatically.
It's up to you to pick the process granularity — it can be at directory level, at file level, or at page level. In this example, because we want to independently convert each file to Markdown, the file level is the most natural choice.
### Create the App
```python title="main.py"
app = coco.App(
"PdfToMarkdown",
app_main,
sourcedir=pathlib.Path("./pdf_files"),
outdir=pathlib.Path("./out"),
)
```
This defines a CocoIndex App — the top-level runnable unit in CocoIndex. It binds the main function with its arguments.
## Run the pipeline
Run the pipeline:
```bash
cocoindex update main.py
```
CocoIndex will:
1. Create the `out/` directory
2. Convert each PDF in `pdf_files/` to Markdown in `out/`
Check the output:
```bash
ls out/
# example.md (one .md file for each input PDF)
```
## Incremental updates
The power of CocoIndex is **incremental processing**. Try these:
**Add a new file:**
Add a new PDF to `pdf_files/`, then run:
```bash
cocoindex update main.py
```
Only the new file is processed.
**Modify a file:**
Replace a PDF in `pdf_files/` with an updated version, then run:
```bash
cocoindex update main.py
```
Only the changed file is reprocessed.
**Delete a file:**
```bash
rm pdf_files/example.pdf
cocoindex update main.py
```
The corresponding Markdown file is automatically removed.
## Next steps
- Read [Core Concepts](../programming_guide/core_concepts) to understand the mental model — state-driven programming, processing components, and memoization
- Dive into the [Programming Guide](../programming_guide/app), starting with Apps, to learn how to build more complex pipelines
- Browse more [examples](https://github.com/cocoindex-io/cocoindex/tree/main/examples) for real-world patterns (text embedding, RAG, knowledge graphs)
---
# Use CocoIndex with AI coding agents
Source: https://cocoindex.io/docs/getting_started/ai_coding_agents/
CocoIndex ships an official **agent skill** that teaches AI coding agents how to build CocoIndex v1 pipelines correctly — core concepts, API surface, common patterns, connectors, and best practices, all in one place.
The skill lives in the [`skills/cocoindex/`](https://github.com/cocoindex-io/cocoindex/tree/main/skills/cocoindex) directory of the main repo.
## Why a skill?
CocoIndex v1 is a fundamental redesign from v0. Without context, an LLM trained on older snapshots tends to hallucinate the wrong API (the v0 flow-builder DSL, deprecated decorators, missing `@coco.fn`, unstable component paths). The skill gives the agent a single concise reference covering:
- **Mental model** — `target_state = transform(source_state)`, declarative pipelines, [processing components](../programming_guide/processing_component).
- **Core APIs** — `@coco.fn`, `mount` / `use_mount` / `mount_each`, `ContextKey`, `@coco.lifespan`, target state declarations.
- **Common patterns** — file transformation, vector embedding, LLM extraction, knowledge graphs.
- **Connectors** — PostgreSQL, SQLite, LanceDB, Qdrant, SurrealDB, Apache Doris, LocalFS, S3, Kafka, Google Drive.
- **Best practices** — memoization, stable component paths, vector schema with `Annotated[NDArray, KEY]`.
## Use with Claude Code
Claude Code auto-loads skills from `.claude/skills/` in the current project, or from `~/.claude/skills/` globally.
Project-local install (recommended for repos that build with CocoIndex):
```sh
mkdir -p .claude/skills
git clone --depth=1 https://github.com/cocoindex-io/cocoindex.git /tmp/cocoindex-skill
cp -r /tmp/cocoindex-skill/skills/cocoindex .claude/skills/
```
Global install:
```sh
mkdir -p ~/.claude/skills
git clone --depth=1 https://github.com/cocoindex-io/cocoindex.git /tmp/cocoindex-skill
cp -r /tmp/cocoindex-skill/skills/cocoindex ~/.claude/skills/
```
Once installed, Claude Code picks up the skill automatically when you ask it to build or modify a CocoIndex pipeline.
## Use with other agents
The skill is plain Markdown — `SKILL.md` plus a few reference files. Any agent that accepts file-based context will work:
- **Cursor** — copy `skills/cocoindex/SKILL.md` into `.cursor/rules/cocoindex.md`.
- **Generic AGENTS.md / CLAUDE.md** — concatenate or `@import` `SKILL.md` from your top-level agent instructions file.
- **Custom RAG / agent stack** — index the `skills/cocoindex/` directory like any other documentation source.
## What's inside
```
skills/cocoindex/
├── SKILL.md # main entry — concepts, APIs, patterns
└── references/
├── api_reference.md # quick API reference
├── connectors.md # full connector reference
├── patterns.md # detailed pipeline patterns
├── setup_project.md # project setup
└── setup_database.md # database setup
```
## Contributing
The skill is versioned alongside the codebase — when the API changes, the skill changes with it. PRs that improve clarity, add examples, or cover new connectors are welcome. See the [contributing guide](../contributing/guide/) for how to get started.
---
# CocoIndex core concepts
Source: https://cocoindex.io/docs/programming_guide/core_concepts/
## Incremental processing
When processing data and storing results in targets (e.g., a database) for knowledge retrieval by AI agents or search systems, both your data and code evolve over time. Reprocessing everything after every change is expensive, slow, and disruptive. Incremental processing solves this by only processing what's changed, and applying those changes to the target.
Implementing incremental processing by hand is complicated because:
- You need to figure out what has changed and what has not.
- You need to think in the time dimension and carefully compute the “delta”, e.g., what needs to be inserted, updated, or deleted in your target.
- You need to track and preserve intermediate states to avoid full recomputation when possible.
- You need to evolve the target schema and backfill data when the code logic changes.
With so many moving parts, when something goes wrong, it is difficult to debug.
## State-driven programming
CocoIndex uses a *declarative* programming model — instead of programming *how* to incrementally process your data and apply changes to your target, you declaratively specify *what* your target should look like, based on the current state of your data source.
**Info**
If you've used React, spreadsheets, or materialized views, this mental model will feel familiar:
- **Spreadsheets**: You declare formulas in cells. When any upstream cell changes, downstream cells automatically recompute to reflect the new state.
- **React**: You declare your UI as a function of state. When state changes, React automatically re-renders the UI to match.
- **Materialized Views**: You declare a query (e.g., SQL) that runs on source tables. When input data changes, the view automatically refreshes to match.
CocoIndex uses the above ideas to formulate a state-driven paradigm for long-running, side-effectful data processing pipelines with the following key concepts:
- **Data transformations**: You read the current state from your source and perform a series of transformations. For example, converting PDFs to markdown files,
extracting features or structures, or mapping data to fit a particular schema.
- **Target states**: You output the results to a target such as a relational database, vector database, or file system. Note that the target state is a pure function of the source state (i.e., it has no other side effects). **TargetState = Transform(SourceState)**
- **Incremental processing**: Under the hood, when the source state changes, CocoIndex incrementally processes only the changes needed to update the target, so you don't have to manage it yourself.
## App
An ***app*** is the top-level executable entity in CocoIndex. In an app, you write code to:
- Read state from sources
- Transform the data
- Declare ***target states***: i.e., what the output should look like
CocoIndex then syncs these target states to external systems (Postgres, vector databases, etc.).
For example, here's an app that reads PDFs from a drive, converts them to markdown, and outputs to a folder:
## Processing Component
In practice, your source often contains many items — files, rows, or entities — each of which can be processed independently. A ***processing component*** groups an item's processing together with its target states. Each processing component runs independently and applies its target states to the external system as soon as it completes, without waiting for the rest of the app.
For example, if you have many files in a drive and want to process them file by file, your processing component would operate at the file level:
Taking this further, suppose you want to split each file into chunks and create embedding vectors for indexing. The processing component can still operate at the file level, but each component now produces multiple target states (one per chunk). CocoIndex applies all target state changes (inserts, updates, and deletes rows in the target database) as a unit for each file — all writes happen after processing completes, and each target backend applies its batch atomically when supported (e.g., within a database transaction):
Let's see what happens when the source state changes in different ways:
When a new file (`c.md`) is added, a new processing component is created for it. Once execution completes, CocoIndex applies the new target states — inserting `vector5` and `vector6` into the vector store.
When file `b.md` is updated — say its content is reduced to just one chunk instead of two — the processing component's target state changes from `vector3` and `vector4` to just `vector5`. CocoIndex deletes `vector3` and `vector4`, then inserts `vector5` into the vector database, all within a single transaction.
When file `b.md` is deleted from the source folder, CocoIndex deletes its associated target states (`vector3` and `vector4`) from the vector database in a single transaction.
## Function memoization: skip unchanged computations
***Function memoization*** is a technique that allows skipping a function when its input and code are unchanged from a previous run. It is essential for incremental processing — without it, every run would require full recomputation.
In CocoIndex, both processing components and transforms are expressed as functions, so function memoization can be enabled at either level. Using the chunk-embed example:
- *Processing component level*: If a file hasn't changed and the processing logic hasn't changed, the entire processing component is skipped.
- *Transform level*: If the input to the "embed" transform (the chunk text) hasn't changed and the transform logic (e.g., the model) hasn't changed, that specific embedding computation is skipped.
See [Function Memoization](./function#memoization) for more details.
Here's how memoization behaves in different scenarios:
When input file `b.md` changes:
- The input state `a.md` is unchanged, so the 1st processing component is entirely reused without reprocessing.
- The input state `b.md` changed, so the 2nd processing component must be reprocessed. After splitting, suppose we get two chunks: `chunk3` (identical to before) and `chunk5` (new).
- `Embed(chunk3)` was memoized previously, so its cached result is reused.
- `Embed(chunk5)` is new and must be computed.
When the "Split into chunks" logic changes:
- All processing components must be reprocessed since the logic changed.
- For the 1st processing component, the new logic produces the same chunks as before. The memoized `Embed` results are reused without recomputation.
- For the 2nd processing component, the new logic produces different chunks (`chunk5` and `chunk6`), so `Embed` must be invoked on them.
As these examples show, memoization can save expensive computations even when logic changes — as long as the intermediate results remain the same.
## Next steps
Now that you understand the mental model, dive into the Programming Guide to learn how to use these concepts in code:
- [App](./app) — creating and running pipelines
- [Target State](./target_state) — declaring what should exist in external systems
- [Processing Component](./processing_component) — structuring work and mounting components
- [Function](./function) — the `@coco.fn` decorator, memoization, and change detection
---
# The CocoIndex App
Source: https://cocoindex.io/docs/programming_guide/app/
**Note — Prerequisite**
This page builds on [Core Concepts](./core_concepts), which introduces the App and the source → transform → target-state model **with diagrams**. If the App model feels abstract, start there.
An **App** is the top-level runnable unit in CocoIndex.
It names your pipeline and binds a main function with its parameters. When you call `app.update()`, CocoIndex runs that main function as the root [processing component](./processing_component) which can mount child processing components to do work and declare target states.
## Creating an app
To create an App, provide:
1. **An `AppConfig`** (or just a name string) — identifies the pipeline
2. **A main function** — the entry point for your pipeline
3. **Arguments** — any additional arguments to pass to the main function
```python
import cocoindex as coco
@coco.fn
async def app_main(sourcedir: pathlib.Path) -> None:
# ... your pipeline logic ...
app = coco.App(
coco.AppConfig(name="MyPipeline"),
app_main,
sourcedir=pathlib.Path("./data"),
)
```
You can also pass just a name string instead of `AppConfig`:
```python
app = coco.App("MyPipeline", app_main, sourcedir=pathlib.Path("./data"))
```
**Tip**
The main function is usually async. See [How Sync and Async Work Together](./sdk_overview#how-sync-and-async-work-together) for details.
## Updating an app
Call `update()` to execute the pipeline. It returns an `UpdateHandle` that is also `Awaitable`, so the simplest usage stays the same:
```python
# Async — await the result directly (backward-compatible)
result = await app.update()
```
```python
# Sync (blocking) API
result = app.update_blocking()
```
**Parameters:**
- `live` option keeps the app running after the initial scan so live components can continue watching for changes. See [Live Mode](./live_mode).
- `report_to_stdout` option prints periodic progress updates during execution. Pass `True` for the default refresh interval, or a `timedelta` to set it.
- `full_reprocess` option reprocesses everything and invalidates existing caches. This forces all components to re-execute and all target states to be re-applied, even if they haven't changed.
When you update an App, CocoIndex:
1. Runs the lifespan setup (if not already done)
2. Executes the main function (the root processing component), which mounts child processing components
3. Compares the declared target states with the previous run and applies only the necessary changes to external systems
Given the same logic and inputs, updates are repeatable. When logic or inputs change, only the affected parts re-execute.
To watch progress beyond the `report_to_stdout` flag, the `UpdateHandle` returned by `app.update()` also exposes stats programmatically — poll with `handle.stats()` or stream with `handle.watch()`. For those structured APIs, and for splitting a run into separately-reported scopes with `coco.stats_group()`, see [Progress monitoring](../advanced_topics/progress_monitoring).
## How an app runs
An App is the top-level runner and entry point. A **processing component** is the unit of incremental execution *within* an app.
- Your app's main function runs as the **root processing component** at the root path.
- Each call to `mount()` or `use_mount()` declares a **child processing component** at a child path. Sugar APIs like `mount_each()` and `mount_target()` also create child components.
- Each processing component declares a set of target states, and CocoIndex syncs them as a unit when that component finishes — all writes happen after processing completes, and each target backend applies its batch atomically when supported.
This is why `app.update()` does not "run everything from scratch": CocoIndex uses the component path tree to decide what can be reused and what must re-run.
For example, an app that processes files might mount one component per file — the per-file fan-out from [Core Concepts](./core_concepts#processing-component), shown here as a path tree:
```text
(root) ← app_main component
├── "setup" ← declare_dir_target component
└── "process"
├── "hello.md" ← process_file component
└── "world.md" ← process_file component
```
See [Processing Component](./processing_component) for how mounting and component paths define these boundaries.
## Database path
CocoIndex needs a database path (`db_path`) to store its internal state. This database tracks target states and memoized results from previous runs, enabling CocoIndex to compute what changed and apply only the necessary updates.
The simplest way to configure the database path is via the `COCOINDEX_DB` environment variable:
```bash
export COCOINDEX_DB=./cocoindex.db
```
With `COCOINDEX_DB` set, you can create and run apps without any additional configuration:
```python
import cocoindex as coco
@coco.fn
def app_main() -> None:
# ... your pipeline logic ...
app = coco.App("MyPipeline", app_main)
app.update_blocking() # Uses COCOINDEX_DB for storage
```
For details on what the internal database stores and how to tune its LMDB settings (e.g., increasing the maximum database size beyond 4 GiB), see [Internal Storage](../advanced_topics/internal_storage).
## Lifespan (optional)
A **lifespan function** defines the CocoIndex runtime lifecycle: its setup runs when the runtime starts (automatically before the first `app.update()`), and its cleanup runs when the runtime stops. Use it to configure CocoIndex settings programmatically or to initialize shared resources that processing components can reuse.
**Tip**
If you only need to set the database path, using the `COCOINDEX_DB` environment variable is simpler than defining a lifespan function.
### Defining a lifespan
Use the `@lifespan` decorator to register a lifespan function. By default, all apps share the same lifespan (unless you explicitly specify an app in a different [*Environment*](../advanced_topics/multiple_environments)). The function receives an `EnvironmentBuilder` for configuration and uses `yield` to separate setup from cleanup:
```python
import pathlib
from typing import AsyncIterator
import cocoindex as coco
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
# Configure CocoIndex's internal database location (overrides COCOINDEX_DB if set)
builder.settings.db_path = pathlib.Path("./cocoindex.db")
# Setup: initialize resources here
yield
# Cleanup happens automatically when the context exits
```
Setting `db_path` in the lifespan takes precedence over the `COCOINDEX_DB` environment variable. If neither is provided, CocoIndex will raise an error.
The lifespan function can be sync or async:
```python
import cocoindex as coco
@coco.lifespan
def coco_lifespan(builder: coco.EnvironmentBuilder) -> Iterator[None]:
builder.settings.db_path = pathlib.Path("./cocoindex.db")
yield
```
You can also use the lifespan to provide resources (like database connections) that processing components can access. See [Context](./context) for details on sharing resources across your pipeline.
### Explicit lifecycle control (optional)
The lifespan runs automatically the first time any App updates — most users don't need to do anything beyond defining the lifespan and calling `app.update()`.
If you need more explicit control — for example, to know when startup completes for health checks, or to explicitly trigger shutdown — you can manage the lifecycle directly:
```python
# Async API
await coco.start() # Run lifespan setup
# ... run apps or other operations ...
await coco.stop() # Run lifespan cleanup
```
```python
# Sync (blocking) API
coco.start_blocking() # Run lifespan setup
# ... run apps or other operations ...
coco.stop_blocking() # Run lifespan cleanup
```
Or use the `runtime()` context manager, which supports both sync and async usage:
```python
# Async
async with coco.runtime():
await app.update()
```
```python
# Sync (blocking)
with coco.runtime():
app.update_blocking()
```
## Managing apps with CLI
CocoIndex provides a CLI for managing your apps without writing additional code.
### Update an app
Run your app once to sync all target states:
```bash
cocoindex update main.py
```
This executes your pipeline and applies all declared target states to external systems. Add `--live` (or `-L`) to keep the app running and react to source changes continuously — see [Live Mode](./live_mode).
### Drop an app
Remove an app and revert all its target states:
```bash
cocoindex drop main.py
```
This will delete all target states created by the app (e.g., drop tables, delete rows) and clear its internal state.
`drop` is an explicit, foreground operation — any failure during the recursive delete (root or any descendant) raises rather than being silently logged. The internal tracking record for a component whose delete failed is preserved so the next `drop` (with the underlying problem fixed) can complete the cleanup. See [Error Handling](../advanced_topics/exception_handlers) for the general principle.
See [CLI Reference](../cli) for more commands and options.
---
# Declaring target state
Source: https://cocoindex.io/docs/programming_guide/target_state/
**Note — Prerequisite**
This page builds on [Core Concepts](./core_concepts), which introduces target states and the declarative *target state = transform(source state)* model **with diagrams**. If the ideas below feel abstract, start there.
A **target state** represents what you want to exist in an external system. You *declare* target states in your code; CocoIndex keeps them in sync with your intent — creating, updating, or removing them as needed.
**Note — Terminology**
A **target** is the external system you write to — a directory, a database table, a vector store collection, etc. In Python, targets are represented by objects like `DirTarget` and `TableTarget`. A **target state** is what you want to exist *in* that target — a specific file, row, or embedding.
CocoIndex treats your declarations as the source of truth: if you stop declaring a target state, CocoIndex will remove it from the target.
Examples of target states:
- A file in a directory
- A row in a database table
- An embedding vector in a vector store
When your source data changes, CocoIndex compares the newly declared target states with those from the previous run and applies only the necessary changes.
## Declaring target states
CocoIndex connectors provide **targets** with `declare_*` methods:
```python
# Declare a file target state
dir_target.declare_file(filename="output.html", content=html)
# Declare a row target state
table_target.declare_row(row=DocEmbedding(...))
```
### Where do targets come from?
Target states can be nested — a directory contains files, a table contains rows. The container itself is a target state you declare, and once it's ready, you get a target to declare child target states within it.
Container target states (like a directory or table) are typically top-level — you can declare them directly. Child target states (like files or rows) require the container to be ready first.
Connectors provide convenience methods that mount the container and return a ready-to-use target in one step:
### Example: writing a file to a directory
For simple cases where each processing component writes a single file, you can declare the file directly:
```python
from cocoindex.connectors import localfs
# Declare a single file target state directly
localfs.declare_file(outdir / "output.html", html, create_parent_dirs=True)
```
When you need a `DirTarget` to declare multiple files, use the connector's convenience method:
```python
# Mount a directory target, get a ready-to-use DirTarget
dir_target = await localfs.mount_dir_target(outdir)
# Declare child target states (files)
dir_target.declare_file(filename="output.html", content=html)
```
### Example: writing a row to PostgreSQL
This example uses a [`ContextKey`](./context) to reference the database connection — see [Context](./context) for how keys are defined and provided.
```python
import asyncpg
import cocoindex as coco
from cocoindex.connectors import postgres
# Define a ContextKey for the database connection (provided in lifespan)
TARGET_DB = coco.ContextKey[asyncpg.Pool]("target_db")
# Mount a table target, get a ready-to-use TableTarget
table = await postgres.mount_table_target(
TARGET_DB,
table_name="doc_embeddings",
table_schema=await postgres.TableSchema.from_class(
DocEmbedding, primary_key=["id"]
),
)
# Declare a child target state (a row)
table.declare_row(row=DocEmbedding(...))
```
These convenience methods wrap [`mount_target()`](./processing_component#mount_target), which automatically derives the component path from the target's globally unique key. See [Processing Component](./processing_component) for more on mounting APIs.
**Tip — Type safety**
Targets like `DirTarget` and `TableTarget` have two statuses: **pending** (just created) and **resolved** (after the container target state is ready). The type system tracks this — if you try to use a pending target before it's resolved, type checkers like mypy will flag the error.
## How CocoIndex syncs target states
Under the hood, CocoIndex compares your declared target states with the previous run and applies the minimal changes needed — the same create/update/delete sync the [change-scenario diagrams in Core Concepts](./core_concepts#processing-component) walk through, viewed per target state:
Target State
CocoIndex's Action
On first declaration
When declared differently
When no longer declared
A database table
Create the table
Alter the table
Drop the table
A row in a database table
Insert the row
Update the row
Delete the row
A file in a directory
Create the file
Update the file
Delete the file
CocoIndex ensures containers exist before their contents are added, and properly cleans up contents when the container changes.
### What happens when a target's schema changes
When you change a container target state's declaration (e.g., add a column to a table schema, change a primary key), CocoIndex detects the change and does its best to alter the target in place. If the change is too large to alter (e.g., changing primary keys), the target is dropped and recreated.
When a target is dropped and recreated, CocoIndex automatically reprocesses all affected components to backfill the data — you don't need to manually trigger `--full-reprocess`. This is handled by the target connector's [child invalidation](../advanced_topics/custom_target_connector#child-invalidation) mechanism, which signals to CocoIndex whether the change is destructive (all children lost) or lossy (some data may be lost).
## Generic target state APIs
For cases where connector-specific APIs don't cover your needs, CocoIndex provides generic APIs:
- `declare_target_state()` — declare a leaf target state
- `declare_target_state_with_child()` — declare a target state that provides child target states
These are exported from `cocoindex` and used internally by connectors. For defining custom targets, see [Custom Target States Connector](../advanced_topics/custom_target_connector).
---
# The processing component
Source: https://cocoindex.io/docs/programming_guide/processing_component/
**Note — Prerequisite**
This page builds on [Core Concepts](./core_concepts), which introduces processing components, target states, and incremental sync **with diagrams** — the per-file fan-out and the chunk-embedding pipeline it walks through are referenced throughout this page. If the mounting APIs below feel abstract, start there.
Most apps process many independent source items — files, rows, or entities. A **Processing Component** is the unit of execution for one: it runs that item's transformation logic and declares the set of **target states** produced for it.
## Component path
A **component path** is the stable identifier for a processing component across runs (think of it like a path in a tree). CocoIndex uses it to match a component to its previous run, detect what changed for that item, and sync that component's target states as a unit when it finishes. This sync happens per component; CocoIndex does not wait for other components in the same app to complete.
Component paths are hierarchical and form a tree structure. You specify child paths using `coco.component_subpath()` with stable identifiers like string literals, file names, row keys, or entity IDs:
```python
coco.component_subpath(filename) # e.g., coco.component_subpath("hello.pdf")
coco.component_subpath("user", user_id) # e.g., coco.component_subpath("user", 12345)
```
Choose paths that are stable for the "same" item (e.g., file path, primary key). If an item disappears and its path is no longer present, CocoIndex cleans up the target states owned by that path (and its sub-paths).
Here's an example component path tree for a pipeline that processes files:
```text
(root) ← app_main component
└── process_file
├── "hello.pdf" ← process_file component
└── "world.pdf" ← process_file component
```
The tree is populated dynamically as the app runs — each `mount()` / `mount_each()` call adds a subpath.
See [StableKey](./sdk_overview#stablekey) in the SDK Overview for details on what values can be used in component paths.
## Mount
Mounting is how you declare (instantiate) a processing component within an app at a specific path, so CocoIndex knows that component exists, should run, and owns a set of target states — and can match it against its previous run to sync only what changed.
CocoIndex provides two core mounting APIs:
- **`mount()`** — sets up a processing component in a child path without depending on data from it. This allows the component to refresh independently in live mode.
- **`use_mount()`** — returns a value from the component's execution to the caller. The component at that path cannot refresh independently without re-executing the caller.
**Which one you reach for comes down to a single question: does the caller need a value back?** `use_mount()` consumes the child's return value, which *couples* the two — the call blocks until the child finishes and commits its target states, and the child can't refresh on its own without re-running the caller. `mount()` takes nothing back and returns as soon as the child is scheduled, so the child runs on its own and can [refresh independently in live mode](./live_mode) when its inputs change.
And two sugar APIs that simplify common patterns:
- **`mount_each()`** — mounts one component per item in a keyed iterable
- **`mount_target()`** — mounts a target without an explicit subpath
See also [`map()`](#map) for a utility API that operates within a component without creating new ones.
### Automatic subpath derivation
`mount()`, `use_mount()`, and `mount_each()` all accept an optional `ComponentSubpath` as their first argument. When omitted, the subpath is **auto-derived** from the function name using `Symbol(fn.__name__)`.
```python
# These are equivalent:
await coco.mount(process_file, file, target)
await coco.mount(coco.component_subpath(coco.Symbol("process_file")), process_file, file, target)
```
This means the component path for a `process_file` function is `parent / Symbol("process_file")`. The function must have a `__name__` attribute; if it doesn't (e.g., a lambda), provide an explicit subpath.
Since sibling component paths must not collide, you need an explicit subpath when:
- **The same function is mounted more than once** — auto-derived paths would be identical, so each call needs a distinct path (e.g., `coco.component_subpath("session", youtube_id)`)
- **Different functions happen to share a `__name__`** — rare, but possible with wrappers or closures
- **You want a specific path name** — different from the function name
### `mount()`
Use `mount()` when you don't need a return value from the processing component. It schedules the processing component to run and returns a handle:
```python
handle = await coco.mount(process_file, file, target)
```
With an explicit subpath, for example when mounting multiple components of the same function:
```python
handle = await coco.mount(
coco.component_subpath("process", filename),
process_file,
file,
target,
)
```
The handle provides a method you can call if you need to wait until the processing component is fully ***ready*** — meaning all its target states have been applied to external systems and all components in its sub-paths are ready:
```python
await handle.ready()
```
You usually only need to call `ready()` when you have logic that depends on the processing component's target states being applied — for example, querying the latest data from a target table after syncing it.
`mount()` also accepts **LiveComponent classes** — components that process continuously and react to changes incrementally instead of rescanning everything. See [Live Components](../advanced_topics/live_component) for details.
### `use_mount()`
Use `use_mount()` when you need the processing component's return value. It mounts the component, waits until it's ready, and returns the value directly:
```python
table = await coco.use_mount(setup_table, table_name="docs")
```
With an explicit subpath:
```python
table = await coco.use_mount(
coco.component_subpath("setup"),
setup_table,
table_name="docs",
)
```
A common use of `use_mount()` is to obtain a [target](./target_state#where-do-targets-come-from) after its container target state is applied.
### `mount_each()`
`mount_each()` mounts one processing component per item in a keyed iterable.
```python
files = localfs.walk_dir(sourcedir, path_matcher=PatternFilePathMatcher(included_patterns=["**/*.md"]))
await coco.mount_each(process_file, files.items(), target)
```
Each item in the iterable is a `(key, value)` tuple. The value is passed as the first argument to the function, and any additional arguments are passed through. Items are mounted under an [auto-derived subpath](#automatic-subpath-derivation) (`Symbol(fn.__name__)`), so the component path for each item is `parent / Symbol("process_file") / key`.
In the static case this is just the keyed-loop form of [`mount()`](#mount) — the same per-file fan-out shown in [Core Concepts](./core_concepts#processing-component). The snippet above is equivalent to looping over the items and calling `mount(coco.component_subpath(coco.Symbol("process_file"), key), process_file, value, target)` for each one. The per-item `key` is what keeps each component's path stable across runs, so the engine matches, updates, and cleans up each item individually.
You can provide an explicit subpath as the first argument:
```python
await coco.mount_each(coco.component_subpath("files"), process_file, files.items(), target)
```
Source connectors provide an `items()` method that returns `(StableKey, T)` pairs. For example, `localfs.walk_dir(...).items()` yields `(relative_path, File)` tuples.
It also does one thing a hand-written loop can't: when a source connector supports live watching, its `items()` returns a `LiveMapView` or `LiveMapFeed` instead of a plain iterable, and `mount_each()` detects this and automatically adds, updates, and removes per-item components as the source changes — no changes to `mount_each()` itself are needed. See [Live Mode](./live_mode).
### `mount_target()`
`mount_target()` mounts a target without requiring an explicit subpath.
```python
from cocoindex.connectors import localfs
dir_target = await coco.mount_target(localfs.dir_target(outdir))
```
The component path is derived automatically from the target's globally unique key — you don't need to create a `component_subpath` for it. This is sugar over calling `use_mount()` with a target declaration function.
Connectors also provide convenience methods that wrap `mount_target()`:
```python
# Equivalent to the above
dir_target = await localfs.mount_dir_target(outdir)
# PostgreSQL example
table = await postgres.mount_table_target(
PG_DB,
table_name="doc_embeddings",
table_schema=await postgres.TableSchema.from_class(DocEmbedding, primary_key=["id"]),
)
```
### Using `component_subpath` as a context manager
You can use `component_subpath()` as a context manager to create nested paths without repeating common prefixes:
```python
with coco.component_subpath("process"):
for f in files:
await coco.mount(
coco.component_subpath(str(f.relative_path)),
process_file,
f,
target,
)
```
This is equivalent to:
```python
for f in files:
await coco.mount(
coco.component_subpath("process", str(f.relative_path)),
process_file,
f,
target,
)
```
**Tip**
When iterating over keyed items, prefer [`mount_each()`](#mount_each) — it handles the loop and subpath creation for you.
## How target states sync
The component path tree determines ownership. When a component is no longer mounted at a path (e.g., a source file is deleted), CocoIndex automatically cleans up its target states — and recursively for all its sub-paths.
**Info — Sync Mechanism**
After a processing component finishes, CocoIndex syncs its target states:
1. **Compares** the target states declared in this run against those from the previous run at the same path
2. **Applies changes** as a unit — creating, updating, or deleting target states as needed
3. **Recursively cleans up** sub-paths where components are no longer mounted
All writes happen strictly after processing completes — you never see partial effects from a processing failure or interrupt. Each target backend applies its batch atomically when supported (e.g., within a database transaction), but changes across different target backends are not transactional with each other.
## What happens when a component fails
CocoIndex processes each component in two phases: **processing** (running your function, declaring target states) and **submit** (writing changes to target backends).
### Failure isolation
The framework's general rule: **a call raises on failure iff the failed work was on the critical path for the call to return.**
- **`use_mount()`** — you're awaiting the child's *value*, so the child succeeding is on the critical path. The child's exception propagates to the parent.
- **`mount()` and `mount_each()`** — these return as soon as the work is *scheduled* (you get a handle back). The child's execution runs in the background, off your critical path. A failure in one child does **not** affect the parent or siblings — by default the exception is logged and other components continue. One bad file shouldn't take down the entire pipeline.
To react to background failures, you can:
- Install [exception handlers](../advanced_topics/exception_handlers#exception-handlers) — global or scoped — to send alerts, record metrics, or implement custom logic. A handler that raises propagates the failure through `await handle.ready()` if you choose to await it.
- [Monitor app progress](../advanced_topics/progress_monitoring) — `UpdateStats` exposes per-component stats including error counts, so you can detect failures programmatically.
For the full picture — including the critical-path principle applied to every API, interrupted update recovery, and the exception handler API — see [Error Handling](../advanced_topics/exception_handlers).
### No rollback, convergent roll-forward
CocoIndex does not roll back partial writes. The two-phase design makes this safe:
- **Processing** is side-effect-free — it only declares target states in memory. If processing fails (e.g., a parsing error), no writes were attempted, so there's nothing to undo.
- **Submit** writes changes to target backends. If a submit fails partway through (e.g., a database connection drops), some writes may have been applied. CocoIndex does not attempt to undo them. Instead, on the next run CocoIndex computes the current desired state, and the target connector reconciles against all possible previous states — converging the target to the correct state regardless of what was partially applied. This is why built-in connectors use convergent operations like upserts (`INSERT ... ON CONFLICT DO UPDATE`) rather than plain inserts.
## How big should a processing component be?
When defining processing components, think about granularity — what one path represents — because it determines the sync boundary for target states.
For example, if you're processing files:
- **Coarse**: one component for "all files" (`coco.component_subpath("process")`)
- **Medium**: one component per file (`coco.component_subpath("process", file_path)`)
- **Fine**: one component per chunk (`coco.component_subpath("process", file_path, chunk_id)`)
In general:
- **Coarse-grained** (fewer, larger components): More target states sync together as a unit, but you only see updates after the larger component finishes.
- **Fine-grained** (more, smaller components): Each component syncs its target states as soon as it finishes, but target states owned by different components do not sync together as a unit.
For small datasets, a single processing component that owns all target states is simple and ensures all target states sync together. As data grows, consider breaking it down into one component per source item (e.g., one per file) to reduce latency: you see each item's target states synced as soon as it's processed, without waiting for the full dataset to complete. This also helps isolate failures to that item.
## Explicit context management
CocoIndex automatically propagates component context through Python's `contextvars`, which works for ordinary function calls (both sync and async). However, in situations where context variables are not preserved (for example, when using `concurrent.futures.ThreadPoolExecutor`), you need to explicitly capture and attach the context.
Use `coco.get_component_context()` to capture the current context, and `context.attach()` to restore it:
```python
from concurrent.futures import ThreadPoolExecutor
@coco.fn
def app_main() -> None:
# Capture the current context
ctx = coco.get_component_context()
def worker(item):
# Attach the context in the worker thread
with ctx.attach():
# Now CocoIndex APIs work correctly
process_item(item)
with ThreadPoolExecutor() as executor:
executor.map(worker, items)
```
This pattern ensures that CocoIndex can track component relationships and target state ownership even across thread boundaries.
## Processing helpers
### `map()`
`map()` applies an async function to each item in a collection, running all calls concurrently within the current processing component. Unlike [`mount()`](#mount) and [`mount_each()`](#mount_each), it does **not** create child processing components — it's purely concurrent execution (similar to `asyncio.gather()`).
```python
@coco.fn(memo=True)
async def process_file(file: FileLike, table: postgres.TableTarget[DocEmbedding]) -> None:
chunks = splitter.split(await file.read_text())
id_gen = IdGenerator()
await coco.map(process_chunk, chunks, file.file_path.path, id_gen, table)
```
The first argument to the function receives each item; additional arguments are passed through to every call. `map()` returns a `list` of the results, in the same order as the input items.
#### When to use `map()` vs `mount_each()`
- Use **`mount_each()`** when each item should be its own processing component — with its own component path, target state ownership, and target states sync boundary.
- Use **`map()`** when you want to process items concurrently *within* the current component, without creating new component boundaries. This is common for sub-item work like processing chunks within a file — the same within-component chunk work the [chunk-embedding pipeline in Core Concepts](./core_concepts#processing-component) walks through.
---
# Declaring functions with @coco.fn
Source: https://cocoindex.io/docs/programming_guide/function/
**Note — Prerequisite**
This page builds on [Core Concepts](./core_concepts), which introduces change detection and function memoization **with diagrams**. If memoization feels abstract, start there.
It's common to factor work into helper functions (for parsing, chunking, embedding, formatting, etc.). In CocoIndex, you can decorate any Python function with `@coco.fn` when you want to add incremental capabilities to it. The decorated function is still a normal Python function: its signature stays the same, and you can call it normally.
```python
@coco.fn
async def process_file(file: FileLike) -> str:
return await file.read_text()
# Can be called like any normal function
result = await process_file(file)
```
`@coco.fn` preserves the sync/async nature of the underlying function. Decorating a sync function yields a sync function; decorating an async function yields an async function.
## How to think about `@coco.fn`
Decorating a function tells CocoIndex that calls to it are part of the incremental update engine. You still write normal Python, but CocoIndex can now:
- Detect when inputs or code have changed (change detection)
- Skip work when nothing has changed (memoization)
This is what lets CocoIndex avoid rerunning expensive steps on every `app.update()`. See [Processing Component](./processing_component) for how decorated functions are mounted at component paths.
If you don't need any of the above for a helper, keep it as a plain Python function.
## Change detection and memoization
Every `@coco.fn` function participates in CocoIndex's change detection system. With `memo=True`, the function's results are cached and reused when nothing has changed. These two mechanisms — detecting changes and acting on them — work together to enable incremental updates. For the big-picture view — how memoization skips unchanged work across both input and code changes — see the [Function memoization section in Core Concepts](./core_concepts#function-memoization-skip-unchanged-computations).
**Tip — Mental model**
A function's behavior is determined by its **logic** (source code, [`deps`](#deps), [`version`](#version)), its **inputs** (arguments), and the **context values** it reads via [`use_context()`](./context#retrieving-values). If none of those have changed, a memoized function returns its cached result without re-executing. You don't need to reason about *how* CocoIndex tracks any of this; just trust the contract.
### Change detection
CocoIndex detects three kinds of changes:
**Logic changes** — the function's source code, [`deps`](#deps) values, and explicit [`version`](#version) bumps. Tracked by `@coco.fn`. Logic fingerprints **propagate transitively** up the call chain — but only through `@coco.fn` boundaries. If `foo` (memoized) calls `bar` (also `@coco.fn`-decorated, with or without `memo=True`), and `bar`'s logic changes, `foo`'s memo is invalidated. A bare Python helper that isn't decorated is invisible to change detection: editing it will not invalidate `foo`. This is why `@coco.fn` matters for **any** function in the call chain, not just memoized ones — see [Common patterns](#common-patterns).
**Input changes** — the function's arguments. Tracked by `@coco.fn`. When you call a function with different arguments, the fingerprints change. Input fingerprints do not propagate transitively.
**Context changes** — [context values](./context) with [`detect_change=True`](./context#change-detection). Tracked by [`use_context()`](./context#retrieving-values) at the call site — independent of `@coco.fn`. Context reads propagate transitively: if a memoized `foo` calls `bar` and `bar` reads a change-detected context value, then changing that value invalidates `foo`'s memo too — even though `foo` itself never called `use_context()`. What matters is whether the key was read *anywhere* during the memoized call, not where in the call chain.
### Memoization
With `memo=True`, the function's result is cached. On subsequent calls, if no logic, input, or read context values have changed, the cached result is reused without executing the function body — it carries over [target states](./target_state) declared during the function's previous invocation and returns its previous return value.
```python
@coco.fn(memo=True)
def process_chunk(chunk: Chunk) -> list[float]:
# This computation is skipped if chunk, logic, and context are unchanged
return embed(chunk.text)
```
**Info — Type annotations**
Add a **return type annotation** to memoized functions so CocoIndex can properly reconstruct cached values. Without a type annotation, cached values may deserialize as basic Python types (`dict`, `list`, etc.) instead of their original types. See [Serialization](./serialization) for details on supported types.
**Caution — `memo=True` constraints**
A memoized function:
- **Must run inside a [processing component](./processing_component)** for memoization to take effect. Outside a component context (e.g., called directly from a script or test), the function still executes correctly but the cache is bypassed silently — every call runs the body.
- **Cannot mount child components** (`coco.mount(...)` / `coco.use_mount(...)` inside the body). Mounting is a side effect the cache cannot replay; CocoIndex raises an error if a memoized function attempts it. Either drop `memo=True`, or restructure so the mount happens in a non-memoized caller.
#### Cache-hit semantics
When a memoized function's cache hits, its **body does not run** — and neither do any nested `@coco.fn` calls inside it. The cached output is replayed directly: the previous return value is returned, and any target states the function declared on its previous run are carried over.
```python
@coco.fn(memo=True)
async def inner(text: str) -> str:
print("inner ran")
return await call_llm(text)
@coco.fn(memo=True)
async def outer(text: str) -> str:
print("outer ran")
return await inner(text) + "!"
# First call: both print, both bodies execute
result = await outer("hello")
# Second call with same inputs: nothing prints — outer's cached value is returned
# directly, and inner is not invoked at all.
result = await outer("hello")
```
Propagation (logic, `deps`, context) is recorded during the **previous** invocation and replayed on cache lookup. You don't need to model the internals — change anything that affects behavior and the cache invalidates correctly.
**Note — Exceptions and the cache**
If a memoized function raises, no cache entry is written for that call. The next invocation with the same inputs sees a cache miss and re-executes the body — exceptions never poison the cache, so you don't need to wrap calls defensively.
**Tip — When to memoize**
**Cost:** Function return values must be stored for memoization. Larger return values mean higher storage costs.
**Benefit:** Memoization saves more when:
- The computation is expensive
- The function's caller is reprocessed frequently (due to logic or input changes)
**Examples:**
- ✅ **Embedding functions** — good to memoize. Computation is heavy; return value is fixed-size and not too large.
- ❌ **Splitting text into fixed-size chunks** — usually not worth memoizing. Computation is light; return value can be large.
- ✅ **Processing component for files that are mostly stable between runs** — very beneficial to memoize, since unchanged files are skipped entirely. We can save the cost of reading file content and processing them when they haven't changed.
- 🤔 **Chunk embedding when file-level memoization is already enabled** — still beneficial, but less so for stable files. The benefit increases for files that change frequently, or when your code evolves (e.g., adding more features per file triggers file-level reprocessing, but unchanged chunks can still skip embedding).
### Controlling change detection scope
Three parameters on `@coco.fn` let you customize how logic changes are detected:
- **`logic_tracking`** — controls the *scope* of automatic logic change detection
- **`version`** — provides explicit manual control over when dependent memos are invalidated
- **`deps`** — declares external values (e.g. a module-level prompt string) as part of the function's logic, so changing them invalidates dependent memos
These parameters control logic fingerprinting. Data fingerprinting (for arguments, `deps` values, and context values) is controlled by the objects themselves (see [Memoization Keys & States](../advanced_topics/memoization_keys)).
#### `logic_tracking`
The `logic_tracking` parameter controls whether and how logic changes are detected:
- **`"full"` (default):** Track this function's logic AND all transitively called `@coco.fn` functions' logic. A change anywhere in the call chain invalidates dependent memos.
- **`"self"`:** Track only this function's own logic. Changes in called functions do not propagate through this function.
- **`None`:** Don't track this function's logic at all. Logic changes to this function are invisible to the change detection system.
#### `version`
The `version` parameter lets you explicitly invalidate dependent memos by bumping an integer:
```python
@coco.fn(version=2)
def process_chunk(chunk: Chunk) -> list[float]:
# Bumping version invalidates all memoized callers, even if code looks the same
return embed(chunk.text)
```
#### `deps`
The `deps` parameter declares external value(s) the function logic depends on but that aren't visible in its body — for example a prompt string or a model identifier defined at module scope. When the value changes, the function's logic fingerprint changes and dependent memos are invalidated, exactly as if the function body had been edited.
```python
SYSTEM_PROMPT = "You are a helpful assistant. Be concise."
@coco.fn(memo=True, deps=SYSTEM_PROMPT)
def summarize(text: str) -> str:
# Editing SYSTEM_PROMPT invalidates this function's memo
# (and propagates to memoized callers) just like a logic change would.
return call_llm(SYSTEM_PROMPT, text)
```
For multiple dependencies, pass a tuple or dict:
```python
SYSTEM_PROMPT = "..."
MODEL = "claude-haiku-4-5"
@coco.fn(memo=True, deps={"prompt": SYSTEM_PROMPT, "model": MODEL})
def summarize(text: str) -> str:
return call_llm(SYSTEM_PROMPT, text, model=MODEL)
```
The value is canonicalized through the [memoization-key pipeline](../advanced_topics/memoization_keys), which honors `__coco_memo_key__()`, registered memo key functions, and the standard handling for primitives, dataclasses, and Pydantic models.
**Caution — Snapshotted at decoration time**
`deps` is evaluated **once** when the decorator is applied (typically at module import), not re-evaluated per call. For per-call or per-instance values — instance attributes in a bound method, request-scoped config, anything that changes at runtime — pass them as regular function arguments instead, so the memoization layer observes each new value.
`deps` requires `logic_tracking` to be enabled; combining `deps=` with `logic_tracking=None` raises `ValueError`.
#### Common patterns
**Tip — `@coco.fn` on non-memoized helpers**
You can — and often should — decorate helpers with `@coco.fn` even without `memo=True`. The decorator's job is to make the function's logic *visible* to the change detection system. A bare helper is invisible: editing it will not invalidate any memoized caller, leading to silently stale results. Add `@coco.fn` so its fingerprint propagates; only add `memo=True` if caching its return value is worth it.
These parameters can be set on any `@coco.fn` function — not just memoized ones.
**Fully automatic (default)** — use `logic_tracking="full"` (or omit it) without setting `version`. Any logic change in the function or its callees invalidates dependent memos. This always just works.
```python
@coco.fn
async def process_file(file: FileLike) -> list[Chunk]:
# Any change here or in called @coco.fn functions invalidates dependent memos
text = await file.read_text()
return split_and_embed(text)
```
**Manual, precise control** — use `logic_tracking="self"` with `version`. You decide what counts as a behavior change by bumping `version`, without being affected by implementation detail changes (performance optimizations, logging tweaks, refactoring, etc.).
```python
@coco.fn(logic_tracking="self", version=3)
async def process(data: str) -> str:
# Bump version when behavior changes (e.g., new output format).
# Internal refactors or logging changes won't trigger reprocessing.
return await transform(data)
```
**Opt out of tracking** — use `logic_tracking=None` for functions with a stable contract (where logic changes don't affect output), or functions whose changes don't affect behavior (e.g., logging, performance hints). This prevents unnecessary reprocessing when only internals change.
```python
@coco.fn(logic_tracking=None)
def embed(text: str) -> list[float]:
# Contract is stable: same input always produces the same embedding.
# Internal changes (e.g., switching backends) are handled by version bumps.
return model.encode(text)
```
**Note**
[Context changes](./context#change-detection) are independent of `@coco.fn` and `logic_tracking`. Even with `logic_tracking=None`, a change in a change-detected context value still invalidates dependent memos, because context tracking is done by `use_context()`, not by the decorator.
### Debugging unexpected re-runs
If a memoized function is re-running when you expect a cache hit, walk through the inputs in order:
1. **Logic** — did the function's source code change? Did any `@coco.fn` it transitively calls change? Was a `version` bumped? Was a `deps` value edited?
2. **Inputs** — are the arguments byte-identical to the previous call (after canonicalization)? Custom types with unstable equality are a common source of spurious invalidation — see [Memoization Keys & States](../advanced_topics/memoization_keys).
3. **Context** — did any [`detect_change=True`](./context#change-detection) context value read during the previous invocation change? Remember this propagates transitively through nested `@coco.fn` calls.
If none of those changed and the function still re-runs, the most common cause is a non-stable fingerprint on a custom-typed argument or context value — define `__coco_memo_key__` to make it deterministic.
Conversely, if a memo is hitting when you expect invalidation, common causes are:
- A logic change in a helper that is **not** decorated with `@coco.fn`. Add `@coco.fn` to it (no `memo=` needed) so its logic participates in propagation — see [Common patterns](#common-patterns).
- A `deps` value that you changed at runtime: `deps` is [snapshotted at decoration time](#deps), so per-instance or per-request values must be passed as regular arguments instead.
- The function uses [`logic_tracking=None`](#logic_tracking), which opts it out of code-change detection entirely.
### Customizing data fingerprinting
By default, CocoIndex fingerprints function arguments, `deps` values, and context values automatically for most types — primitives, containers, dataclasses, Pydantic models, and picklable objects. For custom types, or when you need multi-level validation (e.g., check mtime first, then content hash), see [Memoization Keys & States](../advanced_topics/memoization_keys).
For per-function overrides — excluding an argument from the memo key, or transforming it just for this function — pass `memo_key={...}` on `@coco.fn` / `@coco.fn.as_async`; see [Override at the call site with `memo_key=`](../advanced_topics/memoization_keys#override-at-the-call-site-with-memo_key).
## Execution capabilities
The following capabilities control *how* the function executes, independent of change detection and memoization.
### Async adapter
Use `@coco.fn.as_async` when you need an **async** interface for a function that has a sync underlying implementation. This is useful for compute-intensive leaf functions, and is required for features like [batching](#batching) and [runner](#runner).
```python
@coco.fn.as_async
def embed(text: str) -> list[float]:
return model.encode([text])[0]
# External usage: always async, even though the function body is sync
embedding = await embed("hello world")
```
`@coco.fn.as_async` is equivalent to wrapping the function in `asyncio.to_thread()` — the sync function runs on a thread pool and doesn't block the event loop.
You can also call any `@coco.fn`-decorated function asynchronously via the `.as_async()` method, without changing its primary signature:
```python
@coco.fn
def expensive_fn(data: bytes) -> bytes:
return process(data)
# Primary call is sync:
result = expensive_fn(data)
# Async call via .as_async():
result = await expensive_fn.as_async(data)
```
### Batching
With `batching=True`, multiple concurrent calls to the function are automatically batched together. This is useful for operations that are more efficient when processing multiple inputs at once, such as embedding models.
Batching requires an async interface. If the underlying function is sync, use `@coco.fn.as_async(batching=True)`. If the underlying function is already `async def`, `@coco.fn(batching=True)` works directly.
When batching is enabled:
- The function implementation receives a `list[T]` and returns a `list[R]`
- The external signature becomes `async T -> R` (single input, single output)
- Concurrent calls are collected and processed together
```python
@coco.fn.as_async(batching=True, max_batch_size=32)
def embed(texts: list[str]) -> list[list[float]]:
# Called with a batch of texts, returns a batch of embeddings
return model.encode(texts)
# External usage: async, single input, single output
embedding = await embed("hello world") # Returns list[float]
# Concurrent calls are automatically batched using asyncio.gather
embeddings = await asyncio.gather(
embed("text1"),
embed("text2"),
embed("text3"),
)
```
The `max_batch_size` parameter limits how many inputs can be processed in a single batch.
**Tip — When to use batching**
Batching is beneficial when:
- The underlying operation has significant per-call overhead (e.g., GPU kernel launch)
- The operation can process multiple inputs more efficiently than one at a time
- You have concurrent calls from multiple coroutines
Common use cases:
- **Embedding models** — most embedding APIs and models are optimized for batch processing
- **LLM inference** — batch multiple prompts together for better GPU utilization
- **Database operations** — batch inserts or lookups
### Runner
The `runner` parameter allows functions to execute in a specific context, such as a dedicated GPU runner that serializes GPU workloads.
Like batching, a runner requires an async interface. If the underlying function is sync, use `@coco.fn.as_async(runner=...)` to make it async. If the underlying function is already `async def`, `@coco.fn(runner=...)` works directly.
```python
@coco.fn.as_async(runner=coco.GPU)
def gpu_inference(data: bytes) -> bytes:
# Runs with GPU serialization
return model.predict(data)
# External usage: async
result = await gpu_inference(data)
```
The `coco.GPU` runner:
- By default, runs in-process with all functions sharing a queue for serial execution
- Sync functions run on a dedicated GPU thread to avoid blocking the event loop
- Set the environment variable `COCOINDEX_RUN_GPU_IN_SUBPROCESS=1` to run in a subprocess for GPU memory isolation
You can combine batching with a runner:
```python
@coco.fn.as_async(batching=True, max_batch_size=16, runner=coco.GPU)
def batch_gpu_embed(texts: list[str]) -> list[list[float]]:
# Batched execution with GPU serialization
return gpu_model.encode(texts)
# External usage: async
embedding = await batch_gpu_embed("hello world")
# Concurrent calls
embeddings = await asyncio.gather(
batch_gpu_embed("text1"),
batch_gpu_embed("text2"),
batch_gpu_embed("text3"),
)
```
**Note**
By default, `coco.GPU` runs functions in-process, so no pickling is required. When using subprocess mode (`COCOINDEX_RUN_GPU_IN_SUBPROCESS=1`), the function and all its arguments must be picklable since they are serialized for subprocess execution.
---
# Sharing resources via context
Source: https://cocoindex.io/docs/programming_guide/context/
CocoIndex provides a **context** mechanism for sharing resources across your pipeline. This is useful for database connections, API clients, configuration objects, or any resource that multiple processing components need to access.
## ContextKey
A `ContextKey[T]` is a typed key that identifies a resource. Define keys at module level:
```python
import asyncpg
import cocoindex as coco
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
# Database connection — no change detection (swapping credentials shouldn't reprocess)
PG_DB = coco.ContextKey[asyncpg.Pool]("text_embedding_db")
# Embedding model — with change detection (switching models should reprocess)
EMBEDDER = coco.ContextKey[SentenceTransformerEmbedder]("embedder", detect_change=True)
```
The type parameter (`asyncpg.Pool`, `SentenceTransformerEmbedder`) enables type checking — when you retrieve the value, your editor knows its type.
### Change detection
By default, context keys have **change detection disabled** — changing the provided value between runs does not automatically invalidate memoized functions that consumed it via `use_context()`. To opt in to change detection, pass `detect_change=True`. When enabled, [context changes](./function#change-detection) are their own category — tracked by `use_context()` at the call site, independent of `@coco.fn`. When a fingerprint changes, any memoized function whose execution involved a `use_context()` call on that key is invalidated.
Use `detect_change=True` for resources that affect computation results — models, configuration objects, etc. This ensures memoized functions re-execute when those values change. Resources that don't affect computation results — database connections, loggers, debug flags, monitoring clients — can use the default (`detect_change=False`).
**Tip**
Change detection is transitive: if function `foo` (memoized) calls function `bar`, and `bar` calls `use_context(key)` on a change-detected key, then `foo`'s memo is also invalidated when the context value changes.
## ContextKey as stable identity
Beyond sharing resources, a `ContextKey` also serves as the **stable identity** of the resource it points to. When you anchor sources or targets to a `ContextKey`, CocoIndex treats *the key itself* — not the underlying value — as the identifier across runs.
This has two consequences:
1. **The underlying value can change without losing tracked state.** Rotating credentials, moving a database, or relocating a directory won't invalidate memoization or managed state, as long as the same `ContextKey` is used.
2. **Renaming a `ContextKey` is a breaking change.** Two different keys are two different resources, even if they point to the same physical backend. Existing tracked state will be treated as orphaned. When migrating code, reuse the previous key name to preserve continuity.
**Tip — Naming convention**
Pick a `ContextKey` name that reflects the *logical* role of the resource, not its current address. The name is what CocoIndex persists.
- **Applications**: use any descriptive name — e.g., `"text_embedding_db"`, `"docs_root"`.
- **Libraries**: prefix with your package name and a `/` to avoid collisions with application keys or other libraries — e.g., `"my_library/db"`, `"cocoindex.connectors.postgres/pool"`.
## Providing values
In your [lifespan function](./app#defining-a-lifespan), use `builder.provide()` to make resources available:
```python
from typing import AsyncIterator
from cocoindex.connectors import postgres
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
async with await asyncpg.create_pool(DATABASE_URL) as pool:
builder.provide(PG_DB, pool)
builder.provide(EMBEDDER, SentenceTransformerEmbedder(EMBED_MODEL))
yield
```
The resource is available for the lifetime of the environment. When the lifespan exits (after `yield`), cleanup happens automatically if you use a context manager pattern.
## Retrieving values
In processing components, use `coco.use_context()` to retrieve provided resources:
```python
@coco.fn
async def process_chunk(chunk: Chunk, table: postgres.TableTarget[DocEmbedding]) -> None:
# Retrieve the embedder from context
embedding = await coco.use_context(EMBEDDER).embed(chunk.text)
table.declare_row(row=DocEmbedding(text=chunk.text, embedding=embedding, ...))
```
Some connectors also accept `ContextKey`s directly as a convenience — for example, `postgres.mount_table_target()` takes a `ContextKey[asyncpg.Pool]` and resolves the connection internally:
```python
@coco.fn
async def app_main(sourcedir: pathlib.Path) -> None:
# PG_DB is resolved internally by the connector
table = await postgres.mount_table_target(
PG_DB,
table_name="doc_embeddings",
table_schema=await postgres.TableSchema.from_class(DocEmbedding, primary_key=["id"]),
)
# ... mount processing components ...
```
## Complete example
Here's a complete pipeline that uses context to share a database connection and an embedding model across processing components:
```python
from __future__ import annotations
import pathlib
from dataclasses import dataclass
from typing import AsyncIterator, Annotated
import asyncpg
from numpy.typing import NDArray
import cocoindex as coco
from cocoindex.connectors import localfs, postgres
from cocoindex.ops.text import RecursiveSplitter
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
from cocoindex.resources.chunk import Chunk
from cocoindex.resources.file import FileLike, PatternFilePathMatcher
from cocoindex.resources.id import IdGenerator
DATABASE_URL = "postgres://cocoindex:cocoindex@localhost/cocoindex"
EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
# 1. Define context keys at module level
PG_DB = coco.ContextKey[asyncpg.Pool]("text_embedding_db")
EMBEDDER = coco.ContextKey[SentenceTransformerEmbedder]("embedder", detect_change=True)
_splitter = RecursiveSplitter()
# 2. Provide values in the lifespan
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
async with await asyncpg.create_pool(DATABASE_URL) as pool:
builder.provide(PG_DB, pool)
builder.provide(EMBEDDER, SentenceTransformerEmbedder(EMBED_MODEL))
yield
# 3. Use EMBEDDER in type annotations (for vector column schema)
@dataclass
class DocEmbedding:
id: int
filename: str
text: str
embedding: Annotated[NDArray, EMBEDDER] # dimension resolved from context
# 4. Retrieve values in processing functions
@coco.fn
async def process_chunk(
chunk: Chunk,
filename: pathlib.PurePath,
id_gen: IdGenerator,
table: postgres.TableTarget[DocEmbedding],
) -> None:
table.declare_row(
row=DocEmbedding(
id=await id_gen.next_id(chunk.text),
filename=str(filename),
text=chunk.text,
embedding=await coco.use_context(EMBEDDER).embed(chunk.text),
),
)
@coco.fn(memo=True)
async def process_file(
file: FileLike,
table: postgres.TableTarget[DocEmbedding],
) -> None:
text = await file.read_text()
chunks = _splitter.split(text, chunk_size=2000, chunk_overlap=500, language="markdown")
id_gen = IdGenerator()
await coco.map(process_chunk, chunks, file.file_path.path, id_gen, table)
# 5. PG_DB used directly by the connector (resolved internally)
@coco.fn
async def app_main(sourcedir: pathlib.Path) -> None:
table = await postgres.mount_table_target(
PG_DB,
table_name="doc_embeddings",
table_schema=await postgres.TableSchema.from_class(
DocEmbedding, primary_key=["id"],
),
)
files = localfs.walk_dir(
sourcedir,
recursive=True,
path_matcher=PatternFilePathMatcher(included_patterns=["**/*.md"]),
)
await coco.mount_each(process_file, files.items(), table)
app = coco.App(
coco.AppConfig(name="TextEmbedding"),
app_main,
sourcedir=pathlib.Path("./markdown_files"),
)
```
## Accessing context outside processing components
If you need to access context values outside of CocoIndex processing components — for example, in query/serving logic that shares resources with your indexing pipeline — use `env.get_context()`:
```python
# Sync API
db = coco.default_env().get_context(PG_DB)
```
```python
# Async API
db = (await coco.default_env()).get_context(PG_DB)
```
This is useful when your application runs both indexing and serving in the same process and you want to initialize shared resources (like database connection pools or configuration) once in the lifespan.
**Note**
`default_env()` starts the environment if it hasn't been started yet, which runs the lifespan function. If you're using an explicit environment, call `get_context()` directly on that environment instance.
---
# Running in live mode
Source: https://cocoindex.io/docs/programming_guide/live_mode/
By default, calling `app.update()` runs in **catch-up mode**: it scans all sources, processes what changed since the last run (memoized components are skipped, so unchanged work is not redone), syncs target states, and returns. Targets are caught up to the moment the run started, and that's it — to pick up further changes, you call `update()` again.
So catch-up mode is already incremental — but each call still has to scan sources to discover what changed, and changes are only picked up when you trigger a new run.
**Live mode** keeps the app running after catch-up finishes and lets components stream changes continuously from their sources (e.g. a file system watcher or a database change feed), applying them to target states with very low latency. This is useful when:
- You want near-real-time reactions to source changes, instead of waiting for the next `update()` call
- Your sources can push changes more efficiently than a full rescan can discover them
Two things are needed for live mode to work: the app must be **enabled** to stay running, and somewhere in the component tree a component must **react** to changes.
## Enabling live mode
Pass `live=True` when updating the app:
```python
app.update_blocking(live=True)
# Or async
handle = app.update(live=True)
await handle.result()
```
From the CLI:
```bash
cocoindex update --live my_app.py
# or
cocoindex update -L my_app.py
```
The `live` flag propagates top-down through the component tree — both `coco.mount()` and `coco.use_mount()` inherit `live` from the parent, so children are live when the app is live.
Without `live=True` on the app, the app runs in catch-up mode — everything completes after the initial scan, even if a source supports live watching.
## Reacting to changes
Enabling live mode keeps the app running, but something in the component tree needs to actually watch for changes. That something is a [**LiveComponent**](../advanced_topics/live_component) — a component with a long-running `process_live()` method that delivers incremental updates.
You rarely need to write a `LiveComponent` manually. The two most common patterns are:
### Sources with `LiveMapView` or `LiveMapFeed`
Source connectors can provide live capabilities via two [protocols](../advanced_topics/live_component#livemapfeed-and-livemapview):
- **`LiveMapView`** — the source has scannable current state (e.g., a directory or database table). It does a full scan first, then watches for changes. Example: [`localfs.walk_dir(live=True).items()`](../connectors/localfs#live-file-watching).
- **`LiveMapFeed`** — the source only streams changes, with no snapshot to scan (e.g., a Kafka consumer). All data arrives via the change stream. Example: [`kafka.topic_as_map()`](../connectors/kafka#as-source).
When `mount_each()` receives either, it automatically creates a `LiveComponent` internally that:
1. **Scans current state** (if available) — iterates all items and mounts a processing component for each
2. **Signals readiness** — the initial scan is complete (or the stream has caught up), target states are synced
3. **Watches for changes** — the source delivers incremental updates:
- New or modified items → re-mount the affected component
- Deleted items → remove the component and its target states
CocoIndex handles change detection, memoization, and target state reconciliation the same way as in catch-up mode.
Without live support on the source, `mount_each()` falls back to catch-up behavior — a one-time iteration over items and that's it.
### Periodic refresh with `coco.auto_refresh`
When the source isn't change-aware but you still want fresh data — say, polling a REST endpoint or re-reading a database table that doesn't emit change events — wrap your processor function in [`coco.auto_refresh`](../advanced_topics/live_component#example-periodic-refresh-with-cocoauto_refresh). It runs your function once, signals readiness, then re-runs it on a fixed delay:
```python
import datetime
import cocoindex as coco
async def sync_users(db, target) -> None:
rows = await db.fetch_all_users()
for row in rows:
target.declare_row(row=UserRow(...))
@coco.fn
async def app_main(db, target) -> None:
await coco.mount(
coco.auto_refresh(sync_users, interval=datetime.timedelta(minutes=5)),
db, target,
)
app = coco.App(coco.AppConfig(name="UserSync"), app_main, db=..., target=...)
app.update_blocking(live=True)
```
**Catch-up compatibility:** in catch-up mode (the default), `auto_refresh` runs `sync_users` once and exits — observationally identical to mounting `sync_users` directly. The interval is ignored. Same pipeline, choose catch-up or live at run time.
**Handling deletes:** each cycle's declarations are reconciled against the previous run. If a row disappears from the source table between polls, `sync_users` simply doesn't declare it that cycle — CocoIndex automatically deletes the corresponding target. You don't need to track deletions yourself.
### Decoupling stages with an in-memory `LiveMap`
When one part of your pipeline produces keyed data and another consumes it — without going through an external system — put an in-memory [`LiveMap`](../common_resources/live_map) between them. Producers declare `(key, value)` entries; the consumer reads it as a `LiveMapView` via `mount_each`, reacting to adds, updates, and deletes just like a live source.
## Examples
### `localfs` — file watching with `LiveMapView`
The [`localfs`](../connectors/localfs) connector supports live mode via `walk_dir(..., live=True)`, which watches for file system changes using `watchfiles`:
```python
@coco.fn
async def app_main(sourcedir: pathlib.Path, outdir: pathlib.Path) -> None:
files = localfs.walk_dir(
sourcedir, recursive=True,
path_matcher=PatternFilePathMatcher(included_patterns=["**/*.md"]),
live=True, # items() returns a LiveMapView
)
await coco.mount_each(process_file, files.items(), outdir)
app = coco.App(coco.AppConfig(name="FilesTransform"), app_main, sourcedir=..., outdir=...)
app.update_blocking(live=True)
```
**Catch-up compatibility:** `LiveMapView` sources also work without `live=True` — they do the initial full scan and exit cleanly. You can write your pipeline once and choose catch-up or live at run time.
For a complete working example, see [`files_transform`](https://github.com/cocoindex-io/cocoindex/tree/main/examples/files_transform).
### `kafka` — consuming a topic with `LiveMapFeed`
The [`kafka`](../connectors/kafka) connector treats a topic as a live keyed map — each message is an upsert or delete for a key. Since there's no snapshot to scan, it returns a `LiveMapFeed`:
```python
from confluent_kafka.aio import AIOConsumer
from cocoindex.connectors import kafka
@coco.fn
async def app_main() -> None:
consumer = AIOConsumer({
"bootstrap.servers": "localhost:9092",
"group.id": "my-group",
"enable.auto.commit": "false",
})
items = kafka.topic_as_map(consumer, ["my-topic"])
await coco.mount_each(process_message, items)
app = coco.App(coco.AppConfig(name="KafkaConsumer"), app_main)
app.update_blocking(live=True)
```
## Going deeper
The abstractions behind live mode, from most general to most specific:
- **[LiveComponent](../advanced_topics/live_component)** — the underlying protocol for components that react to changes incrementally. Most flexible — full control over the lifecycle.
- **[LiveMapFeed / LiveMapView](../advanced_topics/live_component#livemapfeed-and-livemapview)** — represents a changing collection of keyed items. `mount_each()` uses it to construct a `LiveComponent` automatically. Connector authors implement this to add live support.
- **[`coco.auto_refresh`](../advanced_topics/live_component#example-periodic-refresh-with-cocoauto_refresh)** — wraps a regular processor function as a `LiveComponent` that re-runs on a fixed interval. Use when there's no change-event source.
- **Source connectors** — provide `LiveMapView` (e.g., [`localfs`](../connectors/localfs)) or `LiveMapFeed` (e.g., [`kafka`](../connectors/kafka)) from their source APIs. Users just pass the result to `mount_each()`.
For custom change feeds, fine-grained lifecycle control, or implementing live map protocols on your own connector, see [Live Components](../advanced_topics/live_component).
---
# Value serialization for memoization
Source: https://cocoindex.io/docs/programming_guide/serialization/
## Overview
CocoIndex serializes and caches the return values of [memoized functions](./function#memoization) so that unchanged work can be skipped on subsequent runs. Most Python types work automatically — the key thing to get right is the **return type annotation**, which tells CocoIndex how to reconstruct your objects:
```python
@coco.fn(memo=True)
async def process_chunk(chunk: Chunk) -> Embedding: # return type annotation
return embed(chunk.text)
```
Without annotations, values may deserialize as basic Python types (`dict`, `list`, `str`, etc.) instead of their original types.
**Info — Advanced: other places where serialization and type annotations matter**
Serialization also applies to [memo states](../advanced_topics/memoization_keys#memo-state-validation) and [tracking records](../advanced_topics/custom_target_connector#targethandler-you-implement). If you're implementing these, add type annotations to:
- **`__coco_memo_state__` `prev_state` parameter** — annotate with the state type you return in `MemoStateOutcome(state=...)`. See [Memo state validation](../advanced_topics/memoization_keys#memo-state-validation).
- **`reconcile()` `prev_possible_records` parameter** — annotate with `Collection[YourTrackingRecord]`. See [Custom Target Connector](../advanced_topics/custom_target_connector#targethandler-you-implement).
## Supported types
The following types all work out of the box — no registration needed:
| Category | Types |
|----------|-------|
| **Primitives** | `bool`, `int`, `float`, `str`, `bytes`, `None` |
| **Collections** | `list`, `tuple`, `dict`, `set`, `frozenset` |
| **Dataclasses** | Any `@dataclass` (including frozen) |
| **NamedTuples** | Any `NamedTuple` subclass |
| **Pydantic models** | Any `pydantic.BaseModel` subclass |
| **msgspec Structs** | Any `msgspec.Struct` subclass |
| **Date/time** | `datetime.datetime`, `datetime.date`, `datetime.time`, `datetime.timedelta`, `datetime.timezone` |
| **Other stdlib** | `uuid.UUID`, `complex`, `pathlib.Path`, `pathlib.PurePath` |
| **NumPy** | `numpy.ndarray`, `numpy.dtype` (when numpy is installed) |
More generally, all types [supported by msgspec](https://msgspec.dev/supported-types) work automatically. These types also work when nested inside collections or other structured types.
## Custom types
If your type isn't in the list above, register it with `@coco.serialize_by_pickle`:
```python
import cocoindex as coco
@coco.serialize_by_pickle
class MySpecialType:
def __init__(self, data):
self.data = data
```
For third-party types, call it as a regular function:
```python
import cocoindex as coco
from some_library import SomeType
coco.serialize_by_pickle(SomeType)
```
**Caution — Not for dataclasses, NamedTuples, or `msgspec.Struct`**
Don't apply `@coco.serialize_by_pickle` to dataclasses, NamedTuples, or `msgspec.Struct` — these are already supported natively. Applying it only works at the top level; when nested inside another supported type, the native encoding takes precedence and the decorator has no effect.
If serialization fails because of a problematic *field* inside a dataclass, register that field's type with `@coco.serialize_by_pickle` instead.
## Union types
Unions of a custom type with `None` work fine (`MyDataclass | None`). However, unions involving multiple custom types or a custom type with other non-`None` types require tagged `msgspec.Struct` variants.
For example, this **won't work**:
```python
from dataclasses import dataclass
@dataclass
class Config:
value: int
class Settings(NamedTuple):
config: Config | str # fails at deserialization
```
**Fix** — wrap each variant in a tagged [`msgspec.Struct`](https://msgspec.dev/structs#tagged-unions). The `tag=True` parameter embeds a type tag in the serialized data so that the correct variant can be identified during deserialization:
```python
import msgspec
class ConfigValue(msgspec.Struct, tag=True):
value: int
class StringValue(msgspec.Struct, tag=True):
value: str
class Settings(NamedTuple):
config: ConfigValue | StringValue # works — variants are distinguished by tag
```
## Troubleshooting
### `DeserializationError: Cannot build msgspec Decoder`
This typically means an unsupported union type. The error message includes a hint about the cause.
**Fix**: Restructure the union to use tagged `msgspec.Struct` variants. See [Union types](#union-types) above.
### `DeserializationError: Failed to deserialize msgspec payload`
The type annotation doesn't match the serialized data. Common causes:
- **Missing return type annotation** on a memoized function — add `-> YourType` to the function signature.
- **Changed type structure** between runs — if you renamed or restructured a dataclass, the cached data won't match. Rebuild the cache by running [`app.update(full_reprocess=True)`](./app#updating-an-app) or [`cocoindex update --full-reprocess`](../cli).
- **Forward reference not resolved** — if your type annotation uses a string forward reference, ensure the type is defined before the function is first called.
### `UnpicklingError: Forbidden global during unpickling`
```
_pickle.UnpicklingError: Forbidden global during unpickling: myapp.models.Summary
```
CocoIndex restricts which types can be deserialized for security. This error means your type isn't in the allow-list. Fix by either:
1. **Converting to a dataclass or NamedTuple** (recommended — supported natively, no registration needed)
2. **Using `@coco.serialize_by_pickle`** to register the type
**Note — Upgrading from older versions**
If you see this error after upgrading, previously cached data may reference types that aren't yet registered. You have two options:
- Add `@coco.serialize_by_pickle` to the type and re-run.
- If the type is already a dataclass or NamedTuple, add `@coco.unpickle_safe` to allow reading the old cached data. Once the cache is rebuilt, the decorator can be removed.
---
# Python SDK overview
Source: https://cocoindex.io/docs/programming_guide/sdk_overview/
This document provides an overview of the CocoIndex Python SDK organization and how async and sync APIs work together.
## Package organization
The CocoIndex SDK is organized into several modules:
### Core package
| Package | Description |
|---------|-------------|
| `cocoindex` | All core APIs — async by default, sync variants have a `_blocking` suffix |
### Sub-packages
| Package | Description |
|---------|-------------|
| `cocoindex.connectors` | Connectors for data sources and targets |
| `cocoindex.resources` | [Common resources](../common_resources/data_types) — shared data types, vector schema annotations, and ID generation utilities |
| `cocoindex.ops` | Built-in operations for common data processing tasks (e.g., text splitting, embedding with SentenceTransformers) |
Import connectors and extras by their specific sub-module:
```python
from cocoindex.connectors import localfs, postgres
from cocoindex.ops.text import RecursiveSplitter
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
from cocoindex.resources.file import FileLike, PatternFilePathMatcher
from cocoindex.resources.chunk import Chunk
```
## Common types
### StableKey
`StableKey` is a type alias defining what values can be used when creating component paths via `coco.component_subpath()`:
```python
StableKey = None | bool | int | str | bytes | uuid.UUID | Symbol | tuple[StableKey, ...]
```
Common examples include strings (like `"setup"` or `"table"`), integers, and UUIDs. Tuples allow composite keys when needed.
`Symbol` provides predefined names that will never conflict with strings (which typically come from runtime data).
Each processing component must be mounted at a unique path. See [Processing Component](./processing_component) for how the component path tree affects target states and ownership.
## Async vs sync APIs
CocoIndex's API is **async-first**. The APIs fall into three categories:
### Orchestration APIs (async only)
The APIs that shape your pipeline are async:
`mount()`, `use_mount()`, `mount_each()`, `mount_target()`, `map()`
### Entry-point APIs (async + sync)
APIs for starting and running your pipeline have both async and sync variants. Sync variants use a `_blocking` suffix:
| Async | Sync (blocking) |
|-------|-----------------|
| `await app.update(...)` | `app.update_blocking(...)` |
| `await app.drop(...)` | `app.drop_blocking(...)` |
| `await coco.start()` | `coco.start_blocking()` |
| `await coco.stop()` | `coco.stop_blocking()` |
| `async with coco.runtime():` | `with coco.runtime():` |
`app.update()` returns an `UpdateHandle` that is also awaitable — `await app.update()` returns the result directly, or you can use the handle for [progress monitoring](../advanced_topics/progress_monitoring). Use the `_blocking` variants for scripts and CLI usage. See [App](./app) for details.
### Processing functions (your choice)
The `@coco.fn` decorator preserves the sync/async nature of your function — your processing functions can be sync or async. See [Function](./function) for details.
## How sync and async work together
Like any async Python program, **async functions can call into sync code, but not the other way around**. In practice, this means higher-level functions (orchestration) tend to be async, while leaf functions (the actual computation) can be sync.
CocoIndex provides two ways for async code to call into sync functions:
- **Mounting** — When you mount a processing component, the function is scheduled on CocoIndex's runtime, not called directly. So an async function can mount a sync processing function.
- **`@coco.fn.as_async`** — Wraps a sync function with an async interface (runs on a thread pool). Useful for compute-intensive leaf functions. See [Function](./function) for details.
### Example: async orchestration mounting sync leaf functions
A typical pipeline has an async main function that orchestrates the pipeline, while leaf functions that do the actual computation can be sync:
```python
import pathlib
import cocoindex as coco
from cocoindex.connectors import localfs
from cocoindex.resources.file import PatternFilePathMatcher
from docling.document_converter import DocumentConverter
_converter = DocumentConverter()
@coco.fn(memo=True)
def process_file(file: localfs.File, outdir: pathlib.Path) -> None:
# Sync leaf function — does the actual computation
markdown = _converter.convert(
file.file_path.resolve()
).document.export_to_markdown()
outname = file.file_path.path.stem + ".md"
localfs.declare_file(outdir / outname, markdown, create_parent_dirs=True)
@coco.fn
async def app_main(sourcedir: pathlib.Path, outdir: pathlib.Path) -> None:
# Async — orchestrates the pipeline, mounts child components
files = localfs.walk_dir(
sourcedir,
recursive=True,
path_matcher=PatternFilePathMatcher(included_patterns=["**/*.pdf"]),
)
await coco.mount_each(process_file, files.items(), outdir)
app = coco.App("PdfToMarkdown", app_main,
sourcedir=pathlib.Path("./pdf_files"), outdir=pathlib.Path("./out"))
```
Here `app_main` is async because it uses mounting APIs (`mount_each`), while `process_file` is sync because it only does computation. The sync `process_file` is mounted as a child component — mounting schedules it on CocoIndex's runtime, so the async parent can mount a sync child without issues.
## Running an app
Run the app with either an async or sync entry point:
```python
# Async entry point
async def main():
await app.update(report_to_stdout=True)
asyncio.run(main())
```
```python
# Sync entry point (scripts, CLI)
app.update_blocking(report_to_stdout=True)
```
---
# Common data types
Source: https://cocoindex.io/docs/common_resources/data_types/
The `cocoindex.resources` package provides common data models and abstractions shared across connectors and built-in operations. Connectors provide concrete implementations — for example, `localfs.File` implements `FileLike`, and `localfs.FilePath` extends `FilePath`. See individual [connector docs](../connectors/localfs) for connector-specific details.
## File
The file module (`cocoindex.resources.file`) defines base classes and utilities for working with file-like objects.
### FileLike
`FileLike` is a base class for file objects with async read methods. Each connector provides its own subclass (e.g., `localfs.File`, `amazon_s3.S3Object`).
```python
from cocoindex.resources.file import FileLike
async def process_file(file: FileLike) -> str:
text = await file.read_text()
...
return text
```
**Properties:**
- `file_path` — A `FilePath` object representing the file's path. Access the relative path via `file_path.path` (`PurePath`).
**Methods:**
- `async size()` — Return the file size in bytes.
- `async read(size=-1)` — Read file content as bytes. Pass `size` to limit bytes read.
- `async read_text(encoding=None, errors="replace")` — Read as text. Auto-detects encoding via BOM if not specified.
**Memoization:**
`FileLike` objects provide a memoization key based on `file_path` (file identity). When used as arguments to a [memoized function](../programming_guide/function#memoization), CocoIndex uses a two-level validation: it checks the modification time first (cheap), then computes a content fingerprint only if the modification time has changed. This means touching a file or moving it won't cause unnecessary recomputation if the content is unchanged.
### FilePath
`FilePath` is a base class that combines a **base directory** (with a stable key) and a **relative path**. This enables stable memoization even when the entire directory tree is moved to a different location.
```python
from cocoindex.resources.file import FilePath
```
Each connector provides its own `FilePath` subclass (e.g., `localfs.FilePath`). The base class defines the common interface.
**Properties:**
- `base_dir` — An object that holds the base directory. Its key is used for stable memoization.
- `path` — The path relative to the base directory (`PurePath`).
**Methods:**
- `resolve()` — Resolve to the full path (type depends on the connector, e.g., `pathlib.Path` for local filesystem).
**Path Operations:**
`FilePath` supports most `pathlib.PurePath` operations:
```python
# Join paths with /
config_path = source_dir / "config" / "settings.json"
# Access path properties
config_path.name # "settings.json"
config_path.stem # "settings"
config_path.suffix # ".json"
config_path.parts # ("config", "settings.json")
config_path.parent # FilePath pointing to "config/"
# Modify path components
config_path.with_name("other.json")
config_path.with_suffix(".yaml")
config_path.with_stem("config")
# Pattern matching
config_path.match("*.json") # True
# Convert to POSIX string
config_path.as_posix() # "config/settings.json"
```
**Memoization:**
`FilePath` provides a memoization key based on `(base_dir.key, path)`. This means:
- Two `FilePath` objects with the same base directory key and relative path have the same memo key
- Moving the entire project directory doesn't invalidate memoization, as long as the same base directory key is used
For connector-specific usage (e.g., `register_base_dir`), see the individual connector documentation like [Local File System](../connectors/localfs).
### FilePathMatcher
`FilePathMatcher` is a protocol for filtering files and directories during traversal.
```python
from cocoindex.resources.file import FilePathMatcher
class MyMatcher(FilePathMatcher):
def is_dir_included(self, path: PurePath) -> bool:
"""Return True to traverse this directory."""
return not path.name.startswith(".")
def is_file_included(self, path: PurePath) -> bool:
"""Return True to include this file."""
return path.suffix in (".py", ".md")
```
#### PatternFilePathMatcher
A built-in `FilePathMatcher` implementation using [globset](https://docs.rs/globset/#syntax) patterns:
```python
from cocoindex.resources.file import PatternFilePathMatcher
# Include only Python and Markdown files, exclude tests and hidden dirs
matcher = PatternFilePathMatcher(
included_patterns=["**/*.py", "**/*.md"],
excluded_patterns=["**/test_*", "**/.*"],
)
```
**Parameters:**
- `included_patterns` — Glob patterns ([globset](https://docs.rs/globset) syntax) for files to include. Use `**/*.ext` to match at any depth. If `None`, all files are included.
- `excluded_patterns` — Glob patterns ([globset](https://docs.rs/globset) syntax) for files/directories to exclude. Excluded directories are not traversed. A pattern prefixed with `!` negates the exclusion for matching paths (see below).
**Note**
Patterns use [globset](https://docs.rs/globset) semantics: `*.py` matches only in the root directory; use `**/*.py` to match at any depth.
**Gitignore-style negation.** Within `excluded_patterns`, a pattern beginning with `!` *un-excludes* paths that would otherwise be excluded by a preceding pattern. This lets you exclude broadly and then carve out exceptions, instead of enumerating every directory to exclude:
```python
# Exclude all dot-entries, but keep .github through
matcher = PatternFilePathMatcher(
excluded_patterns=[
"**/.*", # exclude all dot-files and dot-directories
"!**/.github/**", # …except anything under .github
]
)
```
Directory traversal honors negations correctly: a directory excluded by a normal pattern is still traversed when a `!` pattern could match its contents, so the kept paths inside it are reachable.
## Chunk
The chunk module (`cocoindex.resources.chunk`) defines types for representing text chunks produced by [text splitters](../ops/text).
### Chunk
A `Chunk` is a frozen dataclass representing a piece of text with its position information in the original document.
```python
from cocoindex.resources.chunk import Chunk
```
**Fields:**
- `text` (`str`) — The text content of the chunk.
- `start` (`TextPosition`) — Start position in the original text.
- `end` (`TextPosition`) — End position in the original text.
### TextPosition
A frozen dataclass representing a position in text.
**Fields:**
- `byte_offset` (`int`) — Byte offset from the start of the text.
- `char_offset` (`int`) — Character offset from the start of the text.
- `line` (`int`) — 1-based line number.
- `column` (`int`) — 1-based column number.
**Example:**
```python
from cocoindex.ops.text import RecursiveSplitter
splitter = RecursiveSplitter()
chunks = splitter.split(text, chunk_size=2000, chunk_overlap=500, language="markdown")
for chunk in chunks:
print(f"[{chunk.start.line}:{chunk.start.column}] {chunk.text[:50]}...")
```
## Embedder
The embedder module (`cocoindex.resources.embedder`) defines a protocol for single-text async embedding.
### Embedder Protocol
```python
from cocoindex.resources.embedder import Embedder
class Embedder(Protocol):
async def embed(self, text: str) -> NDArray[np.float32]: ...
```
This is the call-site contract that consumers like [`resolve_entities`](../ops/entity_resolution) rely on. Both [`LiteLLMEmbedder`](../ops/litellm) and [`SentenceTransformerEmbedder`](../ops/sentence_transformers) satisfy this protocol — `await embedder.embed("some text")` returns a single `NDArray[np.float32]`.
The protocol is deliberately narrow: it does not include `dimension()` or `__coco_vector_schema__()`, which are concerns of connectors and table-schema creation, not of embedding consumers.
```python
# Any embedder works with resolve_entities:
from cocoindex.ops.entity_resolution import resolve_entities
result = await resolve_entities(
entities={"Apple Inc.", "Apple"},
embedder=my_embedder, # LiteLLMEmbedder, SentenceTransformerEmbedder, or your own
resolve_pair=my_resolver,
)
```
---
# Vector schema annotations
Source: https://cocoindex.io/docs/common_resources/vector_schema/
The schema module (`cocoindex.resources.schema`) defines types that describe vector columns. CocoIndex connectors use these to automatically configure the correct column type (e.g., `vector(384)` in Postgres, `fixed_size_list(384)` in LanceDB).
## VectorSchema
A frozen dataclass that describes a vector column's dtype and dimension.
```python
from cocoindex.resources.schema import VectorSchema
import numpy as np
schema = VectorSchema(dtype=np.dtype(np.float32), size=768)
```
**Fields:**
- `dtype` — NumPy dtype of each element (e.g., `np.float32`)
- `size` — Number of dimensions in the vector (e.g., `384`)
You can construct `VectorSchema` directly when using a custom embedding model that doesn't implement `VectorSchemaProvider`:
```python
from cocoindex.resources.schema import VectorSchema
# For a custom CLIP model with known dimension
schema = VectorSchema(dtype=np.dtype(np.float32), size=768)
# Use it in a Qdrant vector definition
QDRANT_DB = coco.ContextKey[QdrantClient]("my_qdrant_db")
target_collection = await qdrant.mount_collection_target(
QDRANT_DB,
collection_name="image_search",
schema=await qdrant.CollectionSchema.create(
vectors=qdrant.QdrantVectorDef(schema=schema, distance="cosine")
),
)
```
## VectorSchemaProvider
A protocol for objects that can provide vector schema information. The primary use case is as metadata in `Annotated` type annotations — connectors extract vector column configuration from the annotation automatically.
Any object that implements the `__coco_vector_schema__()` method satisfies this protocol. The built-in [`SentenceTransformerEmbedder`](../ops/sentence_transformers) implements it.
There are three ways to specify vector schema in annotations:
### Using a `ContextKey` (recommended)
Define a [`ContextKey`](../programming_guide/context) for the embedder and use it as the annotation. The connector resolves the key at schema creation time. This is the recommended approach because the embedder is configured once in the lifespan and shared across all functions via context.
```python
import cocoindex as coco
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
EMBEDDER = coco.ContextKey[SentenceTransformerEmbedder]("embedder")
@dataclass
class DocEmbedding:
id: int
text: str
embedding: Annotated[NDArray, EMBEDDER] # dimension resolved from context
# In lifespan, provide the embedder:
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
builder.provide(EMBEDDER, SentenceTransformerEmbedder(EMBED_MODEL))
yield
# In coco functions, access the embedder:
embedding = await coco.use_context(EMBEDDER).embed(text)
```
### Using a `VectorSchemaProvider` instance
Pass an embedder instance directly as the annotation. Simpler for scripts where the embedder is a module-level constant.
```python
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2")
@dataclass
class DocEmbedding:
id: int
text: str
embedding: Annotated[NDArray, embedder] # dimension inferred from model (384)
```
### Using a `VectorSchema`
Specify dimension and dtype explicitly. Useful when using a custom embedding model that doesn't implement `VectorSchemaProvider`.
```python
from cocoindex.resources.schema import VectorSchema
@dataclass
class ImageEmbedding:
id: int
embedding: Annotated[NDArray, VectorSchema(dtype=np.dtype(np.float32), size=768)]
```
When a connector's `TableSchema.from_class()` encounters an `Annotated[NDArray, annotation]` field, it resolves the annotation — unwrapping `ContextKey` if needed — and calls `__coco_vector_schema__()` to determine the column's dimension and dtype.
## MultiVectorSchema / MultiVectorSchemaProvider
Analogous types for multi-vector columns (e.g., ColBERT-style token-level embeddings). `MultiVectorSchema` wraps a `VectorSchema` describing the individual vectors. Used by connectors like [Qdrant](../connectors/qdrant) that support multi-vector storage.
```python
from cocoindex.resources.schema import MultiVectorSchema, VectorSchema
multi_schema = MultiVectorSchema(
vector_schema=VectorSchema(dtype=np.dtype(np.float32), size=128)
)
```
---
# Stable ID generation
Source: https://cocoindex.io/docs/common_resources/id_generation/
The ID module (`cocoindex.resources.id`) provides utilities for generating stable unique IDs and UUIDs that persist across incremental updates.
In an incremental pipeline, using random IDs (like `uuid.uuid4()`) means every reprocessing run generates different IDs for the same data — causing unnecessary churn in your targets (deleting old rows, inserting identical ones with new IDs). CocoIndex's ID utilities produce **stable** IDs: the same inputs produce the same IDs across runs, so unchanged data keeps its identity and targets only see real changes.
## Choosing the right API
| API | Same `dep` produces... | Use when... |
|-----|------------------------|-------------|
| `generate_id(dep)` | **Same** ID every time | Each unique input maps to exactly one ID |
| `IdGenerator.next_id(dep)` | **Distinct** ID each call | You need multiple IDs for potentially non-distinct inputs |
The same distinction applies to `generate_uuid` vs `UuidGenerator`.
## generate_id / generate_uuid
Async functions that return the **same** ID/UUID for the **same** `dep` value. These are idempotent: calling multiple times with identical `dep` yields identical results.
```python
from cocoindex.resources.id import generate_id, generate_uuid
async def process_item(item: Item) -> Row:
# Same item.key always gets the same ID
item_id = await generate_id(item.key)
return Row(id=item_id, data=item.data)
async def process_document(doc: Document) -> Row:
# Same doc.path always gets the same UUID
doc_uuid = await generate_uuid(doc.path)
return Row(id=doc_uuid, content=doc.content)
```
**Parameters:**
- `dep` — Dependency value that determines the ID/UUID. The same `dep` always produces the same result within a component. Defaults to `None`.
**Returns:**
- `generate_id` returns an `int` (IDs start from 1; 0 is reserved)
- `generate_uuid` returns a `uuid.UUID`
## IdGenerator / UuidGenerator
Classes that return a **distinct** ID/UUID on each call, even when called with the same `dep` value. The sequence is stable across runs.
Use these when you need multiple IDs for potentially non-distinct inputs, such as splitting text into chunks where chunks may have identical content but still need unique IDs.
```python
from cocoindex.resources.id import IdGenerator, UuidGenerator
async def process_document(doc: Document) -> list[Row]:
# Use doc.path to distinguish generators within the same processing component
id_gen = IdGenerator(deps=doc.path)
rows = []
for chunk in split_into_chunks(doc.content):
# Each call returns a distinct ID, even if chunks are identical
chunk_id = await id_gen.next_id(chunk.content)
rows.append(Row(id=chunk_id, content=chunk.content))
return rows
async def process_with_uuids(doc: Document) -> list[Row]:
# Use doc.path to distinguish generators within the same processing component
uuid_gen = UuidGenerator(deps=doc.path)
rows = []
for chunk in split_into_chunks(doc.content):
# Each call returns a distinct UUID, even if chunks are identical
chunk_uuid = await uuid_gen.next_uuid(chunk.content)
rows.append(Row(id=chunk_uuid, content=chunk.content))
return rows
```
**Constructor:**
- `IdGenerator(deps=None)` / `UuidGenerator(deps=None)` — Create a generator. The `deps` parameter distinguishes generators within the same [processing component](../programming_guide/processing_component). Use distinct `deps` values for different generator instances.
**Methods:**
- `async IdGenerator.next_id(dep=None)` — Generate the next unique integer ID (distinct on each call)
- `async UuidGenerator.next_uuid(dep=None)` — Generate the next unique UUID (distinct on each call)
---
# In-memory LiveMap
Source: https://cocoindex.io/docs/common_resources/live_map/
`LiveMap[K, V]` (`cocoindex.resources.live_map`) is an in-memory, keyed collection that sits **between** a producing part of your pipeline and a consuming part. The producing side **declares** `(key, value)` entries; the consuming side reads the map as a [`LiveMapView`](../advanced_topics/live_component#livemapfeed-and-livemapview) via `coco.mount_each`, getting one [processing component](../programming_guide/processing_component) per entry that stays in sync as entries appear, change, and disappear.
Think of it as a connector whose "external system" is an in-process `dict`: entries are declared as [target states](../programming_guide/target_state) — so they participate in CocoIndex's declarative change detection and ownership — and that same `dict` is simultaneously exposed as a live source for downstream components. It lets you split a pipeline into a producer half and a consumer half that are decoupled through one shared, incrementally-maintained collection.
It is designed for [live mode](../programming_guide/live_mode): the producer and consumer run concurrently, and the consumer reacts as the producer updates the map.
## Example
```python
import cocoindex as coco
from cocoindex.resources.live_map import LiveMap
@coco.fn
async def produce_entries(lm: LiveMap[str, str]) -> None:
# Any component can declare entries — often a live component reading a stream.
for key, text in fetch_items():
lm.declare_entry(key, text)
@coco.fn
async def process_entry(value: str) -> None:
... # build something from each entry's value
@coco.fn
async def app_main() -> None:
lm: LiveMap[str, str] = await LiveMap.create()
await coco.mount(produce_entries, lm) # producing side
await coco.mount_each(process_entry, lm) # consuming side: one component per entry
```
## Creating a LiveMap
```python
lm: LiveMap[str, str] = await LiveMap.create()
```
`create()` is an async factory and must be called from **inside the app's component tree** (it mounts a backing target). `K` must be a [stable key](../programming_guide/processing_component#component-path); `V` is any value comparable with `==` — no hashability, serialization, or fingerprinting is required, so arbitrary objects (lists, dataclasses, …) work.
## Producing entries
```python
lm.declare_entry(key, value)
```
Call `declare_entry` from inside any component. The entry is a target state **owned by the declaring component**, which gives it normal declarative semantics:
- Declaring a key makes it present, or updates its value.
- The consumer is notified **only when the value actually changes** (compared with `==`) — re-declaring the same value is a no-op for downstream.
- An entry is removed when the component that declared it **stops declaring it** (on a re-run) or disappears. There is no explicit delete verb; deletion follows target-state ownership, exactly like other CocoIndex targets.
Multiple components may declare into the same map, as long as each key has a single owner (two components declaring the same key is a conflict, the same as for any target).
## Consuming entries
A `LiveMap` is a [`LiveMapView`](../advanced_topics/live_component#livemapfeed-and-livemapview), so pass it straight to `coco.mount_each` — it behaves just like a live source:
```python
await coco.mount_each(process_entry, lm)
```
`mount_each` mounts one processing component per entry (keyed by the entry key), scans the current entries first, then reacts to incremental changes — re-mounting a component when its value changes and removing it when its entry is deleted. The processor receives the entry **value**.
A LiveMap supports **one active consumer** (`mount_each`) at a time; a second concurrent consumer raises `RuntimeError`.
## Semantics
- **Live-mode first.** LiveMap is built for [`app.update(live=True)`](../programming_guide/live_mode): the consumer subscribes and reacts as producers update the map. In catch-up mode (plain `app.update()`), the consumer scans whatever entries happen to exist when it runs, which can race producers running concurrently in the same session — so for predictable one-shot results, order the producer ahead of the consumer (await the producer's `handle.ready()` before consuming).
- **In-memory and per-session.** The map lives in process memory and is rebuilt from its producers each time the app starts; its contents do not persist across restarts.
## When to use it
Reach for a LiveMap when you want to decouple a producing stage from a consuming stage through a shared, incrementally-maintained collection. For example: one part of your pipeline watches a stream, extracts entities, and declares them into a map keyed by entity ID; another part builds an index or enrichment for each entity, reacting automatically as entities are added, updated, or removed.
---
# Amazon S3 connector
Source: https://cocoindex.io/docs/connectors/amazon_s3/
The `amazon_s3` connector provides utilities for reading objects from Amazon S3 buckets and S3-compatible services (e.g. MinIO, Tigris).
```python
from cocoindex.connectors import amazon_s3
```
**Note — Installation**
This connector requires the `aiobotocore` library. Install with:
```bash
pip install cocoindex[amazon_s3]
```
## As source
The connector provides two ways to read from S3:
- `list_objects()` — List and iterate over objects in a bucket (with optional prefix and filtering)
- `get_object()` — Fetch a single object by its key
- `read()` — Read object content directly by S3 URI
Both require an aiobotocore S3 client, which you create and manage yourself:
```python
import aiobotocore.session
session = aiobotocore.session.get_session()
async with session.create_client("s3") as client:
# Use client with list_objects() or get_object()
...
# For S3-compatible services:
async with session.create_client("s3", endpoint_url="http://localhost:9000") as client:
...
# For Tigris — single global endpoint, no region selection.
# Pass region_name="auto"; boto3 requires a region for SigV4 signing
# but Tigris ignores the value.
async with session.create_client(
"s3",
endpoint_url="https://t3.storage.dev",
region_name="auto",
) as client:
...
```
### list_objects
List objects in an S3 bucket. Returns an `S3Walker` that supports async iteration.
```python
def list_objects(
client: AioBaseClient,
bucket_name: str,
*,
prefix: str = "",
path_matcher: FilePathMatcher | None = None,
max_file_size: int | None = None,
) -> S3Walker
```
**Parameters:**
- `client` — An aiobotocore S3 client.
- `bucket_name` — The S3 bucket name.
- `prefix` — Only list objects whose key starts with this prefix. The prefix is stripped from relative paths in the returned files.
- `path_matcher` — Optional filter for files. Patterns are matched against the relative path (after prefix stripping). See [PatternFilePathMatcher](../common_resources/data_types#patternfilepathmatcher).
- `max_file_size` — Skip objects larger than this size in bytes.
**Returns:** An `S3Walker` that can be used with `async for` loops.
### Iterating files
`list_objects()` returns an `S3Walker` that yields `S3File` objects (implementing the [`FileLike`](../common_resources/data_types#filelike) base class):
```python
import aiobotocore.session
from cocoindex.connectors import amazon_s3
session = aiobotocore.session.get_session()
async with session.create_client("s3") as client:
async for file in amazon_s3.list_objects(client, "my-bucket", prefix="data/"):
text = await file.read_text()
...
```
See [`FileLike`](../common_resources/data_types#filelike) for details on the file objects.
### Keyed iteration with `items()`
`S3Walker.items()` yields `(str, S3File)` pairs, useful for associating each file with a stable string key (its relative path):
```python
async for key, file in amazon_s3.list_objects(client, "my-bucket").items():
content = await file.read()
```
### Filtering files
Use `PatternFilePathMatcher` to filter which objects are included. Patterns are matched against the relative path (after prefix stripping):
```python
from cocoindex.connectors import amazon_s3
from cocoindex.resources.file import PatternFilePathMatcher
matcher = PatternFilePathMatcher(included_patterns=["**/*.json"])
async for file in amazon_s3.list_objects(client, "my-bucket", prefix="data/", path_matcher=matcher):
process(file)
```
### Limiting file size
Use `max_file_size` to skip objects that exceed a size threshold:
```python
# Skip objects larger than 10 MB
async for file in amazon_s3.list_objects(client, "my-bucket", max_file_size=10 * 1024 * 1024):
process(file)
```
### get_object
Fetch a single object from an S3 bucket by its key.
```python
async def get_object(
client: AioBaseClient,
bucket_name_or_uri: str,
key: str | None = None,
) -> S3File
```
**Parameters:**
- `client` — An aiobotocore S3 client.
- `bucket_name_or_uri` — Either a full S3 URI (`s3://bucket/key`) or the bucket name when `key` is supplied separately.
- `key` — The full S3 object key. Required when `bucket_name_or_uri` is a bucket name; must be omitted when a URI is given.
**Returns:** An `S3File` (FileLike) for the specified object.
**Example:**
```python
import aiobotocore.session
from cocoindex.connectors import amazon_s3
session = aiobotocore.session.get_session()
async with session.create_client("s3") as client:
# Via S3 URI:
f = await amazon_s3.get_object(client, "s3://my-bucket/data/config.json")
data = await f.read()
# Via bucket name + key:
f = await amazon_s3.get_object(client, "my-bucket", "data/config.json")
data = await f.read()
```
### read
Read object content directly from an S3 URI, without fetching metadata first.
```python
async def read(
client: AioBaseClient,
uri: str,
size: int = -1,
) -> bytes
```
**Parameters:**
- `client` — An aiobotocore S3 client.
- `uri` — An S3 URI (`s3://bucket/key`).
- `size` — Number of bytes to read. If -1 (default), read the entire object.
**Returns:** The object content as bytes.
**Example:**
```python
async with session.create_client("s3") as client:
data = await amazon_s3.read(client, "s3://my-bucket/data/config.json")
```
### S3FilePath
Each file returned by the connector has an `S3FilePath` — a [`FilePath`](../common_resources/data_types#filepath) specialized for S3:
- **Relative path** (`file.file_path.path`) — The object key relative to the walker prefix (or the full key if no prefix was used).
- **Resolved path** (`file.file_path.resolve()`) — The full S3 object key.
For example, with `prefix="data/"` and an object key `"data/docs/readme.md"`:
- `file.file_path.path` → `PurePath("docs/readme.md")`
- `file.file_path.resolve()` → `"data/docs/readme.md"`
### Example
```python
import aiobotocore.session
import cocoindex as coco
from cocoindex.connectors import amazon_s3
from cocoindex.resources.file import FileLike, PatternFilePathMatcher
@coco.fn
async def app_main(bucket: str) -> None:
session = aiobotocore.session.get_session()
async with session.create_client("s3") as client:
matcher = PatternFilePathMatcher(included_patterns=["**/*.md"])
walker = amazon_s3.list_objects(
client, bucket, prefix="docs/", path_matcher=matcher,
)
with coco.component_subpath("file"):
async for key, file in walker.items():
coco.mount(
coco.component_subpath(key),
process_file,
file,
)
@coco.fn(memo=True)
async def process_file(file: FileLike[str]) -> None:
text = await file.read_text()
# ... process the file content ...
```
---
# Apache Doris connector
Source: https://cocoindex.io/docs/connectors/doris/
The `doris` connector provides utilities for writing rows to Apache Doris databases, with support for vector indexes (HNSW, IVF) and inverted indexes for full-text search.
```python
from cocoindex.connectors import doris
```
**Note — Dependencies**
This connector requires additional dependencies. Install with:
```bash
pip install cocoindex[doris]
```
## Connection setup
### DorisConnectionConfig
Configure the connection to your Doris cluster:
```python
from cocoindex.connectors import doris
config = doris.DorisConnectionConfig(
fe_host="localhost",
database="my_database",
fe_http_port=8080,
query_port=9030,
username="root",
password="",
)
```
**Parameters:**
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `fe_host` | `str` | *(required)* | Frontend host address |
| `database` | `str` | *(required)* | Database name |
| `fe_http_port` | `int` | `8080` | Frontend HTTP port (for stream load) |
| `query_port` | `int` | `9030` | MySQL-compatible query port |
| `username` | `str` | `"root"` | Username |
| `password` | `str` | `""` | Password |
| `enable_https` | `bool` | `False` | Use HTTPS for stream load |
| `be_load_host` | `str \| None` | `None` | Override backend host for stream load (defaults to `fe_host`) |
| `batch_size` | `int` | `10000` | Max rows per stream load batch |
| `stream_load_timeout` | `int` | `600` | Timeout (seconds) for stream load |
| `replication_num` | `int` | `1` | Replication factor for new tables |
| `buckets` | `int \| str` | `"auto"` | Bucket count for new tables |
### connect
Create a managed connection:
```python
def connect(config: DorisConnectionConfig) -> ManagedConnection
```
**Example:**
```python
conn = doris.connect(doris.DorisConnectionConfig(
fe_host="localhost",
database="my_database",
))
```
## As target
The `doris` connector provides target state APIs for writing rows to tables. CocoIndex tracks what rows should exist and automatically handles upserts and deletions via Doris stream load.
### Declaring target states
#### Setting up a connection
Create a `ContextKey[doris.ManagedConnection]` to identify your connection, then provide it in your lifespan:
**Note**
The key name is load-bearing across runs — it's the stable identity CocoIndex uses to track managed rows. See [ContextKey as stable identity](../programming_guide/context#contextkey-as-stable-identity) before renaming.
```python
import cocoindex as coco
from cocoindex.connectors import doris
DORIS_DB = coco.ContextKey[doris.ManagedConnection]("my_doris")
@coco.lifespan
def coco_lifespan(builder: coco.EnvironmentBuilder) -> Iterator[None]:
conn = doris.connect(doris.DorisConnectionConfig(
fe_host="localhost",
database="my_database",
))
builder.provide(DORIS_DB, conn)
yield
# conn is cleaned up after yield
```
#### Tables (parent state)
Declares a table as a target state. Returns a `DorisTableTarget` for declaring rows.
```python
def declare_table_target(
db: ContextKey[ManagedConnection],
table_name: str,
table_schema: TableSchema[RowT],
*,
managed_by: Literal["system", "user"] = "system",
vector_indexes: list[VectorIndexDef] | None = None,
inverted_indexes: list[InvertedIndexDef] | None = None,
) -> DorisTableTarget[RowT, coco.PendingS]
```
**Parameters:**
- `db` — A `ContextKey[doris.ManagedConnection]` identifying the connection to use.
- `table_name` — Name of the table.
- `table_schema` — Schema definition including columns and primary key (see [Table schema](#table-schema-from-python-class)).
- `managed_by` — Whether CocoIndex manages the table lifecycle (`"system"`) or assumes it exists (`"user"`).
- `vector_indexes` — Optional list of vector index definitions (see [Vector indexes](#vector-indexes)).
- `inverted_indexes` — Optional list of inverted index definitions (see [Inverted indexes](#inverted-indexes)).
**Returns:** A pending `DorisTableTarget`. Use the convenience wrapper `await doris.mount_table_target(...)` to resolve.
#### Rows (child states)
Once a `DorisTableTarget` is resolved, declare rows to be upserted:
```python
def DorisTableTarget.declare_row(
self,
*,
row: RowT,
) -> None
```
**Parameters:**
- `row` — A row object (dict, dataclass, NamedTuple, or Pydantic model). Must include all primary key columns.
### Table schema: from Python class
Define the table structure using a Python class:
```python
@classmethod
async def TableSchema.from_class(
cls,
record_type: type[RowT],
primary_key: list[str],
*,
column_overrides: dict[str, DorisType | VectorSchemaProvider] | None = None,
) -> TableSchema[RowT]
```
**Parameters:**
- `record_type` — A record type whose fields define table columns.
- `primary_key` — List of column names forming the primary key.
- `column_overrides` — Optional per-column overrides for type mapping or vector configuration.
**Example:**
```python
@dataclass
class DocEmbedding:
id: int
text: str
embedding: Annotated[NDArray, embedder]
schema = await doris.TableSchema.from_class(
DocEmbedding,
primary_key=["id"],
)
```
Python types are automatically mapped to Doris types:
| Python Type | Doris Type |
|-------------|------------|
| `bool` | `BOOLEAN` |
| `int` | `BIGINT` |
| `float` | `DOUBLE` |
| `str` | `STRING` |
| `bytes` | `STRING` (base64) |
| `uuid.UUID` | `VARCHAR(36)` |
| `datetime.datetime` | `DATETIME` |
| `datetime.date` | `DATE` |
| `list`, `dict`, nested structs | `JSON` |
| `NDArray` (with vector schema) | `ARRAY` |
#### DorisType
Use `DorisType` to specify a custom Doris type:
```python
from typing import Annotated
from cocoindex.connectors.doris import DorisType
@dataclass
class MyRow:
id: Annotated[int, DorisType("INT")]
value: Annotated[float, DorisType("FLOAT")]
```
### Vector indexes
Doris supports vector similarity search via HNSW and IVF indexes. Define them with `VectorIndexDef`:
```python
from cocoindex.connectors.doris import VectorIndexDef
vector_idx = VectorIndexDef(
field_name="embedding",
index_type="HNSW", # or "IVF"
metric_type="l2_distance", # or "cosine_distance"
)
```
**Parameters:**
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `field_name` | `str` | *(required)* | Column to index |
| `index_type` | `str` | `"HNSW"` | Index type: `"HNSW"` or `"IVF"` |
| `metric_type` | `str` | `"l2_distance"` | Distance metric: `"l2_distance"` or `"cosine_distance"` |
| `max_degree` | `int \| None` | `None` | HNSW max degree |
| `ef_construction` | `int \| None` | `None` | HNSW construction parameter |
| `nlist` | `int \| None` | `None` | IVF number of partitions |
### Inverted indexes
Doris supports inverted indexes for full-text search. Define them with `InvertedIndexDef`:
```python
from cocoindex.connectors.doris import InvertedIndexDef
inverted_idx = InvertedIndexDef(
field_name="text",
parser="unicode", # or "english", "chinese", etc.
)
```
**Parameters:**
- `field_name` — Column to index.
- `parser` — Optional tokenizer for full-text search (e.g., `"unicode"`, `"english"`, `"chinese"`). If `None`, the index supports exact matching only.
### Query helpers
#### build_vector_search_query
Build a vector similarity search SQL query:
```python
def build_vector_search_query(
table: str,
vector_field: str,
query_vector: list[float],
metric: str = "l2_distance",
limit: int = 10,
select_columns: list[str] | None = None,
where_clause: str | None = None,
) -> str
```
**Example:**
```python
sql = doris.build_vector_search_query(
table="doc_embeddings",
vector_field="embedding",
query_vector=query_vec.tolist(),
metric="cosine_distance",
limit=5,
)
```
#### connect_async
Create an async MySQL connection for running queries:
```python
async def connect_async(
fe_host: str,
query_port: int = 9030,
username: str = "root",
password: str = "",
database: str | None = None,
) -> Any # aiomysql connection
```
### Example
```python
from typing import Annotated, Iterator
from dataclasses import dataclass
from numpy.typing import NDArray
import cocoindex as coco
from cocoindex.connectors import doris
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
DORIS_DB = coco.ContextKey[doris.ManagedConnection]("my_doris")
embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2")
@dataclass
class DocEmbedding:
id: int
text: str
embedding: Annotated[NDArray, embedder]
@coco.lifespan
def coco_lifespan(builder: coco.EnvironmentBuilder) -> Iterator[None]:
conn = doris.connect(doris.DorisConnectionConfig(
fe_host="localhost",
database="my_database",
))
builder.provide(DORIS_DB, conn)
yield
@coco.fn
async def app_main() -> None:
table = await doris.mount_table_target(
DORIS_DB,
"doc_embeddings",
await doris.TableSchema.from_class(
DocEmbedding,
primary_key=["id"],
),
vector_indexes=[
doris.VectorIndexDef(
field_name="embedding",
index_type="HNSW",
metric_type="cosine_distance",
),
],
)
# Declare rows
for doc in documents:
table.declare_row(row=doc)
```
---
# FalkorDB connector
Source: https://cocoindex.io/docs/connectors/falkordb/
The `falkordb` connector writes records to FalkorDB, a Cypher-compatible graph database that runs as a Redis module. It supports node tables (labels), relationship tables (edge types), per-graph multitenancy (one Redis instance, many isolated graphs), and vector indexes.
```python
from cocoindex.connectors import falkordb
```
**Note — Dependencies**
This connector requires additional dependencies. Install with:
```bash
pip install cocoindex[falkordb]
```
## Connection setup
Create a `ConnectionFactory` and provide it via a `ContextKey`. The factory holds the FalkorDB URI plus the target graph name, and yields a graph handle on demand.
**Note**
The key name is load-bearing across runs — it's the stable identity CocoIndex uses to track managed rows. See [ContextKey as stable identity](../programming_guide/context#contextkey-as-stable-identity) before renaming.
```python
from collections.abc import AsyncIterator
from cocoindex.connectors import falkordb
import cocoindex as coco
KG_DB: coco.ContextKey[falkordb.ConnectionFactory] = coco.ContextKey("kg_db")
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
builder.provide(
KG_DB,
falkordb.ConnectionFactory(
uri="falkor://localhost:6379",
graph="knowledge_graph",
),
)
yield
```
### Multitenancy
A single Redis instance can host many fully isolated graphs. Pair each graph with its own `ContextKey` and `ConnectionFactory(graph=...)`:
```python
KG_DB: coco.ContextKey[falkordb.ConnectionFactory] = coco.ContextKey("kg_db")
APIS_DB: coco.ContextKey[falkordb.ConnectionFactory] = coco.ContextKey("apis_db")
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
uri = "falkor://localhost:6379"
builder.provide(KG_DB, falkordb.ConnectionFactory(uri=uri, graph="knowledge_graph"))
builder.provide(APIS_DB, falkordb.ConnectionFactory(uri=uri, graph="apis_graph"))
yield
```
Different `ContextKey`s with different graph names produce fully separate target-state trees — changes to one never spill into the other.
## As target
The `falkordb` connector provides target state APIs for writing records to node tables and relation tables. CocoIndex tracks what records should exist and automatically handles upserts and deletions.
Each `graph.query` call against FalkorDB is its own atomic unit (FalkorDB does not expose multi-statement transactions); the connector orders writes within a batch as **node upserts → relation upserts → relation deletes → node deletes** so dependent edges always see their endpoints.
### Declaring target states
#### Node tables (parent state)
Declares a node label as a target state. Returns a `TableTarget` for declaring records.
```python
def declare_table_target(
db: ContextKey,
table_name: str,
table_schema: TableSchema[RowT] | None = None,
*,
primary_key: str = "id",
managed_by: Literal["system", "user"] = "system",
) -> TableTarget[RowT, coco.PendingS]
```
**Parameters:**
- `db` — A `ContextKey[falkordb.ConnectionFactory]` for the FalkorDB connection.
- `table_name` — The Cypher node label (e.g. `"Document"`).
- `table_schema` — Optional schema definition (see [Table Schema](#table-schema-from-python-class)). FalkorDB does not enforce per-property types server-side, so the schema participates in CocoIndex's fingerprint (so two flows declaring the same label must agree) but no per-column DDL is emitted.
- `primary_key` — Single property name used as the node's primary key. Defaults to `"id"`. Compound primary keys are not supported in v1.0.
- `managed_by` — Whether CocoIndex manages the table lifecycle (`"system"`) or assumes it exists (`"user"`).
**Returns:** A pending `TableTarget`. Use `await falkordb.mount_table_target(KG_DB, ...)` to get a resolved target.
#### Records (child states)
Once a `TableTarget` is resolved, declare records to be upserted (translated to `MERGE (n:Label {pk: $key_0}) SET n += $props`):
```python
def TableTarget.declare_record(
self,
*,
row: RowT,
) -> None
```
**Parameters:**
- `row` — A row object (dict, dataclass, NamedTuple, or Pydantic model). Must include the `primary_key` field declared above.
`declare_row` is an alias for `declare_record`, for compatibility with Postgres and other RDBMS targets.
#### Relation tables (parent state)
Declares a relationship type as a target state. Returns a `RelationTarget` for declaring edges.
```python
def declare_relation_target(
db: ContextKey,
table_name: str,
from_table: TableTarget,
to_table: TableTarget,
table_schema: TableSchema[RowT] | None = None,
*,
primary_key: str = "id",
managed_by: Literal["system", "user"] = "system",
) -> RelationTarget[RowT, coco.PendingS]
```
**Parameters:**
- `db` — A `ContextKey[falkordb.ConnectionFactory]` for the FalkorDB connection.
- `table_name` — The Cypher relationship type (e.g. `"MENTION"`).
- `from_table` — The `TableTarget` whose nodes are the *source* endpoints of edges in this relationship.
- `to_table` — The `TableTarget` whose nodes are the *target* endpoints of edges in this relationship.
- `table_schema` — Optional schema for the relationship's own properties (see [Table Schema](#table-schema-from-python-class)). The relationship's `primary_key` field uniquely identifies each edge.
- `primary_key` — Single property name used as the edge's primary key. Defaults to `"id"`.
- `managed_by` — Whether CocoIndex manages the relationship lifecycle (`"system"`) or assumes it exists (`"user"`).
**Returns:** A pending `RelationTarget`. Use `await falkordb.mount_relation_target(KG_DB, ...)` to get a resolved target.
#### Relations (child states)
Once a `RelationTarget` is resolved, declare edges. Each declaration produces a triple-MERGE: source endpoint, target endpoint, then the relationship.
```python
def RelationTarget.declare_relation(
self,
*,
from_id: Any,
to_id: Any,
record: RowT | None = None,
) -> None
```
**Parameters:**
- `from_id` — The source node's primary-key value. The connector MERGEs `(s:FromLabel {pk: $from_id})` so endpoints are auto-created if absent.
- `to_id` — The target node's primary-key value. Same MERGE behavior.
- `record` — Optional row object whose fields populate the relationship's properties. Must include the relationship's `primary_key` field if provided.
If `record` is omitted, the connector derives a deterministic edge id from `(from_label, from_id, to_label, to_id)`. This is convenient when an edge has no properties of its own.
#### Vector indexes (attachment)
Declares a vector index on a column of a node table. Vector indexes are an [attachment](../advanced_topics/custom_target_connector#implementing-attachment-providers) to a `TableTarget`:
```python
def TableTarget.declare_vector_index(
self,
*,
name: str | None = None,
field: str,
metric: Literal["cosine", "euclidean", "ip"] = "cosine",
dimension: int,
) -> None
```
**Parameters:**
- `name` — Optional logical name for the index. Defaults to `f"idx_{table_name}__{field}"`.
- `field` — The node property holding the vector.
- `metric` — Similarity metric: `"cosine"`, `"euclidean"`, or `"ip"` (inner product). Translated to FalkorDB's `similarityFunction` option.
- `dimension` — The vector's dimension. Required.
The connector emits `CREATE VECTOR INDEX FOR (e:Label) ON (e.field) OPTIONS {dimension: N, similarityFunction: '...'}`. Vectors are float32 only — wider vector dtypes are not supported.
### Table schema: from Python class
Build a `TableSchema` by introspecting a record type:
```python
@classmethod
async def TableSchema.from_class(
cls,
record_type: type[RowT],
*,
primary_key: str = "id",
column_overrides: dict[str, FalkorType | VectorSchemaProvider] | None = None,
) -> TableSchema[RowT]
```
**Parameters:**
- `record_type` — A dataclass, NamedTuple, or Pydantic model.
- `primary_key` — Field name to use as the table's primary key. Defaults to `"id"`.
- `column_overrides` — Optional dict mapping field names to `FalkorType` or `VectorSchemaProvider` to override the default Python-to-FalkorDB type mapping.
**Returns:** A `TableSchema[RowT]` populated from the class's fields.
#### Default Python → FalkorDB type mapping
| Python type | FalkorDB type | Notes |
|---|---|---|
| `bool` | `boolean` | |
| `int`, NumPy integer scalars | `integer` | |
| `float`, NumPy float scalars | `float` | |
| `decimal.Decimal` | `string` | Encoded via `str()` — FalkorDB has no decimal type. |
| `str` | `string` | |
| `bytes` | `string` | Encoded as base64. |
| `uuid.UUID` | `string` | Encoded via `str()`. |
| `datetime.date` / `datetime.datetime` / `datetime.time` | `string` | Encoded via `.isoformat()`. |
| `datetime.timedelta` | `integer` | Encoded as milliseconds (`int(td.total_seconds() * 1000)`). |
| `numpy.ndarray` (with `VectorSchema` annotation) | `vector` | Encoded as `list[float]`. |
| `dict`, list, nested record, `Any` | `map` / `array` | Passed through native parameter binding. |
#### FalkorType
Override the default mapping for a single column with `FalkorType`:
```python
class FalkorType(NamedTuple):
falkor_type: str
encoder: ValueEncoder | None = None
```
Use with `typing.Annotated`:
```python
from typing import Annotated
from dataclasses import dataclass
from cocoindex.connectors.falkordb import FalkorType
@dataclass
class Row:
id: str
score: Annotated[float, FalkorType("decimal", encoder=str)]
```
The `falkor_type` string is metadata-only — it participates in the schema fingerprint (so two flows declaring the same table must agree) but no DDL is emitted from it.
#### VectorSchemaProvider
For NumPy `ndarray` columns, attach a `VectorSchema` annotation to specify dtype + dimension. See [VectorSchema](../common_resources/vector_schema) for details.
### Table schema: explicit column definitions
Build a `TableSchema` directly from a dict of column definitions when the row type is dynamic:
```python
from cocoindex.connectors.falkordb import TableSchema, ColumnDef
schema = TableSchema(
columns={
"filename": ColumnDef(type="string"),
"title": ColumnDef(type="string"),
"summary": ColumnDef(type="string", nullable=True),
},
primary_key="filename",
)
```
`ColumnDef` fields:
- `type` — The FalkorDB type string (metadata only; see table above).
- `nullable` — Whether the column may be `None`. Defaults to `True`.
- `encoder` — Optional `Callable[[Any], Any]` applied to non-`None` values before they're sent to FalkorDB.
### DDL: indexes and constraints
For each managed table, the connector creates the supporting Cypher index on the primary key field on first run:
- For node tables: `CREATE INDEX FOR (e:Label) ON (e.)`.
- For relation tables: `CREATE INDEX FOR ()-[e:RelType]-() ON (e.)`.
It then attempts a uniqueness constraint via the `GRAPH.CONSTRAINT CREATE` Redis command (best-effort — failures are logged but do not abort). Indexes and constraints are dropped on `cocoindex drop` or when the table is no longer declared.
When `managed_by="user"` is set, the connector skips DDL entirely — you're responsible for creating and dropping the schema. Record-level upserts and deletes still work.
### Example: Node tables
```python
from collections.abc import AsyncIterator
from dataclasses import dataclass
import cocoindex as coco
from cocoindex.connectors import falkordb
KG_DB: coco.ContextKey[falkordb.ConnectionFactory] = coco.ContextKey("kg_db")
@dataclass
class Document:
filename: str
title: str
summary: str
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
builder.provide(KG_DB, falkordb.ConnectionFactory(
uri="falkor://localhost:6379", graph="knowledge_graph",
))
yield
@coco.fn
async def app_main() -> None:
schema = await falkordb.TableSchema.from_class(Document, primary_key="filename")
documents = await falkordb.mount_table_target(
KG_DB, "Document", schema, primary_key="filename",
)
documents.declare_record(
row=Document(
filename="overview.md",
title="Overview",
summary="An overview of CocoIndex...",
)
)
app = coco.App(coco.AppConfig(name="docs_to_falkordb"), app_main)
```
### Example: Relation tables (knowledge graph)
```python
@dataclass
class Entity:
value: str
@dataclass
class RelationshipRow:
id: str
predicate: str
@coco.fn
async def kg_app_main() -> None:
documents = await falkordb.mount_table_target(
KG_DB, "Document",
await falkordb.TableSchema.from_class(Document, primary_key="filename"),
primary_key="filename",
)
entities = await falkordb.mount_table_target(
KG_DB, "Entity",
await falkordb.TableSchema.from_class(Entity, primary_key="value"),
primary_key="value",
)
relationships = await falkordb.mount_relation_target(
KG_DB, "RELATIONSHIP",
entities, entities,
await falkordb.TableSchema.from_class(RelationshipRow, primary_key="id"),
primary_key="id",
)
# populate ...
documents.declare_record(row=Document(filename="overview.md", title="Overview", summary="..."))
entities.declare_record(row=Entity(value="CocoIndex"))
entities.declare_record(row=Entity(value="FalkorDB"))
relationships.declare_relation(
from_id="CocoIndex",
to_id="FalkorDB",
record=RelationshipRow(id="rel-1", predicate="writes_to"),
)
kg_app = coco.App(coco.AppConfig(name="kg_app"), kg_app_main)
```
The `Entity` table is declared up-front (via `mount_table_target`) so its index and constraint are reconciled before any `RELATIONSHIP` edge MERGEs entity endpoints. The relationship's three-MERGE pattern (source endpoint → target endpoint → edge) means missing endpoints are auto-created — but it's good practice to declare them explicitly so deletion-cascade behavior stays predictable.
---
# Google Drive connector
Source: https://cocoindex.io/docs/connectors/google_drive/
The `google_drive` connector provides utilities for reading files from Google Drive using a service account.
```python
from cocoindex.connectors import google_drive
```
**Note — Dependencies**
This connector requires additional dependencies. Install with:
```bash
pip install cocoindex[google_drive]
```
## As source
The connector provides two ways to read from Google Drive:
- `GoogleDriveSource` — high-level source class with async iteration
- `list_files()` — lower-level function returning a sync iterator
Both require a Google service account with access to the target Drive folders.
### Setting up a service account
1. Create a service account in the [Google Cloud Console](https://console.cloud.google.com/iam-admin/serviceaccounts)
2. Download the JSON credential file
3. Share the target Drive folders with the service account's email address
**Note — Google Workspace CLI**
[`gws`](https://github.com/googleworkspace/cli) is an optional, unofficial Google Workspace CLI. It is actively developed and subject to change, but can be useful for exploring or validating Drive API access before configuring CocoIndex's service-account flow. For example:
```bash
gws auth setup
gws auth login
gws drive files list
```
In headless or agent workflows, `gws` can also read credentials from `GOOGLE_WORKSPACE_CLI_CREDENTIALS_FILE`. CocoIndex still expects the service account JSON path in `service_account_credential_path`; use the `gws` credentials setting for `gws` commands themselves.
### GoogleDriveSource
The primary source class for iterating over Google Drive files.
```python
class GoogleDriveSource(
*,
service_account_credential_path: str,
root_folder_ids: Sequence[str],
mime_types: Sequence[str] | None = None,
)
```
**Parameters:**
- `service_account_credential_path` — Path to the service account JSON credential file.
- `root_folder_ids` — List of Google Drive folder IDs to scan. Subfolders are traversed recursively.
- `mime_types` — Optional list of MIME types to include. If `None`, all file types are included.
### Iterating files
`GoogleDriveSource` provides async iteration via `files()`, yielding `DriveFile` objects (implementing the [`FileLike`](../common_resources/data_types#filelike) base class):
```python
source = google_drive.GoogleDriveSource(
service_account_credential_path="./credentials.json",
root_folder_ids=["1abc...xyz"],
)
async for file in source.files():
text = await file.read_text()
...
```
### Keyed iteration with `items()`
`items()` yields `(str, DriveFile)` pairs, where the key is the file's name path. This is useful with `mount_each()`:
```python
async for key, file in source.items():
content = await file.read()
```
### Filtering by MIME type
Use `mime_types` to restrict which files are returned:
```python
source = google_drive.GoogleDriveSource(
service_account_credential_path="./credentials.json",
root_folder_ids=["1abc...xyz"],
mime_types=["application/pdf", "text/plain"],
)
```
Google Workspace files (Docs, Sheets, Slides) are automatically exported:
| Google Workspace type | Exported as |
|---|---|
| Google Docs | Plain text |
| Google Sheets | CSV |
| Google Slides | Plain text |
### list_files
A lower-level sync iterator for listing files:
```python
def list_files(spec: GoogleDriveSourceSpec) -> Iterator[DriveFile]
```
**Parameters:**
- `spec` — A `GoogleDriveSourceSpec` with the same fields as `GoogleDriveSource` constructor parameters.
**Returns:** A sync iterator of `DriveFile` objects.
### DriveFile
`DriveFile` implements [`FileLike`](../common_resources/data_types#filelike) with Google Drive-specific behavior:
- `file_path` — A `DriveFilePath` where `resolve()` returns the Google Drive file ID.
- `read()` / `read_text()` — Downloads file content via the Google Drive API. Partial reads (`size` parameter) are not supported.
### Example
```python
import cocoindex as coco
from cocoindex.connectors import google_drive
from cocoindex.resources.file import FileLike
@coco.fn(memo=True)
async def process_file(file: FileLike) -> None:
text = await file.read_text()
# ... process the file content ...
@coco.fn
async def app_main(credential_path: str, folder_ids: list[str]) -> None:
source = google_drive.GoogleDriveSource(
service_account_credential_path=credential_path,
root_folder_ids=folder_ids,
)
with coco.component_subpath("file"):
async for key, file in source.items():
await coco.mount(
coco.component_subpath(key),
process_file,
file,
)
app = coco.App(
"GoogleDriveIngestion",
app_main,
credential_path="./credentials.json",
folder_ids=["1abc...xyz"],
)
```
---
# Apache Iggy connector
Source: https://cocoindex.io/docs/connectors/iggy/
The `iggy` connector supports [Apache Iggy](https://iggy.apache.org/docs/) as
both a **source** and a **target**. Iggy organizes messages as
**streams → topics → partitions**; CocoIndex follows that model and treats an
Iggy topic partition either as a raw live stream or as an application-keyed
live map.
```python
from cocoindex.connectors import iggy
```
Install the optional dependency:
```bash
pip install cocoindex[iggy]
```
The connector expects an `apache_iggy.IggyClient` that you create, connect, and
provide through your app context. Streams and topics are user-managed:
CocoIndex does not create or drop them.
## As Source
### As a live stream
Use `topic_as_stream()` when every Iggy message is an event and downstream code
does not need map-style deletion semantics.
```python
def topic_as_stream(
client: IggyClient,
consumer_group: str,
stream: str,
topic: str,
*,
partition_id: int = 0,
batch_length: int = 100,
allow_replay: bool = False,
initial_high_watermark: int | None = None,
) -> TopicStream
```
`TopicStream.payloads()` adapts the stream to `LiveStream[bytes]` when the
processing logic only needs message payload bytes.
```python
events = iggy.topic_as_stream(
client,
consumer_group="cocoindex-worker",
stream="orders",
topic="events",
).payloads()
```
### As a live keyed map
Use `topic_as_map()` when message payloads encode an application-level key and
you want CocoIndex to treat the topic as a live map.
```python
def topic_as_map(
client: IggyClient,
consumer_group: str,
stream: str,
topic: str,
*,
key: KeyFn,
is_deletion: IsDeleteFn | None = None,
partition_id: int = 0,
batch_length: int = 100,
allow_replay: bool = False,
initial_high_watermark: int | None = None,
) -> LiveMapFeed[StableKey, ReceiveMessage]
```
Iggy Python messages do not expose Kafka-style keys or tombstones, so `key` is
required. Return `None` from `key` to skip a message. Pass `is_deletion` if your
payload format has application-level delete events.
```python
import json
def key(message) -> str | None:
payload = json.loads(message.payload())
return payload.get("id")
items = iggy.topic_as_map(
client,
consumer_group="cocoindex-worker",
stream="orders",
topic="events",
key=key,
)
```
### Readiness and offsets
The source connector disables Iggy auto-commit and stores offsets after the
downstream readiness handle completes. This mirrors Kafka-style back-pressure:
an offset is stored only after CocoIndex has finished processing the message.
For single-partition topics, the connector can infer the initial high watermark
from Iggy's topic details. For multi-partition topics, pass
`initial_high_watermark` for the consumed partition; the current Python SDK does
not expose per-partition high-watermark callbacks.
## As Target
The target connector sends bytes or strings to a user-managed Iggy
stream/topic/partition.
```python
IGGY = coco.ContextKey[IggyClient]("iggy")
target = await iggy.mount_iggy_topic_target(
IGGY,
stream="orders",
topic="derived-events",
partition=0,
)
target.declare_target_state(key="order-123", value=b'{"status":"ready"}')
```
Deletes need an application-level delete payload because Iggy does not have
Kafka-style tombstones:
```python
target = await iggy.mount_iggy_topic_target(
IGGY,
stream="orders",
topic="derived-events",
deletion_value_fn=lambda key: f'{{"id":{key!r},"deleted":true}}',
)
```
### Target APIs
```python
def declare_iggy_topic_target(
client: ContextKey[IggyClient],
stream: str,
topic: str,
*,
partition: int = 0,
deletion_value_fn: DeletionValueFn | None = None,
) -> IggyTopicTarget[PendingS]
```
```python
async def mount_iggy_topic_target(
client: ContextKey[IggyClient],
stream: str,
topic: str,
*,
partition: int = 0,
deletion_value_fn: DeletionValueFn | None = None,
) -> IggyTopicTarget[ResolvedS]
```
---
# Kafka connector
Source: https://cocoindex.io/docs/connectors/kafka/
The `kafka` connector supports Kafka as both a **source** (consuming messages as a live keyed map, or as a raw event stream) and a **target** (producing messages for declared target states).
```python
from cocoindex.connectors import kafka
```
**Note — Dependencies**
This connector requires additional dependencies. Install with:
```bash
pip install cocoindex[kafka]
```
## As source
The `kafka` connector can treat a Kafka topic as a live keyed map — each message is an upsert or delete for a key. It returns a [`LiveMapFeed`](../advanced_topics/live_component#livemapfeed-and-livemapview) for use with `mount_each()`.
### Setting up a consumer
Create an `AIOConsumer` directly — no `ContextKey` needed. The consumer must be **unsubscribed** (the connector handles subscription internally to manage partition rebalance callbacks).
```python
from confluent_kafka.aio import AIOConsumer
consumer = AIOConsumer({
"bootstrap.servers": "localhost:9092",
"group.id": "my-group",
"enable.auto.commit": "false",
})
```
### `topic_as_map()`
```python
def topic_as_map(
consumer: AIOConsumer,
topics: list[str],
*,
is_deletion: IsDeleteFn | None = None,
) -> LiveMapFeed[bytes | str, Message]:
```
**Parameters:**
- `consumer` — An unsubscribed `AIOConsumer`. Auto-commit should be disabled.
- `topics` — Topics to subscribe to.
- `is_deletion` — Optional predicate `(message: Message) -> bool` for custom deletion detection on non-tombstone messages (see [Deletion handling](#deletion-handling)).
**Returns:** A `LiveMapFeed[bytes | str, Message]` where each item is keyed by the message key and the value is the full `confluent_kafka.Message` object.
### Deletion handling
Messages with `None` value (Kafka tombstones) are **always** treated as deletions. The optional `is_deletion` predicate provides additional deletion logic for non-tombstone messages:
```python
# Default: only tombstones are deletions
items = kafka.topic_as_map(consumer, ["my-topic"])
# Custom: also treat messages with a specific header as deletions
items = kafka.topic_as_map(
consumer, ["my-topic"],
is_deletion=lambda msg: msg.value() == b"DELETED",
)
```
### Offset management
Offsets are committed automatically with at-least-once semantics. Messages are processed in parallel, but an offset is only committed after all earlier messages in the same partition have been fully processed. Messages with `None` keys are logged as errors and skipped.
### Readiness
The feed signals readiness after catching up to the high watermark offsets that existed when consumption started. After that, it continues consuming indefinitely until the app is stopped.
### Example
```python
from collections.abc import AsyncIterator
from confluent_kafka import Message
from confluent_kafka.aio import AIOConsumer
from cocoindex.connectors import kafka, localfs
import cocoindex as coco
@coco.fn(memo=True)
async def process_message(msg: Message, target: localfs.DirTarget) -> None:
key = msg.key()
value = msg.value()
if isinstance(key, bytes):
key = key.decode()
target.declare_file(filename=f"{key}.bin", content=value)
@coco.fn
async def app_main(outdir: pathlib.Path) -> None:
target = await localfs.mount_dir_target(outdir)
consumer = AIOConsumer({
"bootstrap.servers": "localhost:9092",
"group.id": "my-group",
"enable.auto.commit": "false",
})
items = kafka.topic_as_map(consumer, ["my-topic"])
await coco.mount_each(process_message, items, target)
app = coco.App(
coco.AppConfig(name="KafkaToFiles"),
app_main,
outdir=pathlib.Path("./out"),
)
app.update_blocking(live=True)
```
## As a live stream
Some downstream connectors don't model Kafka messages as a keyed map — they treat each message as an opaque event payload (e.g. the [OCI Object Storage](./oci_object_storage#live-bucket-watching) live mode ingests Object Storage event JSON delivered through OCI Streaming). For these, the `kafka` connector exposes a topic as a [`LiveStream`](../advanced_topics/live_component#livestream) — CocoIndex's keyless counterpart to `LiveMapFeed`, an opaque sequence of messages with the same readiness and back-pressure machinery but no key/delete semantics.
Set up the consumer the same way as for [`topic_as_map()`](#setting-up-a-consumer).
### `topic_as_stream()`
```python
def topic_as_stream(
consumer: AIOConsumer,
topics: list[str],
) -> TopicStream:
```
**Parameters:**
- `consumer` — An unsubscribed `AIOConsumer`. Auto-commit should be disabled.
- `topics` — Topics to subscribe to.
**Returns:** A `TopicStream` (a `LiveStream[Message]`) that delivers raw `confluent_kafka.Message` objects to its subscriber. [Offset management](#offset-management) and [readiness](#readiness) behave identically to `topic_as_map()`.
### `TopicStream.payloads()`
```python
def payloads(self) -> LiveStream[bytes]:
```
A view over the same `TopicStream` that yields each message's value (the byte payload) instead of the full `Message`. Null-valued messages (Kafka tombstones) are filtered out of this view; consumers that need tombstone semantics should subscribe to the `TopicStream` directly.
This is the typical input for sources that consume opaque event payloads:
```python
from confluent_kafka.aio import AIOConsumer
from cocoindex.connectors import kafka, oci_object_storage
consumer = AIOConsumer({
"bootstrap.servers": "localhost:9092",
"group.id": "my-group",
"enable.auto.commit": "false",
})
topic_stream = kafka.topic_as_stream(consumer, ["object-storage-events"])
walker = oci_object_storage.list_objects(
client, "my-namespace", "my-bucket",
live_stream=topic_stream.payloads(),
)
```
For a complete app using this pattern, see the [OCI Object Storage live-mode example](./oci_object_storage#example).
**Caution**
A `TopicStream` (and any `payloads()` view of it) supports at most one active watcher — the underlying consumer can hold only one subscription. A second concurrent `watch()` raises `RuntimeError`.
## As target
The `kafka` connector provides target state APIs for producing messages to Kafka topics. Topics are user-managed (CocoIndex does not create or drop topics) — CocoIndex only produces messages to them.
### Setting up a producer
Create a `ContextKey[AIOProducer]` to identify your producer, then provide it in your lifespan:
**Note**
The key name is load-bearing across runs — it's the stable identity CocoIndex uses to track target state for topics produced through this key. See [ContextKey as stable identity](../programming_guide/context#contextkey-as-stable-identity) before renaming.
```python
from confluent_kafka import AIOProducer
import cocoindex as coco
KAFKA_PRODUCER = coco.ContextKey[AIOProducer]("my_kafka_producer")
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
producer = AIOProducer({"bootstrap.servers": "localhost:9092"})
builder.provide(KAFKA_PRODUCER, producer)
yield
```
### Declaring target states
#### Topics (parent state)
Declares a topic as a target state. Returns a `KafkaTopicTarget` for declaring messages.
```python
def declare_kafka_topic_target(
producer: ContextKey[AIOProducer],
topic: str,
*,
deletion_value_fn: DeletionValueFn | None = None,
) -> KafkaTopicTarget[coco.PendingS]
```
**Parameters:**
- `producer` — A `ContextKey[AIOProducer]` identifying the producer to use.
- `topic` — The Kafka topic name.
- `deletion_value_fn` — Optional callback that produces a deletion value for a given key (see [Deletion handling](#deletion-handling)).
**Returns:** A pending `KafkaTopicTarget`. Use the async convenience wrapper to resolve:
```python
topic_target = await kafka.mount_kafka_topic_target(
KAFKA_PRODUCER, "my-topic"
)
```
#### Messages (child states)
Once a `KafkaTopicTarget` is resolved, declare target states to produce messages:
```python
def KafkaTopicTarget.declare_target_state(
self,
*,
key: bytes | str,
value: bytes | str,
) -> None
```
**Parameters:**
- `key` — The message key, used as the stable identity for change detection.
- `value` — The message value.
CocoIndex fingerprints the value and only produces a message when it has changed since the last run.
### Deletion handling
When a previously declared target state is no longer declared, CocoIndex produces a deletion message. The behavior depends on `deletion_value_fn`:
- **Without callback** (default): Produces a message with the key and no value (Kafka tombstone).
- **With callback**: Calls `deletion_value_fn(key)` to produce the deletion value.
```python
# Tombstone on deletion (default)
topic_target = await kafka.mount_kafka_topic_target(
KAFKA_PRODUCER, "my-topic"
)
# Custom deletion value
topic_target = await kafka.mount_kafka_topic_target(
KAFKA_PRODUCER, "my-topic",
deletion_value_fn=lambda key: b'{"deleted": true}',
)
```
### Example
```python
from collections.abc import AsyncIterator
from confluent_kafka import AIOProducer
from cocoindex.connectors import kafka, localfs
import cocoindex as coco
KAFKA_PRODUCER = coco.ContextKey[AIOProducer]("my_kafka_producer")
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
producer = AIOProducer({"bootstrap.servers": "localhost:9092"})
builder.provide(KAFKA_PRODUCER, producer)
yield
@coco.fn(memo=True)
async def process_file(
file: localfs.File, topic_target: kafka.KafkaTopicTarget
) -> None:
content = await file.read_bytes()
topic_target.declare_target_state(
key=file.file_path.path.as_posix().encode(),
value=content,
)
@coco.fn
async def app_main() -> None:
topic_target = await kafka.mount_kafka_topic_target(
KAFKA_PRODUCER, "file-contents"
)
files = localfs.walk_dir(localfs.FilePath(path="./data"))
await coco.mount_each(process_file, files.items(), topic_target)
app = coco.App(
coco.AppConfig(name="FilesToKafka"),
app_main,
)
app.update_blocking(report_to_stdout=True)
```
---
# LanceDB connector
Source: https://cocoindex.io/docs/connectors/lancedb/
The `lancedb` connector provides utilities for writing rows to LanceDB tables, with automatic schema inference from Python classes and support for declaring vector and full-text search (FTS) indexes. CocoIndex manages the table lifecycle — creating, dropping, and evolving the schema — and keeps rows in sync via incremental upserts and deletions.
```python
from cocoindex.connectors import lancedb
```
**Note — Dependencies**
This connector requires additional dependencies. Install with:
```bash
pip install cocoindex[lancedb]
```
## Connection setup
LanceDB connections are created directly via the LanceDB library. CocoIndex exposes thin wrappers:
```python
async def connect_async(uri: str, **options: Any) -> LanceAsyncConnection
def connect(uri: str, **options: Any) -> lancedb.DBConnection
```
**Parameters:**
- `uri` — LanceDB URI (local path like `"./lancedb_data"` or cloud URI like `"s3://bucket/path"`).
- `**options` — Additional options passed directly to `lancedb.connect_async()` / `lancedb.connect()`.
**Returns:** A LanceDB connection.
**Example:**
```python
conn = await lancedb.connect_async("./lancedb_data")
```
## As target
The `lancedb` connector provides target state APIs for writing rows to tables. CocoIndex tracks what rows should exist and automatically handles upserts and deletions.
### Declaring target states
#### Setting up a connection
Create a `ContextKey[lancedb.LanceAsyncConnection]` to identify your LanceDB connection, then provide it in your lifespan:
**Note**
The key name is load-bearing across runs — it's the stable identity CocoIndex uses to track managed tables. See [ContextKey as stable identity](../programming_guide/context#contextkey-as-stable-identity) before renaming.
```python
import cocoindex as coco
LANCE_DB = coco.ContextKey[lancedb.LanceAsyncConnection]("main_db")
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
conn = await lancedb.connect_async(LANCEDB_URI)
builder.provide(LANCE_DB, conn)
yield
```
#### Tables (parent state)
Declares a table as a target state. Returns a `TableTarget` for declaring rows.
```python
def declare_table_target(
db: ContextKey[LanceAsyncConnection],
table_name: str,
table_schema: TableSchema[RowT],
*,
managed_by: Literal["system", "user"] = "system",
num_transactions_before_optimize: int = 50,
) -> TableTarget[RowT, coco.PendingS]
```
**Parameters:**
- `db` — A `ContextKey[LanceAsyncConnection]` identifying the connection to use.
- `table_name` — Name of the table.
- `table_schema` — Schema definition including columns and primary key (see [Table Schema](#table-schema-from-python-class)).
- `managed_by` — Whether CocoIndex manages the table lifecycle (`"system"`) or assumes it exists (`"user"`).
- `num_transactions_before_optimize` — Number of successful row mutation batches before scheduling a background LanceDB `table.optimize()` call.
**Returns:** A pending `TableTarget`. Use the convenience wrapper `await lancedb.mount_table_target(LANCE_DB, table_name, table_schema)` to resolve.
#### Rows (child states)
Once a `TableTarget` is resolved, declare rows to be upserted:
```python
def TableTarget.declare_row(
self,
*,
row: RowT,
) -> None
```
**Parameters:**
- `row` — A row object (dict, dataclass, NamedTuple, or Pydantic model). Must include all primary key columns.
#### Vector indexes (attachment)
Declare a vector index on a vector column to accelerate similarity search. Vector indexes are an [attachment](../advanced_topics/custom_target_connector#implementing-attachment-providers) to a `TableTarget`:
```python
def TableTarget.declare_vector_index(
self,
*,
name: str | None = None,
column: str,
metric: Literal["cosine", "l2", "dot"] = "cosine",
index_type: Literal["ivf_pq", "hnsw_pq"] = "ivf_pq",
num_partitions: int | None = None,
num_sub_vectors: int | None = None,
num_bits: int | None = None,
m: int | None = None,
ef_construction: int | None = None,
) -> None
```
**Parameters:**
- `name` — Logical index name (defaults to `column`).
- `column` — Vector column to index.
- `metric` — Distance metric: `"cosine"` (default), `"l2"`, or `"dot"`.
- `index_type` — Index algorithm: `"ivf_pq"` (IVF-PQ, default) or `"hnsw_pq"` (HNSW-PQ).
- `num_partitions` — *(IVF-PQ only)* Number of IVF partitions.
- `num_sub_vectors` — *(IVF-PQ / HNSW-PQ)* Number of PQ sub-vectors.
- `num_bits` — *(IVF-PQ / HNSW-PQ)* Number of bits per PQ code.
- `m` — *(HNSW-PQ only)* Maximum number of HNSW edges per node.
- `ef_construction` — *(HNSW-PQ only)* Size of the HNSW candidate list during build.
Parameters left as `None` fall back to LanceDB's defaults.
**Example:**
```python
table.declare_vector_index(column="embedding", metric="cosine")
```
#### FTS indexes (attachment)
Declare a full-text search (FTS) index on a text column to enable keyword and phrase search. Like vector indexes, FTS indexes are an [attachment](../advanced_topics/custom_target_connector#implementing-attachment-providers) to a `TableTarget`:
```python
def TableTarget.declare_fts_index(
self,
*,
name: str | None = None,
column: str,
language: str = "English",
with_position: bool = True,
) -> None
```
**Parameters:**
- `name` — Logical index name (defaults to `column`).
- `column` — Text column to index.
- `language` — Tokenizer language (e.g. `"English"`, `"Chinese"`).
- `with_position` — Whether to store token positions (enables phrase queries). Defaults to `True`.
**Example:**
```python
table.declare_fts_index(column="content")
```
**Note**
Indexes are reconciled as part of the table's target state: changing a declaration replaces the index in place, removing a declaration drops the index, and dropping the table removes all its indexes.
### Table schema: from Python class
Define the table structure using a Python class (dataclass, NamedTuple, or Pydantic model):
```python
@classmethod
async def TableSchema.from_class(
cls,
record_type: type[RowT],
primary_key: list[str],
*,
column_specs: dict[str, LanceType | VectorSchemaProvider] | None = None,
) -> TableSchema[RowT]
```
**Parameters:**
- `record_type` — A record type whose fields define table columns.
- `primary_key` — List of column names forming the primary key.
- `column_specs` — Optional per-column overrides for type mapping or vector configuration.
**Example:**
```python
@dataclass
class OutputDocument:
doc_id: str
title: str
content: str
embedding: Annotated[NDArray, embedder]
schema = await lancedb.TableSchema.from_class(
OutputDocument,
primary_key=["doc_id"],
)
```
Python types are automatically mapped to PyArrow types:
| Python Type | PyArrow Type |
|-------------|--------------|
| `bool` | `bool` |
| `int` | `int64` |
| `float` | `float64` |
| `str` | `string` |
| `bytes` | `binary` |
| `list`, `dict`, nested structs | `string` (JSON encoded) |
| `NDArray` (with vector schema) | `fixed_size_list` |
To override the default mapping, provide a `LanceType` or `VectorSchemaProvider` via:
- **Type annotation** — using `typing.Annotated` on the field
- **`column_specs`** — passing overrides when constructing `TableSchema`
#### LanceType
Use `LanceType` to specify a custom PyArrow type or encoder:
```python
from typing import Annotated
from cocoindex.connectors.lancedb import LanceType
import pyarrow as pa
@dataclass
class MyRow:
id: Annotated[int, LanceType(pa.int32())]
value: Annotated[float, LanceType(pa.float32())]
```
#### VectorSchemaProvider
For `NDArray` fields, a `VectorSchemaProvider` annotation specifies the vector dimension and dtype. The annotation accepts a `VectorSchemaProvider`, a `ContextKey`, or an explicit `VectorSchema`. See [Vector Schema](../common_resources/vector_schema#vectorschemaprovider) for details.
### Table schema: explicit column definitions
Define columns directly using `ColumnDef`:
```python
def TableSchema.__init__(
self,
columns: dict[str, ColumnDef],
primary_key: list[str],
) -> None
```
**Example:**
```python
schema = lancedb.TableSchema(
{
"doc_id": lancedb.ColumnDef(type=pa.string(), nullable=False),
"title": lancedb.ColumnDef(type=pa.string()),
"content": lancedb.ColumnDef(type=pa.string()),
"embedding": lancedb.ColumnDef(type=pa.list_(pa.float32(), list_size=384)),
},
primary_key=["doc_id"],
)
```
### Example
```python
import cocoindex as coco
from cocoindex.connectors import lancedb
LANCEDB_URI = "./lancedb_data"
LANCE_DB = coco.ContextKey[lancedb.LanceAsyncConnection]("main_db")
@dataclass
class OutputDocument:
doc_id: str
title: str
content: str
embedding: Annotated[NDArray, embedder]
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
conn = await lancedb.connect_async(LANCEDB_URI)
builder.provide(LANCE_DB, conn)
yield
@coco.fn
async def app_main() -> None:
# Declare table target state
table = await lancedb.mount_table_target(
LANCE_DB,
"documents",
await lancedb.TableSchema.from_class(
OutputDocument,
primary_key=["doc_id"],
),
)
# Declare a vector index for similarity search
table.declare_vector_index(column="embedding", metric="cosine")
# Declare rows
for doc in documents:
table.declare_row(row=doc)
```
---
# Local filesystem connector
Source: https://cocoindex.io/docs/connectors/localfs/
The `localfs` connector provides utilities for reading files from and writing files to the local file system.
```python
from cocoindex.connectors import localfs
```
## Stable memoization with FilePath
A key feature of the `localfs` connector is [**stable memoization**](../programming_guide/function) through `FilePath`. When you move your entire project directory, memoization keys remain stable as long as you use the same `ContextKey` key string for the base directory.
### Using ContextKey for stable base directories
Define a `ContextKey[pathlib.Path]` with a stable string identifier. Provide the actual path in your app's lifespan, then pass the key directly to `walk_dir()`, `declare_dir_target()`, and related functions.
```python
import pathlib
import cocoindex as coco
from cocoindex.connectors import localfs
# Define a stable key (the string "source_dir" is the stable memo identifier)
SOURCE_DIR = coco.ContextKey[pathlib.Path]("source_dir")
@coco.fn
async def app_main() -> None:
async for file in localfs.walk_dir(SOURCE_DIR, recursive=True):
# file.file_path has a stable memo key based on "source_dir"
await process(file)
# In your lifespan, provide the actual path:
async def lifespan(builder: coco.EnvBuilder) -> None:
builder.provide(SOURCE_DIR, pathlib.Path("./data"))
```
When you move your project to a different location, just update the path in `builder.provide()` — memoization keys remain the same because they're based on the stable key string (`"source_dir"`), not the filesystem path.
### FilePath
`FilePath` combines an optional base directory key and a relative path. Passing a `ContextKey[Path]` directly to any localfs function is equivalent to constructing `FilePath(base_dir=key)`.
`FilePath` supports all `pathlib.PurePath` operations:
```python
SOURCE_DIR = coco.ContextKey[pathlib.Path]("source_dir")
source = localfs.FilePath(base_dir=SOURCE_DIR)
# Create paths using the / operator
config_path = source / "config" / "settings.json"
# Access path properties
print(config_path.name) # "settings.json"
print(config_path.suffix) # ".json"
print(config_path.parent) # FilePath pointing to "config/"
# Resolve to absolute path (requires active component context)
abs_path = config_path.resolve() # pathlib.Path
```
See [FilePath](../common_resources/data_types#filepath) in Resource Types for full details.
## As source
Use `walk_dir()` to iterate over files in a directory. It returns a `DirWalker` that supports both synchronous and asynchronous iteration.
```python
def walk_dir(
path: FilePath | Path | ContextKey[Path],
*,
live: bool = False,
recursive: bool = False,
path_matcher: FilePathMatcher | None = None,
) -> DirWalker
```
**Parameters:**
- `path` — The root directory path to walk through. Can be a `FilePath`, a `pathlib.Path`, or a `ContextKey[Path]` (equivalent to `FilePath(base_dir=path)`).
- `live` — If `True`, `items()` returns a [`LiveMapView`](../advanced_topics/live_component#livemapfeed-and-livemapview) that supports live file watching via `mount_each()`.
- `recursive` — If `True`, recursively walk subdirectories.
- `path_matcher` — Optional filter for files and directories. See [PatternFilePathMatcher](../common_resources/data_types#patternfilepathmatcher).
**Returns:** A `DirWalker` that supports async iteration via `async for`.
### Iterating files
`walk_dir()` returns a `DirWalker` that supports async iteration, yielding `File` objects (implementing the [`FileLike`](../common_resources/data_types#filelike) base class):
```python
async for file in localfs.walk_dir("/path/to/documents", recursive=True):
text = await file.read_text()
...
```
File I/O runs in a thread pool, keeping the event loop responsive.
### Keyed iteration with `items()`
`DirWalker.items()` yields keyed `(str, File)` pairs, useful for associating each file with a stable string key (its relative path):
```python
async for key, file in localfs.walk_dir("/path/to/dir", recursive=True).items():
content = await file.read()
```
### Filtering files
Use `PatternFilePathMatcher` to filter which files and directories are included:
```python
from cocoindex.connectors import localfs
from cocoindex.resources.file import PatternFilePathMatcher
# Include only .py and .md files, exclude hidden directories and test files
matcher = PatternFilePathMatcher(
included_patterns=["**/*.py", "**/*.md"],
excluded_patterns=["**/.*", "**/test_*", "**/__pycache__"],
)
async for file in localfs.walk_dir("/path/to/project", recursive=True, path_matcher=matcher):
await process(file)
```
### Live file watching
When `live=True`, `items()` returns a [`LiveMapView`](../advanced_topics/live_component#livemapfeed-and-livemapview) instead of a plain `AsyncIterable`. Combined with [`mount_each()`](../programming_guide/processing_component#mount_each), this enables automatic incremental file watching — new, modified, and deleted files are processed without a full rescan:
```python
files = localfs.walk_dir(
sourcedir, recursive=True,
path_matcher=PatternFilePathMatcher(included_patterns=["**/*.md"]),
live=True,
)
await coco.mount_each(process_file, files.items(), target)
```
See [Live Mode](../programming_guide/live_mode) for how this works and how to enable it on the app.
### Example
```python
import pathlib
import cocoindex as coco
from cocoindex.connectors import localfs
from cocoindex.resources.file import FileLike, PatternFilePathMatcher
SOURCE_DIR = coco.ContextKey[pathlib.Path]("source_dir")
@coco.fn
async def app_main() -> None:
matcher = PatternFilePathMatcher(included_patterns=["**/*.md"])
async for file in localfs.walk_dir(SOURCE_DIR, recursive=True, path_matcher=matcher):
await coco.mount(
coco.component_subpath("file", str(file.file_path.path)),
process_file,
file,
)
@coco.fn(memo=True)
async def process_file(file: FileLike) -> None:
text = await file.read_text()
# ... process the file content ...
```
## As target
The `localfs` connector provides target state APIs for writing files. CocoIndex tracks what files should exist and automatically handles creation, updates, and deletion.
### declare_file
Declare a single file target. This is the simplest way to write a file.
```python
@coco.fn
def declare_file(
path: FilePath | Path | ContextKey[Path],
content: bytes | str,
*,
create_parent_dirs: bool = False,
) -> None
```
**Parameters:**
- `path` — The filesystem path for the file. Can be a `FilePath`, a `pathlib.Path`, or a `ContextKey[Path]`.
- `content` — The file content (bytes or str).
- `create_parent_dirs` — If `True`, create parent directories if they don't exist.
**Example:**
```python
OUTPUT_DIR = coco.ContextKey[pathlib.Path]("output_dir")
@coco.fn
def app_main() -> None:
coco.mount(
localfs.declare_file,
localfs.FilePath("readme.txt", base_dir=OUTPUT_DIR),
content="Hello, world!",
create_parent_dirs=True,
)
```
### declare_dir_target
Declare a directory target for writing multiple files. Returns a `DirTarget` for declaring files within.
```python
@coco.fn
def declare_dir_target(
path: FilePath | Path | ContextKey[Path],
*,
create_parent_dirs: bool = True,
) -> DirTarget[coco.PendingS]
```
**Parameters:**
- `path` — The filesystem path for the directory. Can be a `FilePath`, a `pathlib.Path`, or a `ContextKey[Path]`.
- `create_parent_dirs` — If `True`, create parent directories if they don't exist. Defaults to `True`.
**Returns:** A pending `DirTarget`. Use `await coco.mount_target(...)` or the convenience wrapper `await localfs.mount_dir_target(path)` to resolve.
### DirTarget.declare_file
Declares a file to be written within the directory.
```python
def declare_file(
self,
filename: str | PurePath,
content: bytes | str,
*,
create_parent_dirs: bool = False,
) -> None
```
**Parameters:**
- `filename` — The name of the file (can include subdirectory path).
- `content` — The file content (bytes or str).
- `create_parent_dirs` — If `True`, create parent directories within the target directory.
### DirTarget.declare_dir_target
Declares a subdirectory target within the directory.
```python
def declare_dir_target(
self,
path: str | PurePath,
*,
create_parent_dirs: bool = False,
) -> DirTarget[coco.PendingS]
```
**Parameters:**
- `path` — The path of the subdirectory (relative to this directory).
- `create_parent_dirs` — If `True`, create parent directories.
**Returns:** A `DirTarget` for the subdirectory.
### Target example
```python
import pathlib
import cocoindex as coco
from cocoindex.connectors import localfs
from cocoindex.resources.file import FileLike, PatternFilePathMatcher
SOURCE_DIR = coco.ContextKey[pathlib.Path]("source_dir")
OUTPUT_DIR = coco.ContextKey[pathlib.Path]("output_dir")
@coco.fn
async def app_main() -> None:
# Declare output directory target using context key
target = await localfs.mount_dir_target(OUTPUT_DIR)
# Process files and write outputs
await coco.mount_each(process_file, localfs.walk_dir(SOURCE_DIR, recursive=True).items(), target)
@coco.fn(memo=True)
async def process_file(file: FileLike, target: localfs.DirTarget) -> None:
# Transform the file
content = (await file.read_text()).upper()
# Write to output with same relative path
target.declare_file(
filename=file.file_path.path,
content=content,
create_parent_dirs=True,
)
```
---
# Neo4j connector
Source: https://cocoindex.io/docs/connectors/neo4j/
The `neo4j` connector writes records to [Neo4j](https://neo4j.com), a property graph database. It supports node tables (labels), relationship tables (edge types), per-database multitenancy (one Neo4j cluster, many isolated databases), real Cypher uniqueness constraints, and vector indexes via the `CREATE VECTOR INDEX` DDL form.
```python
from cocoindex.connectors import neo4j
```
**Note — Dependencies**
This connector requires additional dependencies. Install with:
```bash
pip install cocoindex[neo4j]
```
Targets Neo4j 5.18+. Vector-index DDL (`CREATE VECTOR INDEX … OPTIONS { indexConfig: { … } }`) shipped in 5.18 — older 5.x servers will reject the DDL the connector emits.
## Connection setup
Create a `ConnectionFactory` and provide it via a `ContextKey`. The factory holds the Bolt URI, optional auth, and the target database name; it lazily opens a Neo4j async driver and returns a graph handle on demand.
**Note**
The key name is load-bearing across runs — it's the stable identity CocoIndex uses to track managed rows. See [ContextKey as stable identity](../programming_guide/context#contextkey-as-stable-identity) before renaming.
```python
from collections.abc import AsyncIterator
from cocoindex.connectors import neo4j
import cocoindex as coco
KG_DB: coco.ContextKey[neo4j.ConnectionFactory] = coco.ContextKey("kg_db")
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
builder.provide(
KG_DB,
neo4j.ConnectionFactory(
uri="bolt://localhost:7687",
auth=("neo4j", "cocoindex"),
database="neo4j",
),
)
yield
```
`auth` is optional — omit it for unauthenticated dev instances. `database` defaults to `"neo4j"` (the default db that ships with every Neo4j 5 installation).
### Multitenancy
A single Neo4j cluster can host many isolated databases. Pair each database with its own `ContextKey` and `ConnectionFactory(database=...)`:
```python
KG_DB: coco.ContextKey[neo4j.ConnectionFactory] = coco.ContextKey("kg_db")
APIS_DB: coco.ContextKey[neo4j.ConnectionFactory] = coco.ContextKey("apis_db")
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
uri = "bolt://localhost:7687"
auth = ("neo4j", "cocoindex")
builder.provide(KG_DB, neo4j.ConnectionFactory(uri=uri, auth=auth, database="kg"))
builder.provide(APIS_DB, neo4j.ConnectionFactory(uri=uri, auth=auth, database="apis"))
yield
```
Different `ContextKey`s with different database names produce fully separate target-state trees — changes to one never spill into the other.
## As target
The `neo4j` connector provides target state APIs for writing records to node tables and relation tables. CocoIndex tracks what records should exist and automatically handles upserts and deletions.
Each apply batch is wrapped in a single Neo4j transaction (`tx.commit()` on success, rollback on exception), so partial writes never leak into the database. Within a batch, writes are ordered as **node upserts → relation upserts → relation deletes → node deletes** so dependent edges always see their endpoints.
### Declaring target states
#### Node tables (parent state)
Declares a node label as a target state. Returns a `TableTarget` for declaring records.
```python
def declare_table_target(
db: ContextKey,
table_name: str,
table_schema: TableSchema[RowT] | None = None,
*,
primary_key: str = "id",
managed_by: Literal["system", "user"] = "system",
) -> TableTarget[RowT, coco.PendingS]
```
**Parameters:**
- `db` — A `ContextKey[neo4j.ConnectionFactory]` for the Neo4j connection.
- `table_name` — The Cypher node label (e.g. `"Document"`).
- `table_schema` — Optional schema definition (see [Table Schema](#table-schema-from-python-class)). The schema participates in CocoIndex's fingerprint (so two flows declaring the same label must agree); per-property type DDL is not emitted in v1.
- `primary_key` — Single property name used as the node's primary key. Defaults to `"id"`. Compound primary keys are not supported in v1.0.
- `managed_by` — Whether CocoIndex manages the table lifecycle (`"system"`) or assumes it exists (`"user"`).
**Returns:** A pending `TableTarget`. Use `await neo4j.mount_table_target(KG_DB, ...)` to get a resolved target.
#### Records (child states)
Once a `TableTarget` is resolved, declare records to be upserted (translated to `MERGE (n:Label {pk: $key_0}) SET n += $props`):
```python
def TableTarget.declare_record(
self,
*,
row: RowT,
) -> None
```
**Parameters:**
- `row` — A row object (dict, dataclass, NamedTuple, or Pydantic model). Must include the `primary_key` field declared above.
`declare_row` is an alias for `declare_record`, for compatibility with Postgres and other RDBMS targets.
#### Relation tables (parent state)
Declares a relationship type as a target state. Returns a `RelationTarget` for declaring edges.
```python
def declare_relation_target(
db: ContextKey,
table_name: str,
from_table: TableTarget,
to_table: TableTarget,
table_schema: TableSchema[RowT] | None = None,
*,
primary_key: str = "id",
managed_by: Literal["system", "user"] = "system",
) -> RelationTarget[RowT, coco.PendingS]
```
**Parameters:**
- `db` — A `ContextKey[neo4j.ConnectionFactory]` for the Neo4j connection.
- `table_name` — The Cypher relationship type (e.g. `"MENTION"`).
- `from_table` — The `TableTarget` whose nodes are the *source* endpoints of edges in this relationship.
- `to_table` — The `TableTarget` whose nodes are the *target* endpoints of edges in this relationship.
- `table_schema` — Optional schema for the relationship's own properties. The relationship's `primary_key` field uniquely identifies each edge.
- `primary_key` — Single property name used as the edge's primary key. Defaults to `"id"`.
- `managed_by` — Whether CocoIndex manages the relationship lifecycle (`"system"`) or assumes it exists (`"user"`).
**Returns:** A pending `RelationTarget`. Use `await neo4j.mount_relation_target(KG_DB, ...)` to get a resolved target.
#### Relations (child states)
Once a `RelationTarget` is resolved, declare edges. Each declaration produces a triple-MERGE: source endpoint, target endpoint, then the relationship.
```python
def RelationTarget.declare_relation(
self,
*,
from_id: Any,
to_id: Any,
record: RowT | None = None,
) -> None
```
**Parameters:**
- `from_id` — The source node's primary-key value. The connector MERGEs `(s:FromLabel {pk: $from_id})` so endpoints are auto-created if absent.
- `to_id` — The target node's primary-key value. Same MERGE behavior.
- `record` — Optional row object whose fields populate the relationship's properties. Must include the relationship's `primary_key` field if provided.
If `record` is omitted, the connector derives a deterministic edge id of the form `{from_label}_{from_id}_{to_label}_{to_id}`. Convenient when an edge has no properties of its own.
#### Vector indexes (attachment)
Declares a vector index on a column of a node table. Vector indexes are an [attachment](../advanced_topics/custom_target_connector#implementing-attachment-providers) to a `TableTarget`:
```python
def TableTarget.declare_vector_index(
self,
*,
name: str | None = None,
field: str,
metric: Literal["cosine", "euclidean"] = "cosine",
dimension: int,
) -> None
```
**Parameters:**
- `name` — Optional logical name for the index. Defaults to `f"vec_{table_name}__{field}"`.
- `field` — The node property holding the vector.
- `metric` — Similarity metric: `"cosine"` or `"euclidean"`. Translated to Neo4j's `vector.similarity_function` option.
- `dimension` — The vector's dimension. Required.
The connector emits:
```cypher
CREATE VECTOR INDEX `coco_vec_