Skip to main content

24 posts tagged with "Examples"

CocoIndex examples and implementation guides

View All Tags

Extracting Structured Data from Patient Intake Forms with DSPy and CocoIndex

· 13 min read
Linghua Jin
CocoIndex Maintainer

Extracting Structured Data from Patient Intake Forms with DSPy and CocoIndex

Patient intake forms are indeed a rich source of structured clinical data, but traditional OCR + regex pipelines fail to reliably capture their nested, conditional, and variable structure, leaving most of that value locked in unstructured text or manual entry.

In today's example, we are going to show how to extract clean, typed, Pydantic-validated structured data directly from PDFs, using:

  • DSPy (for multimodal structured extraction with Gemini 2.5 Flash vision)
  • CocoIndex (for incremental processing, caching, and database storage)

No manual text extraction, no brittle markdown conversion — just connect to source, transform the PDFs, and get validated patient models out, and ready to go in production.

The entire code is open sourced with Apache 2.0 license. To see more examples build with CocoIndex, you could refer to the examples page. ⭐ Star the project if you find it helpful! GitHub

Why DSPy + CocoIndex?

Before jumping in, here’s what each component contributes:

DSPy: A programming framework for LLMs

Traditional LLM apps rely on prompt engineering: you write a prompt with instructions, few‑shot examples, and formatting, then call the model and parse the raw text. This approach is fragile:

  • Small changes in the prompt, model, or data can break the output format or quality.
  • Logic is buried in strings, making it hard to test, compose, or version.

DSPy replaces this with a programming model: you define what each LLM step should do (inputs, outputs, constraints), and the framework figures out how to prompt the model to satisfy that spec.

CocoIndex: A ultra performant data processing engine for AI workloads

CocoIndex is an ultra performant compute framework for AI workloads, with incremental processing. Users write simple in-memory computations in Python and coco runs it as a resilient, scalable data pipeline (with Rust Engine) – with fresh data always ready for serving. Same flow definition you use in a notebook can be lifted easily into production.

With CocoIndex, changes in sources or transformation logic only trigger minimal recompute, cutting cold-start “backfill” latencies from hours to seconds while reducing GPU/API spend. In production, this manifests as always-fresh targets: you run in “live” mode with change data capture or polling, and CocoIndex keeps derived stores in sync with complex unstructured sources like codebases, PDFs, and multi-hop API compositions.

Because every transformation step is observable with lineage, teams get auditability and explainability out of the box, which helps for regulated scenarios like healthcare extraction or financial workflows.

DSPy & CocoIndex Synergy

The synergy shows up most clearly in end-to-end AI data products: DSPy defines robust, typed extractors or decision modules, and CocoIndex wires them into a resilient, incremental pipeline that can meet SLOs and compliance needs. Any change in documents, code, or business rules is reflected quickly and explainably in the targets and features those agents consume.

Flow Overview

Prerequisites

Before getting started, make sure you have the following set up:

  1. Install Postgres if you don't have one, ensure you can connect to it from your development environment.
  2. Python dependencies
pip install -U cocoindex dspy-ai pydantic pymupdf
  1. Create a .env file:
# Postgres database address for cocoindex
COCOINDEX_DATABASE_URL=postgres://cocoindex:cocoindex@localhost/cocoindex

# Gemini API key
GEMINI_API_KEY=YOUR_GEMINI_API_KEY

Pydantic Models: Define the structured schema

We defined Pydantic-style classes (Contact, Address, Insurance, etc.) to match a FHIR-inspired patient schema, enabling structured and validated representations of patient data. Each model corresponds to a key aspect of a patient's record, ensuring both type safety and nested relationships.

Patient schema

1. Contact Model

class Contact(BaseModel):
name: str
phone: str
relationship: str
  • Represents an emergency or personal contact for the patient.
  • Fields:
    • name: Contact's full name.
    • phone: Contact phone number.
    • relationship: Relation to the patient (e.g., parent, spouse, friend).

2. Address Model

class Address(BaseModel):
street: str
city: str
state: str
zip_code: str
  • Represents a postal address.
  • Fields:
    • street, city, state, zip_code: Standard address fields.

3. Pharmacy Model

class Pharmacy(BaseModel):
name: str
phone: str
address: Address
  • Represents the patient’s preferred pharmacy.
  • Fields:
    • name: Pharmacy name.
    • phone: Pharmacy contact number.
    • address: Uses the Address model for structured address information.

4. Insurance Model

class Insurance(BaseModel):
provider: str
policy_number: str
group_number: str | None = None
policyholder_name: str
relationship_to_patient: str
  • Represents the patient’s insurance information.
  • Fields:
    • provider: Insurance company name.
    • policy_number: Unique policy number.
    • group_number: Optional group number.
    • policyholder_name: Name of the person covered under the insurance.
    • relationship_to_patient: Relationship to patient (e.g., self, parent).

5. Condition Model

class Condition(BaseModel):
name: str
diagnosed: bool
  • Represents a medical condition.
  • Fields:
    • name: Condition name (e.g., Diabetes).
    • diagnosed: Boolean indicating whether it has been officially diagnosed.

6. Medication Model

class Medication(BaseModel):
name: str
dosage: str
  • Represents a current medication the patient is taking.
  • Fields:
    • name: Medication name.
    • dosage: Dosage information (e.g., "10mg daily").

7. Allergy Model

class Allergy(BaseModel):
name: str
  • Represents a known allergy.
  • Fields:
    • name: Name of the allergen (e.g., peanuts, penicillin).

8. Surgery Model

class Surgery(BaseModel):
name: str
date: str
  • Represents a surgery or procedure the patient has undergone.
  • Fields:
    • name: Surgery name (e.g., Appendectomy).
    • date: Surgery date (as a string, ideally ISO format).

9. Patient Model

class Patient(BaseModel):
name: str
dob: datetime.date
gender: str
address: Address
phone: str
email: str
preferred_contact_method: str
emergency_contact: Contact
insurance: Insurance | None = None
reason_for_visit: str
symptoms_duration: str
past_conditions: list[Condition] = Field(default_factory=list)
current_medications: list[Medication] = Field(default_factory=list)
allergies: list[Allergy] = Field(default_factory=list)
surgeries: list[Surgery] = Field(default_factory=list)
occupation: str | None = None
pharmacy: Pharmacy | None = None
consent_given: bool
consent_date: str | None = None
  • Represents a complete patient record with personal, medical, and administrative information.
  • Key fields:
    • name, dob, gender: Basic personal info.
    • address, phone, email: Contact info.
    • preferred_contact_method: How the patient prefers to be reached.
    • emergency_contact: Nested Contact model.
    • insurance: Optional nested Insurance model.
    • reason_for_visit, symptoms_duration: Visit details.
    • past_conditions, current_medications, allergies, surgeries: Lists of nested models for comprehensive medical history.
    • occupation: Optional job info.
    • pharmacy: Optional nested Pharmacy model.
    • consent_given, consent_date: Legal/administrative consent info.

Why Use Pydantic Here?

  1. Validation: Ensures all fields are the correct type (e.g., dob is a date).
  2. Structured Nested Models: Patient has nested objects like Address, Contact, and Insurance.
  3. Default Values & Optional Fields: Handles optional fields and defaults (Field(default_factory=list) ensures empty lists if no data).
  4. Serialization: Easily convert models to JSON for APIs or databases.
  5. Error Checking: Automatically raises errors if invalid data is provided.

DSPy Vision Extractor

DSPy Signature

Let’s define PatientExtractionSignature. A Signature describes what data your module expects and what it will produce. Think of it as a schema for an AI task.

PatientExtractionSignature is a dspy.Signature, which is DSPy's way of declaring what the model should do, not how it does it.

# DSPy Signature for patient information extraction from images
class PatientExtractionSignature(dspy.Signature):
"""Extract structured patient information from a medical intake form image."""

form_images: list[dspy.Image] = dspy.InputField(
desc="Images of the patient intake form pages"
)
patient: Patient = dspy.OutputField(
desc="Extracted patient information with all available fields filled"
)

This signature defines task contract for patient information extraction.

  • Inputs: form_images – a list of images of the intake form.
  • Outputs: patient – a structured Patient object.

From DSPy's point of view, this Signature is a "spec": a mapping from an image-based context to a structured, Pydantic-backed semantic object that can later be optimized, trained, and composed with other modules.

PatientExtractor Module

PatientExtractor is a dspy.Module, which in DSPy is a composable, potentially trainable building block that implements the Signature.

class PatientExtractor(dspy.Module):
"""DSPy module for extracting patient information from intake form images."""

def __init__(self) -> None:
super().__init__()
self.extract = dspy.ChainOfThought(PatientExtractionSignature)

def forward(self, form_images: list[dspy.Image]) -> Patient:
"""Extract patient information from form images and return as a Pydantic model."""
result = self.extract(form_images=form_images)
return result.patient # type: ignore
  • In __init__, ChainOfThought is a DSPy primitive module that knows how to call an LLM with reasoning-style prompting to satisfy the given Signature. In other words, it is a default "strategy" for solving the "extract patient from images" task.
  • The forward method is DSPy's standard interface for executing a module. You pass form_images into self.extract(). DSPy then handles converting this call into an LLM interaction (or a trained program) that produces a patient field as declared in the Signature.

Conceptually, PatientExtractor is an ETL operator: the Signature describes the input/output types, and the internal ChainOfThought module is the function that fills that contract.

Single-Step Extraction

Now let’s wire the DSPy Module to extract from a single PDF. From high level,

  • The extractor receives PDF bytes directly
  • Internally converts PDF pages to DSPy Image objects using PyMuPDF
  • Processes images with vision model
  • Returns Pydantic model directly
@cocoindex.op.function(cache=True, behavior_version=1)
def extract_patient(pdf_content: bytes) -> Patient:
"""Extract patient information from PDF content."""

# Convert PDF pages to DSPy Image objects
pdf_doc = pymupdf.open(stream=pdf_content, filetype="pdf")

form_images = []
for page in pdf_doc:
# Render page to pixmap (image) at 2x resolution for better quality
pix = page.get_pixmap(matrix=pymupdf.Matrix(2, 2))
# Convert to PNG bytes
img_bytes = pix.tobytes("png")
# Create DSPy Image from bytes
form_images.append(dspy.Image(img_bytes))

pdf_doc.close()

# Extract patient information using DSPy with vision
extractor = PatientExtractor()
patient = extractor(form_images=form_images)

return patient # type: ignore

This function is a CocoIndex function (decorated with @cocoindex.op.function) that takes PDF bytes as input and returns a fully structured Patient Pydantic object.

  • cache=True allows repeated calls with the same PDF to reuse results.
  • behavior_version=1 ensures versioning of the function for reproducibility.

Create DSPy Image objects

We open PDF from bytes using PyMuPDF (pymupdf), then we iterate over each page.

  • Useful trick: Render the page as a high-resolution image (2x) for better OCR/vision performance.
  • Convert the rendered page to PNG bytes.
  • Wrap the PNG bytes in a DSPy Image object.

DSPy Extraction

The list of form_images is passed to the DSPy module:

  1. ChainOfThought reasoning interprets each image.
  2. Vision + NLP extract relevant text fields.
  3. Populate Pydantic Patient object with structured patient info.

CocoIndex Flow

CocoIndex Flow

  • Loads PDFs from local directory as binary
  • For each document, applies single transform: PDF bytes → Patient data
  • Exports the results in a PostgreSQL table

Declare Flow

Declare a CocoIndex flow, connect to the source, add a data collector to collect processed data.

@cocoindex.flow_def(name="PatientIntakeExtractionDSPy")
def patient_intake_extraction_dspy_flow(
flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope
) -> None:
data_scope["documents"] = flow_builder.add_source(
cocoindex.sources.LocalFile(path="data/patient_forms", binary=True)
)

patients_index = data_scope.add_collector()
  • @cocoindex.flow_def tells CocoIndex that this function is a flow definition, not regular runtime code.
  • add_source() registers a LocalFile source that traverses data/patient_forms directory and creates a logical table named documents

Ingesting Data

You can connect to various sources, or even custom source with CocoIndex if native connectors are not available. CocoIndex is designed to keep your indexes synchronized with your data sources. This is achieved through a feature called live updates, which automatically detects changes in your sources and updates your indexes accordingly. This ensures that your search results and data analysis are always based on the most current information. You can read more here https://cocoindex.io/docs/tutorials/live_updates

Process documents

with data_scope["documents"].row() as doc:
# Extract patient information directly from PDF using DSPy with vision
# (PDF->Image conversion happens inside the extractor)
doc["patient_info"] = doc["content"].transform(extract_patient)

# Collect the extracted patient information
patients_index.collect(
filename=doc["filename"],
patient_info=doc["patient_info"],
)

This iterates over each document. We transform doc["content"] (the bytes) by our extract_patient function. The result is stored in a new field patient_info.

Then we collect a row with the filename and extracted patient_info.

Transforming Data

Nested Data

Export to Postgres

patients_index.export(
"patients",
cocoindex.storages.Postgres(table_name="patients_info_dspy"),
primary_key_fields=["filename"],
)

We export the collected index to Postgres. This will create/maintain a table patients keyed by filename, automatically deleting or updating rows if inputs change.

Because CocoIndex tracks data lineage, it will handle updates/deletions of source files incrementally.

Configure CocoIndex settings

Define a CocoIndex settings function that configures the AI model for DSPy:

@cocoindex.settings
def cocoindex_settings() -> cocoindex.Settings:
# Configure the model used in DSPy
lm = dspy.LM("gemini/gemini-2.5-flash")
dspy.configure(lm=lm)

return cocoindex.Settings.from_env()

It returns a cocoindex.Settings object initialized from environment variables, enabling the system to use the configured model and environment settings for all DSPy operations.

Running the Pipeline

Update the index:

cocoindex update main

CocoInsight

I used CocoInsight (Free beta now) to troubleshoot the index generation and understand the data lineage of the pipeline. It just connects to your local CocoIndex server, with zero pipeline data retention.

cocoindex server -ci main

Scalable Open ecosystem, not a closed box

CocoIndex is intentionally "composable by default": it gives you a fast, incremental data engine and clean flow, abstraction, but never locks you into a specific model, vector DB, processing module or orchestration stack.

CocoIndex treats everything — sources, ops, and storages — as pluggable interfaces rather than proprietary primitives. You can read from local files, S3, APIs, or custom sources, call any data transformation logic (beyond SQL, DSPy modules, any complex Python transformations, generated parsers etc), and export to relational databases, vector databases, search engines, or custom sinks through its storage layer.

Why DSPy + CocoIndex fits this philosophy

DSPy is itself a compositional framework for LLMs: you define typed Signatures and Modules, and it learns how to implement them, making the LLM layer programmable, testable, and optimizable.

CocoIndex treats these modules as first-class operators in the flow, so you get a clean separation of concerns: DSPy owns “how the model thinks,” while CocoIndex owns “how data moves, is cached, and is served” across changing PDFs, code, or APIs.

This pairing is powerful because neither system tries to be the entire stack: CocoIndex does not prescribe a prompt framework, and DSPy does not prescribe a data pipeline engine. Instead, they interlock: DSPy modules become composable building blocks inside CocoIndex flows, and CocoIndex gives those modules a production context with retries, batching, caching, and live updates.

Comparison of DSPy vs BAML examples

For this DSPy and BAML example with CocoIndex, there are some differences in the data flow and the schema enforcement.

AspectThis tutorial (DSPy)Previous intake tutorials (OpenAI/BAML)
Model interactionDSPy Signatures + Modules, trainable and optimizable.Direct prompts or BAML functions with generated clients.
Input formatPDF bytes → images → Gemini Vision (no Markdown step).PDF/DOCX → Markdown via parsers (e.g., MarkItDown) → text LLM.
Schema enforcementPydantic models used directly as DSPy output types.Dataclasses or BAML types mapped into CocoIndex collectors.
Pipeline engineCocoIndex incremental flow with LocalFile source + Postgres sink.Same engine

DSPy and BAML both help build LLM applications with structured outputs, but they emphasize different things: DSPy focuses on modular Python workflows and optimizer-driven prompt tuning, while BAML focuses on a typed DSL, schema guarantees, and multi-language client generation.

Support CocoIndex ❤️

If this example was helpful, the easiest way to support CocoIndex is to give the project a ⭐ on GitHub.

Your stars help us grow the community, stay motivated, and keep shipping better tools for real-time data ingestion and transformation.

Building a Knowledge Graph from Meeting Notes that automatically updates

· 15 min read
Linghua Jin
CocoIndex Maintainer

Building a Knowledge Graph from Meeting Notes that automatically updates

Meeting notes are goldmines of organizational intelligence. They capture decisions, action items, participant information, and the relationships between people and tasks. Yet most organizations treat them as static documents — searchable only through basic text search. Imagine instead being able to query your meetings like a database: "Who attended meetings where the topic was 'budget planning'?" or "What tasks did Sarah get assigned across all meetings?"

This is where knowledge graphs shine. By extracting structured information from unstructured meeting notes and building a graph representation, you unlock powerful relationship-based queries and insights that would be impossible with traditional document storage.

In this post, we'll explore a practical CocoIndex example that demonstrates exactly this — building a knowledge graph of meetings from Markdown documents stored in Google Drive, powered by LLM-based extraction, and persisted in Neo4j.

The source code open sourced and available at Meeting Notes Graph Code.

Building a Real-Time HackerNews Trending Topics Detector with CocoIndex: A Deep Dive into Custom Sources and AI

· 17 min read
Linghua Jin
CocoIndex Maintainer

Building a Real-Time HackerNews Trending Topics Detector with CocoIndex: A Deep Dive into Custom Sources and AI

In the age of information overload, understanding what's trending—and why—is crucial for developers, researchers, and data engineers. HackerNews is one of the most influential tech communities, but manually tracking emerging topics across thousands of threads and comments is practically impossible.

What if you could automatically index HackerNews content, extract topics using AI, and query trending discussions in real-time? That's exactly what CocoIndex enables through its Custom Sources framework combined with LLM-powered extraction.

In this post, we'll explore the HackerNews Trending Topics example, a production-ready pipeline that demonstrates some of the most powerful concepts in CocoIndex: incremental data syncing, LLM-powered information extraction, and queryable indexes.

Extract structured information from HackerNews with a Custom Source and keep it in sync with Postgres

· 12 min read
Linghua Jin
CocoIndex Maintainer

Extract structured information from HackerNews with a Custom Source and export in Postgres

Custom Sources are one of the most powerful concepts in CocoIndex. They let you turn any API — internal or external — into a first-class, incremental data stream that the framework can automatically diff, track, and sync.

Think of it as React for data flows: you describe the shape of your data, and CocoIndex handles incremental updates, state persistence, lineage, and downstream sync. You get predictable, debuggable, fault-tolerant pipelines without the usual orchestration overhead.

In this example, we build a custom connector for HackerNews. It fetches recent stories + nested comments, indexes them, and exposes a simple search interface powered by Postgres full-text search.

If this example is helpful, we’d appreciate a ⭐ on CocoIndex GitHub!

Why Use a Custom Source?

In many scenarios, pipelines don't just read from clean tables. They depend on:

  • Internal REST services
  • Partner APIs
  • Legacy systems
  • Non-standard data models that don’t fit traditional connectors

CocoIndex’s Custom Source API makes these integrations declarative, incremental, and safe by default. Instead of writing ad-hoc scripts, you wrap your API as a “source component,” and CocoIndex takes it from there.

Project Walkthrough — Building a HackerNews Index

Goals

  1. Call HackerNews Search API
  2. Fetch nested comments
  3. Update only modified threads
  4. Store content in Postgres
  5. Expose a text search interface

CocoIndex handles change detection, idempotency, lineage, and state sync automatically.

Overview

HackerNews Custom Source Pipeline

The pipeline consists of three major parts:

  1. Define a custom source (HackerNewsConnector)
    • Calls HackerNews API
    • Emits rows for changed/updated threads
    • Pulls full thread + comment tree
  2. Build an index with CocoIndex Flow
    • Collect thread content
    • Collect all comments recursively
    • Export to a Postgres table (hn_messages)
  3. Add a lightweight query handler
    • Uses PostgreSQL full-text search
    • Returns ranked matches for a keyword query

Each cocoindex update only processes changed HN threads and keeps everything in sync.

The project is open source and available on GitHub.

Prerequisites

Defining the Data Model

Every custom source defines two lightweight data types:

  • Key Type → uniquely identifies an item
  • Value Type → the full content for that item

In hacker news, each news is a thread, and each thread can have multiple comments. HackerNews Thread and Comments

For HackerNews, let’s define keys like this:

class _HackerNewsThreadKey(NamedTuple):
"""Row key type for HackerNews source."""
thread_id: str

Keys must be:

  • hashable
  • serializable
  • stable (doesn’t change over time)

Values hold the actual dataset:

@dataclasses.dataclass
class _HackerNewsComment:
id: str
author: str | None
text: str | None
created_at: datetime | None

@dataclasses.dataclass
class _HackerNewsThread:
"""Value type for HackerNews source."""
author: str | None
text: str
url: str | None
created_at: datetime | None
comments: list[_HackerNewsComment]

This tells CocoIndex exactly what every HackerNews “item” looks like when fully fetched. _HackerNewsThread holds a post and all its comments, while _HackerNewsComment represents individual comments.

Building a Custom Source Connector

A Custom Source has two parts:

  1. SourceSpec — declarative configuration
  2. SourceConnector — operational logic for reading data

Writing the SourceSpec

A SourceSpec in CocoIndex is a declarative configuration that tells the system what data to fetch and how to connect to a source. It doesn’t fetch data itself — that’s handled by the source connector.

class HackerNewsSource(SourceSpec):
"""Source spec for HackerNews API."""
tag: str | None = None
max_results: int = 100

Fields:

  • tag
    • Optional filter for the type of HackerNews content.
    • Example: "story", "job", "poll".
    • If None, it fetches all types.
  • max_results
    • Maximum number of threads to fetch from HackerNews at a time.
    • Helps limit the size of the index for performance or testing.

Defining the connector

Sets up the connector's configuration and HTTP session so it can fetch HackerNews data efficiently.

@source_connector(
spec_cls=HackerNewsSource,
key_type=_HackerNewsThreadKey,
value_type=_HackerNewsThread,
)
class HackerNewsConnector:
"""Custom source connector for HackerNews API."""

_spec: HackerNewsSource
_session: aiohttp.ClientSession

def __init__(self, spec: HackerNewsSource, session: aiohttp.ClientSession):
self._spec = spec
self._session = session

@staticmethod
async def create(spec: HackerNewsSource) -> "HackerNewsConnector":
"""Create a HackerNews connector from the spec."""
return HackerNewsConnector(spec, aiohttp.ClientSession())
  • source_connector tells CocoIndex that this class is a custom source connector. It specifies:
    • spec_cls: the configuration class (HackerNewsSource)
    • key_type: how individual items are identified (_HackerNewsThreadKey)
    • value_type: the structure of the data returned (_HackerNewsThread)
  • create() is called by CocoIndex to initialize the connector, and it sets up a fresh aiohttp.ClientSession for making HTTP requests.

Listing Available Threads

The list() method in HackerNewsConnector is responsible for discovering all available HackerNews threads that match the given criteria (tag, max results) and returning metadata about them. CocoIndex uses this to know which threads exist and which may have changed.

async def list(
self,
) -> AsyncIterator[PartialSourceRow[_HackerNewsThreadKey, _HackerNewsThread]]:
"""List HackerNews threads using the search API."""
# Use HackerNews search API
search_url = "https://hn.algolia.com/api/v1/search_by_date"
params: dict[str, Any] = {"hitsPerPage": self._spec.max_results}

if self._spec.tag:
params["tags"] = self._spec.tag
async with self._session.get(search_url, params=params) as response:
response.raise_for_status()
data = await response.json()
for hit in data.get("hits", []):
if thread_id := hit.get("objectID", None):
utime = hit.get("updated_at")
ordinal = (
int(datetime.fromisoformat(utime).timestamp())
if utime
else NO_ORDINAL
)
yield PartialSourceRow(
key=_HackerNewsThreadKey(thread_id=thread_id),
data=PartialSourceRowData(ordinal=ordinal),
)

list() fetches metadata for all recent HackerNews threads.

  • For each thread:
    • It generates a PartialSourceRow with:
      • key: the thread ID
      • ordinal: the last updated timestamp
  • Purpose: allows CocoIndex to track what threads exist and which have changed without fetching full thread content.

Fetching Full Thread Content

This async method fetches a single HackerNews thread (including its comments) from the API, and wraps the result in a PartialSourceRowData object — the structure CocoIndex uses for row-level ingestion.

async def get_value(
self, key: _HackerNewsThreadKey
) -> PartialSourceRowData[_HackerNewsThread]:
"""Get a specific HackerNews thread by ID using the items API."""

# Use HackerNews items API to get full thread with comments
item_url = f"https://hn.algolia.com/api/v1/items/{key.thread_id}"

async with self._session.get(item_url) as response:
response.raise_for_status()
data = await response.json()

if not data:
return PartialSourceRowData(
value=NON_EXISTENCE,
ordinal=NO_ORDINAL,
content_version_fp=None,
)
return PartialSourceRowData(
value=HackerNewsConnector._parse_hackernews_thread(data)
)
  • get_value() fetches the full content of a specific thread, including comments.
  • Parses the raw JSON into structured Python objects (_HackerNewsThread + _HackerNewsComment).
  • Returns a PartialSourceRowData containing the full thread.

Ordinal Support

Tells CocoIndex that this source provides timestamps (ordinals).

def provides_ordinal(self) -> bool:
return True

CocoIndex uses ordinals to incrementally update only changed threads, improving efficiency.

Parsing JSON into Structured Data

This static method takes the raw JSON response from the API and turns it into a normalized _HackerNewsThread object containing:

  • The post (title, text, metadata)
  • All nested comments, flattened into a single list
  • Proper Python datetime objects

It performs recursive traversal of the comment tree.

@staticmethod
def _parse_hackernews_thread(data: dict[str, Any]) -> _HackerNewsThread:
comments: list[_HackerNewsComment] = []

def _add_comments(parent: dict[str, Any]) -> None:
children = parent.get("children", None)
if not children:
return
for child in children:
ctime = child.get("created_at")
if comment_id := child.get("id", None):
comments.append(
_HackerNewsComment(
id=str(comment_id),
author=child.get("author", ""),
text=child.get("text", ""),
created_at=datetime.fromisoformat(ctime) if ctime else None,
)
)
_add_comments(child)

_add_comments(data)

ctime = data.get("created_at")
text = data.get("title", "")
if more_text := data.get("text", None):
text += "\n\n" + more_text
return _HackerNewsThread(
author=data.get("author"),
text=text,
url=data.get("url"),
created_at=datetime.fromisoformat(ctime) if ctime else None,
comments=comments,
)
  • Converts raw HackerNews API response into _HackerNewsThread and _HackerNewsComment.
  • _add_comments() recursively parses nested comments.
  • Combines title + text into the main thread content.
  • Produces a fully structured object ready for indexing.

Putting It All Together in a Flow

Your flow now reads exactly like a React component.

Define the flow and connect source

@cocoindex.flow_def(name="HackerNewsIndex")
def hackernews_flow(
flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope
) -> None:

# Add the custom source to the flow
data_scope["threads"] = flow_builder.add_source(
HackerNewsSource(tag="story", max_results=500),
refresh_interval=timedelta(minutes=1),
)

# Create collectors for different types of searchable content
message_index = data_scope.add_collector()

data flow

Process each thread and collect structured information

with data_scope["threads"].row() as thread:
# Index the main thread content
message_index.collect(
id=thread["thread_id"],
thread_id=thread["thread_id"],
content_type="thread",
author=thread["author"],
text=thread["text"],
url=thread["url"],
created_at=thread["created_at"],
)

Process each comment of a thread and collect structured information

with thread["comments"].row() as comment:
message_index.collect(
id=comment["id"],
thread_id=thread["thread_id"],
content_type="comment",
author=comment["author"],
text=comment["text"],
created_at=comment["created_at"],
)

Export to database tables

message_index.export(
"hn_messages",
cocoindex.targets.Postgres(),
primary_key_fields=["id"],
)

CocoIndex now:

  • polls the HackerNews API
  • tracks changes incrementally
  • flattens nested comments
  • exports to Postgres
  • supports live mode

Your app can now query it as a real-time search index.

Querying & Searching the HackerNews Index

At this point you are done with the index flow. As the next step, you could define query handlers — so you can run queries in CocoInsight. You can use any library or framework of your choice to perform queries. You can read more in the documentation about Query Handler.

@hackernews_flow.query_handler()
def search_text(query: str) -> cocoindex.QueryOutput:
"""Search HackerNews threads by title and content."""
table_name = cocoindex.utils.get_target_default_name(hackernews_flow, "hn_messages")

with connection_pool().connection() as conn:
with conn.cursor() as cur:
# Simple text search using PostgreSQL's text search capabilities
cur.execute(
f"""
SELECT id, thread_id, author, content_type, text, created_at,
ts_rank(to_tsvector('english', text), plainto_tsquery('english', %s)) as rank
FROM {table_name}
WHERE to_tsvector('english', text) @@ plainto_tsquery('english', %s)
ORDER BY rank DESC, created_at DESC
""",
(query, query),
)

results = []
for row in cur.fetchall():
results.append(
{
"id": row[0],
"thread_id": row[1],
"author": row[2],
"content_type": row[3],
"text": row[4],
"created_at": row[5].isoformat(),
}
)

return cocoindex.QueryOutput(results=results)

This code defines a query handler that searches HackerNews threads and comments indexed in CocoIndex. It determines the database table storing the messages, then uses PostgreSQL full-text search (to_tsvector and plainto_tsquery) to find rows matching the query.

Results are ranked by relevance (ts_rank) and creation time, formatted into dictionaries, and returned as a structured cocoindex.QueryOutput. Essentially, it performs a full-text search over the indexed content and delivers ranked, structured results.

Running Your HackerNews Custom Source

Once your custom source and flow are ready, running it with CocoIndex is straightforward. You can either update the index on-demand or keep it continuously in sync with HackerNews.

1. Install Dependencies

Make sure you have Python installed and then install your project in editable mode:

pip install -e .

This installs CocoIndex along with all required dependencies, letting you develop and update the connector without reinstalling.

2. Update the Target (On-Demand)

To populate your target (e.g., Postgres) with the latest HackerNews threads:

cocoindex update main
  • Only threads that have changed will be re-processed.
  • Your target remains in sync with the most recent 500 HackerNews threads.
  • Efficient incremental updates save time and compute resources.

Note that each time when you run the update command, CocoIndex will only re-process threads that have changed, and keep the target in sync with the recent 500 threads from HackerNews. You can also run update command in live mode, which will keep the target in sync with the source continuously:

cocoindex update -L main
  • Runs the flow in live mode, polling HackerNews periodically.
  • CocoIndex automatically handles incremental changes and keeps the target synchronized.
  • Ideal for dashboards, search, or AI pipelines that require real-time data.

3. Troubleshoot & Inspect with CocoInsight

CocoInsight lets you visualize and debug your flow, see the lineage of your data, and understand what’s happening under the hood.

Start the server:

cocoindex server -ci main

Then open the UI in your browser: https://cocoindex.io/cocoinsight

CocoInsight has zero pipeline data retention — it’s safe for debugging and inspecting your flows locally.

Note that this requires QueryHandler setup in previous step.

What You Can Build Next

This simple example opens the door to a lot more:

  • Build a trending-topic detector
  • Run LLM summarization pipelines on top of indexed threads
  • Add embeddings + vector search
  • Mirror HN into your internal data warehouse
  • Build a real-time HN dashboard
  • Extend to other news sources (Reddit, Lobsters, etc.)

Because the whole pipeline is declarative and incremental, extending it is straightforward.

Since Custom Sources allow you to wrap any Python logic into an incremental data stream, the best use cases are usually "Hard-to-Reach" data — systems that don't have standard database connectors, have complex nesting, or require heavy pre-processing.

The Knowledge Aggregator for LLM Context

Building a context engine for an AI bot often requires pulling from non-standard documentation sources.

The "Composite" Entity (Data Stitching)

Most companies have user data fragmented across multiple microservices. You can build a Custom Source that acts as a "virtual join" before the data ever hits your index. For example the Source:

  1. Fetches a User ID from an Auth Service (Okta/Auth0).
  2. Uses that ID to fetch billing status from Stripe API.
  3. Uses that ID to fetch usage logs from an Internal Redis.

Instead of managing complex ETL joins downstream, the Custom Source yields a single User360 object. CocoIndex tracks the state of this composite object; if the user upgrades in Stripe or changes their email in Auth0, the index updates automatically.

The "Legacy Wrapper" (Modernization Layer)

Enterprises often have valuable data locked in systems that are painful to query (SOAP, XML, Mainframes). You get a modern, queryable SQL interface (via the CocoIndex target) on top of a 20-year-old system without rewriting the legacy system itself.

Public Data Monitor (Competitive Intelligence)

Tracking changes on public websites or APIs that don't offer webhooks.

  • The Source:
    • Competitor Pricing: Scraping e-commerce product pages.
    • Regulatory Feeds: Polling a government RSS feed or FDA drug approval database.
    • Crypto/Stocks: Hitting a CoinGecko or Yahoo Finance API.

The CocoIndex Value: Using the diff capabilities, you can trigger downstream alerts only when a price changes by >5% or a new regulation is posted, rather than spamming your database with identical polling results.

Why This Matters

Custom Sources extend this model to any API — internal, external, legacy, or real-time.

This unlocks a simple but powerful pattern:

If you can fetch it, CocoIndex can index it, diff it, and sync it.

Whether you’re indexing HackerNews or orchestrating dozens of enterprise services, the framework gives you a stable backbone with:

  • persistent state
  • deterministic updates
  • automatic lineage
  • flexible target exports
  • minimal infrastructure overhead

⭐ Try It, Fork It, Star It

If you found this useful, a star on GitHub means a lot — it helps others discover CocoIndex and supports further development.

Extracting Intake Forms with BAML and CocoIndex

· 8 min read
Linghua Jin
CocoIndex Maintainer

Extracting Intake Forms with BAML and CocoIndex

This tutorial shows how to use BAML together with CocoIndex to build a data pipeline that extracts structured patient information from PDF intake forms. The BAML definitions describe the desired output schema and prompt logic, while CocoIndex orchestrates file input, transformation, and incremental indexing.

We’ll walk through setup, defining the BAML schema, generating the Python client, writing the CocoIndex flow, and running the pipeline. Throughout, we follow best practices (e.g. caching heavy steps) and cite documentation for key concepts.

The full project is open sourced here ⭐. To see more examples build with CocoIndex, you could refer to the examples page.

BAML

BAML, created by BoundaryML, is a typed prompt engineering language that makes LLM workflows predictable, testable, and production-safe. Instead of treating prompts as fragile strings, BAML lets developers define clear input parameters, output schemas, and model configurations — transforming prompts into strongly typed functions.

CocoIndex

CocoIndex is a unified data processing engine built for AI-native applications. It lets you define transformations in one declarative workflow — then keeps everything continuously up to date with real-time, incremental processing. Designed for reliability and scale, CocoIndex ensures that every derived artifact (embeddings, metadata, extractions, models) always reflects the latest source data, making it the foundation for fast, consistent RAG, analytics, and automation pipelines.

Flow Overview

Flow Overview

  • Read PDF files from a directory.
  • For each file, call the BAML function to get a structured Patient.
  • Collect results and export to Postgres.

Prerequisites

  1. Install Postgres if you don't have one.

  2. Install dependencies

    pip install -U cocoindex baml-py
  3. Create a .env file. You can copy it from .env.example first:

    cp .env.example .env

    Then edit the file to fill in your GEMINI_API_KEY.

Structured Extraction Component with BAML

Create a baml_src/ directory for your BAML definitions. We’ll define a schema for patient intake data (nested classes) and a function that prompts Gemini to extract those fields from a PDF. Save this as baml_src/patient.baml

Define Patient Schema

Classes: We defined Pydantic-style classes (Contact, Address, Insurance, etc.) to match the FHIR-inspired patient schema. These become typed output models. Required fields are non-nullable; optional fields use ?.

Schema

class Contact {
name string
phone string
relationship string
}

class Address {
street string
city string
state string
zip_code string
}

class Pharmacy {
name string
phone string
address Address
}

class Insurance {
provider string
policy_number string
group_number string?
policyholder_name string
relationship_to_patient string
}

class Condition {
name string
diagnosed bool
}

class Medication {
name string
dosage string
}

class Allergy {
name string
}

class Surgery {
name string
date string
}

class Patient {
name string
dob string
gender string
address Address
phone string
email string
preferred_contact_method string
emergency_contact Contact
insurance Insurance?
reason_for_visit string
symptoms_duration string
past_conditions Condition[]
current_medications Medication[]
allergies Allergy[]
surgeries Surgery[]
occupation string?
pharmacy Pharmacy?
consent_given bool
consent_date string?
}

Define the BAML function to extract patient info from a PDF

function ExtractPatientInfo(intake_form: pdf) -> Patient {
client Gemini
prompt #"
Extract all patient information from the following intake form document.
Please be thorough and extract all available information accurately.
{{ _.role("user") }}
{{ intake_form }}

Fill in with "N/A" for required fields if the information is not available.

{{ ctx.output_format }}
"#
}

We specify client Gemini and a prompt template. The special variable {{ intake_form }} injects the PDF, and {{ ctx.output_format }} tells BAML to expect the structured format defined by the return type. The prompt explicitly asks Gemini to extract all fields, filling “N/A” if missing.

BAML PDF Extraction: Crucial Prompt Role Gotcha

When using BAML to extract structured data (like a Patient record) from PDFs, it is absolutely critical to ensure the PDF content is injected as part of the user message in the prompt. Specifically, you need to include {{ _.role("user") }} before you insert your file data with {{ intake_form }}:

Why role("user") matters?

  • For OpenAI models (e.g., GPT-4, GPT-4o), if the file's content is not presented in the user message, the model won't "see" the PDF at all — your extraction will fail or be empty.
  • For Gemini and Anthropic, it's more forgiving and can sometimes work anyway, which makes this confusing to debug across providers.

We only discovered this after a discussion on the BAML repo and our own investigations. If you skip the explicit role("user"), you might waste hours debugging inconsistent extractions.

Takeaway:
When building extraction flows with BAML, always set the role to "user" before adding file content to your prompt. That makes your workflow robust and portable across LLM providers.

Thanks to Deepu and Prashanth from our discord community for working with us on this issue. You can see a real-world debugging journey in our Discord thread.

Configure the LLM client to use Google’s Gemini model

client<llm> Gemini {
provider google-ai
options {
model gemini-2.5-flash
api_key env.GEMINI_API_KEY
}
}

Configure BAML generator

In baml_src folder add generator.baml:

generator python_client {
output_type python/pydantic
output_dir "../"
version "0.213.0"
}

The generator block tells baml-cli to create a Python client with Pydantic models in the parent directory.

When we run baml-cli generate

This will compile the .baml definitions into a baml_client/ Python package in your project root. It contains:

  • baml_client/types.py with Pydantic classes (Patient, etc.).
  • baml_client/sync_client.py and async_client.py with a callable b object. For example, b.ExtractPatientInfo(pdf) will return a Patient.

Continuous Data Transformation flow with incremental processing

Next we will define data transformation flow with CocoIndex. Once you declared the state and transformation logic, CocoIndex will take care of all the state change for you from source to target.

CocoIndex Flow

Declare Flow

Declare a Cocoindex flow, connect to the source, add a data collector to collect processed data.

@cocoindex.flow_def(name="PatientIntakeExtractionBaml")
def patient_intake_extraction_flow(
flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope
) -> None:
data_scope["documents"] = flow_builder.add_source(
cocoindex.sources.LocalFile(
path=os.path.join("data", "patient_forms"), binary=True
)
)

patients_index = data_scope.add_collector()

This iterates over each document. We transform doc["content"] (the bytes) by our extract_patient_info function. The result is stored in a new field patient_info. Then we collect a row with the filename and extracted patient info.

Ingesting Data

Define a custom function to use BAML extraction to transform a PDF

@cocoindex.op.function(cache=True, behavior_version=1)
async def extract_patient_info(content: bytes) -> Patient:
pdf = baml_py.Pdf.from_base64(base64.b64encode(content).decode("utf-8"))
return await b.ExtractPatientInfo(pdf)
  • The extract_patient_info function is decorated with @cocoindex.op.function(cache=True, behavior_version=1). Setting cache=True causes CocoIndex to cache outputs of this function for incremental runs (so unchanged inputs skip rerunning the LLM). We increase behavior_version (start at 1) so that any prompt or logic changes will force a refresh.
  • Inside the function, we convert bytes to a BAML Pdf (via base64) and then call await b.ExtractPatientInfo(pdf). This returns a Patient dataclass instance (mapped from the BAML output)

Process each document

  1. Transform each doc with BAML
  2. collect the structured output
with data_scope["documents"].row() as doc:
doc["patient_info"] = doc["content"].transform(extract_patient_info)

patients_index.collect(
filename=doc["filename"],
patient_info=doc["patient_info"],
)

Transforming Data

It is common to have heavy nested data, CocoIndex is natively designed to handle heavily nested data structures.

Nested Data

Export to Postgres

patients_index.export(
"patients",
cocoindex.storages.Postgres(),
primary_key_fields=["filename"],
)

we export the collected index to Postgres. This will create/maintain a table patients keyed by filename, automatically deleting or updating rows if inputs change. Because CocoIndex tracks data lineage, it will handle updates/deletions of source files incrementally

Running the Pipeline

Generate BAML client code (required step, in case you didn’t do it earlier. )

baml generate

This generates the baml_client/ directory with Python code to call your BAML functions.

Update the index:

cocoindex update main

CocoInsight

I used CocoInsight (Free beta now) to troubleshoot the index generation and understand the data lineage of the pipeline. It just connects to your local CocoIndex server, with zero pipeline data retention.

cocoindex server -ci main

Composable by Default: Use the Best Components for Your Use Case

While CocoIndex provides a rich set of building blocks for building LLM pipelines, it is fundamentally designed as an open system. Developers can bring in their preferred transformation components tailored to their domain — from document parsers to structured extractors like BAML.

This flexibility enables deep composability with other open ecosystems. The synergy between CocoIndex and BAML highlights this philosophy: BAML brings powerful prompt-driven schema extraction, while CocoIndex orchestrates and maintains the flow at scale. There’s no lock-in — developers and enterprises experimenting at the frontier can adapt, extend, and integrate freely.

Summary

By combining BAML and CocoIndex, we get a robust, schema-driven workflow: BAML ensures the prompt-to-schema mapping is correct and type-safe, while CocoIndex handles data ingestion, transformation, and incremental storage. This example extracted patient intake information (names, insurance, medications, etc.) from PDFs, but the pattern applies to any structured data extraction task.

Index PDF elements - text, images with mixed embedding models and metadata

· 7 min read
Linghua Jin
CocoIndex Maintainer

Index PDF elements - text, images with mixed encoders and citations with metadata

PDFs are rich with both text and visual content — from descriptive paragraphs to illustrations and tables. This example builds an end-to-end flow that parses, embeds, and indexes both, with full traceability to the original page.

In this example, we split out both text and images, link them back to page metadata, and enable unified semantic search. We’ll use CocoIndex to define the flow, SentenceTransformers for text embeddings, and CLIP for image embeddings — all stored in Qdrant for retrieval.

Automated invoice processing with AI, Snowflake and CocoIndex - with incremental processing

· 17 min read
Dhilip Subramanian
Data & AI Practitioner, CocoIndex Community Contributor

cover

I recently worked with a clothing manufacturer who wanted to simplify their invoice process. Every day, they receive around 20–22 supplier invoices in PDF format. All these invoices are stored in Azure Blob Storage. The finance team used to open each PDF manually and copy the details into their system. This took a lot of time and effort. On top of that, they already had a backlog of 8,000 old invoices waiting to be processed.

At first, I built a flow using n8n. This solution read the invoices from Azure Blob Storage, used Mistral AI to pull out the fields from each PDF, and then loaded the results into Snowflake. The setup worked fine for a while. But as the number of invoices grew, the workflow started to break. Debugging errors inside a no-code tool like n8n became harder and harder. That’s when I decided to switch to a coding solution.

I came across CocoIndex, an open-source ETL framework designed to transform data for AI, with support for real-time incremental processing. It allowed me to build a pipeline that was both reliable and scalable for this use case.

Fast iterate your indexing strategy - trace back from query to data

· 4 min read
Linghua Jin
CocoIndex Maintainer

cover

We are launching a major feature in both CocoIndex and CocoInsight to help users fast iterate with the indexing strategy, and trace back all the way to the data — to make the transformation experience more seamlessly integrated with the end goal.

We deeply care about making the overall experience seamless. With the new launch, you can define query handlers, so that you can easily run queries in tools like CocoInsight.

Incrementally Transform Structured + Unstructured Data from Postgres with AI

· 7 min read
Linghua Jin
CocoIndex Maintainer

PostgreSQL Product Indexing Flow

CocoIndex is one framework for building incremental data flows across structured and unstructured sources.

In CocoIndex, AI steps -- like generating embeddings -- are just transforms in the same flow as your other types of transformations, e.g. data mappings, calculations, etc.

Why One Framework for Structured + Unstructured?

  • One mental model: Treat files, APIs, and databases uniformly; AI steps are ordinary ops.
  • Incremental by default: Use an ordinal column to sync only changes; no fragile glue jobs.
  • Consistency: Embeddings are always derived from the exact transformed row state.
  • Operational simplicity: One deployment, one lineage view, fewer moving parts.

This blog introduces the new PostgreSQL source and shows how to take data from PostgreSQL table as source, transform with both AI models and non-AI calculations, and write them into a new PostgreSQL table for semantic + structured search.

Build a Visual Document Index from multiple formats all at once - PDFs, Images, Slides - with ColPali

· 5 min read
Linghua Jin
CocoIndex Maintainer

Colpali

Do you have a messy collection of scanned documents, PDFs, academic papers, presentation slides, and standalone images — all mixed together with charts, tables, and figures — that you want to process into the same vector space for semantic search or to power an AI agent?

In this example, we’ll walk through how to build a visual document indexing pipeline using ColPali for embedding both PDFs and images — and then query the index using natural language.
We’ll skip OCR entirely — ColPali can directly understand document layouts, tables, and figures from images, making it perfect for semantic search across visual-heavy content.

Index Images with ColPali: Multi-Modal Context Engineering

· 7 min read
Linghua Jin
CocoIndex Maintainer

Colpali

We’re excited to announce that CocoIndex now supports native integration with ColPali — enabling multi-vector, patch-level image indexing using cutting-edge multimodal models.

With just a few lines of code, you can now embed and index images with ColPali’s late-interaction architecture, fully integrated into CocoIndex’s composable flow system.

Bring your own building blocks: Export anywhere with Custom Targets

· 8 min read
Linghua Jin
CocoIndex Maintainer

Custom Targets

We’re excited to announce that CocoIndex now officially supports custom targets — giving you the power to export data to any destination, whether it's a local file, cloud storage, a REST API, or your own bespoke system.

This new capability unlocks a whole new level of flexibility for integrating CocoIndex into your pipelines and allows you to bring your own "building blocks" into our flow model.

Indexing Faces for Scalable Visual Search - Build your own Google Photo Search

· 5 min read
Linghua Jin
CocoIndex Maintainer

Face Detection

CocoIndex supports multi-modal processing natively - it could process both text and image with the same programming model and observe in the same user flow (in CocoInsight).

In this blog, we’ll walk through a comprehensive example of building a scalable face recognition pipeline using CocoIndex. We’ll show how to extract and embed faces from images, structure the data relationally, and export everything into a vector database for real-time querying.

CocoInsight can now visualize identified sections of an image based on the bounding boxes and makes it easier to understand and evaluate AI extractions - seamlessly attaching computed features in the context of unstructured visual data.

Build Real-Time Product Recommendation Engine with LLM and Graph Database

· 8 min read
Linghua Jin
CocoIndex Maintainer

Product Graph

In this blog, we will build a real-time product recommendation engine with LLM and graph database. In particular, we will use LLM to understand the category (taxonomy) of a product. In addition, we will use LLM to enumerate the complementary products - users are likely to buy together with the current product (pencil and notebook). We will use Graph to explore the relationships between products that can be further used for product recommendations or labeling.

Build text embeddings from Google Drive for RAG

· 9 min read

Text Embedding from Google Drive

In this blog, we will show you how to use CocoIndex to build text embeddings from Google Drive for RAG step by step including how to setup Google Cloud Service Account for Google Drive. CocoIndex is an open source framework to build fresh indexes from your data for AI. It is designed to be easy to use and extend.