Skip to main content

Custom Sources

A custom source allows you to read data from any system you want, such as APIs, databases, file systems, cloud storage, or other external services. You can stream data incrementally and provide ordinal information for efficient updates and change tracking.

Overview

Custom sources are defined by two components:

  • A source spec that configures the behavior and connection parameters for the source.
  • A source connector that handles the actual data reading operations. It provides the following required methods:
    • create(): Create a connector instance from the source spec.
    • list(): List all available data items. Return keys.
    • get_value(): Get the full content for a specific data item by given key.

Source Spec

The source spec defines the configuration parameters for your custom source. When you use this source in a flow (typically by calling add_source()), you instantiate this source spec with specific parameter values.

A source spec is defined as a class that inherits from cocoindex.op.SourceSpec.

class CustomSource(cocoindex.op.SourceSpec):
"""
Documentation for the source.
"""
param1: str
param2: int | None = None
...

Notes:

  • All fields of the spec must have a type serializable / deserializable by the json module.
  • All subclasses of SourceSpec can be instantiated similar to a dataclass, i.e. ClassName(param1=value1, param2=value2, ...).

Source Connector

A source connector handles the actual data reading operations for your custom source. It defines how data should be accessed from your source system.

Source connectors implement methods to list available data and retrieve specific data items.

A source connector is defined as a class decorated by @cocoindex.op.source_connector(spec_cls=CustomSource, key_type=KeyType, value_type=ValueType). This is a full-featured source connector definition:

@cocoindex.op.source_connector(
spec_cls=CustomSource,
key_type=DataKeyType,
value_type=DataValueType
)
class CustomSourceConnector:
@staticmethod
async def create(spec: CustomSource) -> "CustomSourceConnector":
"""Required. Create a connector instance from the spec."""
...

async def list(self, options: SourceReadOptions) -> AsyncIterator[PartialSourceRow[DataKeyType, DataValueType]]:
"""Required. List all available data items. `options` is optional."""
...

async def get_value(self, key: DataKeyType, options: SourceReadOptions) -> PartialSourceRowData[DataValueType]:
"""Required. Get the full content for a specific data item. `options` is optional."""
...

def provides_ordinal(self) -> bool:
"""Optional. Return True if this source provides ordinal information."""
return False

Your source connector class doesn't have to have everything above:

  • options arguments are optional. It provides additional hints to tell if the engine needs a certain property currently. You don't have to take this argument and always provide available properties.
  • provides_ordinal() is optional. It's a hint to tell if the engine needs ordinal information.
  • create(), list() and get_value() methods can be async or sync.

The following data types are involved in the method definitions above: CustomSource, DataKeyType, DataValueType, PartialSourceRow, PartialSourceRowData. They should be replaced with the actual types in your implementation. We will explain each of them below.

Data Access Methods

Data access methods handle the actual reading operations - discovering available data and retrieving specific content.

async? def create(spec) (Required)

This static method creates a connector instance from the source spec. It should return a connector that can be used to access data from the source.

@staticmethod
async def create(spec: CustomSource) -> "CustomSourceConnector":
"""
Create a connector instance. This is where you initialize connections,
validate configuration, authenticate, etc.
"""
# Initialize connections, authenticate, etc.
return CustomSourceConnector(spec, ...)

async? def list(options?) (Required)

This method enumerates all available data items from the source. It should yield PartialSourceRow objects containing:

  • key: A unique identifier for the data item
  • data: A PartialSourceRowData object with metadata (typically just ordinal information)

The optional options parameter provides hints about what information is needed:

async def list(self, options: SourceReadOptions) -> AsyncIterator[PartialSourceRow[DataKeyType, DataValueType]]:
"""
List all available data items. This method is used by CocoIndex to
discover what data is available and track changes.
"""
# Enumerate data items from your source
for item_metadata in await self._fetch_item_list():
data = PartialSourceRowData[DataValueType]()

# Include ordinal if requested and available.
# Must provide if `provides_ordinal()` returns True and `include_ordinal` is True.
if options.include_ordinal and item_metadata.timestamp:
data.ordinal = item_metadata.timestamp

# Optionally include full value if it's cheap to fetch and requested
if options.include_value and self._can_fetch_cheaply(item_metadata):
data.value = await self._fetch_item_content(item_metadata.id)

yield PartialSourceRow(
key=DataKeyType(id=item_metadata.id),
data=data
)

options is a hint. You don't have to take this argument. Without it, provide all fields as long as they're available.

async? def get_value(key, options?) (Required)

This method retrieves the full content for a specific data item given its key. It should return a PartialSourceRowData object containing the actual data.

The optional options parameter provides hints about what information is needed:

async def get_value(self, key: DataKeyType, options: SourceReadOptions) -> PartialSourceRowData[DataValueType]:
"""
Get the full content for a specific data item. CocoIndex calls this
when it needs the actual data content for processing.
"""
# Fetch the full content for the given key
content = await self._fetch_item_content(key.id)
if content is None:
return PartialSourceRowData(
value=NON_EXISTENCE,
ordinal=NO_ORDINAL,
content_version_fp=None
)

data = PartialSourceRowData(
value=DataValueType(
title=content.title,
content=content.body,
author=content.author
)
)

# Include ordinal if requested and available
if options.include_ordinal and content.timestamp:
data.ordinal = content.timestamp

# Include content version fingerprint if requested and easily available
if options.include_content_version_fp and content.etag:
data.content_version_fp = content.etag.encode()

return data

options is a hint. You don't have to take this argument. Without it, provide all fields as long as they're available.

def provides_ordinal() (Optional)

Returns True if this source provides ordinal information (such as timestamps) that can be used for efficient change detection. If True, CocoIndex can use this information to optimize updates.

def provides_ordinal(self) -> bool:
"""
Return True if this source provides ordinal information.
"""
return True # If your source provides timestamps or version numbers

Data Types

SourceReadOptions

The SourceReadOptions class provides hints to source connectors about what information is needed. This allows connectors to optimize their data fetching:

@dataclasses.dataclass
class SourceReadOptions:
"""Options for reading source data."""
include_ordinal: bool = False
include_content_version_fp: bool = False
include_value: bool = False

Fields:

  • include_ordinal: Whether to include ordinal information (timestamps, version numbers). Required in list() when provides_ordinal() returns True and this is True.
  • include_content_version_fp: Whether to include content version fingerprints for change detection. Always optional.
  • include_value: Whether to include full values. Required in get_value() when this is True. Optional in list() - useful when fetching values is cheap.

PartialSourceRow

Represents a single data item from the source:

@dataclasses.dataclass
class PartialSourceRow(Generic[K, V]):
key: K # Unique identifier for the data item
data: PartialSourceRowData[V] # Metadata and optionally the value

PartialSourceRowData

Contains the data and metadata for a source item:

@dataclasses.dataclass
class PartialSourceRowData(Generic[V]):
value: V | Literal["NON_EXISTENCE"] | None = None # The actual data content
ordinal: int | Literal["NO_ORDINAL"] | None = None # Timestamp or version number
content_version_fp: bytes | None = None # Content fingerprint for change detection

Fields:

  • value: The actual data content, or cocoindex.op.NON_EXISTENCE if the item doesn't exist, or None if not included.
  • ordinal: Timestamp or version number for change tracking, or cocoindex.op.NO_ORDINAL if not available, or None if not included. An ordinal needs to be monotonically increasing.
  • content_version_fp: Content fingerprint (hash, ETag, etc.) for efficient change detection, or None if not available.

Key Type (DataKeyType) and Value Type (DataValueType) for Data

The data type of custom source is a KTable, so the key type and value type also follows the same requirements as key and value of a KTable. Specifically:

  • The key type uniquely identifies each data item in your source. It can be:

    • A NamedTuple or frozen dataclass with one or multiple fields.

      class DataKeyType(NamedTuple):
      id: str
      category: str | None = None
    • When there's a single key part, you can also use a simple type like str or int for single-field keys. The key field name will be cocoindex.KEY_FIELD_NAME for this case.

  • The value type represents the actual data content. It's typically a dataclass containing all the value fields the source exposes.

    @dataclasses.dataclass
    class DataValueType:
    title: str
    content: str
    author: str | None
    created_at: datetime | None

Example

See the custom_source_hn example for a complete end-to-end implementation.