# Progress monitoring

> **CocoIndex v1.** This page documents CocoIndex **v1** — a ground-up redesign from v0. When writing code, ignore any v0 flow-builder DSL or deprecated decorators.
>
> Source: https://cocoindex.io/docs/advanced_topics/progress_monitoring/ · Docs index: https://cocoindex.io/docs/llms.txt · Agent skill: https://cocoindex.io/docs/skill.md
>
> v0→v1 quick map — if you reach for these v0 symbols, stop and use the v1 form: `@cocoindex.flow_def`/`FlowBuilder` → `coco.App` + a `@coco.fn` main function; `add_collector()`/`collect()`/`export()` → declare target states (`declare_row`, `declare_file`); `cocoindex.sources/functions/targets.*` → connector APIs (`localfs.walk_dir`, `coco.ops.*`, `postgres.declare_table_target`). Full mapping + API reference: https://cocoindex.io/docs/skill.md.

Most runs only need the basics covered in [App](../programming_guide/app#updating-an-app): awaiting `app.update()` for the result, or the built-in stdout progress display — `report_to_stdout=True` on `app.update_blocking()` (or the CLI), or `await coco.show_progress(handle)` with the async API. `report_to_stdout` accepts a `timedelta` to set the refresh interval (`True` uses the default); `show_progress` takes a `refresh_interval` keyword.

This page covers the **structured** progress APIs for cases that need more: a daemon streaming progress to a client, a custom dashboard, or splitting a large pipeline into independently-reported phases.

## Structured update stats

`app.update()` returns an `UpdateHandle`. Besides being awaitable, it exposes the same stats that drive the terminal display — as Python objects you can read while the update runs.

### Polling

`handle.stats()` returns a snapshot of the current counters as an `UpdateStats` — or `None` before the handle has started running. It keeps working after completion, returning the final stats:

```python
handle = app.update()
result = await handle.result()   # run to completion
final_stats = handle.stats()     # UpdateStats with the final counters
```

`app.update()` starts lazily — the handle begins running on the first `await` (or `watch()`), so `stats()` returns `None` until then. To read stats *while* an update is in flight, poll `handle.stats()` from a separate task, or use `watch()` (below) to receive each change as it happens.

### Streaming

`handle.watch()` is an async iterator that yields an `UpdateSnapshot` whenever the stats change — `RUNNING` while processing, then a final `READY` snapshot carrying the result:

```python
handle = app.update()
async for snapshot in handle.watch():
    print(snapshot.stats.total.num_finished, "items processed")
    # snapshot.status is UpdateStatus.RUNNING or UpdateStatus.READY
    # snapshot.result is set on the final snapshot, when the iterator ends
```

On a processing error, `watch()` raises the exception directly — handle it with a normal `try`/`except` around the loop.

In [live mode](../programming_guide/live_mode), `watch()` does not stop at the first `READY`: it keeps yielding `RUNNING` snapshots as live components deliver incremental updates, ending only when the app stops.

### Stats types

```python
class UpdateStats:
    by_component: dict[str, ComponentStats]  # keyed by processor name
    total: ComponentStats                    # summed across all processors

class ComponentStats:
    num_execution_starts: int
    num_unchanged: int
    num_adds: int
    num_deletes: int
    num_reprocesses: int
    num_errors: int
    # derived helpers:
    num_processed: int    # unchanged + adds + deletes + reprocesses
    num_finished: int     # num_processed + num_errors
    num_in_progress: int  # max(0, execution_starts - num_finished)

class UpdateSnapshot:
    stats: UpdateStats
    status: UpdateStatus      # RUNNING | READY
    result: R | None          # set only on the final snapshot
```

## Scoped reports with `stats_group`

By default every component's stats roll up into one report for the whole `app.update()`. For a large pipeline you often want to watch a *part* of it on its own — "indexing the docs tree" separately from "indexing the code tree", or one expensive phase by itself.

`coco.stats_group(title)` opens a scope: everything mounted inside the block aggregates into a **separate** report under `title`, **split out** of the parent (the parent report no longer counts that work — there's no double counting). It's a plain `with` block, used inside a processing component like [`coco.component_subpath`](../programming_guide/processing_component#using-component_subpath-as-a-context-manager):

```python
@coco.fn
async def app_main(docs_dir, code_dir, target):
    with coco.stats_group("Indexing docs", report_to_stdout=True):
        files = localfs.walk_dir(docs_dir, ...)
        await coco.mount_each(process_doc, files.items(), target)

    with coco.stats_group("Indexing code", report_to_stdout=True):
        files = localfs.walk_dir(code_dir, ...)
        await coco.mount_each(process_code, files.items(), target)
```

With `report_to_stdout=True`, the group's progress is also printed to stdout, labeled by its title, alongside the main `report_to_stdout` display without disrupting it. Pass a `timedelta` instead of `True` to set the group's refresh interval.

The block yields a `StatsGroupHandle` with the same `stats()` and `watch()` as `UpdateHandle` (a group has no return value, so there's no `result()`):

```python
with coco.stats_group("Indexing docs") as sg:
    await coco.mount_each(process_doc, files.items(), target)

# Exit is non-blocking, so the mounted work keeps running after the block —
# `sg` stays valid; drain its watch() to follow that work to completion:
async for snapshot in sg.watch():
    send_to_dashboard(snapshot.stats)
```

### Semantics

- **Synchronous and non-blocking.** It's a regular `with` (not `async with`), even though the body `await`s mounts. Leaving the block does **not** wait for the work to finish — it only marks where member registration stops. The group becomes `READY` asynchronously once the block has exited *and* every component mounted inside it is ready; observe that via `sg.watch()` / `sg.stats()`.
- **Identity is unchanged.** Grouping only redirects where stats are *reported*. Component paths, change detection, target-state ownership, and the run's overall completion are unaffected — the grouped components are still ordinary children of the surrounding component.
- **Nesting.** Groups may nest. The **innermost** group owns a mount's stats; an outer group's readiness still waits for the inner group's work to finish.
- **Live members.** A group containing [live components](./live_component) becomes `READY` after their initial catch-up and reports their ongoing incremental stats, just like the root.
