Progress monitoring

Beyond report_to_stdout — read structured update stats with UpdateHandle.stats() and watch(), and split a run into separately-reported scopes with stats_group().

Version
v 1.0.2

Most runs only need the basics covered in App: awaiting app.update() for the result, or the built-in stdout progress display — report_to_stdout=True on app.update_blocking() (or the CLI), or await coco.show_progress(handle) with the async API. report_to_stdout accepts a timedelta to set the refresh interval (True uses the default); show_progress takes a refresh_interval keyword.

This page covers the structured progress APIs for cases that need more: a daemon streaming progress to a client, a custom dashboard, or splitting a large pipeline into independently-reported phases.

Structured update stats

app.update() returns an UpdateHandle. Besides being awaitable, it exposes the same stats that drive the terminal display — as Python objects you can read while the update runs.

Polling

handle.stats() returns a snapshot of the current counters as an UpdateStats — or None before the handle has started running. It keeps working after completion, returning the final stats:

python
handle = app.update()
result = await handle.result()   # run to completion
final_stats = handle.stats()     # UpdateStats with the final counters

app.update() starts lazily — the handle begins running on the first await (or watch()), so stats() returns None until then. To read stats while an update is in flight, poll handle.stats() from a separate task, or use watch() (below) to receive each change as it happens.

Streaming

handle.watch() is an async iterator that yields an UpdateSnapshot whenever the stats change — RUNNING while processing, then a final READY snapshot carrying the result:

python
handle = app.update()
async for snapshot in handle.watch():
    print(snapshot.stats.total.num_finished, "items processed")
    # snapshot.status is UpdateStatus.RUNNING or UpdateStatus.READY
    # snapshot.result is set on the final snapshot, when the iterator ends

On a processing error, watch() raises the exception directly — handle it with a normal try/except around the loop.

In live mode, watch() does not stop at the first READY: it keeps yielding RUNNING snapshots as live components deliver incremental updates, ending only when the app stops.

Stats types

python
class UpdateStats:
    by_component: dict[str, ComponentStats]  # keyed by processor name
    total: ComponentStats                    # summed across all processors

class ComponentStats:
    num_execution_starts: int
    num_unchanged: int
    num_adds: int
    num_deletes: int
    num_reprocesses: int
    num_errors: int
    # derived helpers:
    num_processed: int    # unchanged + adds + deletes + reprocesses
    num_finished: int     # num_processed + num_errors
    num_in_progress: int  # max(0, execution_starts - num_finished)

class UpdateSnapshot:
    stats: UpdateStats
    status: UpdateStatus      # RUNNING | READY
    result: R | None          # set only on the final snapshot

Scoped reports with stats_group

By default every component’s stats roll up into one report for the whole app.update(). For a large pipeline you often want to watch a part of it on its own — “indexing the docs tree” separately from “indexing the code tree”, or one expensive phase by itself.

coco.stats_group(title) opens a scope: everything mounted inside the block aggregates into a separate report under title, split out of the parent (the parent report no longer counts that work — there’s no double counting). It’s a plain with block, used inside a processing component like coco.component_subpath:

python
@coco.fn
async def app_main(docs_dir, code_dir, target):
    with coco.stats_group("Indexing docs", report_to_stdout=True):
        files = localfs.walk_dir(docs_dir, ...)
        await coco.mount_each(process_doc, files.items(), target)

    with coco.stats_group("Indexing code", report_to_stdout=True):
        files = localfs.walk_dir(code_dir, ...)
        await coco.mount_each(process_code, files.items(), target)

With report_to_stdout=True, the group’s progress is also printed to stdout, labeled by its title, alongside the main report_to_stdout display without disrupting it. Pass a timedelta instead of True to set the group’s refresh interval.

The block yields a StatsGroupHandle with the same stats() and watch() as UpdateHandle (a group has no return value, so there’s no result()):

python
with coco.stats_group("Indexing docs") as sg:
    await coco.mount_each(process_doc, files.items(), target)

# Exit is non-blocking, so the mounted work keeps running after the block —
# `sg` stays valid; drain its watch() to follow that work to completion:
async for snapshot in sg.watch():
    send_to_dashboard(snapshot.stats)

Semantics

  • Synchronous and non-blocking. It’s a regular with (not async with), even though the body awaits mounts. Leaving the block does not wait for the work to finish — it only marks where member registration stops. The group becomes READY asynchronously once the block has exited and every component mounted inside it is ready; observe that via sg.watch() / sg.stats().
  • Identity is unchanged. Grouping only redirects where stats are reported. Component paths, change detection, target-state ownership, and the run’s overall completion are unaffected — the grouped components are still ordinary children of the surrounding component.
  • Nesting. Groups may nest. The innermost group owns a mount’s stats; an outer group’s readiness still waits for the inner group’s work to finish.
  • Live members. A group containing live components becomes READY after their initial catch-up and reports their ongoing incremental stats, just like the root.
CocoIndex Docs Edit this page Report issue