Kafka connector
Consume Kafka topics as live keyed maps (topic_as_map → LiveMapFeed) or as raw event streams (topic_as_stream → LiveStream), and produce messages as target states — with automatic offset commits and partition rebalancing at at-least-once semantics.
The kafka connector supports Kafka as both a source (consuming messages as a live keyed map, or as a raw event stream) and a target (producing messages for declared target states).
from cocoindex.connectors import kafka
This connector requires additional dependencies. Install with:
pip install cocoindex[kafka]As source
The kafka connector can treat a Kafka topic as a live keyed map — each message is an upsert or delete for a key. It returns a LiveMapFeed for use with mount_each().
Setting up a consumer
Create an AIOConsumer directly — no ContextKey needed. The consumer must be unsubscribed (the connector handles subscription internally to manage partition rebalance callbacks).
from confluent_kafka.aio import AIOConsumer
consumer = AIOConsumer({
"bootstrap.servers": "localhost:9092",
"group.id": "my-group",
"enable.auto.commit": "false",
})
topic_as_map()
def topic_as_map(
consumer: AIOConsumer,
topics: list[str],
*,
is_deletion: IsDeleteFn | None = None,
) -> LiveMapFeed[bytes | str, Message]:
Parameters:
consumer— An unsubscribedAIOConsumer. Auto-commit should be disabled.topics— Topics to subscribe to.is_deletion— Optional predicate(message: Message) -> boolfor custom deletion detection on non-tombstone messages (see Deletion handling).
Returns: A LiveMapFeed[bytes | str, Message] where each item is keyed by the message key and the value is the full confluent_kafka.Message object.
Deletion handling
Messages with None value (Kafka tombstones) are always treated as deletions. The optional is_deletion predicate provides additional deletion logic for non-tombstone messages:
# Default: only tombstones are deletions
items = kafka.topic_as_map(consumer, ["my-topic"])
# Custom: also treat messages with a specific header as deletions
items = kafka.topic_as_map(
consumer, ["my-topic"],
is_deletion=lambda msg: msg.value() == b"DELETED",
)
Offset management
Offsets are committed automatically with at-least-once semantics. Messages are processed in parallel, but an offset is only committed after all earlier messages in the same partition have been fully processed. Messages with None keys are logged as errors and skipped.
Readiness
The feed signals readiness after catching up to the high watermark offsets that existed when consumption started. After that, it continues consuming indefinitely until the app is stopped.
Example
from collections.abc import AsyncIterator
from confluent_kafka import Message
from confluent_kafka.aio import AIOConsumer
from cocoindex.connectors import kafka, localfs
import cocoindex as coco
@coco.fn(memo=True)
async def process_message(msg: Message, target: localfs.DirTarget) -> None:
key = msg.key()
value = msg.value()
if isinstance(key, bytes):
key = key.decode()
target.declare_file(filename=f"{key}.bin", content=value)
@coco.fn
async def app_main(outdir: pathlib.Path) -> None:
target = await localfs.mount_dir_target(outdir)
consumer = AIOConsumer({
"bootstrap.servers": "localhost:9092",
"group.id": "my-group",
"enable.auto.commit": "false",
})
items = kafka.topic_as_map(consumer, ["my-topic"])
await coco.mount_each(process_message, items, target)
app = coco.App(
coco.AppConfig(name="KafkaToFiles"),
app_main,
outdir=pathlib.Path("./out"),
)
app.update_blocking(live=True)
As a live stream
Some downstream connectors don’t model Kafka messages as a keyed map — they treat each message as an opaque event payload (e.g. the OCI Object Storage live mode ingests Object Storage event JSON delivered through OCI Streaming). For these, the kafka connector exposes a topic as a LiveStream — CocoIndex’s keyless counterpart to LiveMapFeed, an opaque sequence of messages with the same readiness and back-pressure machinery but no key/delete semantics.
Set up the consumer the same way as for topic_as_map().
topic_as_stream()
def topic_as_stream(
consumer: AIOConsumer,
topics: list[str],
) -> TopicStream:
Parameters:
consumer— An unsubscribedAIOConsumer. Auto-commit should be disabled.topics— Topics to subscribe to.
Returns: A TopicStream (a LiveStream[Message]) that delivers raw confluent_kafka.Message objects to its subscriber. Offset management and readiness behave identically to topic_as_map().
TopicStream.payloads()
def payloads(self) -> LiveStream[bytes]:
A view over the same TopicStream that yields each message’s value (the byte payload) instead of the full Message. Null-valued messages (Kafka tombstones) are filtered out of this view; consumers that need tombstone semantics should subscribe to the TopicStream directly.
This is the typical input for sources that consume opaque event payloads:
from confluent_kafka.aio import AIOConsumer
from cocoindex.connectors import kafka, oci_object_storage
consumer = AIOConsumer({
"bootstrap.servers": "localhost:9092",
"group.id": "my-group",
"enable.auto.commit": "false",
})
topic_stream = kafka.topic_as_stream(consumer, ["object-storage-events"])
walker = oci_object_storage.list_objects(
client, "my-namespace", "my-bucket",
live_stream=topic_stream.payloads(),
)
For a complete app using this pattern, see the OCI Object Storage live-mode example.
A TopicStream (and any payloads() view of it) supports at most one active watcher — the underlying consumer can hold only one subscription. This is not runtime-checked.
As target
The kafka connector provides target state APIs for producing messages to Kafka topics. Topics are user-managed (CocoIndex does not create or drop topics) — CocoIndex only produces messages to them.
Setting up a producer
Create a ContextKey[AIOProducer] to identify your producer, then provide it in your lifespan:
The key name is load-bearing across runs — it’s the stable identity CocoIndex uses to track target state for topics produced through this key. See ContextKey as stable identity before renaming.
from confluent_kafka import AIOProducer
import cocoindex as coco
KAFKA_PRODUCER = coco.ContextKey[AIOProducer]("my_kafka_producer")
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
producer = AIOProducer({"bootstrap.servers": "localhost:9092"})
builder.provide(KAFKA_PRODUCER, producer)
yield
Declaring target states
Topics (parent state)
Declares a topic as a target state. Returns a KafkaTopicTarget for declaring messages.
def declare_kafka_topic_target(
producer: ContextKey[AIOProducer],
topic: str,
*,
deletion_value_fn: DeletionValueFn | None = None,
) -> KafkaTopicTarget[coco.PendingS]
Parameters:
producer— AContextKey[AIOProducer]identifying the producer to use.topic— The Kafka topic name.deletion_value_fn— Optional callback that produces a deletion value for a given key (see Deletion handling).
Returns: A pending KafkaTopicTarget. Use the async convenience wrapper to resolve:
topic_target = await kafka.mount_kafka_topic_target(
KAFKA_PRODUCER, "my-topic"
)
Messages (child states)
Once a KafkaTopicTarget is resolved, declare target states to produce messages:
def KafkaTopicTarget.declare_target_state(
self,
*,
key: bytes | str,
value: bytes | str,
) -> None
Parameters:
key— The message key, used as the stable identity for change detection.value— The message value.
CocoIndex fingerprints the value and only produces a message when it has changed since the last run.
Deletion handling
When a previously declared target state is no longer declared, CocoIndex produces a deletion message. The behavior depends on deletion_value_fn:
- Without callback (default): Produces a message with the key and no value (Kafka tombstone).
- With callback: Calls
deletion_value_fn(key)to produce the deletion value.
# Tombstone on deletion (default)
topic_target = await kafka.mount_kafka_topic_target(
KAFKA_PRODUCER, "my-topic"
)
# Custom deletion value
topic_target = await kafka.mount_kafka_topic_target(
KAFKA_PRODUCER, "my-topic",
deletion_value_fn=lambda key: b'{"deleted": true}',
)
Example
from collections.abc import AsyncIterator
from confluent_kafka import AIOProducer
from cocoindex.connectors import kafka, localfs
import cocoindex as coco
KAFKA_PRODUCER = coco.ContextKey[AIOProducer]("my_kafka_producer")
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
producer = AIOProducer({"bootstrap.servers": "localhost:9092"})
builder.provide(KAFKA_PRODUCER, producer)
yield
@coco.fn(memo=True)
async def process_file(
file: localfs.File, topic_target: kafka.KafkaTopicTarget
) -> None:
content = await file.read_bytes()
topic_target.declare_target_state(
key=file.file_path.path.as_posix().encode(),
value=content,
)
@coco.fn
async def app_main() -> None:
topic_target = await kafka.mount_kafka_topic_target(
KAFKA_PRODUCER, "file-contents"
)
files = localfs.walk_dir(localfs.FilePath(path="./data"))
await coco.mount_each(process_file, files.items(), topic_target)
app = coco.App(
coco.AppConfig(name="FilesToKafka"),
app_main,
)
app.update_blocking(report_to_stdout=True)