Neo4j connector

Write to Neo4j — a property graph database — with support for node tables, relationship tables (edges), per-database multitenancy, atomic-batch writes via Bolt transactions, real CREATE CONSTRAINT uniqueness, and vector indexes with cosine / euclidean similarity.

Version
v 1.0.2

The neo4j connector writes records to Neo4j, a property graph database. It supports node tables (labels), relationship tables (edge types), per-database multitenancy (one Neo4j cluster, many isolated databases), real Cypher uniqueness constraints, and vector indexes via the CREATE VECTOR INDEX DDL form.

python
from cocoindex.connectors import neo4j
Dependencies

This connector requires additional dependencies. Install with:

bash
pip install cocoindex[neo4j]

Targets Neo4j 5.18+. Vector-index DDL (CREATE VECTOR INDEX … OPTIONS { indexConfig: { … } }) shipped in 5.18 — older 5.x servers will reject the DDL the connector emits.

Connection setup

Create a ConnectionFactory and provide it via a ContextKey. The factory holds the Bolt URI, optional auth, and the target database name; it lazily opens a Neo4j async driver and returns a graph handle on demand.

Note

The key name is load-bearing across runs — it’s the stable identity CocoIndex uses to track managed rows. See ContextKey as stable identity before renaming.

python
from collections.abc import AsyncIterator
from cocoindex.connectors import neo4j
import cocoindex as coco

KG_DB: coco.ContextKey[neo4j.ConnectionFactory] = coco.ContextKey("kg_db")

@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
    builder.provide(
        KG_DB,
        neo4j.ConnectionFactory(
            uri="bolt://localhost:7687",
            auth=("neo4j", "cocoindex"),
            database="neo4j",
        ),
    )
    yield

auth is optional — omit it for unauthenticated dev instances. database defaults to "neo4j" (the default db that ships with every Neo4j 5 installation).

Multitenancy

A single Neo4j cluster can host many isolated databases. Pair each database with its own ContextKey and ConnectionFactory(database=...):

python
KG_DB: coco.ContextKey[neo4j.ConnectionFactory] = coco.ContextKey("kg_db")
APIS_DB: coco.ContextKey[neo4j.ConnectionFactory] = coco.ContextKey("apis_db")

@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
    uri = "bolt://localhost:7687"
    auth = ("neo4j", "cocoindex")
    builder.provide(KG_DB, neo4j.ConnectionFactory(uri=uri, auth=auth, database="kg"))
    builder.provide(APIS_DB, neo4j.ConnectionFactory(uri=uri, auth=auth, database="apis"))
    yield

Different ContextKeys with different database names produce fully separate target-state trees — changes to one never spill into the other.

As target

The neo4j connector provides target state APIs for writing records to node tables and relation tables. CocoIndex tracks what records should exist and automatically handles upserts and deletions.

Each apply batch is wrapped in a single Neo4j transaction (tx.commit() on success, rollback on exception), so partial writes never leak into the database. Within a batch, writes are ordered as node upserts → relation upserts → relation deletes → node deletes so dependent edges always see their endpoints.

Declaring target states

Node tables (parent state)

Declares a node label as a target state. Returns a TableTarget for declaring records.

python
def declare_table_target(
    db: ContextKey,
    table_name: str,
    table_schema: TableSchema[RowT] | None = None,
    *,
    primary_key: str = "id",
    managed_by: Literal["system", "user"] = "system",
) -> TableTarget[RowT, coco.PendingS]

Parameters:

  • db — A ContextKey[neo4j.ConnectionFactory] for the Neo4j connection.
  • table_name — The Cypher node label (e.g. "Document").
  • table_schema — Optional schema definition (see Table Schema). The schema participates in CocoIndex’s fingerprint (so two flows declaring the same label must agree); per-property type DDL is not emitted in v1.
  • primary_key — Single property name used as the node’s primary key. Defaults to "id". Compound primary keys are not supported in v1.0.
  • managed_by — Whether CocoIndex manages the table lifecycle ("system") or assumes it exists ("user").

Returns: A pending TableTarget. Use await neo4j.mount_table_target(KG_DB, ...) to get a resolved target.

Records (child states)

Once a TableTarget is resolved, declare records to be upserted (translated to MERGE (n:Label {pk: $key_0}) SET n += $props):

python
def TableTarget.declare_record(
    self,
    *,
    row: RowT,
) -> None

Parameters:

  • row — A row object (dict, dataclass, NamedTuple, or Pydantic model). Must include the primary_key field declared above.

declare_row is an alias for declare_record, for compatibility with Postgres and other RDBMS targets.

Relation tables (parent state)

Declares a relationship type as a target state. Returns a RelationTarget for declaring edges.

python
def declare_relation_target(
    db: ContextKey,
    table_name: str,
    from_table: TableTarget,
    to_table: TableTarget,
    table_schema: TableSchema[RowT] | None = None,
    *,
    primary_key: str = "id",
    managed_by: Literal["system", "user"] = "system",
) -> RelationTarget[RowT, coco.PendingS]

Parameters:

  • db — A ContextKey[neo4j.ConnectionFactory] for the Neo4j connection.
  • table_name — The Cypher relationship type (e.g. "MENTION").
  • from_table — The TableTarget whose nodes are the source endpoints of edges in this relationship.
  • to_table — The TableTarget whose nodes are the target endpoints of edges in this relationship.
  • table_schema — Optional schema for the relationship’s own properties. The relationship’s primary_key field uniquely identifies each edge.
  • primary_key — Single property name used as the edge’s primary key. Defaults to "id".
  • managed_by — Whether CocoIndex manages the relationship lifecycle ("system") or assumes it exists ("user").

Returns: A pending RelationTarget. Use await neo4j.mount_relation_target(KG_DB, ...) to get a resolved target.

Relations (child states)

Once a RelationTarget is resolved, declare edges. Each declaration produces a triple-MERGE: source endpoint, target endpoint, then the relationship.

python
def RelationTarget.declare_relation(
    self,
    *,
    from_id: Any,
    to_id: Any,
    record: RowT | None = None,
) -> None

Parameters:

  • from_id — The source node’s primary-key value. The connector MERGEs (s:FromLabel {pk: $from_id}) so endpoints are auto-created if absent.
  • to_id — The target node’s primary-key value. Same MERGE behavior.
  • record — Optional row object whose fields populate the relationship’s properties. Must include the relationship’s primary_key field if provided.

If record is omitted, the connector derives a deterministic edge id of the form {from_label}_{from_id}_{to_label}_{to_id}. Convenient when an edge has no properties of its own.

Vector indexes (attachment)

Declares a vector index on a column of a node table. Vector indexes are an attachment to a TableTarget:

python
def TableTarget.declare_vector_index(
    self,
    *,
    name: str | None = None,
    field: str,
    metric: Literal["cosine", "euclidean"] = "cosine",
    dimension: int,
) -> None

Parameters:

  • name — Optional logical name for the index. Defaults to f"vec_{table_name}__{field}".
  • field — The node property holding the vector.
  • metric — Similarity metric: "cosine" or "euclidean". Translated to Neo4j’s vector.similarity_function option.
  • dimension — The vector’s dimension. Required.

The connector emits:

cypher
CREATE VECTOR INDEX `coco_vec_<Label>__<field>` IF NOT EXISTS
FOR (n:`Label`) ON n.`field`
OPTIONS { indexConfig: {
  `vector.dimensions`: <N>,
  `vector.similarity_function`: '<metric>'
} }

Vectors are float32 only.

Table schema: from Python class

Build a TableSchema by introspecting a record type:

python
@classmethod
async def TableSchema.from_class(
    cls,
    record_type: type[RowT],
    *,
    primary_key: str = "id",
    column_overrides: dict[str, Neo4jType | VectorSchemaProvider] | None = None,
) -> TableSchema[RowT]

Parameters:

  • record_type — A dataclass, NamedTuple, or Pydantic model.
  • primary_key — Field name to use as the table’s primary key. Defaults to "id".
  • column_overrides — Optional dict mapping field names to Neo4jType or VectorSchemaProvider to override the default Python-to-Neo4j type mapping.

Returns: A TableSchema[RowT] populated from the class’s fields.

Default Python → Neo4j type mapping

Most types pass through native Bolt encoding — no per-value transform applied:

Python typeNeo4j typeNotes
boolBOOLEAN
int, NumPy integer scalarsINTEGER
float, NumPy float scalarsFLOAT
decimal.DecimalSTRINGEncoded via str() — Neo4j has no decimal type.
strSTRING
bytesBYTESNative Bolt type — no encoder.
uuid.UUIDSTRINGEncoded via str().
datetime.dateDATENative Bolt type.
datetime.datetimeZONED_DATETIMENative Bolt type.
datetime.timeLOCAL_TIMENative Bolt type.
datetime.timedeltaDURATIONNative Bolt type.
numpy.ndarray (with VectorSchema annotation)LIST<FLOAT>Encoded via tolist(); paired with vector-index DDL.
dict, list, nested record, AnyMAP / LIST<ANY>Passed through native parameter binding.

Neo4jType

Override the default mapping for a single column with Neo4jType:

python
class Neo4jType(NamedTuple):
    neo4j_type: str
    encoder: ValueEncoder | None = None

Use with typing.Annotated:

python
from typing import Annotated
from dataclasses import dataclass
from cocoindex.connectors.neo4j import Neo4jType

@dataclass
class Row:
    id: str
    score: Annotated[float, Neo4jType("STRING", encoder=str)]

The neo4j_type string is metadata-only — it participates in the schema fingerprint (so two flows declaring the same table must agree) but no per-property type DDL is emitted from it.

VectorSchemaProvider

For NumPy ndarray columns, attach a VectorSchema annotation to specify dtype + dimension. See VectorSchema for details.

Table schema: explicit column definitions

Build a TableSchema directly from a dict of column definitions when the row type is dynamic:

python
from cocoindex.connectors.neo4j import TableSchema, ColumnDef

schema = TableSchema(
    columns={
        "filename": ColumnDef(type="STRING"),
        "title": ColumnDef(type="STRING"),
        "summary": ColumnDef(type="STRING", nullable=True),
    },
    primary_key="filename",
)

ColumnDef fields:

  • type — The Neo4j type string (metadata only; see table above).
  • nullable — Whether the column may be None. Defaults to True.
  • encoder — Optional Callable[[Any], Any] applied to non-None values before they’re sent to Neo4j.

DDL: indexes and constraints

For each managed table, the connector creates supporting Cypher artifacts on first run:

  • For node tables: a uniqueness constraint on the primary key —
    cypher
    CREATE CONSTRAINT `coco_uniq_<Label>__<pk>` IF NOT EXISTS
    FOR (n:`<Label>`) REQUIRE n.`<pk>` IS UNIQUE
    Neo4j auto-creates a backing index for each constraint, so a separate CREATE INDEX is redundant on nodes.
  • For relation tables:
    cypher
    CREATE INDEX `coco_idx_rel_<RelType>__<pk>` IF NOT EXISTS
    FOR ()-[r:`<RelType>`]-() ON (r.`<pk>`)

Indexes and constraints are dropped on cocoindex drop or when the table is no longer declared.

When managed_by="user" is set, the connector skips DDL entirely — you’re responsible for creating and dropping the schema. Record-level upserts and deletes still work.

Example: Node tables

python
from collections.abc import AsyncIterator
from dataclasses import dataclass
import cocoindex as coco
from cocoindex.connectors import neo4j

KG_DB: coco.ContextKey[neo4j.ConnectionFactory] = coco.ContextKey("kg_db")


@dataclass
class Document:
    filename: str
    title: str
    summary: str


@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
    builder.provide(KG_DB, neo4j.ConnectionFactory(
        uri="bolt://localhost:7687",
        auth=("neo4j", "cocoindex"),
        database="neo4j",
    ))
    yield


@coco.fn
async def app_main() -> None:
    schema = await neo4j.TableSchema.from_class(Document, primary_key="filename")
    documents = await neo4j.mount_table_target(
        KG_DB, "Document", schema, primary_key="filename",
    )
    documents.declare_record(
        row=Document(
            filename="overview.md",
            title="Overview",
            summary="An overview of CocoIndex...",
        )
    )


app = coco.App(coco.AppConfig(name="docs_to_neo4j"), app_main)

Example: Relation tables (knowledge graph)

python
@dataclass
class Entity:
    value: str


@dataclass
class RelationshipRow:
    id: str
    predicate: str


@coco.fn
async def kg_app_main() -> None:
    documents = await neo4j.mount_table_target(
        KG_DB, "Document",
        await neo4j.TableSchema.from_class(Document, primary_key="filename"),
        primary_key="filename",
    )
    entities = await neo4j.mount_table_target(
        KG_DB, "Entity",
        await neo4j.TableSchema.from_class(Entity, primary_key="value"),
        primary_key="value",
    )
    relationships = await neo4j.mount_relation_target(
        KG_DB, "RELATIONSHIP",
        entities, entities,
        await neo4j.TableSchema.from_class(RelationshipRow, primary_key="id"),
        primary_key="id",
    )

    # populate ...
    documents.declare_record(row=Document(filename="overview.md", title="Overview", summary="..."))
    entities.declare_record(row=Entity(value="CocoIndex"))
    entities.declare_record(row=Entity(value="Neo4j"))
    relationships.declare_relation(
        from_id="CocoIndex",
        to_id="Neo4j",
        record=RelationshipRow(id="rel-1", predicate="writes_to"),
    )


kg_app = coco.App(coco.AppConfig(name="kg_app"), kg_app_main)

The Entity table is declared up-front (via mount_table_target) so its uniqueness constraint is reconciled before any RELATIONSHIP edge MERGEs entity endpoints. The relationship’s three-MERGE pattern (source endpoint → target endpoint → edge) means missing endpoints are auto-created — but it’s good practice to declare them explicitly so deletion-cascade behavior stays predictable.

CocoIndex Docs Edit this page Report issue