Skip to main content

Manage Flows Dynamically

You write a function, a.k.a. flow definition, to define indexing logic. Sometimes you want to reuse the same flow definition for multiple flow instances (a.k.a. flow), e.g. each takes input from different sources, exports to different targets, and even with slightly different parameters for transformation logic.

States of a flow instance

A flow instance has states from two aspects:

  • In-process object, of type cocoindex.Flow.
  • Persistent resource, including states in the internal storage and backend resources that are owned by the flow instance.

A flow instance is ultimately a persistent resource. Its in-process object is a handle to operate on it. Consider file handles and database connections. CocoIndex provides APIs to open and close flow instances, and setup and drop the persistent resource.

Parameterize the flow definition

In the example from the Quickstart Guide, we decorate the flow definition function with a @cocoindex.flow_def(name="DemoFlow") decorator:

Example in Quickstart Guide
@cocoindex.flow_def(name="TextEmbedding")
def text_embedding_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
...

This immediately creates the in-process object of the flow instance, using the given function as the flow definition. This is a shortcut of:

def _text_embedding_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
...

text_embedding_flow = cocoindex.open_flow("TextEmbedding", _text_embedding_flow)

Here, cocoindex.open_flow() is the function that creates the in-process object of the flow instance, with the given name and flow definition function. You can directly call it dynamically with flow name created programmatically.

Oftentimes, you also want to parameterize the flow definition function. For example, we may have a dataclass like this to hold the parameters of the flow:

@dataclass
class TextEmbeddingFlowParameters:
source_path: str
target_table_name: str

And consider we have a registry of parameters for all flow instances somewhere. For simplicity, we use a hardcoded dict here, and provide a simple function to get the parameters for a given flow name. In reality, the source of truth may come from a configuration file, a database, etc., and the function can be replaced by your own implementation.

FLOW_PARAMETERS: dict[str, TextEmbeddingFlowParameters] = {
"foo": TextEmbeddingFlowParameters(source_path="/path/to/foo", target_table_name="foo_embeddings"),
"bar": TextEmbeddingFlowParameters(source_path="/path/to/bar", target_table_name="bar_embeddings"),
}

def get_flow_parameters(name: str) -> TextEmbeddingFlowParameters:
return FLOW_PARAMETERS[name]

Then you can have a function that returns the flow definition function for the given parameters:

def text_embedding_flow_def(params: TextEmbeddingFlowParameters):
def _flow_def(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
# Add a data source to read files from the specified directory
data_scope["documents"] = flow_builder.add_source(
cocoindex.sources.LocalFile(path=params.source_path))

doc_embeddings = data_scope.add_collector()
...

# Export the collected data to a Postgres table, with the specified table name.
doc_embeddings.export(
"doc_embeddings",
cocoindex.targets.Postgres(table_name=params.target_table_name),
primary_key_fields=["filename", "location"],
)

return _flow_def

With this, you can open flow instances dynamically with its parameters:

text_embedding_flows: dict[str, cocoindex.Flow] = {}

def get_text_embedding_flow(name: str) -> cocoindex.Flow:
flow = text_embedding_flows.get(name)

if flow is None:
params = get_flow_parameters(name)
flow = text_embedding_flows[name] = cocoindex.open_flow(f"TextEmbedding_{name}", text_embedding_flow_def(params))

return flow

Operate on the flow instances

Setup the persistent resource

After you instantiated and open flow instances dynamically, before you can perform any data updates, you need to make sure the persistent resource is ready. You can use the setup() method, e.g. modify the above code to:

text_embedding_flows: dict[str, cocoindex.Flow] = {}

def get_text_embedding_flow(name: str) -> cocoindex.Flow:
flow = text_embedding_flows.get(name)

if flow is None:
params = get_flow_parameters(name)
flow = text_embedding_flows[name] = cocoindex.open_flow(f"TextEmbedding_{name}", text_embedding_flow_def(params))
flow.setup(report_to_stdout=True)

return flow

setup() method synchronizes the persistent resource to a state that is consistent with the in-process object. For example,

  • If the persistent resource is not there yet, it will create the backend resources for new targets.
  • If your flow definition changed and a new target has been added since the last time of setup, it will create the backend resources for new targets.
  • If an existing target is removed from the flow definition, it will drop the backend resources for the removed target.
  • If nothing changed since the last time of setup, it will be a no-op. i.e. the setup() method is idempotent.

setup() takes a report_to_stdout parameter to control whether to print the setup progress to the standard output.

setup() takes care of all scenarios and makes sure the persistent resource is in the right state. It's generally safe to call it after you open a flow instance, even if you don't know whether the persistent resource already exists.

Perform data updates

After you make sure the persistent resource is ready, you can perform data updates using the flow.

The update() method updates the target defined by the flow.

flow.update()

This performs a one-time data update. After the function returns, the target is up-to-date as of the moment when the function is called. For example, we can call update() to update the target after the flow is setup:

def update_text_embedding_index(name: str):
flow = get_text_embedding_flow(name)
flow.update()

You can also do a live update. See the Live Updates tutorial for more details.

Close the flow object

Sometimes you don't want to hold the in-process object forever. You can free up the memory resources by closing the flow instances with the close() method.

For example, the dict we managed above behaves like a cache to hold the flow instances. If a specific flow isn't used for a while, we may close it. The TTLCache from cachetools package provides exactly this functionality. We can rewrite the above code a little bit. First, we bring in necessary imports:

from cachetools import cached, TTLCache

Then we define our own version of TTLCache to make it call the close() method when the flow instance is evicted from the cache:

class MyTTLCache(TTLCache):
def popitem(self):
# Close the flow instance when it is evicted from the cache
key, flow = super().popitem()
flow.close()
return key, flow

With this, we can modify our get_text_embedding_flow() function to use MyTTLCache to cache the flow instances, instead of managing our own dict:

@cached(cache=MyTTLCache(maxsize=20, ttl=600))
def get_text_embedding_flow(name: str) -> cocoindex.Flow:
params = get_flow_parameters(name)
flow = cocoindex.open_flow(f"TextEmbedding_{name}", text_embedding_flow_def(params))
flow.setup(report_to_stdout=True)
return flow

The @cached() decorator from cachetools package automatically manages the cache for us (and it also offers thread safety!). Once a flow is not touched for 10 minutes, it will call the popitem() method, which will close the in-memory flow object.

Drop the persistent resource

Occasionally, you may want to drop the persistent resource of a flow. The drop() method is for this purpose.

def drop_text_embedding_index(name: str):
flow = get_text_embedding_flow(name)
flow.drop()

This will drop the persistent resource of the flow. The in-memory flow object is still alive, and can be reused until it's closed. For example, you can still call setup() again.

Put it all together

import cocoindex
from cachetools import cached, TTLCache
from dataclasses import dataclass

@dataclass
class TextEmbeddingFlowParameters:
source_path: str
target_table_name: str

FLOW_PARAMETERS: dict[str, TextEmbeddingFlowParameters] = {
"foo": TextEmbeddingFlowParameters(source_path="/path/to/foo", target_table_name="foo_embeddings"),
"bar": TextEmbeddingFlowParameters(source_path="/path/to/bar", target_table_name="bar_embeddings"),
}

# Placeholder to get the parameters for a given flow name. You can replace this with your own implementation.
def get_flow_parameters(name: str) -> TextEmbeddingFlowParameters:
return FLOW_PARAMETERS[name]


def text_embedding_flow_def(params: TextEmbeddingFlowParameters):
def _flow_def(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
# Add a data source to read files from a directory
data_scope["documents"] = flow_builder.add_source(
cocoindex.sources.LocalFile(path=params.source_path))

doc_embeddings = data_scope.add_collector()
...

# Export the collected data to a Postgres table, with the specified table name.
doc_embeddings.export(
"doc_embeddings",
cocoindex.targets.Postgres(table_name=params.target_table_name),
primary_key_fields=["filename", "location"],
)

return _flow_def

class MyTTLCache(TTLCache):
def popitem(self):
# Close the flow instance when it is evicted from the cache
key, flow = super().popitem()
flow.close()
return key, flow

@cached(cache=MyTTLCache(maxsize=20, ttl=600))
def get_text_embedding_flow(name: str) -> cocoindex.Flow:
params = get_flow_parameters(name)
flow = cocoindex.open_flow(f"TextEmbedding_{name}", text_embedding_flow_def(params))
flow.setup(report_to_stdout=True)
return flow

def update_text_embedding_index(name: str):
flow = get_text_embedding_flow(name)
flow.update()

def drop_text_embedding_index(name: str):
flow = get_text_embedding_flow(name)
flow.drop()

This provides a skeleton. With this, you can trigger update_text_embedding_index() and drop_text_embedding_index() from your application, e.g. a web server API.

Takeaways

From this tutorial, we walked through major flow management / operation APIs provided by CocoIndex. These APIs can be categorized into three aspects:

AspectAPIsDescription
Life of in-process flow objectopen_flow(), Flow.close()Create and destroy the in-memory handle to operate on flow instances
Life of persistent resourceFlow.setup(), Flow.drop()Create and destroy the backend resources and internal storage
Data updatesFlow.update(), FlowLiveUpdaterExecute the indexing logic to update targets. Requires persistent resource to be up-to-date first.

For simplicity, we use an in-memory dict as source of truth for the flow parameters. You can replace it with your own mechanism, e.g. table from a database, a configuration file, etc. You can trigger these APIs from your applications specific to your use case, e.g. from a specific API endpoint of a web server.

Further readings

You can see the following documents for more details: