Skip to main content

Run a CocoIndex Flow

After a flow is defined as discussed in Flow Definition, you can start to transform data with it.

It can be achieved in two ways:

  • Use CocoIndex CLI.

  • Use APIs provided by the library. You have a cocoindex.Flow object after defining the flow in your code, and you can interact with it later.

The following sections assume you have a flow demo_flow:

main.py
@cocoindex.flow_def(name="DemoFlow")
def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
...

It creates a demo_flow object in cocoindex.Flow type.

Build / update target data

The major goal of a flow is to perform the transformations on source data and build / update data in the target storage (the index). This action has two modes:

  • One time update. It builds/update the target data based on source data up to the current moment. After the target data is at least as fresh as the source data when update starts, it's done. It fits into situations that you need to access the fresh target data at certain time points.

  • Live update. It continuously captures changes from the source data and updates the target data accordingly. It's long-running and only stops when being aborted explicitly. It fits into situations that you need to access the fresh target data continuously in most of the time.

info

For both modes, CocoIndex is performing incremental processing, i.e. we only perform computations and storage mutations on source data that are changed, or the flow has changed. This is to achieve best efficiency.

One time update

CLI

The cocoindex update subcommand creates/updates data in the target storage.

Once it's done, the target data is fresh up to the moment when the function is called.

cocoindex update main.py

Library API

The update() async method creates/updates data in the target storage.

Once the function returns, the target data is fresh up to the moment when the function is called.

stats = await demo_flow.update()
print(stats)

Live update

A data source may enable one or multiple change capture mechanisms:

  • Configured with a refresh interval, which is generally applicable to all data sources.

  • Specific data sources also provide their specific change capture mechanisms. For example, AmazonS3 source watches S3 bucket's change events, and GoogleDrive source allows polling recent modified files. See documentations for specific data sources.

Change capture mechanisms enable CocoIndex to continuously capture changes from the source data and update the target data accordingly, under live update mode.

CLI

To perform live update, run the cocoindex update subcommand with -L option:

cocoindex update main.py -L

If there's at least one data source with change capture mechanism enabled, it will keep running until the aborted (e.g. by Ctrl-C). Otherwise, it falls back to the same behavior as one time update, and will finish after a one-time update is done.

Library API

To perform live update, you need to create a cocoindex.FlowLiveUpdater object using the cocoindex.Flow object. It takes an optional cocoindex.FlowLiveUpdaterOptions option, with the following fields:

  • live_mode (type: bool, default: True): Whether to perform live update for data sources with change capture mechanisms. It has no effect for data sources without any change capture mechanism.

  • print_stats (type: bool, default: False): Whether to print stats during update.

Note that cocoindex.FlowLiveUpdater provides a unified interface for both one-time update and live update. It only performs live update when live_mode is True, and only for sources with change capture mechanisms enabled. If a source has multiple change capture mechanisms enabled, all will take effect to trigger updates.

This creates a cocoindex.FlowLiveUpdater object, with an optional cocoindex.FlowLiveUpdaterOptions option:

my_updater = cocoindex.FlowLiveUpdater(
demo_flow, cocoindex.FlowLiveUpdaterOptions(print_stats=True))

A FlowLiveUpdater object supports the following methods:

  • start(): Start the updater. CocoIndex will continuously capture changes from the source data and update the target data accordingly in background threads managed by the engine.
  • abort(): Abort the updater.
  • wait(): Wait for the updater to finish. It only unblocks in one of the following cases:
    • The updater was aborted.
    • A one time update is done, and live update is not enabled: either live_mode is False, or all data sources have no change capture mechanisms enabled.
  • update_stats(): It returns the stats of the updater.
my_updater = cocoindex.FlowLiveUpdater(demo_flow)
# Start the updater.
my_updater.start()

# Perform your own logic (e.g. a query loop).
...

# Print the update stats.
print(my_updater.update_stats())
# Abort the updater.
my_updater.abort()
# Wait for the updater to finish.
my_updater.wait()

Python SDK also allows you to use the updater as a context manager. It will automatically start the updater during the context entry, and abort and wait for the updater to finish automatically when the context is exited. The following code is equivalent to the code above (if no early return happens):

with cocoindex.FlowLiveUpdater(demo_flow) as my_updater:
# Perform your own logic (e.g. a query loop).
...
print(my_updater.update_stats())

CocoIndex also provides asynchronous versions of APIs for blocking operations, including:

  • start_async() and wait_async(), e.g.

    my_updater = cocoindex.FlowLiveUpdater(demo_flow)
    # Start the updater.
    await my_updater.start_async()

    # Perform your own logic (e.g. a query loop).
    ...

    # Print the update stats.
    print(my_updater.update_stats())
    # Abort the updater.
    my_updater.abort()
    # Wait for the updater to finish.
    await my_updater.wait_async()
  • Async context manager, e.g.

    async with cocoindex.FlowLiveUpdater(demo_flow) as my_updater:
    # Perform your own logic (e.g. a query loop).
    ...
    print(my_updater.update_stats())

Evaluate the flow

CocoIndex allows you to run the transformations defined by the flow without updating the target storage.

CLI

The cocoindex evaluate subcommand runs the transformation and dumps flow outputs. It takes the following options:

  • --output-dir (optional): The directory to dump the result to. If not provided, it will use eval_{flow_name}_{timestamp}.
  • --no-cache (optional): By default, we use already-cached intermediate data if available. This flag will turn it off. Note that we only read existing cached data without updating the cache, even if it's turned on.

Example:

cocoindex evaluate main.py --output-dir ./eval_output

Library API

The evaluate_and_dump() method runs the transformation and dumps flow outputs to files.

It takes a EvaluateAndDumpOptions dataclass as input to configure, with the following fields:

  • output_dir (type: str, required): The directory to dump the result to.
  • use_cache (type: bool, default: True): Use already-cached intermediate data if available. Note that we only read existing cached data without updating the cache, even if it's turned on.

Example:

demo_flow.evaluate_and_dump(EvaluateAndDumpOptions(output_dir="./eval_output"))