Skip to main content
Linghua Jin
CocoIndex Maintainer
View all authors

Fast iterate your indexing strategy - trace back from query to data

· 4 min read
Linghua Jin
CocoIndex Maintainer

cover

We are launching a major feature in both CocoIndex and CocoInsight to help users fast iterate with the indexing strategy, and trace back all the way to the data — to make the transformation experience more seamlessly integrated with the end goal.

We deeply care about making the overall experience seamless. With the new launch, you can define query handlers, so that you can easily run queries in tools like CocoInsight.

CocoInsight

Does my data transformation creates meaningful index for retrieval?

In CocoInsight, we’ve added a Query mode. You can enable this by adding a CocoIndex Query Handler. You can quickly query index, and view the collected information for any entity.

CocoInsight Query Mode

The result is directly linked and can be traced back step by step to how data is generated on the indexing path.

Where are the results coming from?

For example, this snippet comes from the file docs/docs/core/flow_def.mdx . The file was split into 30 chunks after transformation.

trace back data

Why is my chunk / snippet not showing in the search result?

When you perform a query, on the ranking path, you’d usually have a scoring mechanism. On the CocoInsight, you can quickly find any files you have in your mind, and for any chunks, you can scan the scoring in the same context.

missing chunks

This gives you a powerful toolset with direct insight to end to end data transformation, to quickly iterate data indexing strategy without any headaches of building any additional UI or tools.

Integrate Query Logic with CocoIndex

Query Handler

To run queries in CocoInsight, you need to define query handlers. You can use any libraries or frameworks of your choice to perform queries.

You can read more in the documentation about Query Handler.

Query handlers let you expose a simple function that takes a query string and returns structured results. They are discoverable by tools like CocoInsight so you can query your indexes without building your own UI.

For example:

# Declaring it as a query handler, so that you can easily run queries in CocoInsight.
@code_embedding_flow.query_handler(
result_fields=cocoindex.QueryHandlerResultFields(
embedding=["embedding"], score="score"
)
)
def search(query: str) -> cocoindex.QueryOutput:
# Get the table name, for the export target in the code_embedding_flow above.
table_name = cocoindex.utils.get_target_default_name(
code_embedding_flow, "code_embeddings"
)
# Evaluate the transform flow defined below with the input query, to get the embedding.
query_vector = code_to_embedding.eval(query)
# Run the query and get the results.
with connection_pool().connection() as conn:
register_vector(conn)
with conn.cursor() as cur:
cur.execute(
f"""
SELECT filename, code, embedding, embedding <=> %s AS distance, start, "end"
FROM {table_name} ORDER BY distance LIMIT %s
""",
(query_vector, TOP_K),
)
return cocoindex.QueryOutput(
query_info=cocoindex.QueryInfo(
embedding=query_vector,
similarity_metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY,
),
results=[
{
"filename": row[0],
"code": row[1],
"embedding": row[2],
"score": 1.0 - row[3],
"start": row[4],
"end": row[5],
}
for row in cur.fetchall()
],
)

This code defines a query handler that:

  1. Turns the input query into an embedding vector. code_to_embedding is a shared transformation flow between Query and Index path, see detailed explanation below.
  2. Searches a database of code embeddings using cosine similarity.
  3. Returns the top matching code snippets with their filename, code, embedding, score, and positions.

Sharing Logic Between Indexing and Query

Sometimes, transformation logic needs to be shared between indexing and querying, e.g. when we build a vector index and query against it, the embedding computation needs to be consistent between indexing and querying.

You can find the documentation about Transformation Flow.

You can use @cocoindex.transform_flow() to define shared logic. For example

@cocoindex.transform_flow()
def text_to_embedding(text: cocoindex.DataSlice[str]) -> cocoindex.DataSlice[NDArray[np.float32]]:
return text.transform(
cocoindex.functions.SentenceTransformerEmbed(
model="sentence-transformers/all-MiniLM-L6-v2"))

In your indexing flow, you can directly call it

with doc["chunks"].row() as chunk:
chunk["embedding"] = text_to_embedding(chunk["text"])

In your query logic, you can call the eval() method with a specific value

def search(query: str) -> cocoindex.QueryOutput:
# Evaluate the transform flow defined below with the input query, to get the embedding.
query_vector = code_to_embedding.eval(query)

Examples

Beyond Vector Index

We use vector index in this blog. CocoIndex is a powerful data transformation framework that is beyond vector index. You can use it to build vector index, knowledge graph, structured extraction and transformation and any custom logic towards your need on efficient retrieval from fresh data.

Support Us

We’re constantly adding more examples and improving our runtime. ⭐ Star CocoIndex on GitHub and share the love ❤️ !

And let us know what are you building with CocoIndex — we’d love to feature them.

Incrementally Transform Structured + Unstructured Data from Postgres with AI

· 7 min read
Linghua Jin
CocoIndex Maintainer

PostgreSQL Product Indexing Flow

CocoIndex is one framework for building incremental data flows across structured and unstructured sources.

In CocoIndex, AI steps -- like generating embeddings -- are just transforms in the same flow as your other types of transformations, e.g. data mappings, calculations, etc.

Why One Framework for Structured + Unstructured?

  • One mental model: Treat files, APIs, and databases uniformly; AI steps are ordinary ops.
  • Incremental by default: Use an ordinal column to sync only changes; no fragile glue jobs.
  • Consistency: Embeddings are always derived from the exact transformed row state.
  • Operational simplicity: One deployment, one lineage view, fewer moving parts.

This blog introduces the new PostgreSQL source and shows how to take data from PostgreSQL table as source, transform with both AI models and non-AI calculations, and write them into a new PostgreSQL table for semantic + structured search.

If this helps you, ⭐ Star CocoIndex GitHub!

The Example: PostgreSQL Product Indexing Flow

PostgreSQL Product Indexing Flow

Our example demonstrates

  • Reading data from a PostgreSQL table source_products.
  • Computing additional fields (total_value, full_description).
  • Generating embeddings for semantic search.
  • Storing the results in another PostgreSQL table with a vector index using pgvector

This example is open sourced - examples/postgres_source.

Connect to source

flow_builder.add_source reads rows from source_products.

@cocoindex.flow_def(name="PostgresProductIndexing")
def postgres_product_indexing_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope) -> None:

data_scope["products"] = flow_builder.add_source(
cocoindex.sources.Postgres(
table_name="source_products",
# Optional. Use the default CocoIndex database if not specified.
database=cocoindex.add_transient_auth_entry(
cocoindex.DatabaseConnectionSpec(
url=os.environ["SOURCE_DATABASE_URL"],
)
),
# Optional.
ordinal_column="modified_time",
notification=cocoindex.sources.PostgresNotification(),
),
)

This step adds source data from PostgreSQL table source_products to the flow as a KTable.

Add PostgreSQL Source

  • Incremental Sync: When new or updated rows are found, only those rows are run through the pipeline, so downstream indexes and search results reflect the latest data while unchanged rows are untouched.

  • ordinal_column is recommended for change detection so the pipeline processes what's changed.

  • notification: when present, enable change capture based on Postgres LISTEN/NOTIFY.

Check Postgres source for more details.

If you use the Postgres database hosted by Supabase, please click Connect on your project dashboard and find the URL there. Check DatabaseConnectionSpec for more details.

Simple Data Mapping / Transformation

Create a simple transformation to calculate the total price.

@cocoindex.op.function()
def calculate_total_value(price: float, amount: int) -> float:
"""Compute total value for each product."""
return price * amount

Plug into the flow:

with data_scope["products"].row() as product:
# Compute total value
product["total_value"] = flow_builder.transform(
calculate_total_value,
product["price"],
product["amount"],
)

Calculate Total Value

Data Transformation & AI Transformation

Create a custom function creates a full_description field by combining the product’s category, name, and description.

@cocoindex.op.function()
def make_full_description(category: str, name: str, description: str) -> str:
"""Create a detailed product description for embedding."
return f"Category: {category}\nName: {name}\n\n{description}"

Embeddings often perform better with more context. By combining fields into a single text string, we ensure that the semantic meaning of the product is captured fully.

Now plug into the flow:

with data_scope["products"].row() as product:
#.. other transformations

# Compute full description
product["full_description"] = flow_builder.transform(
make_full_description,
product["product_category"],
product["product_name"],
product["description"],
)

# Generate embeddings
product["embedding"] = product["full_description"].transform(
cocoindex.functions.SentenceTransformerEmbed(
model="sentence-transformers/all-MiniLM-L6-v2"
)
)

# Collect data
indexed_product.collect(
product_category=product["product_category"],
product_name=product["product_name"],
description=product["description"],
price=product["price"],
amount=product["amount"],
total_value=product["total_value"],
embedding=product["embedding"],
)

This takes each product row, and does the following:

  1. builds a rich description.

    Make Full Description

  2. turns it into an embedding

    Embed Full Description

  3. collects the embedding along with structured fields (category, name, price, etc.).

    Collect Embedding

Export

indexed_product.export(
"output",
cocoindex.targets.Postgres(),
primary_key_fields=["product_category", "product_name"],
vector_indexes=[
cocoindex.VectorIndexDef(
field_name="embedding",
metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY,
)
],
)

All transformed rows are collected and exported to a new PostgreSQL table with a vector index, ready for semantic search.

Field lineage

When the transform flow starts to getting complex, it's hard to understand how each field is derived. CocoIndex provides a way to visualize the lineage of each field, to make it easier to trace and troubleshoot field origins and downstream dependencies.

For example, the following image shows the lineage of the embedding field, you can click from the final output backward all the way to the source fields, step by step.

Field Lineage

Running the Pipeline

  1. Set up dependencies:

    pip install -e .
  2. Create the source table with sample data:

    psql "postgres://cocoindex:cocoindex@localhost/cocoindex" -f ./prepare_source_data.sql
  3. Setup tables and update the index:

    cocoindex update --setup main
  4. Run CocoInsight:

    cocoindex server -ci main

    You can walk through the project step by step in CocoInsight to see exactly how each field is constructed and what happens behind the scenes. It connects to your local CocoIndex server, with zero pipeline data retention.

Continuous Updating

For continuous updating when the source changes, add -L:

cocoindex server -ci -L main

Check live updates for more details.

Search and Query the Index

Query

Runs a semantic similarity search over the indexed products table, returning the top matches for a given query.

def search(pool: ConnectionPool, query: str, top_k: int = 5) -> list[dict[str, Any]]:
# Get the table name, for the export target in the text_embedding_flow above.
table_name = cocoindex.utils.get_target_default_name(
postgres_product_indexing_flow, "output"
)
# Evaluate the transform flow defined above with the input query, to get the embedding.
query_vector = text_to_embedding.eval(query)
# Run the query and get the results.
with pool.connection() as conn:
register_vector(conn)
with conn.cursor(row_factory=dict_row) as cur:
cur.execute(
f"""
SELECT
product_category,
product_name,
description,
amount,
total_value,
(embedding <=> %s) AS distance
FROM {table_name}
ORDER BY distance ASC
LIMIT %s
""",
(query_vector, top_k),
)
return cur.fetchall()

This function

  • Converts the query text into an embedding (query_vector).
  • Compares it with each product’s stored embedding (embedding) using vector distance.
  • Returns the closest matches, including both metadata and the similarity score (distance).

Create an command-line interactive loop

def _main() -> None:
# Initialize the database connection pool.
pool = ConnectionPool(os.environ["COCOINDEX_DATABASE_URL"])
# Run queries in a loop to demonstrate the query capabilities.
while True:
query = input("Enter search query (or Enter to quit): ")
if query == "":
break
# Run the query function with the database connection pool and the query.
results = search(pool, query)
print("\nSearch results:")
for result in results:
score = 1.0 - result["distance"]
print(
f"[{score:.3f}] {result['product_category']} | {result['product_name']} | {result['amount']} | {result['total_value']}"
)
print(f" {result['description']}")
print("---")
print()

if __name__ == "__main__":
load_dotenv()
cocoindex.init()
_main()

Run as a Service

This example runs as a service using Fast API.

Summary

This approach unlocks powerful new possibilities for businesses to build fast and consistent semantic + structured search experiences, enabling advanced recommendations, knowledge discovery, and contextual analytics from hybrid data at scale.

With a single deployment, one lineage view, and a coherent mental model, CocoIndex is a future-ready framework that drives the next generation of data- and AI-powered applications with simplicity, rigor, and operational excellence.

Support Us

We’re constantly adding more examples and improving our runtime. ⭐ Star CocoIndex on GitHub and share the love ❤️! And let us know what are you building with CocoIndex — we’d love to feature them.

Build a Visual Document Index from multiple formats all at once - PDFs, Images, Slides - with ColPali

· 5 min read
Linghua Jin
CocoIndex Maintainer

Colpali

Do you have a messy collection of scanned documents, PDFs, academic papers, presentation slides, and standalone images — all mixed together with charts, tables, and figures — that you want to process into the same vector space for semantic search or to power an AI agent?

In this example, we’ll walk through how to build a visual document indexing pipeline using ColPali for embedding both PDFs and images — and then query the index using natural language.
We’ll skip OCR entirely — ColPali can directly understand document layouts, tables, and figures from images, making it perfect for semantic search across visual-heavy content.

CocoIndex Changelog 2025-08-18

· 13 min read
Linghua Jin
CocoIndex Maintainer

CocoIndex Changelog 2025-08-15

We’ve shipped 20+ releases — packed with production-ready features, scalability upgrades, and runtime improvements. 🚀 Huge thanks to our amazing users for the feedback and for running CocoIndex at scale!

Index Images with ColPali: Multi-Modal Context Engineering

· 7 min read
Linghua Jin
CocoIndex Maintainer

Colpali

We’re excited to announce that CocoIndex now supports native integration with ColPali — enabling multi-vector, patch-level image indexing using cutting-edge multimodal models.

With just a few lines of code, you can now embed and index images with ColPali’s late-interaction architecture, fully integrated into CocoIndex’s composable flow system.

Multi-Dimensional Vector Support in CocoIndex

· 6 min read
Linghua Jin
CocoIndex Maintainer

Custom Targets

CocoIndex now provides robust and flexible support for typed vector data — from simple numeric arrays to deeply nested multi-dimensional vectors. This support is designed for seamless integration with high-performance vector databases such as Qdrant, and enables advanced indexing, embedding, and retrieval workflows across diverse data modalities.

Bring your own building blocks: Export anywhere with Custom Targets

· 8 min read
Linghua Jin
CocoIndex Maintainer

Custom Targets

We’re excited to announce that CocoIndex now officially supports custom targets — giving you the power to export data to any destination, whether it's a local file, cloud storage, a REST API, or your own bespoke system.

This new capability unlocks a whole new level of flexibility for integrating CocoIndex into your pipelines and allows you to bring your own "building blocks" into our flow model.

Indexing Faces for Scalable Visual Search - Build your own Google Photo Search

· 5 min read
Linghua Jin
CocoIndex Maintainer

Face Detection

CocoIndex supports multi-modal processing natively - it could process both text and image with the same programming model and observe in the same user flow (in CocoInsight).

In this blog, we’ll walk through a comprehensive example of building a scalable face recognition pipeline using CocoIndex. We’ll show how to extract and embed faces from images, structure the data relationally, and export everything into a vector database for real-time querying.

CocoInsight can now visualize identified sections of an image based on the bounding boxes and makes it easier to understand and evaluate AI extractions - seamlessly attaching computed features in the context of unstructured visual data.

Introducing CocoInsight

· 4 min read
Linghua Jin
CocoIndex Maintainer

CocoInsight From day zero, we envisioned CocoInsight as a fundamental companion to CocoIndex — not just a tool, but a philosophy: making data explainable, auditable, and actionable at every stage of the data pipeline with AI workloads. CocoInsight has been in private beta for a while, it is one of the most loved feature for our users building ETL with coco, with significant boost on developer velocity, and lowering the barrier to entry for data engineering.

We are officially launching CocoInsight today - it has zero pipeline data retention and connects to your on-premise CocoIndex server for pipeline insights. This makes data directly visible and easy to develop ETL pipelines.

Flow-based schema inference for Qdrant

· 7 min read
Linghua Jin
CocoIndex Maintainer

CocoIndex + Qdrant Automatic Schema Setup

CocoIndex supports Qdrant natively - the integration features a high performance Rust stack with incremental processing end to end for scale and data freshness. 🎉 We just rolled out our latest change that handles automatic target schema setup with Qdrant from CocoIndex indexing flow.

Build Real-Time Product Recommendation Engine with LLM and Graph Database

· 8 min read
Linghua Jin
CocoIndex Maintainer

Product Graph

In this blog, we will build a real-time product recommendation engine with LLM and graph database. In particular, we will use LLM to understand the category (taxonomy) of a product. In addition, we will use LLM to enumerate the complementary products - users are likely to buy together with the current product (pencil and notebook). We will use Graph to explore the relationships between products that can be further used for product recommendations or labeling.