Apache Iggy connector
Consume Apache Iggy topics as live streams/maps and produce messages as target states.
The iggy connector supports Apache Iggy as
both a source and a target. Iggy organizes messages as
streams → topics → partitions; CocoIndex follows that model and treats an
Iggy topic partition either as a raw live stream or as an application-keyed
live map.
from cocoindex.connectors import iggy
Install the optional dependency:
pip install cocoindex[iggy]
The connector expects an apache_iggy.IggyClient that you create, connect, and
provide through your app context. Streams and topics are user-managed:
CocoIndex does not create or drop them.
As Source
As a live stream
Use topic_as_stream() when every Iggy message is an event and downstream code
does not need map-style deletion semantics.
def topic_as_stream(
client: IggyClient,
consumer_group: str,
stream: str,
topic: str,
*,
partition_id: int = 0,
batch_length: int = 100,
allow_replay: bool = False,
initial_high_watermark: int | None = None,
) -> TopicStream
TopicStream.payloads() adapts the stream to LiveStream[bytes] when the
processing logic only needs message payload bytes.
events = iggy.topic_as_stream(
client,
consumer_group="cocoindex-worker",
stream="orders",
topic="events",
).payloads()
As a live keyed map
Use topic_as_map() when message payloads encode an application-level key and
you want CocoIndex to treat the topic as a live map.
def topic_as_map(
client: IggyClient,
consumer_group: str,
stream: str,
topic: str,
*,
key: KeyFn,
is_deletion: IsDeleteFn | None = None,
partition_id: int = 0,
batch_length: int = 100,
allow_replay: bool = False,
initial_high_watermark: int | None = None,
) -> LiveMapFeed[StableKey, ReceiveMessage]
Iggy Python messages do not expose Kafka-style keys or tombstones, so key is
required. Return None from key to skip a message. Pass is_deletion if your
payload format has application-level delete events.
import json
def key(message) -> str | None:
payload = json.loads(message.payload())
return payload.get("id")
items = iggy.topic_as_map(
client,
consumer_group="cocoindex-worker",
stream="orders",
topic="events",
key=key,
)
Readiness and offsets
The source connector disables Iggy auto-commit and stores offsets after the downstream readiness handle completes. This mirrors Kafka-style back-pressure: an offset is stored only after CocoIndex has finished processing the message.
For single-partition topics, the connector can infer the initial high watermark
from Iggy’s topic details. For multi-partition topics, pass
initial_high_watermark for the consumed partition; the current Python SDK does
not expose per-partition high-watermark callbacks.
As Target
The target connector sends bytes or strings to a user-managed Iggy stream/topic/partition.
IGGY = coco.ContextKey[IggyClient]("iggy")
target = await iggy.mount_iggy_topic_target(
IGGY,
stream="orders",
topic="derived-events",
partition=0,
)
target.declare_target_state(key="order-123", value=b'{"status":"ready"}')
Deletes need an application-level delete payload because Iggy does not have Kafka-style tombstones:
target = await iggy.mount_iggy_topic_target(
IGGY,
stream="orders",
topic="derived-events",
deletion_value_fn=lambda key: f'{{"id":{key!r},"deleted":true}}',
)
Target APIs
def declare_iggy_topic_target(
client: ContextKey[IggyClient],
stream: str,
topic: str,
*,
partition: int = 0,
deletion_value_fn: DeletionValueFn | None = None,
) -> IggyTopicTarget[PendingS]
async def mount_iggy_topic_target(
client: ContextKey[IggyClient],
stream: str,
topic: str,
*,
partition: int = 0,
deletion_value_fn: DeletionValueFn | None = None,
) -> IggyTopicTarget[ResolvedS]