The processing component
Processing components are the units of incremental execution and the sync boundaries for target states. Covers mounting APIs (mount, use_mount, mount_each, mount_target), component paths, and granularity tradeoffs.
Most apps process many independent source items — files, rows, or entities. A Processing Component is the unit of execution for one: it runs that item’s transformation logic and declares the set of target states produced for it.
Component path
A component path is the stable identifier for a processing component across runs (think of it like a path in a tree). CocoIndex uses it to match a component to its previous run, detect what changed for that item, and sync that component’s target states as a unit when it finishes. This sync happens per component; CocoIndex does not wait for other components in the same app to complete.
Component paths are hierarchical and form a tree structure. You specify child paths using coco.component_subpath() with stable identifiers like string literals, file names, row keys, or entity IDs:
coco.component_subpath(filename) # e.g., coco.component_subpath("hello.pdf")
coco.component_subpath("user", user_id) # e.g., coco.component_subpath("user", 12345)
Choose paths that are stable for the “same” item (e.g., file path, primary key). If an item disappears and its path is no longer present, CocoIndex cleans up the target states owned by that path (and its sub-paths).
Here’s an example component path tree for a pipeline that processes files:
(root) ← app_main component
└── process_file
├── "hello.pdf" ← process_file component
└── "world.pdf" ← process_file component
The tree is populated dynamically as the app runs — each mount() / mount_each() call adds a subpath.
See StableKey in the SDK Overview for details on what values can be used in component paths.
Mount
Mounting is how you declare (instantiate) a processing component within an app at a specific path, so CocoIndex knows that component exists, should run, and owns a set of target states.
CocoIndex provides two core mounting APIs:
mount()— sets up a processing component in a child path without depending on data from it. This allows the component to refresh independently in live mode.use_mount()— returns a value from the component’s execution to the caller. The component at that path cannot refresh independently without re-executing the caller.
And two sugar APIs that simplify common patterns:
mount_each()— mounts one component per item in a keyed iterablemount_target()— mounts a target without an explicit subpath
See also map() for a utility API that operates within a component without creating new ones.
Automatic subpath derivation
mount(), use_mount(), and mount_each() all accept an optional ComponentSubpath as their first argument. When omitted, the subpath is auto-derived from the function name using Symbol(fn.__name__).
# These are equivalent:
await coco.mount(process_file, file, target)
await coco.mount(coco.component_subpath(coco.Symbol("process_file")), process_file, file, target)
This means the component path for a process_file function is parent / Symbol("process_file"). The function must have a __name__ attribute; if it doesn’t (e.g., a lambda), provide an explicit subpath.
Since sibling component paths must not collide, you need an explicit subpath when:
- The same function is mounted more than once — auto-derived paths would be identical, so each call needs a distinct path (e.g.,
coco.component_subpath("session", youtube_id)) - Different functions happen to share a
__name__— rare, but possible with wrappers or closures - You want a specific path name — different from the function name
mount()
Use mount() when you don’t need a return value from the processing component. It schedules the processing component to run and returns a handle:
handle = await coco.mount(process_file, file, target)
With an explicit subpath, for example when mounting multiple components of the same function:
handle = await coco.mount(
coco.component_subpath("process", filename),
process_file,
file,
target,
)
The handle provides a method you can call if you need to wait until the processing component is fully ready — meaning all its target states have been applied to external systems and all components in its sub-paths are ready:
await handle.ready()
You usually only need to call ready() when you have logic that depends on the processing component’s target states being applied — for example, querying the latest data from a target table after syncing it.
mount() also accepts LiveComponent classes — components that process continuously and react to changes incrementally instead of rescanning everything. See Live Components for details.
use_mount()
Use use_mount() when you need the processing component’s return value. It mounts the component, waits until it’s ready, and returns the value directly:
table = await coco.use_mount(setup_table, table_name="docs")
With an explicit subpath:
table = await coco.use_mount(
coco.component_subpath("setup"),
setup_table,
table_name="docs",
)
A common use of use_mount() is to obtain a target after its container target state is applied.
mount_each()
mount_each() mounts one processing component per item in a keyed iterable.
files = localfs.walk_dir(sourcedir, path_matcher=PatternFilePathMatcher(included_patterns=["**/*.md"]))
await coco.mount_each(process_file, files.items(), target)
Each item in the iterable is a (key, value) tuple. The value is passed as the first argument to the function, and any additional arguments are passed through. Items are mounted under an auto-derived subpath (Symbol(fn.__name__)), so the component path for each item is parent / Symbol("process_file") / key.
You can provide an explicit subpath as the first argument:
await coco.mount_each(coco.component_subpath("files"), process_file, files.items(), target)
Source connectors provide an items() method that returns (StableKey, T) pairs. For example, localfs.walk_dir(...).items() yields (relative_path, File) tuples.
When a source connector supports live watching, its items() returns a LiveMapView or LiveMapFeed instead of a plain iterable. mount_each() detects this and automatically handles incremental updates — no changes to mount_each() itself are needed. See Live Mode.
mount_target()
mount_target() mounts a target without requiring an explicit subpath.
from cocoindex.connectors import localfs
dir_target = await coco.mount_target(localfs.dir_target(outdir))
The component path is derived automatically from the target’s globally unique key — you don’t need to create a component_subpath for it. This is sugar over calling use_mount() with a target declaration function.
Connectors also provide convenience methods that wrap mount_target():
# Equivalent to the above
dir_target = await localfs.mount_dir_target(outdir)
# PostgreSQL example
table = await postgres.mount_table_target(
PG_DB,
table_name="doc_embeddings",
table_schema=await postgres.TableSchema.from_class(DocEmbedding, primary_key=["id"]),
)
Using component_subpath as a context manager
You can use component_subpath() as a context manager to create nested paths without repeating common prefixes:
with coco.component_subpath("process"):
for f in files:
await coco.mount(
coco.component_subpath(str(f.relative_path)),
process_file,
f,
target,
)
This is equivalent to:
for f in files:
await coco.mount(
coco.component_subpath("process", str(f.relative_path)),
process_file,
f,
target,
)
When iterating over keyed items, prefer mount_each() — it handles the loop and subpath creation for you.
How target states sync
The component path tree determines ownership. When a component is no longer mounted at a path (e.g., a source file is deleted), CocoIndex automatically cleans up its target states — and recursively for all its sub-paths.
After a processing component finishes, CocoIndex syncs its target states:
- Compares the target states declared in this run against those from the previous run at the same path
- Applies changes as a unit — creating, updating, or deleting target states as needed
- Recursively cleans up sub-paths where components are no longer mounted
All writes happen strictly after processing completes — you never see partial effects from a processing failure or interrupt. Each target backend applies its batch atomically when supported (e.g., within a database transaction), but changes across different target backends are not transactional with each other.
What happens when a component fails
CocoIndex processes each component in two phases: processing (running your function, declaring target states) and submit (writing changes to target backends). Failure behavior depends on how the component was mounted and which phase fails.
Failure isolation
use_mount()— the parent has a data dependency on the child’s result, so the child’s exception propagates directly to the parent. The parent must handle it or it will fail too.mount()andmount_each()— the child runs in the background. A failure in one child does not affect the parent or siblings — by default the exception is logged and other components continue. This isolation is intentional: one bad file shouldn’t take down the entire pipeline.
To react to background failures, you can:
- Install exception handlers — global or scoped — to send alerts, record metrics, or implement custom logic.
- Monitor app progress —
UpdateStatsexposes per-component stats including error counts, so you can detect failures programmatically.
For the full picture — including interrupted update recovery and the exception handler API — see Error Handling.
No rollback, convergent roll-forward
CocoIndex does not roll back partial writes. The two-phase design makes this safe:
- Processing is side-effect-free — it only declares target states in memory. If processing fails (e.g., a parsing error), no writes were attempted, so there’s nothing to undo.
- Submit writes changes to target backends. If a submit fails partway through (e.g., a database connection drops), some writes may have been applied. CocoIndex does not attempt to undo them. Instead, on the next run CocoIndex computes the current desired state, and the target connector reconciles against all possible previous states — converging the target to the correct state regardless of what was partially applied. This is why built-in connectors use convergent operations like upserts (
INSERT ... ON CONFLICT DO UPDATE) rather than plain inserts.
How big should a processing component be?
When defining processing components, think about granularity — what one path represents — because it determines the sync boundary for target states.
For example, if you’re processing files:
- Coarse: one component for “all files” (
coco.component_subpath("process")) - Medium: one component per file (
coco.component_subpath("process", file_path)) - Fine: one component per chunk (
coco.component_subpath("process", file_path, chunk_id))
In general:
- Coarse-grained (fewer, larger components): More target states sync together as a unit, but you only see updates after the larger component finishes.
- Fine-grained (more, smaller components): Each component syncs its target states as soon as it finishes, but target states owned by different components do not sync together as a unit.
For small datasets, a single processing component that owns all target states is simple and ensures all target states sync together. As data grows, consider breaking it down into one component per source item (e.g., one per file) to reduce latency: you see each item’s target states synced as soon as it’s processed, without waiting for the full dataset to complete. This also helps isolate failures to that item.
Explicit context management
CocoIndex automatically propagates component context through Python’s contextvars, which works for ordinary function calls (both sync and async). However, in situations where context variables are not preserved (for example, when using concurrent.futures.ThreadPoolExecutor), you need to explicitly capture and attach the context.
Use coco.get_component_context() to capture the current context, and context.attach() to restore it:
from concurrent.futures import ThreadPoolExecutor
@coco.fn
def app_main() -> None:
# Capture the current context
ctx = coco.get_component_context()
def worker(item):
# Attach the context in the worker thread
with ctx.attach():
# Now CocoIndex APIs work correctly
process_item(item)
with ThreadPoolExecutor() as executor:
executor.map(worker, items)
This pattern ensures that CocoIndex can track component relationships and target state ownership even across thread boundaries.
Processing helpers
map()
map() applies an async function to each item in a collection, running all calls concurrently within the current processing component. Unlike mount() and mount_each(), it does not create child processing components — it’s purely concurrent execution (similar to asyncio.gather()).
@coco.fn(memo=True)
async def process_file(file: FileLike, table: postgres.TableTarget[DocEmbedding]) -> None:
chunks = splitter.split(await file.read_text())
id_gen = IdGenerator()
await coco.map(process_chunk, chunks, file.file_path.path, id_gen, table)
The first argument to the function receives each item; additional arguments are passed through to every call. map() returns a list of the results, in the same order as the input items.
When to use map() vs mount_each()
- Use
mount_each()when each item should be its own processing component — with its own component path, target state ownership, and target states sync boundary. - Use
map()when you want to process items concurrently within the current component, without creating new component boundaries. This is common for sub-item work like processing chunks within a file.