# Neo4j connector

> **CocoIndex v1.** This page documents CocoIndex **v1** — a ground-up redesign from v0. When writing code, ignore any v0 flow-builder DSL or deprecated decorators.
>
> Source: https://cocoindex.io/docs/connectors/neo4j/ · Docs index: https://cocoindex.io/docs/llms.txt · Agent skill: https://cocoindex.io/docs/skill.md

The `neo4j` connector writes records to [Neo4j](https://neo4j.com), a property graph database. It supports node tables (labels), relationship tables (edge types), per-database multitenancy (one Neo4j cluster, many isolated databases), real Cypher uniqueness constraints, and vector indexes via the `CREATE VECTOR INDEX` DDL form.

```python
from cocoindex.connectors import neo4j
```

**Note — Dependencies**
This connector requires additional dependencies. Install with:

```bash
pip install cocoindex[neo4j]
```

Targets Neo4j 5.18+. Vector-index DDL (`CREATE VECTOR INDEX … OPTIONS { indexConfig: { … } }`) shipped in 5.18 — older 5.x servers will reject the DDL the connector emits.

## Connection setup

Create a `ConnectionFactory` and provide it via a `ContextKey`. The factory holds the Bolt URI, optional auth, and the target database name; it lazily opens a Neo4j async driver and returns a graph handle on demand.

**Note**
The key name is load-bearing across runs — it's the stable identity CocoIndex uses to track managed rows. See [ContextKey as stable identity](../programming_guide/context#contextkey-as-stable-identity) before renaming.

```python
from collections.abc import AsyncIterator
from cocoindex.connectors import neo4j
import cocoindex as coco

KG_DB: coco.ContextKey[neo4j.ConnectionFactory] = coco.ContextKey("kg_db")

@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
    builder.provide(
        KG_DB,
        neo4j.ConnectionFactory(
            uri="bolt://localhost:7687",
            auth=("neo4j", "cocoindex"),
            database="neo4j",
        ),
    )
    yield
```

`auth` is optional — omit it for unauthenticated dev instances. `database` defaults to `"neo4j"` (the default db that ships with every Neo4j 5 installation).

### Multitenancy

A single Neo4j cluster can host many isolated databases. Pair each database with its own `ContextKey` and `ConnectionFactory(database=...)`:

```python
KG_DB: coco.ContextKey[neo4j.ConnectionFactory] = coco.ContextKey("kg_db")
APIS_DB: coco.ContextKey[neo4j.ConnectionFactory] = coco.ContextKey("apis_db")

@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
    uri = "bolt://localhost:7687"
    auth = ("neo4j", "cocoindex")
    builder.provide(KG_DB, neo4j.ConnectionFactory(uri=uri, auth=auth, database="kg"))
    builder.provide(APIS_DB, neo4j.ConnectionFactory(uri=uri, auth=auth, database="apis"))
    yield
```

Different `ContextKey`s with different database names produce fully separate target-state trees — changes to one never spill into the other.

## As target

The `neo4j` connector provides target state APIs for writing records to node tables and relation tables. CocoIndex tracks what records should exist and automatically handles upserts and deletions.

Each apply batch is wrapped in a single Neo4j transaction (`tx.commit()` on success, rollback on exception), so partial writes never leak into the database. Within a batch, writes are ordered as **node upserts → relation upserts → relation deletes → node deletes** so dependent edges always see their endpoints.

### Declaring target states

#### Node tables (parent state)

Declares a node label as a target state. Returns a `TableTarget` for declaring records.

```python
def declare_table_target(
    db: ContextKey,
    table_name: str,
    table_schema: TableSchema[RowT] | None = None,
    *,
    primary_key: str = "id",
    managed_by: Literal["system", "user"] = "system",
) -> TableTarget[RowT, coco.PendingS]
```

**Parameters:**

- `db` — A `ContextKey[neo4j.ConnectionFactory]` for the Neo4j connection.
- `table_name` — The Cypher node label (e.g. `"Document"`).
- `table_schema` — Optional schema definition (see [Table Schema](#table-schema-from-python-class)). The schema participates in CocoIndex's fingerprint (so two flows declaring the same label must agree); per-property type DDL is not emitted in v1.
- `primary_key` — Single property name used as the node's primary key. Defaults to `"id"`. Compound primary keys are not supported in v1.0.
- `managed_by` — Whether CocoIndex manages the table lifecycle (`"system"`) or assumes it exists (`"user"`).

**Returns:** A pending `TableTarget`. Use `await neo4j.mount_table_target(KG_DB, ...)` to get a resolved target.

#### Records (child states)

Once a `TableTarget` is resolved, declare records to be upserted (translated to `MERGE (n:Label {pk: $key_0}) SET n += $props`):

```python
def TableTarget.declare_record(
    self,
    *,
    row: RowT,
) -> None
```

**Parameters:**

- `row` — A row object (dict, dataclass, NamedTuple, or Pydantic model). Must include the `primary_key` field declared above.

`declare_row` is an alias for `declare_record`, for compatibility with Postgres and other RDBMS targets.

#### Relation tables (parent state)

Declares a relationship type as a target state. Returns a `RelationTarget` for declaring edges.

```python
def declare_relation_target(
    db: ContextKey,
    table_name: str,
    from_table: TableTarget,
    to_table: TableTarget,
    table_schema: TableSchema[RowT] | None = None,
    *,
    primary_key: str = "id",
    managed_by: Literal["system", "user"] = "system",
) -> RelationTarget[RowT, coco.PendingS]
```

**Parameters:**

- `db` — A `ContextKey[neo4j.ConnectionFactory]` for the Neo4j connection.
- `table_name` — The Cypher relationship type (e.g. `"MENTION"`).
- `from_table` — The `TableTarget` whose nodes are the *source* endpoints of edges in this relationship.
- `to_table` — The `TableTarget` whose nodes are the *target* endpoints of edges in this relationship.
- `table_schema` — Optional schema for the relationship's own properties. The relationship's `primary_key` field uniquely identifies each edge.
- `primary_key` — Single property name used as the edge's primary key. Defaults to `"id"`.
- `managed_by` — Whether CocoIndex manages the relationship lifecycle (`"system"`) or assumes it exists (`"user"`).

**Returns:** A pending `RelationTarget`. Use `await neo4j.mount_relation_target(KG_DB, ...)` to get a resolved target.

#### Relations (child states)

Once a `RelationTarget` is resolved, declare edges. Each declaration produces a triple-MERGE: source endpoint, target endpoint, then the relationship.

```python
def RelationTarget.declare_relation(
    self,
    *,
    from_id: Any,
    to_id: Any,
    record: RowT | None = None,
) -> None
```

**Parameters:**

- `from_id` — The source node's primary-key value. The connector MERGEs `(s:FromLabel {pk: $from_id})` so endpoints are auto-created if absent.
- `to_id` — The target node's primary-key value. Same MERGE behavior.
- `record` — Optional row object whose fields populate the relationship's properties. Must include the relationship's `primary_key` field if provided.

If `record` is omitted, the connector derives a deterministic edge id of the form `{from_label}_{from_id}_{to_label}_{to_id}`. Convenient when an edge has no properties of its own.

#### Vector indexes (attachment)

Declares a vector index on a column of a node table. Vector indexes are an [attachment](../advanced_topics/custom_target_connector#implementing-attachment-providers) to a `TableTarget`:

```python
def TableTarget.declare_vector_index(
    self,
    *,
    name: str | None = None,
    field: str,
    metric: Literal["cosine", "euclidean"] = "cosine",
    dimension: int,
) -> None
```

**Parameters:**

- `name` — Optional logical name for the index. Defaults to `f"vec_{table_name}__{field}"`.
- `field` — The node property holding the vector.
- `metric` — Similarity metric: `"cosine"` or `"euclidean"`. Translated to Neo4j's `vector.similarity_function` option.
- `dimension` — The vector's dimension. Required.

The connector emits:

```cypher
CREATE VECTOR INDEX `coco_vec_<Label>__<field>` IF NOT EXISTS
FOR (n:`Label`) ON n.`field`
OPTIONS { indexConfig: {
  `vector.dimensions`: <N>,
  `vector.similarity_function`: '<metric>'
} }
```

Vectors are float32 only.

### Table schema: from Python class

Build a `TableSchema` by introspecting a record type:

```python
@classmethod
async def TableSchema.from_class(
    cls,
    record_type: type[RowT],
    *,
    primary_key: str = "id",
    column_overrides: dict[str, Neo4jType | VectorSchemaProvider] | None = None,
) -> TableSchema[RowT]
```

**Parameters:**

- `record_type` — A dataclass, NamedTuple, or Pydantic model.
- `primary_key` — Field name to use as the table's primary key. Defaults to `"id"`.
- `column_overrides` — Optional dict mapping field names to `Neo4jType` or `VectorSchemaProvider` to override the default Python-to-Neo4j type mapping.

**Returns:** A `TableSchema[RowT]` populated from the class's fields.

#### Default Python → Neo4j type mapping

Most types pass through native Bolt encoding — no per-value transform applied:

| Python type | Neo4j type | Notes |
|---|---|---|
| `bool` | `BOOLEAN` | |
| `int`, NumPy integer scalars | `INTEGER` | |
| `float`, NumPy float scalars | `FLOAT` | |
| `decimal.Decimal` | `STRING` | Encoded via `str()` — Neo4j has no decimal type. |
| `str` | `STRING` | |
| `bytes` | `BYTES` | Native Bolt type — no encoder. |
| `uuid.UUID` | `STRING` | Encoded via `str()`. |
| `datetime.date` | `DATE` | Native Bolt type. |
| `datetime.datetime` | `ZONED_DATETIME` | Native Bolt type. |
| `datetime.time` | `LOCAL_TIME` | Native Bolt type. |
| `datetime.timedelta` | `DURATION` | Native Bolt type. |
| `numpy.ndarray` (with `VectorSchema` annotation) | `LIST<FLOAT>` | Encoded via `tolist()`; paired with vector-index DDL. |
| `dict`, list, nested record, `Any` | `MAP` / `LIST<ANY>` | Passed through native parameter binding. |

#### Neo4jType

Override the default mapping for a single column with `Neo4jType`:

```python
class Neo4jType(NamedTuple):
    neo4j_type: str
    encoder: ValueEncoder | None = None
```

Use with `typing.Annotated`:

```python
from typing import Annotated
from dataclasses import dataclass
from cocoindex.connectors.neo4j import Neo4jType

@dataclass
class Row:
    id: str
    score: Annotated[float, Neo4jType("STRING", encoder=str)]
```

The `neo4j_type` string is metadata-only — it participates in the schema fingerprint (so two flows declaring the same table must agree) but no per-property type DDL is emitted from it.

#### VectorSchemaProvider

For NumPy `ndarray` columns, attach a `VectorSchema` annotation to specify dtype + dimension. See [VectorSchema](../common_resources/vector_schema) for details.

### Table schema: explicit column definitions

Build a `TableSchema` directly from a dict of column definitions when the row type is dynamic:

```python
from cocoindex.connectors.neo4j import TableSchema, ColumnDef

schema = TableSchema(
    columns={
        "filename": ColumnDef(type="STRING"),
        "title": ColumnDef(type="STRING"),
        "summary": ColumnDef(type="STRING", nullable=True),
    },
    primary_key="filename",
)
```

`ColumnDef` fields:

- `type` — The Neo4j type string (metadata only; see table above).
- `nullable` — Whether the column may be `None`. Defaults to `True`.
- `encoder` — Optional `Callable[[Any], Any]` applied to non-`None` values before they're sent to Neo4j.

### DDL: indexes and constraints

For each managed table, the connector creates supporting Cypher artifacts on first run:

- For node tables: a uniqueness constraint on the primary key —
  ```cypher
  CREATE CONSTRAINT `coco_uniq_<Label>__<pk>` IF NOT EXISTS
  FOR (n:`<Label>`) REQUIRE n.`<pk>` IS UNIQUE
  ```
  Neo4j auto-creates a backing index for each constraint, so a separate `CREATE INDEX` is redundant on nodes.
- For relation tables:
  ```cypher
  CREATE INDEX `coco_idx_rel_<RelType>__<pk>` IF NOT EXISTS
  FOR ()-[r:`<RelType>`]-() ON (r.`<pk>`)
  ```

Indexes and constraints are dropped on `cocoindex drop` or when the table is no longer declared.

When `managed_by="user"` is set, the connector skips DDL entirely — you're responsible for creating and dropping the schema. Record-level upserts and deletes still work.

### Example: Node tables

```python
from collections.abc import AsyncIterator
from dataclasses import dataclass
import cocoindex as coco
from cocoindex.connectors import neo4j

KG_DB: coco.ContextKey[neo4j.ConnectionFactory] = coco.ContextKey("kg_db")


@dataclass
class Document:
    filename: str
    title: str
    summary: str


@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
    builder.provide(KG_DB, neo4j.ConnectionFactory(
        uri="bolt://localhost:7687",
        auth=("neo4j", "cocoindex"),
        database="neo4j",
    ))
    yield


@coco.fn
async def app_main() -> None:
    schema = await neo4j.TableSchema.from_class(Document, primary_key="filename")
    documents = await neo4j.mount_table_target(
        KG_DB, "Document", schema, primary_key="filename",
    )
    documents.declare_record(
        row=Document(
            filename="overview.md",
            title="Overview",
            summary="An overview of CocoIndex...",
        )
    )


app = coco.App(coco.AppConfig(name="docs_to_neo4j"), app_main)
```

### Example: Relation tables (knowledge graph)

```python
@dataclass
class Entity:
    value: str


@dataclass
class RelationshipRow:
    id: str
    predicate: str


@coco.fn
async def kg_app_main() -> None:
    documents = await neo4j.mount_table_target(
        KG_DB, "Document",
        await neo4j.TableSchema.from_class(Document, primary_key="filename"),
        primary_key="filename",
    )
    entities = await neo4j.mount_table_target(
        KG_DB, "Entity",
        await neo4j.TableSchema.from_class(Entity, primary_key="value"),
        primary_key="value",
    )
    relationships = await neo4j.mount_relation_target(
        KG_DB, "RELATIONSHIP",
        entities, entities,
        await neo4j.TableSchema.from_class(RelationshipRow, primary_key="id"),
        primary_key="id",
    )

    # populate ...
    documents.declare_record(row=Document(filename="overview.md", title="Overview", summary="..."))
    entities.declare_record(row=Entity(value="CocoIndex"))
    entities.declare_record(row=Entity(value="Neo4j"))
    relationships.declare_relation(
        from_id="CocoIndex",
        to_id="Neo4j",
        record=RelationshipRow(id="rel-1", predicate="writes_to"),
    )


kg_app = coco.App(coco.AppConfig(name="kg_app"), kg_app_main)
```

The `Entity` table is declared up-front (via `mount_table_target`) so its uniqueness constraint is reconciled before any `RELATIONSHIP` edge MERGEs entity endpoints. The relationship's three-MERGE pattern (source endpoint → target endpoint → edge) means missing endpoints are auto-created — but it's good practice to declare them explicitly so deletion-cascade behavior stays predictable.
