Custom Targets
A custom target allows you to export data to any destination you want, such as databases, cloud storage, file systems, APIs, or other external systems.
Custom targets are defined by two components:
- A target spec that configures the behavior and connection parameters for the target.
- A target connector that handles the actual data export operations.
Target Spec
The target spec defines the configuration parameters for your custom target. When you use this target in a flow (typically by calling export()
), you instantiate this target spec with specific parameter values.
- Python
A target spec is defined as a class that inherits from cocoindex.op.TargetSpec
.
class CustomTarget(cocoindex.op.TargetSpec):
"""
Documentation for the target.
"""
param1: str
param2: int | None = None
...
Notes:
- All fields of the spec must have a type serializable / deserializable by the
json
module. - All subclasses of
TargetSpec
can be instantiated similar to a dataclass, i.e.ClassName(param1=value1, param2=value2, ...)
.
Target Connector
A target connector handles the actual data export operations for your custom target. It defines how data should be written to your target destination.
Target connectors implement two categories of methods: setup methods for managing target infrastructure (similar to DDL operations in databases), and data methods for handling specific data operations (similar to DML operations).
- Python
A target connector is defined as a class decorated by @cocoindex.op.target_connector(spec_cls=CustomTarget)
.
@cocoindex.op.target_connector(spec_cls=CustomTarget)
class CustomTargetConnector:
# Setup methods
@staticmethod
def get_persistent_key(spec: CustomTarget, target_name: str) -> PersistentKey:
"""Required. Return a persistent key that uniquely identifies this target instance."""
...
@staticmethod
def apply_setup_change(
key: PersistentKey, previous: CustomTarget | None, current: CustomTarget | None
) -> None:
"""Required. Apply setup changes to the target."""
...
@staticmethod
def describe(key: PersistentKey) -> str:
"""Optional. Return a human-readable description of the target."""
...
# Data methods
@staticmethod
def prepare(spec: CustomTarget) -> PreparedCustomTarget:
"""Optional. Prepare for execution before applying mutations."""
...
@staticmethod
def mutate(
*all_mutations: tuple[PreparedCustomTarget, dict[DataKeyType, DataValueType | None]],
) -> None:
"""Required. Apply data mutations to the target."""
...
The following data types are involved in the method definitions above: CustomTarget
, PersistentKey
, PreparedCustomTarget
, DataKeyType
, DataValueType
. They should be replaced with the actual types in your implementation. We will explain each of them below.
Setup Methods
Setup methods manage the target infrastructure - creating, configuring, and cleaning up target resources.
get_persistent_key(spec, target_name) -> PersistentKey
(Required)
This method returns a unique identifier for the target instance. This key is used by CocoIndex to keep track of target state and drive target spec changes.
The key should be stable across different runs. If a previously existing key no longer exists, CocoIndex will assume the target is gone, and will drop it by calling apply_setup_change
with current
set to None
.
The return type of this method should be serializable by the json
module. It will be passed to other setup methods.
apply_setup_change(key, previous, current) -> None
(Required)
This method is called when the target configuration changes. It receives:
key
: The persistent key for this targetprevious
: The previous target spec (orNone
if this is a new target)current
: The current target spec (orNone
if the target is being removed)
This method should be implemented to:
- Create resources when a target is first added (
previous
isNone
) - Update configuration when a target spec changes
- Clean up resources when a target is removed (
current
isNone
)
describe(key) -> str
(Optional)
Returns a human-readable description of the target for logging and debugging purposes.
Data Methods
Data methods handle the actual data operations - inserting, updating, and deleting records in the target.
mutate(*all_mutations) -> None
(Required)
This method applies data changes to the target. It receives multiple mutation batches, where each batch is a tuple containing:
-
The target spec (
PreparedCustomTarget
, orCustomTarget
ifprepare
is not provided). -
A dictionary of mutations (
dict[DataKeyType, DataValueType | None]
). Each entry represents a mutation for a single row. When the value isNone
, it represents a deletion for the row, otherwise it's an upsert.It represented in the same way as KTable, except the value can be
None
. In particular:-
Since both
DataKeyType
andDataValueType
can have multiple columns, they're Struct.DataKeyType
can be represented by a frozen dataclass (i.e.@dataclass(frozen=True)
) or aNamedTuple
, as it needs to be immutable.DataValueType
can be represented by adataclass
, aNamedTuple
or adict[str, Any]
.
-
For simplicity, when there're a single primary key column with basic type, we allow using type of this column (e.g.
str
,int
etc.) as the key type, and a wrapper Struct type can be omitted. You can still use a@dataclass(frozen=True)
or aNamedTuple
to represent the key for this case though, if you want to handle both cases consistently.
-
prepare(spec) -> PreparedCustomTarget
(Optional)
Prepares for execution by performing common operations before applying mutations. The returned value will be passed as the first element of tuples in the mutate
method instead of the original spec.
@staticmethod
def prepare(spec: CustomTarget) -> PreparedCustomTarget:
"""
Prepare for execution. Called once before mutations.
"""
# Initialize connections, validate configuration, etc.
return PreparedCustomTarget(...)
If not provided, the original spec will be passed directly to mutate
.
Best Practices
Idempotency of Methods with Side Effects
apply_setup_change()
and mutate()
are the two methods that are expected to produce side effects.
We expect them to be idempotent, i.e. when calling them with the same arguments multiple times, the effect should remain the same.
For example,
- For
apply_setup_change()
, if the target is a directory, it should be a no-op if we try to create it (previous
isNone
) when the directory already exists, and also a no-op if we try to delete it (current
isNone
) when the directory does not exist. - For
mutate()
, if a mutation is a deletion, it should be a no-op if the row does not exist.
This is to make sure when the system if left in an intermediate state, e.g. interrupted in the middle between a change is made and CocoIndex notes down the change is completed, the targets can still be gracefully rolled forward to the desired states after the system is resumed.
Examples
The cocoindex repository contains the following examples of custom targets:
- In the custom_output_files example,
LocalFileTarget
exports data to local HTML files.