CocoIndex Flow Definition
In CocoIndex, to define an indexing flow, you provide a function to construct the flow, by adding operations and connecting them with fields.
Entry Point
A CocoIndex flow is defined by a function:
- Python
The easiest way is to use the @cocoindex.flow_def
decorator:
@cocoindex.flow_def(name="DemoFlow")
def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
...
This @cocoindex.flow_def
decorator declares this function as a CocoIndex flow definition.
It takes two arguments:
flow_builder
: aFlowBuilder
object to help build the flow.data_scope
: aDataScope
object, representing the top-level data scope. Any data created by the flow should be added to it.
Alternatively, for more flexibility (e.g. you want to do this conditionally or generate dynamic name), you can explicitly call the cocoindex.add_flow_def()
method:
def demo_flow_def(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
...
# Add the flow definition to the flow registry.
demo_flow = cocoindex.flow.add_flow_def("DemoFlow", demo_flow_def)
In both cases, demo_flow
will be an object with cocoindex.Flow
class type.
See Flow Methods for more details on it.
Flow Builder
The FlowBuilder
object is the starting point to construct a flow.
Import From Source
FlowBuilder
provides a from_source()
method to import data from external sources.
A source spec needs to be provided for any import operation, to describe the source and parameters related to the source.
Import must happen at the top level, and the field created by import must be in the top-level struct.
- Python
@cocoindex.flow_def(name="DemoFlow")
def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
data_scope["documents"] = flow_builder.from_source(DemoSourceSpec(...))
......
from_source()
returns a DataSlice
. Once external data sources are imported, you can further transform them using methods exposed by these data objects, as discussed in the following sections.
We'll describe different data objects in next few sections. Note that the actual value of data is not available at the time when we define the flow: it's only available at runtime. In a flow definition, you can use a data representation as input for operations, but not access the actual value.
Data Scope
A data scope represents data for a certain unit, e.g. the top level scope (involving all data for a flow), for a document, or for a chunk. A data scope has a bunch of fields and collectors, and users can add new fields and collectors to it.
Get or Add a Field
Get or add a field of a data scope (which is a data slice). Note that you cannot override an existing field.
- Python
Getting and setting a field of a data scope is done by the []
operator with a field name:
@cocoindex.flow_def(name="DemoFlow")
def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
# Add "documents" to the top-level data scope.
data_scope["documents"] = flow_builder.from_source(DemoSourceSpec(...))
# Each row of "documents" is a child scope.
with data_scope["documents"].row() as document:
# Get "content" from the document scope, transform, and add "summary" to scope.
document["summary"] = field1_row["content"].transform(DemoFunctionSpec(...))
Add a Collector
See Data Collector below for more details.
Data Slice
A data slice references a subset of data belonging to a data scope, e.g. a specific field from a data scope. A data slice has a certain data type, and it's the input for most operations.
Transform
transform()
method transforms the data slice by a function, which creates another data slice.
A function spec needs to be provided for any transform operation, to describe the function and parameters related to the function.
- Python
@cocoindex.flow_def(name="DemoFlow")
def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
...
data_scope["field1"] = data_scope["documents"].transform(DemoFunctionSpec(...))
...
For Each Row
If the data slice has Table
type, you can call row()
method to obtain a child scope representing each row, to apply operations on each row.
- Python
@cocoindex.flow_def(name="DemoFlow")
def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
...
with data_scope["table1"].row() as table1_row:
# Children operations
table1_row["field2"] = table1_row["field1"].transform(DemoFunctionSpec(...))
Get a Sub Field
If the data slice has Struct
type, you can obtain a data slice on a specific sub field of it, similar to getting a field of a data scope.
Data Collector
A data collector can be added from a specific data scope, and it collects multiple entries of data, represented by data slices in the same or children scope.
Call its collect()
method to collect a specific entry, which can have multiple fields.
For example,
- Python
@cocoindex.flow_def(name="DemoFlow")
def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
...
demo_collector = data_scope.add_collector()
with data_scope["documents"].row() as document:
...
demo_collector.collect(filename=document["filename"], summary=document["summary"])
...
Here the collector is in the top-level data scope, and it collects filename
and summary
fields from each row of documents
field.
To specify what to do with the collected data, call a specific method on the collector.
Export
The export()
method exports the collected data to an external storage.
A storage spec needs to be provided for any export operation, to describe the storage and parameters related to the storage.
Export must happen at the top level of a flow, i.e. not within any child scopes created by "for each row". It takes the following arguments:
name
: the name to identify the export target.target_spec
: the storage spec as the export target.primary_key_fields
(optional): the fields to be used as primary key. Types of the fields must be supported as key fields. See Key Types for more details.vector_index
(optional): the fields to create vector index. Each item is a tuple of a field name and a similarity metric. See Vector Type for more details about supported similarity metrics.
- Python
@cocoindex.flow_def(name="DemoFlow")
def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
...
demo_collector = data_scope.add_collector()
...
demo_collector.export(
"demo_storage", DemoStorageSpec(...),
primary_key_fields=["field1"],
vector_index=[("field2", cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY)])
The target storage is managed by CocoIndex, i.e. it'll be created by CocoIndex CLI when you run cocoindex setup
, and the data will be automatically updated (including stale data removal) when updating the index.
The name
for the same storage should remain stable across different runs.
If it changes, CocoIndex will treat it as an old storage removed and a new one created, and perform setup changes and reindexing accordingly.