Operate a CocoIndex Flow
After a flow is defined as discussed in Flow Definition, you can start to transform data with it.
It can be achieved in two ways:
-
Use CocoIndex CLI.
-
Use APIs provided by the library. You have a
cocoindex.Flow
object after defining the flow in your code, and you can interact with it later.
The following sections assume you have a flow demo_flow
:
- Python
@cocoindex.flow_def(name="DemoFlow")
def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
...
It creates a demo_flow
object in cocoindex.Flow
type.
Setup / drop flow
For a flow, its persistent backends need to be ready before it can run, including:
- Internal storage for CocoIndex.
- Backend resources for targets exported by the flow, e.g. a table (in relational databases), a collection (in some vector databases), etc.
The desired state of the backends for a flow is derived based on the flow definition itself. CocoIndex supports two types of actions to manage the persistent backends automatically:
-
Setup a flow, which will change the backends owned by the flow to the desired state, e.g. create new tables for new flow, drop an existing table if the corresponding target is gone, add new column to a target table if a new field is collected, etc. It's no-op if the backend states are already in the desired state.
-
Drop a flow, which will drop all backends owned by the flow. It's no-op if there are no existing backends owned by the flow (e.g. never setup or already dropped).
CLI
cocoindex setup
subcommand will setup all flows.
cocoindex update
and cocoindex server
also provide a --setup
option to setup the flow if needed before performing the main action of updating or starting the server.
cocoindex drop
subcommand will drop all flows.
Library API
- Python
Flow
provides the following APIs to setup / drop individual flows:
setup(report_to_stdout: bool = False)
: Setup the flow.drop(report_to_stdout: bool = False)
: Drop the flow.
For example:
demo_flow.setup(report_to_stdout=True)
demo_flow.drop(report_to_stdout=True)
We also provide the following asynchronous versions of the APIs:
setup_async(report_to_stdout: bool = False)
: Setup the flow asynchronously.drop_async(report_to_stdout: bool = False)
: Drop the flow asynchronously.
For example:
await demo_flow.setup_async(report_to_stdout=True)
await demo_flow.drop_async(report_to_stdout=True)
Besides, CocoIndex also provides APIs to setup / drop all flows at once:
setup_all_flows(report_to_stdout: bool = False)
: Setup all flows.drop_all_flows(report_to_stdout: bool = False)
: Drop all flows.
For example:
cocoindex.setup_all_flows(report_to_stdout=True)
cocoindex.drop_all_flows(report_to_stdout=True)
After dropping the flow, the in-memory cocoindex.Flow
instance is still valid, and you can call setup methods on it again.
If you want to remove the flow from the current process, you can call demo_flow.close()
to do so (see related doc).
Build / update target data
The major goal of a flow is to perform the transformations on source data and build / update data in the target. This action has two modes:
-
One time update. It builds/update the target data based on source data up to the current moment. After the target data is at least as fresh as the source data when update starts, it's done. It fits into situations that you need to access the fresh target data at certain time points.
-
Live update. It continuously captures changes from the source data and updates the target data accordingly. It's long-running and only stops when being aborted explicitly. It fits into situations that you need to access the fresh target data continuously in most of the time.
For both modes, CocoIndex is performing incremental processing, i.e. we only perform computations and target mutations on source data that are changed, or the flow has changed. This is to achieve best efficiency.
One time update
CLI
The cocoindex update
subcommand creates/updates data in the target.
Once it's done, the target data is fresh up to the moment when the command is called.
cocoindex update main.py
With a --setup
option, it will also setup the flow first if needed.
cocoindex update --setup main.py
Library API
- Python
The Flow.update()
method creates/updates data in the target.
stats = demo_flow.update()
print(stats)
update_async()
is the asynchronous version of update()
.
stats = await demo_flow.update_async()
print(stats)
Once the function finishes, the target data is fresh up to the moment when the function is called.
update()
and update_async()
can be called simultaneously, even if a previous call is not finished yet.
It's quite cheap to do so, as CocoIndex will automatically combine multiple calls into a single update, as long as we hold the guarantee that the target data is fresh up to the moment when the last call is initiated.
Live update
A data source may enable one or multiple change capture mechanisms:
-
Configured with a refresh interval, which is generally applicable to all data sources.
-
Specific data sources also provide their specific change capture mechanisms. For example,
AmazonS3
source watches S3 bucket's change events, andGoogleDrive
source allows polling recent modified files. See documentations for specific data sources.
Change capture mechanisms enable CocoIndex to continuously capture changes from the source data and update the target data accordingly, under live update mode.
CLI
To perform live update, run the cocoindex update
subcommand with -L
option:
cocoindex update main.py -L
If there's at least one data source with change capture mechanism enabled, it will keep running until aborted (e.g. by Ctrl-C
).
Otherwise, it falls back to the same behavior as one time update, and will finish after a one-time update is done.
With a --setup
option, it will also setup the flow first if needed.
cocoindex update main.py -L --setup
Library API
- Python
To perform live update, you need to create a cocoindex.FlowLiveUpdater
object using the cocoindex.Flow
object.
It takes an optional cocoindex.FlowLiveUpdaterOptions
option, with the following fields:
-
live_mode
(type:bool
, default:True
): Whether to perform live update for data sources with change capture mechanisms. It has no effect for data sources without any change capture mechanism. -
print_stats
(type:bool
, default:False
): Whether to print stats during update.
Note that cocoindex.FlowLiveUpdater
provides a unified interface for both one-time update and live update.
It only performs live update when live_mode
is True
, and only for sources with change capture mechanisms enabled.
If a source has multiple change capture mechanisms enabled, all will take effect to trigger updates.
This creates a cocoindex.FlowLiveUpdater
object, with an optional cocoindex.FlowLiveUpdaterOptions
option:
my_updater = cocoindex.FlowLiveUpdater(
demo_flow, cocoindex.FlowLiveUpdaterOptions(print_stats=True))
A FlowLiveUpdater
object supports the following methods:
-
start()
: Start the updater. CocoIndex will continuously capture changes from the source data and update the target data accordingly in background threads managed by the engine. -
abort()
: Abort the updater. -
wait()
: Wait for the updater to finish. It only unblocks in one of the following cases:- The updater was aborted.
- A one time update is done, and live update is not enabled:
either
live_mode
isFalse
, or all data sources have no change capture mechanisms enabled.
-
next_status_updates()
: Get the next status updates. It blocks until there's a new status updates, including the processing finishes for a bunch of source updates, and live updater stops (aborted, or no more sources to process). You can continuously call this method in a loop to get the latest status updates and react accordingly.It returns a
cocoindex.FlowUpdaterStatusUpdates
object, with the following properties:active_sources
: Names of sources that are still active, i.e. not stopped processing. If it's empty, it means the updater is stopped.updated_sources
: Names of sources with updates since last time. You can check this to see which sources have recent updates and get processed.
-
update_stats()
: It returns the stats of the updater.
This snippets shows the lifecycle of a live updater:
my_updater = cocoindex.FlowLiveUpdater(demo_flow)
# Start the updater.
my_updater.start()
# Perform your own logic (e.g. a query loop).
...
...
# Wait for the updater to finish.
my_updater.wait()
# Print the update stats.
print(my_updater.update_stats())
Somewhere (in the same or other threads) you can also continuously call next_status_updates()
to get the latest status updates and react accordingly, e.g.
while True:
updates = my_updater.next_status_updates()
for source in updates.updated_sources:
# Perform downstream operations on the target of the source.
run_your_downstream_operations_for(source)
# Break the loop if there's no more active sources.
if not updates.active_sources:
break
next_status_updates()
automatically combines multiple status updates if more than one arrives between two calls,
e.g. your downstream operations may take more time, or you don't need to process too frequently (in which case you can explicitly sleep for a while).
So you don't need to worry about the status updates piling up.
Python SDK also allows you to use the updater as a context manager. It will automatically start the updater during the context entry, and abort and wait for the updater to finish automatically when the context is exited. The following code is equivalent to the code above (if no early return happens):
with cocoindex.FlowLiveUpdater(demo_flow) as my_updater:
# Perform your own logic (e.g. a query loop).
...
CocoIndex also provides asynchronous versions of APIs for blocking operations, including:
-
start_async()
andwait_async()
, e.g.my_updater = cocoindex.FlowLiveUpdater(demo_flow)
# Start the updater.
await my_updater.start_async()
# Perform your own logic (e.g. a query loop).
...
# Wait for the updater to finish.
await my_updater.wait_async()
# Print the update stats.
print(my_updater.update_stats()) -
next_status_updates_async()
, e.g.while True:
updates = await my_updater.next_status_updates_async()
... -
Async context manager, e.g.
async with cocoindex.FlowLiveUpdater(demo_flow) as my_updater:
# Perform your own logic (e.g. a query loop).
...
Evaluate the flow
CocoIndex allows you to run the transformations defined by the flow without updating the target.
CLI
The cocoindex evaluate
subcommand runs the transformation and dumps flow outputs.
It takes the following options:
--output-dir
(optional): The directory to dump the result to. If not provided, it will useeval_{flow_name}_{timestamp}
.--no-cache
(optional): By default, we use already-cached intermediate data if available. This flag will turn it off. Note that we only read existing cached data without updating the cache, even if it's turned on.
Example:
cocoindex evaluate main.py --output-dir ./eval_output
Library API
- Python
The evaluate_and_dump()
method runs the transformation and dumps flow outputs to files.
It takes a EvaluateAndDumpOptions
dataclass as input to configure, with the following fields:
output_dir
(type:str
, required): The directory to dump the result to.use_cache
(type:bool
, default:True
): Use already-cached intermediate data if available. Note that we only read existing cached data without updating the cache, even if it's turned on.
Example:
demo_flow.evaluate_and_dump(EvaluateAndDumpOptions(output_dir="./eval_output"))