Building live components

Implement the LiveComponent protocol with process() for full scans and process_live(operator) for incremental updates — covers LiveComponentOperator (update_full, update, delete, mark_ready) and LiveMapFeed / LiveMapView abstractions over event sources.

Version
v 1.0.0-alpha48
Last reviewed
Apr 19, 2026

By default, a processing component runs in catch-up mode — on each app.update(), it declares all target states and mounts all sub-components from scratch. CocoIndex handles incremental updates by skipping memoized sub-components and reconciling target states at the end, then the component exits. This works well when the dataset is small enough to scan fully each cycle.

When the dataset is large or you need to react to changes continuously (e.g., watching a file system), you want the component itself to stay running and react incrementally. A live component does an initial full scan (same as catch-up mode), then keeps running and reacts to individual changes without rescanning everything.

The LiveComponent protocol

A live component is a class with three methods:

python
class MyLiveComponent:
    def __init__(self, folder: pathlib.Path, target: localfs.DirTarget) -> None:
        """Receive arguments from the mount() call."""
        self.folder = folder
        self.target = target

    async def process(self) -> None:
        """Full processing — mount all children, declare all target states."""
        ...

    async def process_live(self, operator: coco.LiveComponentOperator) -> None:
        """Continuous processing — orchestrate full and incremental updates."""
        ...
  • __init__ receives arguments passed to coco.mount().
  • process() does a full scan — mounts children via coco.mount() and declares target states, just like a traditional component function. Called indirectly via operator.update_full().
  • process_live(operator) is the long-running entry point. It orchestrates full and incremental updates using the operator.

CocoIndex detects a live component by checking if the class has both process and process_live methods.

LiveComponentOperator

The operator passed to process_live() provides four methods:

MethodDescription
await operator.update_full()Run process() with a full submission phase (GCs removed children). Blocks until fully ready.
await operator.update(subpath, fn, *args, **kwargs)Mount a child component incrementally.
await operator.delete(subpath)Delete a child component.
await operator.mark_ready()Signal that processing has caught up to the time process_live() was called.

update_full()

Triggers a full processing cycle: calls process(), submits target states, waits for all children to be ready, and garbage-collects children that are no longer mounted. This is the same mechanism as a traditional component’s update cycle.

update() and delete()

Mount or delete individual child components without a full scan. These are concurrent with each other but serialized with update_full() — if update_full() is running, incremental operations wait until it finishes.

When multiple operations target the same subpath, only the latest one (by invocation order) takes effect.

mark_ready()

Signals to the parent that the live component has caught up. The parent’s await handle.ready() returns when mark_ready() is called. If process_live() returns without calling mark_ready(), it is called automatically.

Example: file system watcher

A component that watches a local folder and processes each file:

python
import pathlib
import cocoindex as coco

class FolderWatcher:
    def __init__(self, folder: pathlib.Path, target) -> None:
        self.folder = folder
        self.target = target

    async def process(self) -> None:
        """Full scan — mount a child for every file in the folder."""
        for path in self.folder.iterdir():
            if path.is_file():
                await coco.mount(
                    coco.component_subpath(path.name),
                    process_file,
                    path,
                    self.target,
                )

    async def process_live(self, operator: coco.LiveComponentOperator) -> None:
        # 1. Set up the file watcher before the full scan so no events are missed.
        watcher = setup_watchdog(self.folder)

        # 2. Full scan.
        await operator.update_full()

        # 3. Signal readiness — parent can proceed.
        await operator.mark_ready()

        # 4. React to changes.
        async for event in watcher.events():
            subpath = coco.component_subpath(event.filename)
            if event.is_update:
                await operator.update(subpath, process_file, event.path, self.target)
            elif event.is_delete:
                await operator.delete(subpath)

The parent mounts it like any other component:

python
@coco.fn
async def app_main(folder: pathlib.Path, outdir: pathlib.Path) -> None:
    # Set up the target in the parent (use_mount is not allowed inside process()).
    target = await localfs.mount_dir_target(outdir)
    # Mount the live component.
    await coco.mount(FolderWatcher, folder, target)

Example: traditional component equivalent

A traditional single-function component:

python
@coco.fn
async def process_all(data) -> None:
    for key, value in data.items():
        coco.declare_target_state(target.target_state(key, value))

is equivalent to this LiveComponent:

python
class ProcessAll:
    def __init__(self, data):
        self.data = data

    async def process(self) -> None:
        for key, value in self.data.items():
            coco.declare_target_state(target.target_state(key, value))

    async def process_live(self, operator: coco.LiveComponentOperator) -> None:
        await operator.update_full()
        # mark_ready() is called automatically on return.

The key difference: the LiveComponent version can later be extended to handle incremental changes in process_live() without changing process().

LiveMapFeed and LiveMapView

Tip

For a user-facing introduction to live mode and when to use it, see Live Mode. This section covers the protocol details for connector authors and advanced use cases.

Two protocols represent live keyed collections that mount_each() can consume. Choose based on whether your source can enumerate its current state:

  • LiveMapView — the source has enumerable current state that can be scanned (e.g., a directory listing, database table). Supports both full scans via __aiter__() and incremental changes via watch().
  • LiveMapFeed — the source only streams changes, with no “current state” to scan (e.g., a Kafka consumer, a webhook event stream). Provides only watch().
python
class LiveMapFeed(Protocol[K, V]):
    async def watch(self, subscriber: LiveMapSubscriber[K, V]) -> None: ...

class LiveMapView(LiveMapFeed[K, V], Protocol[K, V]):
    def __aiter__(self) -> AsyncIterator[tuple[K, V]]: ...

LiveMapView extends LiveMapFeed — any LiveMapView is also a valid LiveMapFeed. When mount_each() receives either protocol as its items argument, it automatically creates an internal LiveComponent:

  • LiveMapView: __aiter__() yields all (key, value) pairs for full scans (called inside the internal process()). watch() handles the full lifecycle.
  • LiveMapFeed: process() is a no-op (no snapshot to scan). All work happens in watch().

LiveMapSubscriber

The subscriber passed to watch() mirrors LiveComponentOperator:

LiveMapSubscriberLiveComponentOperatorDescription
await subscriber.update_all()await operator.update_full()Full re-scan of all items
await subscriber.mark_ready()await operator.mark_ready()Signal readiness
await subscriber.update(key, value)await operator.update(subpath, fn, ...)Incremental update; returns ComponentMountHandle
await subscriber.delete(key)await operator.delete(subpath)Incremental delete; returns ComponentMountHandle

A typical watch() implementation for a LiveMapView:

python
async def watch(self, subscriber: LiveMapSubscriber[K, V]) -> None:
    await subscriber.update_all()     # initial full scan
    await subscriber.mark_ready()     # signal readiness
    # ... watch for changes and call subscriber.update()/delete() ...

Implementing live support for a connector

To add live support to a source connector, make your source return an object that implements LiveMapView (if the source has scannable state) or LiveMapFeed (if it only streams changes):

  • LiveMapView example: The localfs connector’s DirWalkerwalk_dir(..., live=True).items() returns a LiveMapView backed by watchfiles.
  • LiveMapFeed example: The kafka connector — topic_as_map() returns a LiveMapFeed that consumes messages from Kafka topics.

Live mode vs catch-up mode

See Live Mode for how the two modes are enabled at the app level and an overview of how they work.

For a manual LiveComponent, the mode controls whether process_live() continues running after mark_ready():

  • Live mode (live=True): process_live() continues after mark_ready() — the component keeps watching for changes.
  • Catch-up mode (live=False, default): process_live() terminates as soon as mark_ready() is awaited. No code after await operator.mark_ready() executes, so the component behaves like a traditional one-shot processor.

This lets you use the same LiveComponent class in both modes without code changes.

Restrictions

No use_mount() inside process()

process() may only call coco.mount() (background child mounts). Any setup that requires use_mount() — such as declaring target tables — must be done in the parent component before mounting the LiveComponent. This keeps the controller’s provider set stable across full and incremental updates.

Not allowed in use_mount()

LiveComponent classes can only be used with coco.mount() and operator.update(). Passing a LiveComponent class to coco.use_mount() raises a TypeError.

Tip

While LiveComponent classes cannot be passed to mount_each(), you can get live watching behavior more easily using a LiveMapFeed or LiveMapViewmount_each() automatically creates an internal LiveComponent when it detects one.

Readiness

The parent’s await handle.ready() returns when mark_ready() is called inside process_live(), regardless of whether process_live() is still running.

mermaid
sequenceDiagram
    participant Parent
    participant LC as LiveComponent
    participant Op as Operator
    participant Children

    Parent->>LC: mount()
    activate LC
    Note over LC: process_live(operator)

    LC->>Op: update_full()
    activate Op
    Op->>LC: process()
    activate LC
    LC->>Children: mount(A), mount(B), ...
    deactivate LC
    Note over Op: submit + GC<br/>wait children ready
    Op-->>LC: done
    deactivate Op

    LC->>Op: mark_ready()
    Op-->>Parent: readiness resolved

    rect rgb(240, 248, 255)
        Note over LC,Children: Live mode only
        LC->>Op: update(A)
        Op->>Children: run A
        LC->>Op: delete(B)
        Op->>Children: delete B
        Note over LC: continues...
    end
    deactivate LC

If process_live() returns without calling mark_ready(), it is called automatically — the parent will not hang.

CocoIndex Docs Edit this page Report issue