Apache Iggy connector

Consume Apache Iggy topics as live streams/maps and produce messages as target states.

Version
v 1.0.7
Last reviewed
Jun 3, 2026

The iggy connector supports Apache Iggy as both a source and a target. Iggy organizes messages as streams → topics → partitions; CocoIndex follows that model and treats an Iggy topic partition either as a raw live stream or as an application-keyed live map.

python
from cocoindex.connectors import iggy

Install the optional dependency:

bash
pip install cocoindex[iggy]

The connector expects an apache_iggy.IggyClient that you create, connect, and provide through your app context. Streams and topics are user-managed: CocoIndex does not create or drop them.

As Source

As a live stream

Use topic_as_stream() when every Iggy message is an event and downstream code does not need map-style deletion semantics.

python
def topic_as_stream(
    client: IggyClient,
    consumer_group: str,
    stream: str,
    topic: str,
    *,
    partition_id: int = 0,
    batch_length: int = 100,
    allow_replay: bool = False,
    initial_high_watermark: int | None = None,
) -> TopicStream

TopicStream.payloads() adapts the stream to LiveStream[bytes] when the processing logic only needs message payload bytes.

python
events = iggy.topic_as_stream(
    client,
    consumer_group="cocoindex-worker",
    stream="orders",
    topic="events",
).payloads()

As a live keyed map

Use topic_as_map() when message payloads encode an application-level key and you want CocoIndex to treat the topic as a live map.

python
def topic_as_map(
    client: IggyClient,
    consumer_group: str,
    stream: str,
    topic: str,
    *,
    key: KeyFn,
    is_deletion: IsDeleteFn | None = None,
    partition_id: int = 0,
    batch_length: int = 100,
    allow_replay: bool = False,
    initial_high_watermark: int | None = None,
) -> LiveMapFeed[StableKey, ReceiveMessage]

Iggy Python messages do not expose Kafka-style keys or tombstones, so key is required. Return None from key to skip a message. Pass is_deletion if your payload format has application-level delete events.

python
import json


def key(message) -> str | None:
    payload = json.loads(message.payload())
    return payload.get("id")


items = iggy.topic_as_map(
    client,
    consumer_group="cocoindex-worker",
    stream="orders",
    topic="events",
    key=key,
)

Readiness and offsets

The source connector disables Iggy auto-commit and stores offsets after the downstream readiness handle completes. This mirrors Kafka-style back-pressure: an offset is stored only after CocoIndex has finished processing the message.

For single-partition topics, the connector can infer the initial high watermark from Iggy’s topic details. For multi-partition topics, pass initial_high_watermark for the consumed partition; the current Python SDK does not expose per-partition high-watermark callbacks.

As Target

The target connector sends bytes or strings to a user-managed Iggy stream/topic/partition.

python
IGGY = coco.ContextKey[IggyClient]("iggy")

target = await iggy.mount_iggy_topic_target(
    IGGY,
    stream="orders",
    topic="derived-events",
    partition=0,
)

target.declare_target_state(key="order-123", value=b'{"status":"ready"}')

Deletes need an application-level delete payload because Iggy does not have Kafka-style tombstones:

python
target = await iggy.mount_iggy_topic_target(
    IGGY,
    stream="orders",
    topic="derived-events",
    deletion_value_fn=lambda key: f'{{"id":{key!r},"deleted":true}}',
)

Target APIs

python
def declare_iggy_topic_target(
    client: ContextKey[IggyClient],
    stream: str,
    topic: str,
    *,
    partition: int = 0,
    deletion_value_fn: DeletionValueFn | None = None,
) -> IggyTopicTarget[PendingS]
python
async def mount_iggy_topic_target(
    client: ContextKey[IggyClient],
    stream: str,
    topic: str,
    *,
    partition: int = 0,
    deletion_value_fn: DeletionValueFn | None = None,
) -> IggyTopicTarget[ResolvedS]
CocoIndex Docs Edit this page Report issue