Custom Sources
A custom source allows you to read data from any system you want, such as APIs, databases, file systems, cloud storage, or other external services. You can stream data incrementally and provide ordinal information for efficient updates and change tracking.
Overview
Custom sources are defined by two components:
- A source spec that configures the behavior and connection parameters for the source.
- A source connector that handles the actual data reading operations. It provides the following required methods:
create()
: Create a connector instance from the source spec.list()
: List all available data items. Return keys.get_value()
: Get the full content for a specific data item by given key.
Source Spec
The source spec defines the configuration parameters for your custom source. When you use this source in a flow (typically by calling add_source()
), you instantiate this source spec with specific parameter values.
A source spec is defined as a class that inherits from cocoindex.op.SourceSpec
.
class CustomSource(cocoindex.op.SourceSpec):
"""
Documentation for the source.
"""
param1: str
param2: int | None = None
...
Notes:
- All fields of the spec must have a type serializable / deserializable by the
json
module. - All subclasses of
SourceSpec
can be instantiated similar to a dataclass, i.e.ClassName(param1=value1, param2=value2, ...)
.
Source Connector
A source connector handles the actual data reading operations for your custom source. It defines how data should be accessed from your source system.
Source connectors implement methods to list available data and retrieve specific data items.
A source connector is defined as a class decorated by @cocoindex.op.source_connector(spec_cls=CustomSource, key_type=KeyType, value_type=ValueType)
.
This is a full-featured source connector definition:
@cocoindex.op.source_connector(
spec_cls=CustomSource,
key_type=DataKeyType,
value_type=DataValueType
)
class CustomSourceConnector:
@staticmethod
async def create(spec: CustomSource) -> "CustomSourceConnector":
"""Required. Create a connector instance from the spec."""
...
async def list(self, options: SourceReadOptions) -> AsyncIterator[PartialSourceRow[DataKeyType, DataValueType]]:
"""Required. List all available data items. `options` is optional."""
...
async def get_value(self, key: DataKeyType, options: SourceReadOptions) -> PartialSourceRowData[DataValueType]:
"""Required. Get the full content for a specific data item. `options` is optional."""
...
def provides_ordinal(self) -> bool:
"""Optional. Return True if this source provides ordinal information."""
return False
Your source connector class doesn't have to have everything above:
options
arguments are optional. It provides additional hints to tell if the engine needs a certain property currently. You don't have to take this argument and always provide available properties.provides_ordinal()
is optional. It's a hint to tell if the engine needs ordinal information.create()
,list()
andget_value()
methods can be async or sync.
The following data types are involved in the method definitions above: CustomSource
, DataKeyType
, DataValueType
, PartialSourceRow
, PartialSourceRowData
. They should be replaced with the actual types in your implementation. We will explain each of them below.
Data Access Methods
Data access methods handle the actual reading operations - discovering available data and retrieving specific content.
async? def create(spec)
(Required)
This static method creates a connector instance from the source spec. It should return a connector that can be used to access data from the source.
@staticmethod
async def create(spec: CustomSource) -> "CustomSourceConnector":
"""
Create a connector instance. This is where you initialize connections,
validate configuration, authenticate, etc.
"""
# Initialize connections, authenticate, etc.
return CustomSourceConnector(spec, ...)
async? def list(options?)
(Required)
This method enumerates all available data items from the source. It should yield PartialSourceRow
objects containing:
key
: A unique identifier for the data itemdata
: APartialSourceRowData
object with metadata (typically justordinal
information)
The optional options
parameter provides hints about what information is needed:
async def list(self, options: SourceReadOptions) -> AsyncIterator[PartialSourceRow[DataKeyType, DataValueType]]:
"""
List all available data items. This method is used by CocoIndex to
discover what data is available and track changes.
"""
# Enumerate data items from your source
for item_metadata in await self._fetch_item_list():
data = PartialSourceRowData[DataValueType]()
# Include ordinal if requested and available.
# Must provide if `provides_ordinal()` returns True and `include_ordinal` is True.
if options.include_ordinal and item_metadata.timestamp:
data.ordinal = item_metadata.timestamp
# Optionally include full value if it's cheap to fetch and requested
if options.include_value and self._can_fetch_cheaply(item_metadata):
data.value = await self._fetch_item_content(item_metadata.id)
yield PartialSourceRow(
key=DataKeyType(id=item_metadata.id),
data=data
)
options
is a hint. You don't have to take this argument.
Without it, provide all fields as long as they're available.
async? def get_value(key, options?)
(Required)
This method retrieves the full content for a specific data item given its key. It should return a PartialSourceRowData
object containing the actual data.
The optional options
parameter provides hints about what information is needed:
async def get_value(self, key: DataKeyType, options: SourceReadOptions) -> PartialSourceRowData[DataValueType]:
"""
Get the full content for a specific data item. CocoIndex calls this
when it needs the actual data content for processing.
"""
# Fetch the full content for the given key
content = await self._fetch_item_content(key.id)
if content is None:
return PartialSourceRowData(
value=NON_EXISTENCE,
ordinal=NO_ORDINAL,
content_version_fp=None
)
data = PartialSourceRowData(
value=DataValueType(
title=content.title,
content=content.body,
author=content.author
)
)
# Include ordinal if requested and available
if options.include_ordinal and content.timestamp:
data.ordinal = content.timestamp
# Include content version fingerprint if requested and easily available
if options.include_content_version_fp and content.etag:
data.content_version_fp = content.etag.encode()
return data
options
is a hint. You don't have to take this argument.
Without it, provide all fields as long as they're available.
def provides_ordinal()
(Optional)
Returns True
if this source provides ordinal information (such as timestamps) that can be used for efficient change detection. If True
, CocoIndex can use this information to optimize updates.
def provides_ordinal(self) -> bool:
"""
Return True if this source provides ordinal information.
"""
return True # If your source provides timestamps or version numbers
Data Types
SourceReadOptions
The SourceReadOptions
class provides hints to source connectors about what information is needed. This allows connectors to optimize their data fetching:
@dataclasses.dataclass
class SourceReadOptions:
"""Options for reading source data."""
include_ordinal: bool = False
include_content_version_fp: bool = False
include_value: bool = False
Fields:
include_ordinal
: Whether to include ordinal information (timestamps, version numbers). Required inlist()
whenprovides_ordinal()
returnsTrue
and this isTrue
.include_content_version_fp
: Whether to include content version fingerprints for change detection. Always optional.include_value
: Whether to include full values. Required inget_value()
when this isTrue
. Optional inlist()
- useful when fetching values is cheap.
PartialSourceRow
Represents a single data item from the source:
@dataclasses.dataclass
class PartialSourceRow(Generic[K, V]):
key: K # Unique identifier for the data item
data: PartialSourceRowData[V] # Metadata and optionally the value
PartialSourceRowData
Contains the data and metadata for a source item:
@dataclasses.dataclass
class PartialSourceRowData(Generic[V]):
value: V | Literal["NON_EXISTENCE"] | None = None # The actual data content
ordinal: int | Literal["NO_ORDINAL"] | None = None # Timestamp or version number
content_version_fp: bytes | None = None # Content fingerprint for change detection
Fields:
value
: The actual data content, orcocoindex.op.NON_EXISTENCE
if the item doesn't exist, orNone
if not included.ordinal
: Timestamp or version number for change tracking, orcocoindex.op.NO_ORDINAL
if not available, orNone
if not included. An ordinal needs to be monotonically increasing.content_version_fp
: Content fingerprint (hash, ETag, etc.) for efficient change detection, orNone
if not available.
Key Type (DataKeyType
) and Value Type (DataValueType
) for Data
The data type of custom source is a KTable, so the key type and value type also follows the same requirements as key and value of a KTable. Specifically:
-
The key type uniquely identifies each data item in your source. It can be:
-
A
NamedTuple
or frozen dataclass with one or multiple fields.class DataKeyType(NamedTuple):
id: str
category: str | None = None -
When there's a single key part, you can also use a simple type like
str
orint
for single-field keys. The key field name will becocoindex.KEY_FIELD_NAME
for this case.
-
-
The value type represents the actual data content. It's typically a dataclass containing all the value fields the source exposes.
@dataclasses.dataclass
class DataValueType:
title: str
content: str
author: str | None
created_at: datetime | None
Example
See the custom_source_hn example for a complete end-to-end implementation.