Custom Targets
A custom target allows you to export data to any destination you want, such as databases, cloud storage, file systems, APIs, or other external systems. You can either continuously update the destination to keep it in sync with the latest exported data, or simply publish the changes as a changelog to somewhere.
Overview
Custom targets are defined by two components:
- A target spec that configures the behavior and connection parameters for the target.
- A target connector that handles the actual data export operations.
When you define a flow within CocoIndex, you define how data are transformed, collected and exported, without worrying about how to handle data change (insert, update, delete). CocoIndex handles it for you. However, a target connects CocoIndex flow and external systems, and needs to synchronize changes of data from the CocoIndex flow to outside. The implementation of a target connector needs to deal with changes, in two aspects:
-
Setup changes. They're for basic setup of a target's corresponding infrastructure (e.g. a table, a directory) without specific data. When users add a new target, delete an existing target, or make changes to the target spec, the framework will trigger the connector to apply these setup changes by calling
apply_setup_change()
. The connector needs apply corresponding setup changes to the external system, e.g. create/delete a table, update/delete a directory, etc. -
Data changes. They're changes of specific data exported to the target. During the flow is running, when new rows-to-export appear, or existing ones are updated or deleted in the CocoIndex flow, the framework will trigger the connector to apply these data changes by calling
mutate()
, e.g. insert/update/delete a row in a table, write/delete a file, etc.
Target Spec
The target spec defines the configuration parameters for your custom target. When you use this target in a flow (typically by calling export()
), you instantiate this target spec with specific parameter values.
- Python
A target spec is defined as a class that inherits from cocoindex.op.TargetSpec
.
class CustomTarget(cocoindex.op.TargetSpec):
"""
Documentation for the target.
"""
param1: str
param2: int | None = None
...
Notes:
- All fields of the spec must have a type serializable / deserializable by the
json
module. - All subclasses of
TargetSpec
can be instantiated similar to a dataclass, i.e.ClassName(param1=value1, param2=value2, ...)
.
Target Connector
A target connector handles the actual data export operations for your custom target. It defines how data should be written to your target destination.
Target connectors implement two categories of methods: setup methods to deal with setup changes, and data methods to deal with data changes.
- Python
A target connector is defined as a class decorated by @cocoindex.op.target_connector(spec_cls=CustomTarget)
.
@cocoindex.op.target_connector(spec_cls=CustomTarget)
class CustomTargetConnector:
# Setup methods
@staticmethod
def get_persistent_key(spec: CustomTarget, target_name: str) -> PersistentKey:
"""Required. Return a persistent key that uniquely identifies this target instance."""
...
@staticmethod
def apply_setup_change(
key: PersistentKey, previous: CustomTarget | None, current: CustomTarget | None
) -> None:
"""Required. Apply setup changes to the target."""
...
@staticmethod
def describe(key: PersistentKey) -> str:
"""Optional. Return a human-readable description of the target."""
...
# Data methods
@staticmethod
def prepare(spec: CustomTarget) -> PreparedCustomTarget:
"""Optional. Prepare for execution before applying mutations."""
...
@staticmethod
def mutate(
*all_mutations: tuple[PreparedCustomTarget, dict[DataKeyType, DataValueType | None]],
) -> None:
"""Required. Apply data mutations to the target."""
...
The following data types are involved in the method definitions above: CustomTarget
, PersistentKey
, PreparedCustomTarget
, DataKeyType
, DataValueType
. They should be replaced with the actual types in your implementation. We will explain each of them below.
Setup Methods
Setup methods manage the target infrastructure - creating, configuring, and cleaning up target resources.
get_persistent_key(spec, target_name) -> PersistentKey
(Required)
This method returns a unique identifier for the target instance. This key is used by CocoIndex to keep track of target state and drive target spec changes.
The key should be stable across different runs. If a previously existing key no longer exists, CocoIndex will assume the target is gone, and will drop it by calling apply_setup_change
with current
set to None
.
The return type of this method should be serializable by the json
module. It will be passed to other setup methods.
apply_setup_change(key, previous, current) -> None
(Required)
This method is called when the target configuration changes. It receives:
key
: The persistent key for this targetprevious
: The previous target spec (orNone
if this is a new target)current
: The current target spec (orNone
if the target is being removed)
This method should be implemented to:
- Create resources when a target is first added (
previous
isNone
) - Update configuration when a target spec changes
- Clean up resources when a target is removed (
current
isNone
)
describe(key) -> str
(Optional)
Returns a human-readable description of the target for logging and debugging purposes.
Data Methods
Data methods handle the actual data operations - inserting, updating, and deleting records in the target.
mutate(*all_mutations) -> None
(Required)
This method applies data changes to the target. It receives multiple mutation batches, where each batch is a tuple containing:
-
The target spec (
PreparedCustomTarget
, orCustomTarget
ifprepare
is not provided). -
A dictionary of mutations (
dict[DataKeyType, DataValueType | None]
). Each entry represents a mutation for a single row. When the value isNone
, it represents a deletion for the row, otherwise it's an upsert.It represented in the same way as KTable, except the value can be
None
. In particular:-
Since both
DataKeyType
andDataValueType
can have multiple columns, they're Struct.DataKeyType
can be represented by a frozen dataclass (i.e.@dataclass(frozen=True)
) or aNamedTuple
, as it needs to be immutable.DataValueType
can be represented by adataclass
, aNamedTuple
or adict[str, Any]
.
-
For simplicity, when there're a single primary key column with basic type, we allow using type of this column (e.g.
str
,int
etc.) as the key type, and a wrapper Struct type can be omitted. You can still use a@dataclass(frozen=True)
or aNamedTuple
to represent the key for this case though, if you want to handle both cases consistently.
-
prepare(spec) -> PreparedCustomTarget
(Optional)
Prepares for execution by performing common operations before applying mutations. The returned value will be passed as the first element of tuples in the mutate
method instead of the original spec.
@staticmethod
def prepare(spec: CustomTarget) -> PreparedCustomTarget:
"""
Prepare for execution. Called once before mutations.
"""
# Initialize connections, validate configuration, etc.
return PreparedCustomTarget(...)
If not provided, the original spec will be passed directly to mutate
.
Best Practices
Idempotency of Methods with Side Effects
apply_setup_change()
and mutate()
are the two methods that are expected to produce side effects.
We expect them to be idempotent, i.e. when calling them with the same arguments multiple times, the effect should remain the same.
For example,
- For
apply_setup_change()
, if the target is a directory, it should be a no-op if we try to create it (previous
isNone
) when the directory already exists, and also a no-op if we try to delete it (current
isNone
) when the directory does not exist. - For
mutate()
, if a mutation is a deletion, it should be a no-op if the row does not exist.
This is to make sure when the system if left in an intermediate state, e.g. interrupted in the middle between a change is made and CocoIndex notes down the change is completed, the targets can still be gracefully rolled forward to the desired states after the system is resumed.
Example
In this example, we define a custom target that accepts data with the following fields:
filename
(key field)author
(value field)html
(value field)
- Python
import dataclasses
import cocoindex
# 1. Define the target spec
class MyCustomTarget(cocoindex.op.TargetSpec):
"""Spec of the custom target, to configure the target location etc."""
location: str
# 2. Define the value dataclass for exported data
@dataclasses.dataclass
class LocalFileTargetValues:
"""Represents value fields of exported data."""
author: str
html: str
# 3. Define the target connector
@cocoindex.op.target_connector(spec_cls=MyCustomTarget)
class LocalFileTargetConnector:
@staticmethod
def get_persistent_key(spec: MyCustomTarget, target_name: str) -> str:
return spec.location
@staticmethod
def apply_setup_change(
key: str, previous: MyCustomTarget | None, current: MyCustomTarget | None
) -> None:
# Setup/teardown logic here
...
@staticmethod
def mutate(
*all_mutations: tuple[MyCustomTarget, dict[str, LocalFileTargetValues | None]],
) -> None:
"""Apply data mutations to the target."""
for spec, mutations in all_mutations:
for filename, mutation in mutations.items():
if mutation is None:
# Delete the file
...
else:
# Write the file with author and html content
...
# 4. Usage in a flow
@cocoindex.flow_def(name="ExampleFlow")
def example_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope) -> None:
# Add data source
data_scope["documents"] = flow_builder.add_source(...)
# Create collector
output_data = data_scope.add_collector()
# Collect data
with data_scope["documents"].row() as doc:
# Create the "author" and "fieldname" field
...
# Collect the data
output_data.collect(filename=doc["filename"], author=doc["author"], html=doc["transformed_html"])
# Export to custom target
output_data.export(
"OutputData",
MyCustomTarget(location=...),
primary_key_fields=["filename"],
)
In this example, the type for data in all_mutations
is dict[str, LocalFileTargetValues | None]
:
str
is theDataKeyType
(the filename)LocalFileTargetValues
is theDataValueType
(containinghtml
andauthor
fields)- The
mutate()
method receives tuples of(MyCustomTarget, dict[str, LocalFileTargetValues | None])
For simplicity, the type hints can be omitted and a dict
will be created instead of a dataclass instance, and author
and html
will be the keys of the dict.
See the custom_output_files for the an end-to-end example.