# Real-time product recommendations with LLM + graph DB

> Build a real-time product recommendation engine with an LLM and a graph database, from the aspect of product category (taxonomy) understanding.

Published: 2025-05-07 · Canonical: https://cocoindex.io/blogs/product-recommendation/

In this blog, we will build a real-time product recommendation engine with an LLM and a graph database. In particular, we will use an LLM to understand the category (taxonomy) of a product. In addition, we will use the LLM to enumerate the complementary products that users are likely to buy together with the current product (pencil and notebook). We will use a graph to explore the relationships between products that can be further used for product recommendations or labeling.

We are constantly improving, and more features and examples are coming soon. 
Stay tuned and follow our progress by starring our [GitHub repo](https://github.com/cocoindex-io/cocoindex).

Product taxonomy is a way to organize product catalogs in a logical and hierarchical structure; a great detailed explanation can be found [here](https://help.shopify.com/en/manual/products/details/product-category). In practice, it is a complicated problem: a product can be part of multiple categories, and a category can have multiple parents.

The source code is available at [Product Recommendation Code](https://github.com/cocoindex-io/cocoindex/tree/main/examples/product_recommendation).

## Prerequisites
*   [Install PostgreSQL](https://cocoindex.io/docs/getting_started/installation/). CocoIndex uses PostgreSQL internally for [incremental processing](https://cocoindex.io/blogs/incremental-processing). 
*   [Install Neo4j](https://cocoindex.io/docs/connectors/neo4j/), a graph database.
*   [Configure your OpenAI API key](https://cocoindex.io/docs/ops/litellm/). Alternatively, you can switch to [Ollama](https://cocoindex.io/blogs/cocoindex-ollama-structured-extraction-from-pdf), which runs LLM models locally: [guide](https://cocoindex.io/docs/ops/litellm/).

## Documentation
You can read the official CocoIndex Documentation for Property Graph Targets [here](https://cocoindex.io/docs/programming_guide/target_state/).

## Data flow to build knowledge graph

### Overview

The core flow is about [~100 lines of Python code](https://github.com/cocoindex-io/cocoindex/blob/1d42ab31692c73743425f7712c9af395ef98c80e/examples/product_taxonomy_knowledge_graph/main.py#L75-L177).

We are going to declare a [data flow](https://cocoindex.io/docs/programming_guide/core_concepts/):
1.  ingest products (in JSON)
2.  for each product, 
    - parse JSON
    - map & clean up data 
    - extract taxonomy from the mapped data
3.  collect data 
4.  export data to Neo4j

### Add documents as source

```python
@cocoindex.flow_def(name="StoreProduct")
def store_product_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
    data_scope["products"] = flow_builder.add_source(
        cocoindex.sources.LocalFile(path="products",
                                    included_patterns=["*.json"]),
        refresh_interval=datetime.timedelta(seconds=5))
```

Here `flow_builder.add_source` creates a [KTable](https://cocoindex.io/docs/common_resources/data_types/).
`filename` is the key of the KTable.

### Add data collectors

Add collectors at the root scope to collect the product, taxonomy and complementary taxonomy.

```python
product_node = data_scope.add_collector()
product_taxonomy = data_scope.add_collector()
product_complementary_taxonomy = data_scope.add_collector()
```

### Process each product

We will parse the JSON file for each product, and transform the data to the format that we need for downstream processing. 

#### Data mapping

```python
@cocoindex.op.function(behavior_version=2)
def extract_product_info(product: cocoindex.typing.Json, filename: str) -> ProductInfo:
    return ProductInfo(
        id=f"{filename.removesuffix('.json')}",
        url=product["source"],
        title=product["title"],
        price=float(product["price"].lstrip("$").replace(",", "")),
        detail=Template(PRODUCT_TEMPLATE).render(**product),
    )
```

Here we define a [custom function](https://cocoindex.io/docs/programming_guide/function/) for data mapping, e.g., 
- clean up the `id` field
- map `title` -> `title`
- clean up the `price` field
- generate a markdown string for the product detail based on all the fields (for LLM to extract taxonomy and complementary taxonomy, we find that markdown works best as context for the LLM).

#### Flow

Within the flow, we plug in the data mapping transformation to process each product JSON.

```python
with data_scope["products"].row() as product:
    data = (product["content"]
            .transform(cocoindex.functions.ParseJson(), language="json")
            .transform(extract_product_info, filename=product["filename"]))
    product_node.collect(id=data["id"], url=data["url"], title=data["title"], price=data["price"])
```

1. The first `transform()` parses the JSON file.
2. The second `transform()` performs the defined data mapping.
3. We collect the fields we need for the product node in Neo4j.

### Extract taxonomy and complementary taxonomy using LLM

#### Product taxonomy definition

Since we are using an LLM to extract product taxonomy, we need to provide detailed instructions in the class-level docstring.

```python
@dataclasses.dataclass
class ProductTaxonomy:
    """
    Taxonomy for the product.
    
    A taxonomy is a concise noun (or short noun phrase), based on its core functionality, without specific details such as branding, style, etc.

    Always use the most common words in US English.

    Use lowercase without punctuation, unless it's a proper noun or acronym.

    A product may have multiple taxonomies. Avoid large categories like "office supplies" or "electronics". Use specific ones, like "pen" or "printer".
    """
    name: str
```

#### Define product taxonomy info

Basically we want to extract all possible taxonomies for a product, and think about what other products are likely to be bought together with the current product.

```python
@dataclasses.dataclass
class ProductTaxonomyInfo:
    """
    Taxonomy information for the product.

    Fields:
    - taxonomies: Taxonomies for the current product.
    - complementary_taxonomies: Think about when customers buy this product, what else they might need as complementary products. Put labels for these complementary products.
    """
    taxonomies: list[ProductTaxonomy]
    complementary_taxonomies: list[ProductTaxonomy]
```

For each product, we want some insight about its taxonomy and complementary taxonomy and we could use that as a bridge to find related products using a knowledge graph. 

#### LLM extraction

Finally, we will use `cocoindex.functions.ExtractByLlm` to extract the taxonomy and complementary taxonomy from the product detail.

```python
taxonomy = data["detail"].transform(cocoindex.functions.ExtractByLlm(
            llm_spec=cocoindex.LlmSpec(
                api_type=cocoindex.LlmApiType.OPENAI, model="gpt-4.1"),
                output_type=ProductTaxonomyInfo))
```

For example, the LLM takes the description of the *gel pen*, and extracts taxonomy to be *gel pen*.
Meanwhile, it suggests that when people buy *gel pen*, they may also be interested in *notebook* etc. as complementary taxonomy.

And then we will collect the taxonomy and complementary taxonomy into the collector.
```python
with taxonomy['taxonomies'].row() as t:
    product_taxonomy.collect(id=cocoindex.GeneratedField.UUID, product_id=data["id"], taxonomy=t["name"])
with taxonomy['complementary_taxonomies'].row() as t:
    product_complementary_taxonomy.collect(id=cocoindex.GeneratedField.UUID, product_id=data["id"], taxonomy=t["name"])
```

### Build knowledge graph
 
#### Basic concepts
All nodes for Neo4j need two things:
1. Label: The type of the node. E.g., `Product`, `Taxonomy`.
2. Primary key field: The field that uniquely identifies the node. E.g., `id` for `Product` nodes.

CocoIndex uses the primary key field to match the nodes and deduplicate them. If you have multiple nodes with the same primary key, CocoIndex keeps only one of them.

There are two ways to map nodes:
1. When you have a collector just for the node, you can directly export it to Neo4j. For example `Product`. We've collected each product explicitly.
2. When you have a collector for relationships connecting to the node, you can map nodes from selected fields in the relationship collector. You must declare a node label and primary key field.  

For example,

```python
product_taxonomy.collect(id=cocoindex.GeneratedField.UUID, product_id=data["id"], taxonomy=t["name"])
```

This collects a relationship, and a taxonomy node is created from the relationship.

#### Configure Neo4j connection:

```python
conn_spec = cocoindex.add_auth_entry(
    "Neo4jConnection",
    cocoindex.storages.Neo4jConnection(
        uri="bolt://localhost:7687",
        user="neo4j",
        password="cocoindex",
))
```

#### Export `Product` nodes to Neo4j

```python
product_node.export(
    "product_node",
    cocoindex.storages.Neo4j(
        connection=conn_spec,
        mapping=cocoindex.storages.Nodes(label="Product")
    ),
    primary_key_fields=["id"],
)
```

This exports Neo4j nodes with label `Product` from the `product_node` collector.
- It declares Neo4j node label `Product`. It specifies `id` as the primary key field.
- It carries all the fields from the `product_node` collector to Neo4j nodes with label `Product`.

#### Export `Taxonomy` nodes to Neo4j

We don't have an explicit collector for `Taxonomy` nodes. 
They are part of the `product_taxonomy` and `product_complementary_taxonomy` collectors and fields are collected during the taxonomy extraction.

To export them as Neo4j nodes, we need to first declare `Taxonomy` nodes. 

```python
flow_builder.declare(
    cocoindex.storages.Neo4jDeclaration(
        connection=conn_spec,
        nodes_label="Taxonomy",
        primary_key_fields=["value"],
    )
)
```

Next, export the `product_taxonomy` as a relationship to Neo4j.

```python
product_taxonomy.export(
    "product_taxonomy",
    cocoindex.storages.Neo4j(
        connection=conn_spec,
        mapping=cocoindex.storages.Relationships(
            rel_type="PRODUCT_TAXONOMY",
            source=cocoindex.storages.NodeFromFields(
                label="Product",
                fields=[
                    cocoindex.storages.TargetFieldMapping(
                        source="product_id", target="id"),
                ]
            ),
            target=cocoindex.storages.NodeFromFields(
                label="Taxonomy",
                fields=[
                    cocoindex.storages.TargetFieldMapping(
                        source="taxonomy", target="value"),
                ]
            ),
        ),
    ),
    primary_key_fields=["id"],
)
```

Similarly, we can export the `product_complementary_taxonomy` as a relationship to Neo4j.
```python
    product_complementary_taxonomy.export(
        "product_complementary_taxonomy",
        cocoindex.storages.Neo4j(
            connection=conn_spec,
            mapping=cocoindex.storages.Relationships(
                rel_type="PRODUCT_COMPLEMENTARY_TAXONOMY",
                source=cocoindex.storages.NodeFromFields(
                    label="Product",
                    fields=[
                        cocoindex.storages.TargetFieldMapping(
                            source="product_id", target="id"),
                    ]
                ),
                target=cocoindex.storages.NodeFromFields(
                    label="Taxonomy",
                    fields=[
                        cocoindex.storages.TargetFieldMapping(
                            source="taxonomy", target="value"),
                    ]
                ),
            ),
        ),
        primary_key_fields=["id"],
    )
```

The `cocoindex.storages.Relationships` declares how to map relationships in Neo4j.

In a relationship, there's:
1.  A source node and a target node.
2.  A relationship connecting the source and target.
Note that different relationships may share the same source and target nodes.

`NodeFromFields` takes the fields from the `entity_relationship` collector and creates `Taxonomy` nodes. 

## Query and test your index
🎉 Now you are all set!

1.  Install the dependencies:

    ```
    pip install -e .
    ```

2.  Run the following command to set up and update the index.
    ```sh
    cocoindex update --setup main
    ```
    
    You'll see the index update state in the terminal. For example, you'll see the following output:

    ```
    documents: 9 added, 0 removed, 0 updated
    ```

3.  (Optional) I used CocoInsight to troubleshoot the index generation and understand the data lineage of the pipeline. 
It is in free beta now. You can give it a try. Run the following command to start CocoInsight:

    ```
    cocoindex server -ci main
    ```

    And then open the URL https://cocoindex.io/cocoinsight.  It just connects to your local CocoIndex server, with zero pipeline data retention.

### Browse the knowledge graph
After the knowledge graph is built, you can explore the knowledge graph you built in Neo4j Browser.

For the dev environment, you can connect to Neo4j Browser using credentials:
- username: `Neo4j`
- password: `cocoindex`
which are pre-configured in our docker compose [config.yaml](https://raw.githubusercontent.com/cocoindex-io/cocoindex/refs/heads/main/dev/Neo4j.yaml).

You can open it at [http://localhost:7474](http://localhost:7474), and run the following Cypher query to get all relationships:

```cypher
MATCH p=()-->() RETURN p
```

## Support us
We are constantly improving, and more features and examples are coming soon. 
If you love this article, please give us a star ⭐ at [GitHub repo](https://github.com/cocoindex-io/cocoindex) to help us grow.

Thanks for reading!
