# FalkorDB connector

> **CocoIndex v1.** This page documents CocoIndex **v1** — a ground-up redesign from v0. When writing code, ignore any v0 flow-builder DSL or deprecated decorators.
>
> Source: https://cocoindex.io/docs/connectors/falkordb/ · Docs index: https://cocoindex.io/docs/llms.txt · Agent skill: https://cocoindex.io/docs/skill.md

The `falkordb` connector writes records to FalkorDB, a Cypher-compatible graph database that runs as a Redis module. It supports node tables (labels), relationship tables (edge types), per-graph multitenancy (one Redis instance, many isolated graphs), and vector indexes.

```python
from cocoindex.connectors import falkordb
```

**Note — Dependencies**
This connector requires additional dependencies. Install with:

```bash
pip install cocoindex[falkordb]
```

## Connection setup

Create a `ConnectionFactory` and provide it via a `ContextKey`. The factory holds the FalkorDB URI plus the target graph name, and yields a graph handle on demand.

**Note**
The key name is load-bearing across runs — it's the stable identity CocoIndex uses to track managed rows. See [ContextKey as stable identity](../programming_guide/context#contextkey-as-stable-identity) before renaming.

```python
from collections.abc import AsyncIterator
from cocoindex.connectors import falkordb
import cocoindex as coco

KG_DB: coco.ContextKey[falkordb.ConnectionFactory] = coco.ContextKey("kg_db")

@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
    builder.provide(
        KG_DB,
        falkordb.ConnectionFactory(
            uri="falkor://localhost:6379",
            graph="knowledge_graph",
        ),
    )
    yield
```

### Multitenancy

A single Redis instance can host many fully isolated graphs. Pair each graph with its own `ContextKey` and `ConnectionFactory(graph=...)`:

```python
KG_DB: coco.ContextKey[falkordb.ConnectionFactory] = coco.ContextKey("kg_db")
APIS_DB: coco.ContextKey[falkordb.ConnectionFactory] = coco.ContextKey("apis_db")

@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
    uri = "falkor://localhost:6379"
    builder.provide(KG_DB, falkordb.ConnectionFactory(uri=uri, graph="knowledge_graph"))
    builder.provide(APIS_DB, falkordb.ConnectionFactory(uri=uri, graph="apis_graph"))
    yield
```

Different `ContextKey`s with different graph names produce fully separate target-state trees — changes to one never spill into the other.

## As target

The `falkordb` connector provides target state APIs for writing records to node tables and relation tables. CocoIndex tracks what records should exist and automatically handles upserts and deletions.

Each `graph.query` call against FalkorDB is its own atomic unit (FalkorDB does not expose multi-statement transactions); the connector orders writes within a batch as **node upserts → relation upserts → relation deletes → node deletes** so dependent edges always see their endpoints.

### Declaring target states

#### Node tables (parent state)

Declares a node label as a target state. Returns a `TableTarget` for declaring records.

```python
def declare_table_target(
    db: ContextKey,
    table_name: str,
    table_schema: TableSchema[RowT] | None = None,
    *,
    primary_key: str = "id",
    managed_by: Literal["system", "user"] = "system",
) -> TableTarget[RowT, coco.PendingS]
```

**Parameters:**

- `db` — A `ContextKey[falkordb.ConnectionFactory]` for the FalkorDB connection.
- `table_name` — The Cypher node label (e.g. `"Document"`).
- `table_schema` — Optional schema definition (see [Table Schema](#table-schema-from-python-class)). FalkorDB does not enforce per-property types server-side, so the schema participates in CocoIndex's fingerprint (so two flows declaring the same label must agree) but no per-column DDL is emitted.
- `primary_key` — Single property name used as the node's primary key. Defaults to `"id"`. Compound primary keys are not supported in v1.0.
- `managed_by` — Whether CocoIndex manages the table lifecycle (`"system"`) or assumes it exists (`"user"`).

**Returns:** A pending `TableTarget`. Use `await falkordb.mount_table_target(KG_DB, ...)` to get a resolved target.

#### Records (child states)

Once a `TableTarget` is resolved, declare records to be upserted (translated to `MERGE (n:Label {pk: $key_0}) SET n += $props`):

```python
def TableTarget.declare_record(
    self,
    *,
    row: RowT,
) -> None
```

**Parameters:**

- `row` — A row object (dict, dataclass, NamedTuple, or Pydantic model). Must include the `primary_key` field declared above.

`declare_row` is an alias for `declare_record`, for compatibility with Postgres and other RDBMS targets.

#### Relation tables (parent state)

Declares a relationship type as a target state. Returns a `RelationTarget` for declaring edges.

```python
def declare_relation_target(
    db: ContextKey,
    table_name: str,
    from_table: TableTarget,
    to_table: TableTarget,
    table_schema: TableSchema[RowT] | None = None,
    *,
    primary_key: str = "id",
    managed_by: Literal["system", "user"] = "system",
) -> RelationTarget[RowT, coco.PendingS]
```

**Parameters:**

- `db` — A `ContextKey[falkordb.ConnectionFactory]` for the FalkorDB connection.
- `table_name` — The Cypher relationship type (e.g. `"MENTION"`).
- `from_table` — The `TableTarget` whose nodes are the *source* endpoints of edges in this relationship.
- `to_table` — The `TableTarget` whose nodes are the *target* endpoints of edges in this relationship.
- `table_schema` — Optional schema for the relationship's own properties (see [Table Schema](#table-schema-from-python-class)). The relationship's `primary_key` field uniquely identifies each edge.
- `primary_key` — Single property name used as the edge's primary key. Defaults to `"id"`.
- `managed_by` — Whether CocoIndex manages the relationship lifecycle (`"system"`) or assumes it exists (`"user"`).

**Returns:** A pending `RelationTarget`. Use `await falkordb.mount_relation_target(KG_DB, ...)` to get a resolved target.

#### Relations (child states)

Once a `RelationTarget` is resolved, declare edges. Each declaration produces a triple-MERGE: source endpoint, target endpoint, then the relationship.

```python
def RelationTarget.declare_relation(
    self,
    *,
    from_id: Any,
    to_id: Any,
    record: RowT | None = None,
) -> None
```

**Parameters:**

- `from_id` — The source node's primary-key value. The connector MERGEs `(s:FromLabel {pk: $from_id})` so endpoints are auto-created if absent.
- `to_id` — The target node's primary-key value. Same MERGE behavior.
- `record` — Optional row object whose fields populate the relationship's properties. Must include the relationship's `primary_key` field if provided.

If `record` is omitted, the connector derives a deterministic edge id from `(from_label, from_id, to_label, to_id)`. This is convenient when an edge has no properties of its own.

#### Vector indexes (attachment)

Declares a vector index on a column of a node table. Vector indexes are an [attachment](../advanced_topics/custom_target_connector#implementing-attachment-providers) to a `TableTarget`:

```python
def TableTarget.declare_vector_index(
    self,
    *,
    name: str | None = None,
    field: str,
    metric: Literal["cosine", "euclidean", "ip"] = "cosine",
    dimension: int,
) -> None
```

**Parameters:**

- `name` — Optional logical name for the index. Defaults to `f"idx_{table_name}__{field}"`.
- `field` — The node property holding the vector.
- `metric` — Similarity metric: `"cosine"`, `"euclidean"`, or `"ip"` (inner product). Translated to FalkorDB's `similarityFunction` option.
- `dimension` — The vector's dimension. Required.

The connector emits `CREATE VECTOR INDEX FOR (e:Label) ON (e.field) OPTIONS {dimension: N, similarityFunction: '...'}`. Vectors are float32 only — wider vector dtypes are not supported.

### Table schema: from Python class

Build a `TableSchema` by introspecting a record type:

```python
@classmethod
async def TableSchema.from_class(
    cls,
    record_type: type[RowT],
    *,
    primary_key: str = "id",
    column_overrides: dict[str, FalkorType | VectorSchemaProvider] | None = None,
) -> TableSchema[RowT]
```

**Parameters:**

- `record_type` — A dataclass, NamedTuple, or Pydantic model.
- `primary_key` — Field name to use as the table's primary key. Defaults to `"id"`.
- `column_overrides` — Optional dict mapping field names to `FalkorType` or `VectorSchemaProvider` to override the default Python-to-FalkorDB type mapping.

**Returns:** A `TableSchema[RowT]` populated from the class's fields.

#### Default Python → FalkorDB type mapping

| Python type | FalkorDB type | Notes |
|---|---|---|
| `bool` | `boolean` | |
| `int`, NumPy integer scalars | `integer` | |
| `float`, NumPy float scalars | `float` | |
| `decimal.Decimal` | `string` | Encoded via `str()` — FalkorDB has no decimal type. |
| `str` | `string` | |
| `bytes` | `string` | Encoded as base64. |
| `uuid.UUID` | `string` | Encoded via `str()`. |
| `datetime.date` / `datetime.datetime` / `datetime.time` | `string` | Encoded via `.isoformat()`. |
| `datetime.timedelta` | `integer` | Encoded as milliseconds (`int(td.total_seconds() * 1000)`). |
| `numpy.ndarray` (with `VectorSchema` annotation) | `vector<float32, N>` | Encoded as `list[float]`. |
| `dict`, list, nested record, `Any` | `map` / `array` | Passed through native parameter binding. |

#### FalkorType

Override the default mapping for a single column with `FalkorType`:

```python
class FalkorType(NamedTuple):
    falkor_type: str
    encoder: ValueEncoder | None = None
```

Use with `typing.Annotated`:

```python
from typing import Annotated
from dataclasses import dataclass
from cocoindex.connectors.falkordb import FalkorType

@dataclass
class Row:
    id: str
    score: Annotated[float, FalkorType("decimal", encoder=str)]
```

The `falkor_type` string is metadata-only — it participates in the schema fingerprint (so two flows declaring the same table must agree) but no DDL is emitted from it.

#### VectorSchemaProvider

For NumPy `ndarray` columns, attach a `VectorSchema` annotation to specify dtype + dimension. See [VectorSchema](../common_resources/vector_schema) for details.

### Table schema: explicit column definitions

Build a `TableSchema` directly from a dict of column definitions when the row type is dynamic:

```python
from cocoindex.connectors.falkordb import TableSchema, ColumnDef

schema = TableSchema(
    columns={
        "filename": ColumnDef(type="string"),
        "title": ColumnDef(type="string"),
        "summary": ColumnDef(type="string", nullable=True),
    },
    primary_key="filename",
)
```

`ColumnDef` fields:

- `type` — The FalkorDB type string (metadata only; see table above).
- `nullable` — Whether the column may be `None`. Defaults to `True`.
- `encoder` — Optional `Callable[[Any], Any]` applied to non-`None` values before they're sent to FalkorDB.

### DDL: indexes and constraints

For each managed table, the connector creates the supporting Cypher index on the primary key field on first run:

- For node tables: `CREATE INDEX FOR (e:Label) ON (e.<pk>)`.
- For relation tables: `CREATE INDEX FOR ()-[e:RelType]-() ON (e.<pk>)`.

It then attempts a uniqueness constraint via the `GRAPH.CONSTRAINT CREATE` Redis command (best-effort — failures are logged but do not abort). Indexes and constraints are dropped on `cocoindex drop` or when the table is no longer declared.

When `managed_by="user"` is set, the connector skips DDL entirely — you're responsible for creating and dropping the schema. Record-level upserts and deletes still work.

### Example: Node tables

```python
from collections.abc import AsyncIterator
from dataclasses import dataclass
import cocoindex as coco
from cocoindex.connectors import falkordb

KG_DB: coco.ContextKey[falkordb.ConnectionFactory] = coco.ContextKey("kg_db")


@dataclass
class Document:
    filename: str
    title: str
    summary: str


@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
    builder.provide(KG_DB, falkordb.ConnectionFactory(
        uri="falkor://localhost:6379", graph="knowledge_graph",
    ))
    yield


@coco.fn
async def app_main() -> None:
    schema = await falkordb.TableSchema.from_class(Document, primary_key="filename")
    documents = await falkordb.mount_table_target(
        KG_DB, "Document", schema, primary_key="filename",
    )
    documents.declare_record(
        row=Document(
            filename="overview.md",
            title="Overview",
            summary="An overview of CocoIndex...",
        )
    )


app = coco.App(coco.AppConfig(name="docs_to_falkordb"), app_main)
```

### Example: Relation tables (knowledge graph)

```python
@dataclass
class Entity:
    value: str


@dataclass
class RelationshipRow:
    id: str
    predicate: str


@coco.fn
async def kg_app_main() -> None:
    documents = await falkordb.mount_table_target(
        KG_DB, "Document",
        await falkordb.TableSchema.from_class(Document, primary_key="filename"),
        primary_key="filename",
    )
    entities = await falkordb.mount_table_target(
        KG_DB, "Entity",
        await falkordb.TableSchema.from_class(Entity, primary_key="value"),
        primary_key="value",
    )
    relationships = await falkordb.mount_relation_target(
        KG_DB, "RELATIONSHIP",
        entities, entities,
        await falkordb.TableSchema.from_class(RelationshipRow, primary_key="id"),
        primary_key="id",
    )

    # populate ...
    documents.declare_record(row=Document(filename="overview.md", title="Overview", summary="..."))
    entities.declare_record(row=Entity(value="CocoIndex"))
    entities.declare_record(row=Entity(value="FalkorDB"))
    relationships.declare_relation(
        from_id="CocoIndex",
        to_id="FalkorDB",
        record=RelationshipRow(id="rel-1", predicate="writes_to"),
    )


kg_app = coco.App(coco.AppConfig(name="kg_app"), kg_app_main)
```

The `Entity` table is declared up-front (via `mount_table_target`) so its index and constraint are reconciled before any `RELATIONSHIP` edge MERGEs entity endpoints. The relationship's three-MERGE pattern (source endpoint → target endpoint → edge) means missing endpoints are auto-created — but it's good practice to declare them explicitly so deletion-cascade behavior stays predictable.
