Custom Target Connector
A custom target connector is the mechanism that connects CocoIndex's declarative target state system to external systems. When you call methods like dir_target.declare_file() or table_target.declare_row(), a target connector handles the actual synchronization — determining what changed and applying those changes to the external system.
When to create a custom target connector
Most users will use built-in connectors (like localfs or postgres) and never need to create their own. Consider creating a custom target connector when:
- You need to integrate with an external system not covered by existing connectors
- You need custom change detection logic (e.g., content-based fingerprinting)
- You need to manage hierarchical target states (containers with children)
For simple use cases where you just need to write data to an external system without sophisticated change tracking, consider using a regular function with memoization instead. Target states providers are most valuable when you need CocoIndex to track and clean up target states automatically.
Key data types
This section introduces the key data types. Each is marked as either you implement or CocoIndex provides to clarify responsibilities.
TargetHandler (you implement)
A TargetHandler implements the reconciliation logic. It's a protocol with one required method and one optional method:
class TargetHandler(Protocol[KeyT, ValueT, TrackingRecordT, OptChildHandlerT]):
def reconcile(
self,
key: KeyT,
desired_state: ValueT | NonExistenceType,
prev_possible_states: Collection[TrackingRecordT],
prev_may_be_missing: bool,
/,
) -> TargetReconcileOutput[ActionT, TrackingRecordT, OptChildHandlerT] | None:
...
# Optional: override to support attachment types (see "Implementing attachment providers")
def attachment(self, att_type: str) -> TargetHandler | None:
return None # Default: no attachments
Type Parameters:
KeyT: The type used to identify target states (e.g., filename, primary key tuple)ValueT: The specification for the target state (e.g., file content, row data)TrackingRecordT: What's stored to detect changes on future runsOptChildHandlerT: The child handler type, orNonefor leaf targets
Parameters:
key: Unique identifier for this target statedesired_state: What the user declared, orNON_EXISTENCEif no longer declaredprev_possible_states: Tracking records from previous runs (may have multiple due to interrupted updates)prev_may_be_missing: IfTrue, the target state might not exist in the external system
Returns:
TargetReconcileOutputif an action is neededNoneif no changes are required
The reconcile() method must be non-blocking. It should only compare states and return an action — actual I/O operations happen later in the TargetActionSink.
Tracking record (you define)
A tracking record captures the essential information needed to detect changes. Good tracking records:
- Are minimal: Only include what's needed for change detection
- Are deterministic: Same input always produces the same record
- Are serializable: Must be persistable (typically a NamedTuple or dataclass)
# Example: File tracking record
@dataclass(frozen=True, slots=True)
class _FileTrackingRecord:
fingerprint: bytes # Content hash for change detection
For content-based change detection, use the connectorkits.fingerprint utilities. This lets you detect changes without storing the full content:
from cocoindex.connectorkits.fingerprint import fingerprint_bytes, fingerprint_str, fingerprint_object
# For raw bytes
fp = fingerprint_bytes(content)
# For strings
fp = fingerprint_str(text)
# For arbitrary objects (uses memo key mechanism)
fp = fingerprint_object(obj)
Action and TargetActionSink (you implement)
An action (you define) describes what operation to perform on the external system:
# Example: File action
class _FileAction(NamedTuple):
path: pathlib.Path
content: bytes | None # None means delete
A TargetActionSink batches and executes actions:
# Sync sink
sink = coco.TargetActionSink.from_fn(apply_actions)
# Async sink
sink = coco.TargetActionSink.from_async_fn(apply_actions_async)
The sink function receives context_provider as its first argument (for looking up connections from the environment), followed by a sequence of actions. For container targets, it returns child handler definitions:
from cocoindex._internal.context_keys import ContextProvider
def apply_actions(
context_provider: ContextProvider,
actions: Sequence[_FileAction],
) -> list[coco.ChildTargetDef[_ChildHandler] | None]:
outputs = []
for action in actions:
if action.content is None:
action.path.unlink(missing_ok=True)
outputs.append(None)
else:
action.path.write_bytes(action.content)
# Return child handler for directories
if action.is_directory:
outputs.append(coco.ChildTargetDef(handler=_ChildHandler(action.path)))
else:
outputs.append(None)
return outputs
TargetReconcileOutput (you return)
TargetReconcileOutput bundles what reconcile() returns when an action is needed:
class TargetReconcileOutput(NamedTuple):
action: ActionT # What to do
sink: TargetActionSink[ActionT, ...] # How to execute it
tracking_record: TrackingRecordT | NonExistenceType # What to remember
child_invalidation: Literal["destructive", "lossy"] | None = None # For container targets
The child_invalidation field is only relevant for container targets (those with children). See Child invalidation for details.
TargetStatesProvider (CocoIndex provides)
A TargetStatesProvider is a factory that creates TargetState objects. You don't implement this class — CocoIndex gives you one when you register a handler or declare a target state with children.
# You get a provider from registration
provider = coco.register_root_target_states_provider("my.target", handler)
# Or from declaring a parent target state
child_provider = coco.declare_target_state_with_child(parent_target_state)
# Or from an attachment on a resolved child provider (see "Implementing attachment providers")
att_provider = child_provider.attachment("vector_index")
TargetState (CocoIndex provides)
A TargetState wraps a key and spec. You create these using the provider, then declare them:
# Create a target state
target_state = provider.target_state(key, spec)
# Declare it for reconciliation
coco.declare_target_state(target_state)
Implementing root target states
This section covers root target states — those not nested inside another target.
Life of a root target state
Understanding what happens at runtime:
-
Registration: You define a
TargetHandlerand callregister_root_target_states_provider(). CocoIndex returns aTargetStatesProvider— a factory for creating target states associated with your handler. -
Declaration: During execution, user code calls
provider.target_state(key, spec)to createTargetStateobjects, thendeclare_target_state()to declare them. CocoIndex collects all declared target states. -
Reconciliation: When the processing unit finishes, CocoIndex calls your handler's
reconcile()method for each target state. For declared target states,desired_statecontains the spec; for previously declared but now missing states,desired_stateisNON_EXISTENCE(triggering cleanup). Yourreconcile()compares the desired state with previous records and returnsTargetReconcileOutputif an action is needed, orNoneif no changes are required. -
Action Execution: CocoIndex batches actions by their
TargetActionSinkand executes them. The sink applies changes to the external system (database writes, file operations, API calls, etc.). -
Tracking Persistence: After successful execution, CocoIndex persists the new tracking records. On the next run, these become the
prev_possible_statesfor change detection.
Due to interrupted updates, prev_possible_states may contain multiple records. CocoIndex tracks all possible states until a successful update confirms the current state. Your reconciliation logic should handle this by generating actions that work correctly regardless of which previous state is actual.
Step 1: Define your types
Start by defining the types for your provider:
from typing import NamedTuple, Collection
from dataclasses import dataclass
import cocoindex as coco
# Key: How to identify a target state
_RowKey = tuple[str, ...] # Primary key values
# Value: What the user declares
@dataclass
class _RowSpec:
data: dict[str, Any]
# Tracking Record: What to persist for change detection
@dataclass(frozen=True, slots=True)
class _RowTrackingRecord:
fingerprint: bytes
# Action: What operation to perform
class _RowAction(NamedTuple):
key: _RowKey
data: dict[str, Any] | None # None = delete
Step 2: Implement the handler
class _RowHandler(coco.TargetHandler[_RowKey, _RowSpec, _RowTrackingRecord]):
"""Handler for database rows."""
def __init__(self, connection: DatabaseConnection, table: str):
self._conn = connection
self._table = table
self._sink = coco.TargetActionSink.from_async_fn(self._apply_actions)
async def _apply_actions(
self, context_provider: ContextProvider, actions: Sequence[_RowAction]
) -> None:
# Connection was passed in __init__ — context_provider not needed here,
# but must be accepted per the sink protocol.
for action in actions:
if action.data is None:
await self._conn.delete(self._table, action.key)
else:
await self._conn.upsert(self._table, action.key, action.data)
def _compute_fingerprint(self, data: dict[str, Any]) -> bytes:
from cocoindex.connectorkits.fingerprint import fingerprint_object
return fingerprint_object(data)
def reconcile(
self,
key: _RowKey,
desired_state: _RowSpec | coco.NonExistenceType,
prev_possible_states: Collection[_RowTrackingRecord],
prev_may_be_missing: bool,
/,
) -> coco.TargetReconcileOutput[_RowAction, _RowTrackingRecord] | None:
# Handle deletion
if coco.is_non_existence(desired_state):
if not prev_possible_states and not prev_may_be_missing:
return None # Nothing to delete
return coco.TargetReconcileOutput(
action=_RowAction(key=key, data=None),
sink=self._sink,
tracking_record=coco.NON_EXISTENCE,
)
# Handle upsert
target_fp = self._compute_fingerprint(desired_state.data)
# Skip if unchanged
if not prev_may_be_missing and all(
prev.fingerprint == target_fp for prev in prev_possible_states
):
return None
return coco.TargetReconcileOutput(
action=_RowAction(key=key, data=desired_state.data),
sink=self._sink,
tracking_record=_RowTrackingRecord(fingerprint=target_fp),
)
Step 3: Register the provider
For root-level target states (not nested within another target), register a provider:
_row_provider = coco.register_root_target_states_provider(
"mycompany.io/mydb/row", # Unique provider name
_RowHandler(connection, table),
)
Step 4: Create user-facing APIs
Wrap the provider in a user-friendly API:
class TableTarget:
"""User-facing API for declaring rows."""
def __init__(self, provider: coco.TargetStateProvider[_RowKey, _RowSpec, None]):
self._provider = provider
def declare_row(self, *, row: dict[str, Any], key: tuple[str, ...]) -> None:
spec = _RowSpec(data=row)
target_state = self._provider.target_state(key, spec)
coco.declare_target_state(target_state)
Implementing container targets
Container targets (directories, tables) have children (files, rows). This section covers how non-root target states work and how to implement them.
Non-root target states
For targets nested inside another target (e.g., files inside a directory), the lifecycle is similar to root targets but how you get the provider is different.
For root targets, you call register_root_target_states_provider() and immediately get a provider with your handler. For non-root targets, the handler comes from the parent's sink execution:
- Declaration: Call
declare_target_state_with_child(parent_ts)— returns an unresolved child provider immediately - Resolution: When the parent reconciles and its sink executes, the sink returns
ChildTargetDef(handler=...). CocoIndex resolves the child provider with this handler. - Usage: The child provider can now create child target states, which follow the same reconciliation → execution → tracking flow as root targets.
The child handler often needs context from the parent's action execution. For example, a file handler needs to know the directory path that was created. By returning the handler from the parent's sink, the handler has access to this runtime context.
Child invalidation
When a container target undergoes certain changes, the child target states may be affected. The child_invalidation field in TargetReconcileOutput lets you signal this to CocoIndex:
-
"destructive"— The container change destroys all existing children (e.g., a primary key change that requires dropping and recreating a table). CocoIndex will ignore all previous tracking records for children under this container and treat them as new. -
"lossy"— The container change may cause data loss for existing children (e.g., a schema change that removes columns). CocoIndex will force an upsert for all children by settingprev_may_be_missing=True, even if their data appears unchanged. -
None(default) — No impact on children. Normal change detection applies.
Set child_invalidation in the parent handler's reconcile() method when you detect that the container itself has changed in a way that affects its children:
class _DirHandler(coco.TargetHandler[_DirKey, _DirSpec, _DirTrackingRecord]):
def reconcile(self, key, desired_state, prev_possible_states, prev_may_be_missing, /):
# Detect if the container change is destructive or lossy
invalidation = None
if self._is_destructive_change(desired_state, prev_possible_states):
invalidation = "destructive"
elif self._is_lossy_change(desired_state, prev_possible_states):
invalidation = "lossy"
return coco.TargetReconcileOutput(
action=_DirAction(...),
sink=self._sink,
tracking_record=_DirTrackingRecord(...),
child_invalidation=invalidation,
)
Step 1: Define parent and child handlers
The parent handler reconciles the container itself. The child handler reconciles entries within it:
# Parent handler for directory
class _DirHandler(coco.TargetHandler[_DirKey, _DirSpec, _DirTrackingRecord]):
def reconcile(self, key, desired_state, prev_possible_states, prev_may_be_missing, /):
# Reconcile the directory itself
...
# Child handler for entries within a directory
class _EntryHandler(coco.TargetHandler[str, _EntrySpec, _EntryTrackingRecord]):
def __init__(self, base_path: pathlib.Path):
self._base_path = base_path
def reconcile(self, key, desired_state, prev_possible_states, prev_may_be_missing, /):
# Reconcile files/subdirs within the directory
path = self._base_path / key
...
Step 2: Return child handlers from the sink
The parent's sink creates the container and returns child handlers:
def _apply_dir_actions(
context_provider: ContextProvider,
actions: Sequence[_DirAction],
) -> list[coco.ChildTargetDef[_EntryHandler] | None]:
outputs = []
for action in actions:
if action.should_delete:
shutil.rmtree(action.path, ignore_errors=True)
outputs.append(None) # No child handler for deleted directories
else:
action.path.mkdir(parents=True, exist_ok=True)
# Return child handler with the created path
outputs.append(coco.ChildTargetDef(handler=_EntryHandler(action.path)))
return outputs
Step 3: Create user-facing API
The user-facing API uses declare_target_state_with_child() and exposes methods for declaring children:
class DirTarget:
"""User-facing API for declaring files in a directory."""
def __init__(self, provider: coco.TargetStatesProvider[str, _EntrySpec, None]):
self._provider = provider
def declare_file(self, filename: str, content: bytes) -> None:
spec = _EntrySpec(content=content)
target_state = cast(
coco.TargetState[None],
self._provider.target_state(filename, spec),
)
coco.declare_target_state(target_state)
@coco.fn
def declare_dir_target(path: pathlib.Path) -> DirTarget:
"""Declare a directory target and return an API for declaring files."""
parent_ts = _root_provider.target_state(
key=_DirKey(path=str(path)),
spec=_DirSpec(),
)
# Child provider is pending until parent sink runs
child_provider = coco.declare_target_state_with_child(parent_ts)
return DirTarget(child_provider)
Implementing attachment providers
Attachment providers let a child handler expose auxiliary target states alongside its regular children. For example, a PostgreSQL table handler manages rows as regular children, but can also manage vector indexes and SQL command attachments as separate attachment types — each tracked independently.
When to use attachments
Use attachments when a target has auxiliary state beyond its primary children — indexes, triggers, materialized views, or any side-resource that should be managed alongside the main data. Attachments use symbol keys as namespace separators so they never conflict with regular child keys.
Target state path hierarchy
Attachments create additional levels in the target state path using symbol keys (denoted with @ prefix in documentation):
@my_connector/table [level 1 — root provider]
(db_key, schema, table_name) [level 2 — table state]
@vector_index [level 3 — attachment namespace (symbol key)]
index_name_1 [level 4 — attachment instance]
index_name_2 [level 4]
@sql_command_attachment [level 3 — another attachment namespace]
cmd_name_1 [level 4]
row_pk_1 [level 3 — regular child (row)]
row_pk_2 [level 3]
The symbol keys (@vector_index, @sql_command_attachment) are path namespaces — not target states themselves. They separate attachment instances from regular children at the same level.
How it works
- Handler implements
attachment(): The child handler (e.g., row handler) returns a handler for each supported attachment type. - User code calls
provider.attachment(): On a resolved child provider, this creates (or retrieves a cached) attachment sub-provider. - Target states declared under the attachment provider are tracked independently from regular children.
- Idempotent: Calling
.attachment("x")multiple times returns the same cached provider.
Step 1: Implement the attachment handler
An attachment handler is just a regular TargetHandler — it implements reconcile() and has an action sink:
class _VectorIndexSpec(NamedTuple):
column: str
metric: str
method: str
class _VectorIndexAction(NamedTuple):
name: str
spec: _VectorIndexSpec | None # None means delete
class _VectorIndexHandler:
def __init__(self, pool, table_name):
self._pool = pool
self._table_name = table_name
self._sink = coco.TargetActionSink.from_async_fn(self._apply_actions)
async def _apply_actions(
self, context_provider: ContextProvider, actions: Sequence[_VectorIndexAction]
) -> None:
async with self._pool.acquire() as conn:
for action in actions:
if action.spec is None:
await conn.execute(f'DROP INDEX IF EXISTS "{action.name}"')
else:
await conn.execute(f'CREATE INDEX "{action.name}" ...')
def reconcile(self, key, desired_state, prev_possible_states, prev_may_be_missing, /):
# Standard reconcile pattern — compare fingerprints, return action or None
...
Step 2: Add attachment() to the parent handler
The parent handler (which manages regular children) returns attachment handlers by type:
class _RowHandler(coco.TargetHandler[_RowValue, _RowFingerprint]):
def __init__(self, pool, table_name, schema_name, table_schema):
self._pool = pool
self._table_name = table_name
self._schema_name = schema_name
# ...
def attachment(self, att_type: str) -> _VectorIndexHandler | _SqlCommandHandler | None:
if att_type == "vector_index":
return _VectorIndexHandler(self._pool, self._table_name, self._schema_name)
if att_type == "sql_command_attachment":
return _SqlCommandHandler(self._pool, self._table_name, self._schema_name)
return None
def reconcile(self, ...):
# Regular row reconciliation
...
Step 3: Expose attachment APIs on the user-facing target
The user-facing target class calls provider.attachment() to get the attachment sub-provider, then declares target states on it:
class TableTarget:
def __init__(self, provider, table_schema):
self._provider = provider
self._table_schema = table_schema
def declare_row(self, *, row):
# Regular child target state
coco.declare_target_state(self._provider.target_state(pk_values, row_dict))
def declare_vector_index(self, *, name, column, metric="cosine", method="ivfflat"):
spec = _VectorIndexSpec(column=column, metric=metric, method=method)
att_provider = self._provider.attachment("vector_index")
coco.declare_target_state(att_provider.target_state(name, spec))
When an attachment has a teardown step (like DROP INDEX), store the full spec as the tracking record instead of a fingerprint. This lets you recover the teardown information from prev_possible_states when the attachment is deleted or changed. See the _SqlCommandHandler in the PostgreSQL connector for an example.
Best practices
Idempotent actions
Actions should be idempotent — applying the same action multiple times should have the same effect as applying it once:
# Good: Idempotent
path.mkdir(parents=True, exist_ok=True)
path.unlink(missing_ok=True)
await conn.execute("INSERT ... ON CONFLICT DO UPDATE ...")
# Bad: Not idempotent
path.mkdir() # Fails if exists
await conn.execute("INSERT ...") # Fails on duplicate key
Handle multiple previous states
Due to interrupted updates, prev_possible_states may contain multiple records. Design your reconciliation logic to handle this:
# Check if ALL previous states match (conservative approach)
if not prev_may_be_missing and all(
prev.fingerprint == target_fp for prev in prev_possible_states
):
return None # Safe to skip
Efficient change detection
Choose tracking records that enable efficient change detection without storing full content:
| Scenario | Tracking Record |
|---|---|
| File content | Content hash (fingerprint) |
| Database row | Row data hash |
| Schema/structure | Schema definition |
| Directory existence | None (presence is enough) |
Shared action sinks
If all instances of a handler use the same action logic, create a shared sink. The action function must accept context_provider as its first argument:
from cocoindex._internal.context_keys import ContextProvider
def _apply_actions(
context_provider: ContextProvider, actions: Sequence[_MyAction]
) -> None:
...
# Module-level shared sink
_shared_sink = coco.TargetActionSink.from_fn(_apply_actions)
class _MyHandler(coco.TargetHandler[...]):
def reconcile(self, ...):
return coco.TargetReconcileOutput(
action=...,
sink=_shared_sink, # Reuse the same sink
tracking_record=...,
)
Complete example: local file system
Here's a simplified version of the localfs connector showing the complete pattern:
from __future__ import annotations
import pathlib
from dataclasses import dataclass
from typing import Collection, NamedTuple, Sequence
import cocoindex as coco
from cocoindex._internal.context_keys import ContextProvider
from cocoindex.connectorkits.fingerprint import fingerprint_bytes
# Types
_FileName = str
_FileContent = bytes
_FileFingerprint = bytes
class _FileAction(NamedTuple):
path: pathlib.Path
content: _FileContent | None # None = delete
@dataclass(frozen=True, slots=True)
class _FileTrackingRecord:
fingerprint: _FileFingerprint
# Action execution
def _apply_actions(context_provider: ContextProvider, actions: Sequence[_FileAction]) -> None:
for action in actions:
if action.content is None:
action.path.unlink(missing_ok=True)
else:
action.path.parent.mkdir(parents=True, exist_ok=True)
action.path.write_bytes(action.content)
_file_sink = coco.TargetActionSink[_FileAction, None].from_fn(_apply_actions)
# Handler
class _FileHandler(coco.TargetHandler[_FileName, _FileContent, _FileTrackingRecord]):
__slots__ = ("_base_path",)
_base_path: pathlib.Path
def __init__(self, base_path: pathlib.Path):
self._base_path = base_path
def reconcile(
self,
key: _FileName,
desired_state: _FileContent | coco.NonExistenceType,
prev_possible_states: Collection[_FileTrackingRecord],
prev_may_be_missing: bool,
/,
) -> coco.TargetReconcileOutput[_FileAction, _FileTrackingRecord] | None:
path = self._base_path / key
if coco.is_non_existence(desired_state):
if not prev_possible_states and not prev_may_be_missing:
return None
return coco.TargetReconcileOutput(
action=_FileAction(path=path, content=None),
sink=_file_sink,
tracking_record=coco.NON_EXISTENCE,
)
target_fp = fingerprint_bytes(desired_state)
if not prev_may_be_missing and all(
prev.fingerprint == target_fp for prev in prev_possible_states
):
return None
return coco.TargetReconcileOutput(
action=_FileAction(path=path, content=desired_state),
sink=_file_sink,
tracking_record=_FileTrackingRecord(fingerprint=target_fp),
)
See the full implementations in:
cocoindex/connectors/localfs/target.py— File system targetscocoindex/connectors/postgres/target.py— PostgreSQL tables and rows