App
An App is the top-level runnable unit in CocoIndex.
It names your pipeline and binds a main function with its parameters. When you call app.update(), CocoIndex runs that main function as the root processing component which can mount child processing components to do work and declare target states.
Creating an app
To create an App, provide:
- An
AppConfig(or just a name string) — identifies the pipeline - A main function — the entry point for your pipeline
- Arguments — any additional arguments to pass to the main function
import cocoindex as coco
@coco.fn
async def app_main(sourcedir: pathlib.Path) -> None:
# ... your pipeline logic ...
app = coco.App(
coco.AppConfig(name="MyPipeline"),
app_main,
sourcedir=pathlib.Path("./data"),
)
You can also pass just a name string instead of AppConfig:
app = coco.App("MyPipeline", app_main, sourcedir=pathlib.Path("./data"))
The main function can be sync or async. See How Sync and Async Work Together for details.
Updating an app
Call update() to execute the pipeline. It returns an UpdateHandle that is also Awaitable, so the simplest usage stays the same:
# Async — await the result directly (backward-compatible)
result = await app.update()
# Sync (blocking) API
result = app.update_blocking()
Parameters:
report_to_stdoutoption prints periodic progress updates during execution.full_reprocessoption reprocesses everything and invalidates existing caches. This forces all components to re-execute and all target states to be re-applied, even if they haven't changed.
Monitoring progress
app.update() returns an UpdateHandle that provides structured access to processing stats while the update is running:
handle = app.update()
# Poll current stats at any time
stats = handle.stats() # returns UpdateStats or None if not yet started
# Await the final result
result = await handle.result()
# stats() still works after completion
final_stats = handle.stats()
For streaming progress, use watch() which yields UpdateSnapshot objects as stats are updated:
handle = app.update()
async for snapshot in handle.watch():
print(snapshot.stats.total.num_finished, "items processed")
# snapshot.status is UpdateStatus.RUNNING or UpdateStatus.DONE
# snapshot.result is set when status is DONE
When you update an App, CocoIndex:
- Runs the lifespan setup (if not already done)
- Executes the main function (the root processing component), which mounts child processing components
- Syncs all declared target states to external systems
- Compares with the previous run and applies only necessary changes
Given the same code and inputs, updates are repeatable. When data or code changes, only the affected parts re-execute.
How an app runs
An App is the top-level runner and entry point. A processing component is the unit of incremental execution within an app.
- Your app's main function runs as the root processing component at the root path.
- Each call to
mount()oruse_mount()declares a child processing component at a child path. Sugar APIs likemount_each()andmount_target()also create child components. - Each processing component declares a set of target states, and CocoIndex syncs them atomically when that component finishes.
This is why app.update() does not "run everything from scratch": CocoIndex uses the component path tree to decide what can be reused and what must re-run.
For example, an app that processes files might mount one component per file:
(root) ← app_main component
├── "setup" ← declare_dir_target component
└── "process"
├── "hello.md" ← process_file component
└── "world.md" ← process_file component
See Processing Component for how mounting and component paths define these boundaries.
Concurrency control
By default, CocoIndex limits the number of concurrently executing processing components to 1024 per app. When components perform resource-intensive work (e.g., calling external APIs, running ML models), you may want to lower this limit.
Set max_inflight_components in AppConfig to control the limit:
app = coco.App(
coco.AppConfig(name="MyPipeline", max_inflight_components=4),
app_main,
sourcedir=pathlib.Path("./data"),
)
With max_inflight_components=4, at most 4 processing components execute at the same time. When a component finishes, the next pending one starts.
Setting max_inflight_components=1 serializes all components — only one runs at a time.
You can also set the limit via the COCOINDEX_MAX_INFLIGHT_COMPONENTS environment variable:
export COCOINDEX_MAX_INFLIGHT_COMPONENTS=4
Precedence: AppConfig value > environment variable > default (1024).
When a parent component mounts a child, the parent releases its concurrency slot so the child can make progress. This prevents deadlocks in nested mount scenarios — even with max_inflight_components=1, a parent mounting a child will not block forever.
Database path
CocoIndex needs a database path (db_path) to store its internal state. This database tracks target states and memoized results from previous runs, enabling CocoIndex to compute what changed and apply only the necessary updates.
The simplest way to configure the database path is via the COCOINDEX_DB environment variable:
export COCOINDEX_DB=./cocoindex.db
With COCOINDEX_DB set, you can create and run apps without any additional configuration:
import cocoindex as coco
@coco.fn
def app_main() -> None:
# ... your pipeline logic ...
app = coco.App("MyPipeline", app_main)
app.update_blocking() # Uses COCOINDEX_DB for storage
For details on what the internal database stores and how to tune its LMDB settings (e.g., increasing the maximum database size beyond 4 GiB), see Internal Storage.
Lifespan (optional)
A lifespan function defines the CocoIndex runtime lifecycle: its setup runs when the runtime starts (automatically before the first app.update()), and its cleanup runs when the runtime stops. Use it to configure CocoIndex settings programmatically or to initialize shared resources that processing components can reuse.
If you only need to set the database path, using the COCOINDEX_DB environment variable is simpler than defining a lifespan function.
Defining a lifespan
Use the @lifespan decorator to register a lifespan function. By default, all apps share the same lifespan (unless you explicitly specify an app in a different Environment). The function receives an EnvironmentBuilder for configuration and uses yield to separate setup from cleanup:
import cocoindex as coco
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
# Configure CocoIndex's internal database location (overrides COCOINDEX_DB if set)
builder.settings.db_path = pathlib.Path("./cocoindex.db")
# Setup: initialize resources here
yield
# Cleanup happens automatically when the context exits
Setting db_path in the lifespan takes precedence over the COCOINDEX_DB environment variable. If neither is provided, CocoIndex will raise an error.
The lifespan function can be sync or async:
import cocoindex as coco
@coco.lifespan
def coco_lifespan(builder: coco.EnvironmentBuilder) -> Iterator[None]:
builder.settings.db_path = pathlib.Path("./cocoindex.db")
yield
You can also use the lifespan to provide resources (like database connections) that processing components can access. See Context for details on sharing resources across your pipeline.
Explicit lifecycle control (optional)
The lifespan runs automatically the first time any App updates — most users don't need to do anything beyond defining the lifespan and calling app.update().
If you need more explicit control — for example, to know when startup completes for health checks, or to explicitly trigger shutdown — you can manage the lifecycle directly:
# Async API
await coco.start() # Run lifespan setup
# ... run apps or other operations ...
await coco.stop() # Run lifespan cleanup
# Sync (blocking) API
coco.start_blocking() # Run lifespan setup
# ... run apps or other operations ...
coco.stop_blocking() # Run lifespan cleanup
Or use the runtime() context manager, which supports both sync and async usage:
# Async
async with coco.runtime():
await app.update()
# Sync (blocking)
with coco.runtime():
app.update_blocking()
Managing apps with CLI
CocoIndex provides a CLI for managing your apps without writing additional code.
Update an app
Run your app once to sync all target states:
cocoindex update main.py
This executes your pipeline and applies all declared target states to external systems.
Drop an app
Remove an app and revert all its target states:
cocoindex drop main.py
This will delete all target states created by the app (e.g., drop tables, delete rows) and clear its internal state.
See CLI Reference for more commands and options.