Skip to main content

CocoIndex Flow Definition

In CocoIndex, to define an indexing flow, you provide a function to import source, transform data and put them into target storage (sinks). You connect input/output of these operations with fields of data scopes.

Entry Point

A CocoIndex flow is defined by a function:

The easiest way is to use the @cocoindex.flow_def decorator:

@cocoindex.flow_def(name="DemoFlow")
def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
...

This @cocoindex.flow_def decorator declares this function as a CocoIndex flow definition.

It takes two arguments:

  • flow_builder: a FlowBuilder object to help build the flow.
  • data_scope: a DataScope object, representing the top-level data scope. Any data created by the flow should be added to it.

Alternatively, for more flexibility (e.g. you want to do this conditionally or generate dynamic name), you can explicitly call the cocoindex.add_flow_def() method:

def demo_flow_def(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
...

# Add the flow definition to the flow registry.
demo_flow = cocoindex.flow.add_flow_def("DemoFlow", demo_flow_def)

In both cases, demo_flow will be an object with cocoindex.Flow class type. See Flow Running for more details on it.

Data Scope

A data scope represents data for a certain unit, e.g. the top level scope (involving all data for a flow), for a document, or for a chunk. A data scope has a bunch of fields and collectors, and users can add new fields and collectors to it.

Get or Add a Field

You can get or add a field of a data scope (which is a data slice).

note

You cannot override an existing field.

Getting and setting a field of a data scope is done by the [] operator with a field name:

@cocoindex.flow_def(name="DemoFlow")
def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):

# Add "documents" to the top-level data scope.
data_scope["documents"] = flow_builder.add_source(DemoSourceSpec(...))

# Each row of "documents" is a child scope.
with data_scope["documents"].row() as document:

# Get "content" from the document scope, transform, and add "summary" to scope.
document["summary"] = field1_row["content"].transform(DemoFunctionSpec(...))

Add a collector

See Data Collector below for more details.

Data Slice

A data slice references a subset of data belonging to a data scope, e.g. a specific field from a data scope. A data slice has a certain data type, and it's the input for most operations.

Import from source

To get the initial data slice, we need to start from importing data from a source. FlowBuilder provides a add_source() method to import data from external sources. A source spec needs to be provided for any import operation, to describe the source and parameters related to the source. Import must happen at the top level, and the field created by import must be in the top-level struct.

@cocoindex.flow_def(name="DemoFlow")
def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
data_scope["documents"] = flow_builder.add_source(DemoSourceSpec(...))
......
note

The actual value of data is not available at the time when we define the flow: it's only available at runtime. In a flow definition, you can use a data representation as input for operations, but not access the actual value.

Refresh interval

You can provide a refresh_interval argument. When present, in the live update mode, the data source will be refreshed by specified interval.

The refresh_interval argument is of type datetime.timedelta. For example, this refreshes the data source every 1 minute:

@cocoindex.flow_def(name="DemoFlow")
def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
data_scope["documents"] = flow_builder.add_source(
DemoSourceSpec(...), refresh_interval=datetime.timedelta(minutes=1))
......
info

In live update mode, for each refresh, CocoIndex will traverse the data source to figure out the changes, and only perform transformations on changed source keys.

Transform

transform() method transforms the data slice by a function, which creates another data slice. A function spec needs to be provided for any transform operation, to describe the function and parameters related to the function.

The function takes one or multiple data arguments. The first argument is the data slice to be transformed, and the transform() method is applied from it. Other arguments can be passed in as positional arguments or keyword arguments, after the function spec.

@cocoindex.flow_def(name="DemoFlow")
def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
...
data_scope["field2"] = data_scope["field1"].transform(
DemoFunctionSpec(...),
arg1, arg2, ..., key0=kwarg0, key1=kwarg1, ...)
...

For each row

If the data slice has Table type, you can call row() method to obtain a child scope representing each row, to apply operations on each row.

@cocoindex.flow_def(name="DemoFlow")
def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
...
with data_scope["table1"].row() as table1_row:
# Children operations
table1_row["field2"] = table1_row["field1"].transform(DemoFunctionSpec(...))

Get a sub field

If the data slice has Struct type, you can obtain a data slice on a specific sub field of it, similar to getting a field of a data scope.

Data Collector

A data collector can be added from a specific data scope, and it collects multiple entries of data from the same or children scope.

Collect

Call its collect() method to collect a specific entry, which can have multiple fields. Each field has a name as specified by the argument name, and a value in one of the following representations:

  • A DataSlice.

  • An enum cocoindex.GeneratedField.UUID indicating its value is an UUID automatically generated by the engine. The uuid will remain stable when other collected input values are unchanged.

    note

    An automatically generated UUID field is allowed to appear at most once.

For example,

@cocoindex.flow_def(name="DemoFlow")
def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
...
demo_collector = data_scope.add_collector()
with data_scope["documents"].row() as document:
...
demo_collector.collect(id=cocoindex.GeneratedField.UUID,
filename=document["filename"],
summary=document["summary"])
...

Here the collector is in the top-level data scope. It collects filename and summary fields from each row of documents, and generates a id field with UUID and remains stable when filename and summary are unchanged.

Export

The export() method exports the collected data to an external storage.

A storage spec needs to be provided for any export operation, to describe the storage and parameters related to the storage.

Export must happen at the top level of a flow, i.e. not within any child scopes created by "for each row". It takes the following arguments:

  • name: the name to identify the export target.
  • target_spec: the storage spec as the export target.
  • setup_by_user (optional): whether the export target is setup by user. By default, CocoIndex is managing the target setup (surfaced by the cocoindex setup CLI subcommand), e.g. create related tables/collections/etc. with compatible schema, and update them upon change. If True, the export target will be managed by users, and users are responsible for creating the target and updating it upon change.
  • Fields to configure storage indexes. primary_key_fields is required, and all others are optional.
@cocoindex.flow_def(name="DemoFlow")
def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
...
demo_collector = data_scope.add_collector()
...
demo_collector.export(
"demo_storage", DemoStorageSpec(...),
primary_key_fields=["field1"],
vector_indexes=[cocoindex.VectorIndexDef("field2", cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY)])

The target storage is managed by CocoIndex, i.e. it'll be created by CocoIndex CLI when you run cocoindex setup, and the data will be automatically updated (including stale data removal) when updating the index. The name for the same storage should remain stable across different runs. If it changes, CocoIndex will treat it as an old storage removed and a new one created, and perform setup changes and reindexing accordingly.

Storage Indexes

Many storage supports indexes, to boost efficiency in retrieving data. CocoIndex provides a common way to configure indexes for various storages.

  • Primary key. primary_key_fields (Sequence[str]): the fields to be used as primary key. Types of the fields must be supported as key fields. See Key Types for more details.
  • Vector index. vector_indexes (Sequence[VectorIndexDef]): the fields to create vector index. VectorIndexDef has the following fields:
    • field_name: the field to create vector index.
    • metric: the similarity metric to use. See Vector Type for more details about supported similarity metrics.

Miscellaneous

Target Declarations

Most time a target storage is created by calling export() method on a collector, and this export() call comes with configurations needed for the target storage, e.g. options for storage indexes. Occasionally, you may need to specify some configurations for target storage out of the context of any specific data collector.

For example, for graph database targets like Neo4j, you may have a data collector to export data to Neo4j relationships, which will create nodes referenced by various relationships in turn. These nodes don't directly come from any specific data collector (consider relationships from different data collectors may share the same nodes). To specify configurations for these nodes, you can declare spec for related node labels.

FlowBuilder provides declare() method for this purpose, which takes the spec to declare, as provided by various target types.

flow_builder.declare(
cocoindex.storages.Neo4jDeclarations(...)
)

Auth Registry

CocoIndex manages an auth registry. It's an in-memory key-value store, mainly to store authentication information for a backend.

Operation spec is the default way to configure a backend. But it has the following limitations:

  • The spec isn't supposed to contain secret information, and it's frequently shown in various places, e.g. cocoindex show.
  • Once an operation is removed after flow definition code change, the spec is also gone. But we still need to be able to drop the backend (e.g. a table) by cocoindex setup or cocoindex drop.

Auth registry is introduced to solve the problems above. It works as follows:

  • You can create new auth entry by a key and a value.
  • You can references the entry by the key, and pass it as part of spec for certain operations. e.g. Neo4j takes connection field in the form of auth entry reference.

You can add an auth entry by cocoindex.add_auth_entry() function, which returns a cocoindex.AuthEntryReference:

my_graph_conn = cocoindex.add_auth_entry(
"my_graph_conn",
cocoindex.storages.Neo4jConnectionSpec(
uri="bolt://localhost:7687",
user="neo4j",
password="cocoindex",
))

Then reference it when building a spec that takes an auth entry:

  • You can either reference by the AuthEntryReference object directly:

    demo_collector.export(
    "MyGraph",
    cocoindex.storages.Neo4jRelationship(connection=my_graph_conn, ...)
    )
  • You can also reference it by the key string, using cocoindex.ref_auth_entry() function:

    demo_collector.export(
    "MyGraph",
    cocoindex.storages.Neo4jRelationship(connection=cocoindex.ref_auth_entry("my_graph_conn"), ...))

Note that CocoIndex backends use the key of an auth entry to identify the backend.

  • Keep the key stable. If the key doesn't change, it's considered to be the same backend (even if the underlying way to connect/authenticate change).

  • If a key is no longer referenced in any operation spec, keep it until the next cocoindex setup or cocoindex drop, so that when cocoindex will be able to perform cleanups.