Progress monitoring
Beyond report_to_stdout — read structured update stats with UpdateHandle.stats() and watch(), and split a run into separately-reported scopes with stats_group().
Most runs only need the basics covered in App: awaiting app.update() for the result, or the built-in stdout progress display — report_to_stdout=True on app.update_blocking() (or the CLI), or await coco.show_progress(handle) with the async API. report_to_stdout accepts a timedelta to set the refresh interval (True uses the default); show_progress takes a refresh_interval keyword.
This page covers the structured progress APIs for cases that need more: a daemon streaming progress to a client, a custom dashboard, or splitting a large pipeline into independently-reported phases.
Structured update stats
app.update() returns an UpdateHandle. Besides being awaitable, it exposes the same stats that drive the terminal display — as Python objects you can read while the update runs.
Polling
handle.stats() returns a snapshot of the current counters as an UpdateStats — or None before the handle has started running. It keeps working after completion, returning the final stats:
handle = app.update()
result = await handle.result() # run to completion
final_stats = handle.stats() # UpdateStats with the final counters
app.update() starts lazily — the handle begins running on the first await (or watch()), so stats() returns None until then. To read stats while an update is in flight, poll handle.stats() from a separate task, or use watch() (below) to receive each change as it happens.
Streaming
handle.watch() is an async iterator that yields an UpdateSnapshot whenever the stats change — RUNNING while processing, then a final READY snapshot carrying the result:
handle = app.update()
async for snapshot in handle.watch():
print(snapshot.stats.total.num_finished, "items processed")
# snapshot.status is UpdateStatus.RUNNING or UpdateStatus.READY
# snapshot.result is set on the final snapshot, when the iterator ends
On a processing error, watch() raises the exception directly — handle it with a normal try/except around the loop.
In live mode, watch() does not stop at the first READY: it keeps yielding RUNNING snapshots as live components deliver incremental updates, ending only when the app stops.
Stats types
class UpdateStats:
by_component: dict[str, ComponentStats] # keyed by processor name
total: ComponentStats # summed across all processors
class ComponentStats:
num_execution_starts: int
num_unchanged: int
num_adds: int
num_deletes: int
num_reprocesses: int
num_errors: int
# derived helpers:
num_processed: int # unchanged + adds + deletes + reprocesses
num_finished: int # num_processed + num_errors
num_in_progress: int # max(0, execution_starts - num_finished)
class UpdateSnapshot:
stats: UpdateStats
status: UpdateStatus # RUNNING | READY
result: R | None # set only on the final snapshot
Scoped reports with stats_group
By default every component’s stats roll up into one report for the whole app.update(). For a large pipeline you often want to watch a part of it on its own — “indexing the docs tree” separately from “indexing the code tree”, or one expensive phase by itself.
coco.stats_group(title) opens a scope: everything mounted inside the block aggregates into a separate report under title, split out of the parent (the parent report no longer counts that work — there’s no double counting). It’s a plain with block, used inside a processing component like coco.component_subpath:
@coco.fn
async def app_main(docs_dir, code_dir, target):
with coco.stats_group("Indexing docs", report_to_stdout=True):
files = localfs.walk_dir(docs_dir, ...)
await coco.mount_each(process_doc, files.items(), target)
with coco.stats_group("Indexing code", report_to_stdout=True):
files = localfs.walk_dir(code_dir, ...)
await coco.mount_each(process_code, files.items(), target)
With report_to_stdout=True, the group’s progress is also printed to stdout, labeled by its title, alongside the main report_to_stdout display without disrupting it. Pass a timedelta instead of True to set the group’s refresh interval.
The block yields a StatsGroupHandle with the same stats() and watch() as UpdateHandle (a group has no return value, so there’s no result()):
with coco.stats_group("Indexing docs") as sg:
await coco.mount_each(process_doc, files.items(), target)
# Exit is non-blocking, so the mounted work keeps running after the block —
# `sg` stays valid; drain its watch() to follow that work to completion:
async for snapshot in sg.watch():
send_to_dashboard(snapshot.stats)
Semantics
- Synchronous and non-blocking. It’s a regular
with(notasync with), even though the bodyawaits mounts. Leaving the block does not wait for the work to finish — it only marks where member registration stops. The group becomesREADYasynchronously once the block has exited and every component mounted inside it is ready; observe that viasg.watch()/sg.stats(). - Identity is unchanged. Grouping only redirects where stats are reported. Component paths, change detection, target-state ownership, and the run’s overall completion are unaffected — the grouped components are still ordinary children of the surrounding component.
- Nesting. Groups may nest. The innermost group owns a mount’s stats; an outer group’s readiness still waits for the inner group’s work to finish.
- Live members. A group containing live components becomes
READYafter their initial catch-up and reports their ongoing incremental stats, just like the root.