Feature Examples ~10 min read

Live CSV → Kafka with CocoIndex's New Kafka Target Connector

CocoIndex now ships a Kafka target connector. We walk through a tiny live pipeline that watches a folder of CSV files and publishes each row as a JSON message to a StreamNative-hosted Kafka topic — incrementally, with no glue code.

Live CSV → *Kafka* with CocoIndex's New Kafka Target Connector

CocoIndex just got a Kafka target connector. You can now declare Kafka topics as a target of your pipeline the same way you’d declare a Postgres table or a vector index, and CocoIndex will incrementally produce messages as your source data changes — no producer loop, no bookkeeping, no “did I already publish this row?” logic.

In this post we walk through a small but complete example: watch a local directory of CSV files, turn each row into a JSON message keyed by the row’s primary key, and publish to a Kafka topic hosted on StreamNative. The whole pipeline is about 60 lines of Python, and it runs in live mode — edit a CSV, and within a second only the changed rows show up on the topic.

The annoying part of getting data into Kafka has always been the glue. You write a producer, you track which rows you’ve already sent, you handle deletes, you re-bootstrap when the schema changes, you debounce file watchers. None of that is interesting, all of it is easy to get subtly wrong.

The new connector lets you skip all of it. You hand CocoIndex an AIOProducer, a topic name, and a function that says “for this source row, here’s the key and the value.” CocoIndex handles the rest — including only producing messages for rows that actually changed since the last run.

Why choose CocoIndex with Kafka

Kafka is great at durable, high-throughput event streaming, but it expects you to solve data shaping, schema evolution, and incremental change detection yourself. CocoIndex turns that into a declarative problem:

  • Define transformations like formulas: “from this table or API, derive these messages and fields.” CocoIndex computes and re-computes derived data whenever sources change, similar to spreadsheet formulas reacting to edits.

  • Built-in incremental processing: CocoIndex keeps local state (checksums, primary keys, or version markers) so subsequent runs push only changed or deleted rows into Kafka topics.

  • Schema-aware updates: when schemas change upstream, CocoIndex re-derives downstream messages instead of forcing you to manually re-bootstrap topics or write ad-hoc migration scripts.

  • Pluggable sinks beyond Kafka: the same flow that feeds Kafka can also populate vector databases, OLTP/OLAP stores, or search indexes, so Kafka events and your AI/search backends stay in sync from a single definition.

Partnership with StreamNative

We are excited to bring the real time data stack with StreamNative. StreamNative’s native Kafka service, powered by the Ursa engine, provides a reliable real-time data foundation for modern AI and data platforms. Together, CocoIndex and StreamNative enable organizations to continuously transform and index streaming data, helping teams accelerate the delivery of AI-ready and search-ready data products.

The example: CSV files → JSON messages

The full example lives at examples/csv_to_kafka in the CocoIndex repo. Here’s the shape of it.

We have a data/ folder with a couple of CSV files:

csv
# data/products.csv
sku,name,category,price
SKU001,Wireless Mouse,Electronics,29.99
SKU002,Mechanical Keyboard,Electronics,89.99
SKU003,USB-C Hub,Accessories,45.00
csv
# data/employees.csv
emp_id,first_name,last_name,department,email
E101,Alice,Chen,Engineering,[email protected]
E102,Bob,Smith,Marketing,[email protected]

The goal: every row becomes a JSON message on a Kafka topic, keyed by the value of the row’s first column (the primary key). When a CSV file is edited, only the rows that actually changed should be re-published.

The pipeline

First, the Kafka producer is set up once at app startup using a lifespan hook, and stashed in a ContextKey so the rest of the pipeline can grab it without passing it around:

python
import cocoindex as coco
from cocoindex.connectors import kafka, localfs
from confluent_kafka.aio import AIOProducer

KAFKA_PRODUCER = coco.ContextKey[AIOProducer]("kafka_producer", tracked=False)

@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder):
    config = {
        "bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS,
        "sasl.mechanism": "PLAIN",
        "security.protocol": "SASL_SSL",
        "sasl.username": KAFKA_SASL_USERNAME,
        "sasl.password": KAFKA_SASL_PASSWORD,
    }
    producer = AIOProducer(config)
    builder.provide(KAFKA_PRODUCER, producer)
    yield

ContextKey is how CocoIndex shares the producer across components without threading it through every function call; we’ll come back to it later. The SASL block is what StreamNative (or any production broker) wants — for a local broker you can drop those four lines and just point bootstrap.servers at localhost:9092.

Next, the per-file processor. This is where the new Kafka API shows up:

python
@coco.fn(memo=True)
async def process_csv(file: FileLike, topic_target: kafka.KafkaTopicTarget) -> None:
    text = await file.read_text()
    reader = csv.DictReader(io.StringIO(text))

    headers = reader.fieldnames
    if not headers:
        return
    first_col = headers[0]

    for row in reader:
        key_value = row.get(first_col)
        if key_value is not None:
            value = json.dumps(row)
            topic_target.declare_target_state(key=key_value, value=value)

The @coco.fn(memo=True) decorator means the per-file work itself is memoized too — if a file’s contents haven’t changed, process_csv doesn’t even run.

Declare states, not messages

python
topic_target.declare_target_state(key=key, value=value)

It’s deliberately not called send_message() or produce() or declare_message(). It’s declare_target_state.

CocoIndex is a state-driven data framework. The mental model is the same one you’d use for a spreadsheet, a React component tree, or a SQL materialized view: you describe what the target should look like as a function of the source, and the framework figures out the transitions. You don’t compute deltas. You don’t track “what did I send last time.” You don’t handle insert vs. update vs. delete as separate code paths. You just say, “given this CSV row, the target state for key SKU001 is this JSON blob,” and that’s it.

Kafka makes that distinction unusually visible because Kafka’s wire model is the opposite: a topic is a log of events, not a snapshot. Producers send change events; consumers (or compacted topics) reconstruct state from the log. So the question is: who’s responsible for the gap between “I have desired states” and “the broker needs to receive change events”?

In CocoIndex, the framework owns that gap. When you call declare_target_state(key=k, value=v):

  • If k is new or v differs from the last value the framework remembers for k, it emits an upsert message — (k, v) on the wire.
  • If k was previously declared but isn’t declared this time, it emits a delete message — (k, None), or (k, deletion_value_fn(k)) if you supplied a tombstone constructor.
  • If k was previously declared with the same v, nothing is sent. No message, no broker round-trip, no consumer wakeup.

Declared states above the line, derived Kafka messages below — five declared states produce four wire-level messages because the unchanged one is silent

Messages are derived from state transitions. You only ever talk about states. This is exactly the same pattern as the Postgres target (declare_target_state → INSERT / UPDATE / DELETE) and the vector index targets — the wire-level operations differ, but the user-facing API is the same shape, because the semantics are the same.

The reason this matters in practice: it means the same process_csv function works correctly the first time you run it, every subsequent time you run it, when a row is edited, when a row is removed, when a file is deleted, when the whole pipeline crashes and restarts. There is no separate “initial load” code path versus “incremental update” code path. There’s just “given the source, here’s what the target should look like,” and that statement is true whether the target is empty, half-populated, or already in sync.

Finally, wire it all together:

python
@coco.fn
async def app_main() -> None:
    topic_target = await kafka.mount_kafka_topic_target(KAFKA_PRODUCER, KAFKA_TOPIC)

    files = localfs.walk_dir(
        localfs.FilePath(path="./data"),
        path_matcher=PatternFilePathMatcher(included_patterns=["**/*.csv"]),
        live=True,
    )
    await coco.mount_each(process_csv, files.items(), topic_target)

app = coco.App(coco.AppConfig(name="CsvToKafka"), app_main)

Two things to notice here:

  1. mount_kafka_topic_target(...) resolves the producer from the context key and gives back a handle. The topic itself is user-managed — CocoIndex never creates or deletes topics, it just produces into one you already own.
  2. localfs.walk_dir(..., live=True) turns the directory walker into a live source — it does an initial scan, then keeps watching the filesystem and pushes incremental updates downstream. Combined with cocoindex update -L, the whole pipeline runs continuously instead of one-shot.

Live mode: one flag, everything else is the same

So far we’ve described what happens on a single run. But in reality, source files get edited, rows get added and removed, and you usually want the topic to keep up. The same process_csv runs as a catch-up run — scan once, reconcile everything that’s changed since last time, exit — or as a continuously-running pipeline that keeps watching for changes. The diff between the two is one keyword argument and one CLI flag:

Catch-up run:

python
files = localfs.walk_dir(
    localfs.FilePath(path="./data"),
    path_matcher=PatternFilePathMatcher(included_patterns=["**/*.csv"]),
)
await coco.mount_each(process_csv, files.items(), topic_target)
sh
cocoindex update main.py

Live:

python
files = localfs.walk_dir(
    localfs.FilePath(path="./data"),
    path_matcher=PatternFilePathMatcher(included_patterns=["**/*.csv"]),
    live=True,                 # ← +1 line
)
await coco.mount_each(process_csv, files.items(), topic_target)
sh
cocoindex update -L main.py   # ← +1 flag

That’s the entire diff. process_csv doesn’t change. The Kafka target doesn’t change. There’s no separate “streaming” code path to maintain.

Run the live version:

sh
cocoindex update -L main.py

CocoIndex does a full scan first, publishes one message per row, then sits there watching data/:

  • Edit one cell in products.csv — exactly one Kafka message is produced for that one row (modulo broker retries; the producer is at-least-once by default). The other four rows are silent.
  • Add a new row — one new message.
  • Delete a row — one delete message (no value, since this example doesn’t supply a deletion_value_fn).
  • Add a brand new CSV fileprocess_csv runs once for that file, publishing its rows.
  • Delete a CSV file — every row from that file gets a delete message.

What the flag actually does (and doesn’t do)

It’s worth being precise about this, because it’s easy to assume the flag is doing more magic than it is.

Reconciliation happens in both modes. Whether you run cocoindex update or cocoindex update -L, you declare target states and CocoIndex computes deltas against the previous run, emitting only the upsert and delete messages needed to bring the topic in line. To do that, CocoIndex maintains an internal state store that remembers, for every declared key, the fingerprint of the value last sent. That store survives restarts, so when you stop and restart the pipeline it picks up where it left off — no re-broadcast of unchanged rows.

This is also where the ContextKey from earlier earns its keep. The state store doesn’t identify “this Kafka topic” by SASL credentials or bootstrap address — it identifies it by the ContextKey it was anchored to (here, KAFKA_PRODUCER) plus the topic name. If you later rotate the SASL password, swap the broker endpoint, or replace the AIOProducer with a differently configured one, the same ContextKey is enough for CocoIndex to recognize the backend as the same one, and the state store carries over.

The flag controls when reconciliation runs. That’s it. In a catch-up run, CocoIndex scans sources once, reconciles up to the moment, and exits. In live mode, it does the same initial catch-up, then keeps watching the sources for changes and reconciles incrementally as they arrive. Same code path, same reconciliation logic, same target API — just catch-up-and-exit vs. catch-up-and-keep-watching.

The “watching” half comes from the source itself, not the flag. localfs.walk_dir(..., live=True) returns a live source backed by watchfiles; it knows how to deliver filesystem events. A non-live source (or live=False) just enumerates current state and stops. The CLI -L flag tells the runtime to keep the app alive so the watcher’s events have something to drive.

One Kafka detail worth mentioning since the connector doesn’t change it: each declared key is hashed to a partition by Kafka’s default partitioner, so the same key always lands on the same partition. Log compaction and key-based consumers behave exactly the way they’d behave with any hand-rolled producer.

Looking at the topic

We pointed this at a Kafka cluster on StreamNative Cloud — it gave us a real SASL_SSL endpoint in one click, with a hosted console for inspecting messages without writing a consumer. (Plain localhost:9092 works too if you skip the SASL fields in the producer config.)

Here’s what shows up in the console for the cocoindex-csv-rows topic after running the example:

Messages on the StreamNative-hosted Kafka topic after running the CSV → Kafka pipeline

Keys are the row’s primary-key column (SKU001, E101, …); values are the JSON-encoded rows. Edit a CSV locally, refresh, and a new message with the same key appears — same key, latest value wins, log-compacted consumers get exactly the current state.

Try it

The Kafka connector ships as an optional dependency group — pip install cocoindex[kafka] if you’re pulling it into an existing project. To run the example directly:

sh
git clone https://github.com/cocoindex-io/cocoindex
cd cocoindex/examples/csv_to_kafka
cp .env.example .env  # fill in your Kafka bootstrap + SASL creds
pip install -e .
cocoindex update -L main.py

No cloud broker handy? Point KAFKA_BOOTSTRAP_SERVERS at localhost:9092 and leave the SASL fields empty — the example works against any Kafka the confluent_kafka client can connect to.

Then go edit data/products.csv and watch the messages land. ⭐ us on GitHub if you’d like to follow along — more connectors are on the way.


If you’ve ever written “the script that watches a folder and pushes things to Kafka,” this is the version of that script that doesn’t break the second time you change a row.