Building live components
Implement the LiveComponent protocol with process() for full scans and process_live(operator) for incremental updates — covers LiveComponentOperator (update_full, update, delete, mark_ready) and LiveMapFeed / LiveMapView abstractions over event sources.
By default, a processing component runs in catch-up mode — on each app.update(), it declares all target states and mounts all sub-components from scratch. CocoIndex handles incremental updates by skipping memoized sub-components and reconciling target states at the end, then the component exits. This works well when the dataset is small enough to scan fully each cycle.
When the dataset is large or you need to react to changes continuously (e.g., watching a file system), you want the component itself to stay running and react incrementally. A live component does an initial full scan (same as catch-up mode), then keeps running and reacts to individual changes without rescanning everything.
The LiveComponent protocol
A live component is a class with three methods:
class MyLiveComponent:
def __init__(self, folder: pathlib.Path, target: localfs.DirTarget) -> None:
"""Receive arguments from the mount() call."""
self.folder = folder
self.target = target
async def process(self) -> None:
"""Full processing — mount all children, declare all target states."""
...
async def process_live(self, operator: coco.LiveComponentOperator) -> None:
"""Continuous processing — orchestrate full and incremental updates."""
...
__init__receives arguments passed tococo.mount().process()does a full scan — mounts children viacoco.mount()and declares target states, just like a traditional component function. Called indirectly viaoperator.update_full().process_live(operator)is the long-running entry point. It orchestrates full and incremental updates using the operator.
CocoIndex detects a live component by checking if the class has both process and process_live methods.
LiveComponentOperator
The operator passed to process_live() provides four methods:
| Method | Description |
|---|---|
await operator.update_full() | Run process() with a full submission phase (GCs removed children). Blocks until fully ready. |
await operator.update(subpath, fn, *args, **kwargs) | Mount a child component incrementally. |
await operator.delete(subpath) | Delete a child component. |
await operator.mark_ready() | Signal that processing has caught up to the time process_live() was called. |
update_full()
Triggers a full processing cycle: calls process(), submits target states, waits for all children to be ready, and garbage-collects children that are no longer mounted. This is the same mechanism as a traditional component’s update cycle.
update() and delete()
Mount or delete individual child components without a full scan. These are concurrent with each other but serialized with update_full() — if update_full() is running, incremental operations wait until it finishes.
When multiple operations target the same subpath, only the latest one (by invocation order) takes effect.
mark_ready()
Signals to the parent that the live component has caught up. The parent’s await handle.ready() returns when mark_ready() is called. If process_live() returns without calling mark_ready(), it is called automatically.
Example: file system watcher
A component that watches a local folder and processes each file:
import pathlib
import cocoindex as coco
class FolderWatcher:
def __init__(self, folder: pathlib.Path, target) -> None:
self.folder = folder
self.target = target
async def process(self) -> None:
"""Full scan — mount a child for every file in the folder."""
for path in self.folder.iterdir():
if path.is_file():
await coco.mount(
coco.component_subpath(path.name),
process_file,
path,
self.target,
)
async def process_live(self, operator: coco.LiveComponentOperator) -> None:
# 1. Set up the file watcher before the full scan so no events are missed.
watcher = setup_watchdog(self.folder)
# 2. Full scan.
await operator.update_full()
# 3. Signal readiness — parent can proceed.
await operator.mark_ready()
# 4. React to changes.
async for event in watcher.events():
subpath = coco.component_subpath(event.filename)
if event.is_update:
await operator.update(subpath, process_file, event.path, self.target)
elif event.is_delete:
await operator.delete(subpath)
The parent mounts it like any other component:
@coco.fn
async def app_main(folder: pathlib.Path, outdir: pathlib.Path) -> None:
# Set up the target in the parent (use_mount is not allowed inside process()).
target = await localfs.mount_dir_target(outdir)
# Mount the live component.
await coco.mount(FolderWatcher, folder, target)
Example: traditional component equivalent
A traditional single-function component:
@coco.fn
async def process_all(data) -> None:
for key, value in data.items():
coco.declare_target_state(target.target_state(key, value))
is equivalent to this LiveComponent:
class ProcessAll:
def __init__(self, data):
self.data = data
async def process(self) -> None:
for key, value in self.data.items():
coco.declare_target_state(target.target_state(key, value))
async def process_live(self, operator: coco.LiveComponentOperator) -> None:
await operator.update_full()
# mark_ready() is called automatically on return.
The key difference: the LiveComponent version can later be extended to handle incremental changes in process_live() without changing process().
LiveMapFeed and LiveMapView
For a user-facing introduction to live mode and when to use it, see Live Mode. This section covers the protocol details for connector authors and advanced use cases.
Two protocols represent live keyed collections that mount_each() can consume. Choose based on whether your source can enumerate its current state:
LiveMapView— the source has enumerable current state that can be scanned (e.g., a directory listing, database table). Supports both full scans via__aiter__()and incremental changes viawatch().LiveMapFeed— the source only streams changes, with no “current state” to scan (e.g., a Kafka consumer, a webhook event stream). Provides onlywatch().
class LiveMapFeed(Protocol[K, V]):
async def watch(self, subscriber: LiveMapSubscriber[K, V]) -> None: ...
class LiveMapView(LiveMapFeed[K, V], Protocol[K, V]):
def __aiter__(self) -> AsyncIterator[tuple[K, V]]: ...
LiveMapView extends LiveMapFeed — any LiveMapView is also a valid LiveMapFeed. When mount_each() receives either protocol as its items argument, it automatically creates an internal LiveComponent:
LiveMapView:__aiter__()yields all(key, value)pairs for full scans (called inside the internalprocess()).watch()handles the full lifecycle.LiveMapFeed:process()is a no-op (no snapshot to scan). All work happens inwatch().
LiveMapSubscriber
The subscriber passed to watch() mirrors LiveComponentOperator:
| LiveMapSubscriber | LiveComponentOperator | Description |
|---|---|---|
await subscriber.update_all() | await operator.update_full() | Full re-scan of all items |
await subscriber.mark_ready() | await operator.mark_ready() | Signal readiness |
await subscriber.update(key, value) | await operator.update(subpath, fn, ...) | Incremental update; returns ComponentMountHandle |
await subscriber.delete(key) | await operator.delete(subpath) | Incremental delete; returns ComponentMountHandle |
A typical watch() implementation for a LiveMapView:
async def watch(self, subscriber: LiveMapSubscriber[K, V]) -> None:
await subscriber.update_all() # initial full scan
await subscriber.mark_ready() # signal readiness
# ... watch for changes and call subscriber.update()/delete() ...
Implementing live support for a connector
To add live support to a source connector, make your source return an object that implements LiveMapView (if the source has scannable state) or LiveMapFeed (if it only streams changes):
LiveMapViewexample: Thelocalfsconnector’sDirWalker—walk_dir(..., live=True).items()returns aLiveMapViewbacked bywatchfiles.LiveMapFeedexample: Thekafkaconnector —topic_as_map()returns aLiveMapFeedthat consumes messages from Kafka topics.
Live mode vs catch-up mode
See Live Mode for how the two modes are enabled at the app level and an overview of how they work.
For a manual LiveComponent, the mode controls whether process_live() continues running after mark_ready():
- Live mode (
live=True):process_live()continues aftermark_ready()— the component keeps watching for changes. - Catch-up mode (
live=False, default):process_live()terminates as soon asmark_ready()is awaited. No code afterawait operator.mark_ready()executes, so the component behaves like a traditional one-shot processor.
This lets you use the same LiveComponent class in both modes without code changes.
Restrictions
No use_mount() inside process()
process() may only call coco.mount() (background child mounts). Any setup that requires use_mount() — such as declaring target tables — must be done in the parent component before mounting the LiveComponent. This keeps the controller’s provider set stable across full and incremental updates.
Not allowed in use_mount()
LiveComponent classes can only be used with coco.mount() and operator.update(). Passing a LiveComponent class to coco.use_mount() raises a TypeError.
While LiveComponent classes cannot be passed to mount_each(), you can get live watching behavior more easily using a LiveMapFeed or LiveMapView — mount_each() automatically creates an internal LiveComponent when it detects one.
Readiness
The parent’s await handle.ready() returns when mark_ready() is called inside process_live(), regardless of whether process_live() is still running.
sequenceDiagram
participant Parent
participant LC as LiveComponent
participant Op as Operator
participant Children
Parent->>LC: mount()
activate LC
Note over LC: process_live(operator)
LC->>Op: update_full()
activate Op
Op->>LC: process()
activate LC
LC->>Children: mount(A), mount(B), ...
deactivate LC
Note over Op: submit + GC<br/>wait children ready
Op-->>LC: done
deactivate Op
LC->>Op: mark_ready()
Op-->>Parent: readiness resolved
rect rgb(240, 248, 255)
Note over LC,Children: Live mode only
LC->>Op: update(A)
Op->>Children: run A
LC->>Op: delete(B)
Op->>Children: delete B
Note over LC: continues...
end
deactivate LC
If process_live() returns without calling mark_ready(), it is called automatically — the parent will not hang.