Flow methods
Once a flow is defined, use the CLI or Python API to set up backends, run one-shot updates, stream live updates, or evaluate the flow without writing to the target.
After a flow is defined as discussed in Flow Definition, you can start to transform data with it.
It can be achieved in two ways:
-
Use CocoIndex CLI.
-
Use APIs provided by the library. You have a
cocoindex.Flowobject after defining the flow in your code, and you can interact with it later.
The following sections assume you have a flow demo_flow:
Setup/drop flow
For a flow, its persistent backends need to be ready before it can run, including:
- Internal storage for CocoIndex.
- Backend resources for targets exported by the flow, e.g. a table (in relational databases), a collection (in some vector databases), etc.
The desired state of the backends for a flow is derived based on the flow definition itself. CocoIndex supports two types of actions to manage the persistent backends automatically:
-
Setup a flow, which will change the backends owned by the flow to the desired state, e.g. create new tables for new flow, drop an existing table if the corresponding target is gone, add new column to a target table if a new field is collected, etc. It’s no-op if the backend states are already in the desired state.
-
Drop a flow, which will drop all backends owned by the flow. It’s no-op if there are no existing backends owned by the flow (e.g. never setup or already dropped).
CLI
cocoindex setup subcommand will setup all flows.
cocoindex update and cocoindex server also also setup the flow if needed before performing the main action of updating or starting the server, with prompt confirmation.
cocoindex drop subcommand will drop all flows.
Library API
After dropping the flow, the in-memory cocoindex.Flow instance is still valid, and you can call setup methods on it again.
If you want to remove the flow from the current process, you can call demo_flow.close() to do so (see related doc).
Build/update target data
The major goal of a flow is to perform the transformations on source data and build/update data in the target. This action has two modes:
-
One time update. It builds/update the target data based on source data up to the current moment. After the target data is at least as fresh as the source data when update starts, it’s done. It fits into situations that you need to access the fresh target data at certain time points.
-
Live update. During live update, a one time update is performed first, then it continuously captures changes from the source data and updates the target data accordingly. It’s long-running and only stops when being aborted explicitly. It fits into situations that you need to access the fresh target data continuously in most of the time.
For both modes, CocoIndex is performing incremental processing, i.e. we only perform computations and target mutations on source data that are changed, or the flow has changed. This is to achieve best efficiency.
Besides major update modes, CocoIndex also support the following options:
- Reexport targets. When this is enabled, even if both of the source data and flow definition are not changed, CocoIndex will still reprocess and reexport the targets. It’s helpful when you want to reload the target data, e.g. after some data loss. Note that when this is enabled on live update mode, reexport only happens for the initial one time update.
One time update
CLI
The cocoindex update subcommand creates/updates data in the target.
Once it’s done, the target data is fresh up to the moment when the command is called.
cocoindex update main
With a --setup option, it will also setup the flow first if needed.
cocoindex update main
With a --reexport option, it will reexport the targets even if there’s no change.
cocoindex update --reexport main.py
Library API
Live update
A data source may enable one or multiple change capture mechanisms:
-
Configured with a refresh interval, which is generally applicable to all data sources.
-
Specific data sources also provide their specific change capture mechanisms. For example,
Postgressource listens to PostgreSQL’s change notifications,AmazonS3source watches S3 bucket’s change events, andGoogleDrivesource allows polling recent modified files. See documentations for specific data sources.
Change capture mechanisms enable CocoIndex to continuously capture changes from the source data and update the target data accordingly, under live update mode.
CLI
To perform live update, run the cocoindex update subcommand with -L option:
cocoindex update main -L
If there’s at least one data source with change capture mechanism enabled, it will keep running until aborted (e.g. by Ctrl-C).
Otherwise, it falls back to the same behavior as one time update, and will finish after a one-time update is done.
--setup and --reexport options are also available for live update mode.
Library API
Evaluate the flow
CocoIndex allows you to run the transformations defined by the flow without updating the target.
CLI
The cocoindex evaluate subcommand runs the transformation and dumps flow outputs.
It takes the following options:
--output-dir(optional): The directory to dump the result to. If not provided, it will useeval_{flow_name}_{timestamp}.--no-cache(optional): By default, we use already-cached intermediate data if available. This flag will turn it off. Note that we only read existing cached data without updating the cache, even if it’s turned on.
Example:
cocoindex evaluate main.py --output-dir ./eval_output