FalkorDB connector

Write to FalkorDB — a Redis-backed graph database — with support for node tables, relationship tables (edges), per-graph multitenancy, and vector indexes with cosine / euclidean / inner-product distances.

Version
v 1.0.0-alpha48

The falkordb connector writes records to FalkorDB, a Cypher-compatible graph database that runs as a Redis module. It supports node tables (labels), relationship tables (edge types), per-graph multitenancy (one Redis instance, many isolated graphs), and vector indexes.

python
from cocoindex.connectors import falkordb
Dependencies

This connector requires additional dependencies. Install with:

bash
pip install cocoindex[falkordb]

Connection setup

Create a ConnectionFactory and provide it via a ContextKey. The factory holds the FalkorDB URI plus the target graph name, and yields a graph handle on demand.

Note

The key name is load-bearing across runs — it’s the stable identity CocoIndex uses to track managed rows. See ContextKey as stable identity before renaming.

python
from collections.abc import AsyncIterator
from cocoindex.connectors import falkordb
import cocoindex as coco

KG_DB: coco.ContextKey[falkordb.ConnectionFactory] = coco.ContextKey("kg_db")

@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
    builder.provide(
        KG_DB,
        falkordb.ConnectionFactory(
            uri="falkor://localhost:6379",
            graph="knowledge_graph",
        ),
    )
    yield

Multitenancy

A single Redis instance can host many fully isolated graphs. Pair each graph with its own ContextKey and ConnectionFactory(graph=...):

python
KG_DB: coco.ContextKey[falkordb.ConnectionFactory] = coco.ContextKey("kg_db")
APIS_DB: coco.ContextKey[falkordb.ConnectionFactory] = coco.ContextKey("apis_db")

@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
    uri = "falkor://localhost:6379"
    builder.provide(KG_DB, falkordb.ConnectionFactory(uri=uri, graph="knowledge_graph"))
    builder.provide(APIS_DB, falkordb.ConnectionFactory(uri=uri, graph="apis_graph"))
    yield

Different ContextKeys with different graph names produce fully separate target-state trees — changes to one never spill into the other.

As target

The falkordb connector provides target state APIs for writing records to node tables and relation tables. CocoIndex tracks what records should exist and automatically handles upserts and deletions.

Each graph.query call against FalkorDB is its own atomic unit (FalkorDB does not expose multi-statement transactions); the connector orders writes within a batch as node upserts → relation upserts → relation deletes → node deletes so dependent edges always see their endpoints.

Declaring target states

Node tables (parent state)

Declares a node label as a target state. Returns a TableTarget for declaring records.

python
def declare_table_target(
    db: ContextKey,
    table_name: str,
    table_schema: TableSchema[RowT] | None = None,
    *,
    primary_key: str = "id",
    managed_by: Literal["system", "user"] = "system",
) -> TableTarget[RowT, coco.PendingS]

Parameters:

  • db — A ContextKey[falkordb.ConnectionFactory] for the FalkorDB connection.
  • table_name — The Cypher node label (e.g. "Document").
  • table_schema — Optional schema definition (see Table Schema). FalkorDB does not enforce per-property types server-side, so the schema participates in CocoIndex’s fingerprint (so two flows declaring the same label must agree) but no per-column DDL is emitted.
  • primary_key — Single property name used as the node’s primary key. Defaults to "id". Compound primary keys are not supported in v1.0.
  • managed_by — Whether CocoIndex manages the table lifecycle ("system") or assumes it exists ("user").

Returns: A pending TableTarget. Use await falkordb.mount_table_target(KG_DB, ...) to get a resolved target.

Records (child states)

Once a TableTarget is resolved, declare records to be upserted (translated to MERGE (n:Label {pk: $key_0}) SET n += $props):

python
def TableTarget.declare_record(
    self,
    *,
    row: RowT,
) -> None

Parameters:

  • row — A row object (dict, dataclass, NamedTuple, or Pydantic model). Must include the primary_key field declared above.

declare_row is an alias for declare_record, for compatibility with Postgres and other RDBMS targets.

Relation tables (parent state)

Declares a relationship type as a target state. Returns a RelationTarget for declaring edges.

python
def declare_relation_target(
    db: ContextKey,
    table_name: str,
    from_table: TableTarget,
    to_table: TableTarget,
    table_schema: TableSchema[RowT] | None = None,
    *,
    primary_key: str = "id",
    managed_by: Literal["system", "user"] = "system",
) -> RelationTarget[RowT, coco.PendingS]

Parameters:

  • db — A ContextKey[falkordb.ConnectionFactory] for the FalkorDB connection.
  • table_name — The Cypher relationship type (e.g. "MENTION").
  • from_table — The TableTarget whose nodes are the source endpoints of edges in this relationship.
  • to_table — The TableTarget whose nodes are the target endpoints of edges in this relationship.
  • table_schema — Optional schema for the relationship’s own properties (see Table Schema). The relationship’s primary_key field uniquely identifies each edge.
  • primary_key — Single property name used as the edge’s primary key. Defaults to "id".
  • managed_by — Whether CocoIndex manages the relationship lifecycle ("system") or assumes it exists ("user").

Returns: A pending RelationTarget. Use await falkordb.mount_relation_target(KG_DB, ...) to get a resolved target.

Relations (child states)

Once a RelationTarget is resolved, declare edges. Each declaration produces a triple-MERGE: source endpoint, target endpoint, then the relationship.

python
def RelationTarget.declare_relation(
    self,
    *,
    from_id: Any,
    to_id: Any,
    record: RowT | None = None,
) -> None

Parameters:

  • from_id — The source node’s primary-key value. The connector MERGEs (s:FromLabel {pk: $from_id}) so endpoints are auto-created if absent.
  • to_id — The target node’s primary-key value. Same MERGE behavior.
  • record — Optional row object whose fields populate the relationship’s properties. Must include the relationship’s primary_key field if provided.

If record is omitted, the connector derives a deterministic edge id from (from_label, from_id, to_label, to_id). This is convenient when an edge has no properties of its own.

Vector indexes (attachment)

Declares a vector index on a column of a node table. Vector indexes are an attachment to a TableTarget:

python
def TableTarget.declare_vector_index(
    self,
    *,
    name: str | None = None,
    field: str,
    metric: Literal["cosine", "euclidean", "ip"] = "cosine",
    dimension: int,
) -> None

Parameters:

  • name — Optional logical name for the index. Defaults to f"idx_{table_name}__{field}".
  • field — The node property holding the vector.
  • metric — Similarity metric: "cosine", "euclidean", or "ip" (inner product). Translated to FalkorDB’s similarityFunction option.
  • dimension — The vector’s dimension. Required.

The connector emits CREATE VECTOR INDEX FOR (e:Label) ON (e.field) OPTIONS {dimension: N, similarityFunction: '...'}. Vectors are float32 only — wider vector dtypes are not supported.

Table schema: from Python class

Build a TableSchema by introspecting a record type:

python
@classmethod
async def TableSchema.from_class(
    cls,
    record_type: type[RowT],
    *,
    primary_key: str = "id",
    column_overrides: dict[str, FalkorType | VectorSchemaProvider] | None = None,
) -> TableSchema[RowT]

Parameters:

  • record_type — A dataclass, NamedTuple, or Pydantic model.
  • primary_key — Field name to use as the table’s primary key. Defaults to "id".
  • column_overrides — Optional dict mapping field names to FalkorType or VectorSchemaProvider to override the default Python-to-FalkorDB type mapping.

Returns: A TableSchema[RowT] populated from the class’s fields.

Default Python → FalkorDB type mapping

Python typeFalkorDB typeNotes
boolboolean
int, NumPy integer scalarsinteger
float, NumPy float scalarsfloat
decimal.DecimalstringEncoded via str() — FalkorDB has no decimal type.
strstring
bytesstringEncoded as base64.
uuid.UUIDstringEncoded via str().
datetime.date / datetime.datetime / datetime.timestringEncoded via .isoformat().
datetime.timedeltaintegerEncoded as milliseconds (int(td.total_seconds() * 1000)).
numpy.ndarray (with VectorSchema annotation)vector<float32, N>Encoded as list[float].
dict, list, nested record, Anymap / arrayPassed through native parameter binding.

FalkorType

Override the default mapping for a single column with FalkorType:

python
class FalkorType(NamedTuple):
    falkor_type: str
    encoder: ValueEncoder | None = None

Use with typing.Annotated:

python
from typing import Annotated
from dataclasses import dataclass
from cocoindex.connectors.falkordb import FalkorType

@dataclass
class Row:
    id: str
    score: Annotated[float, FalkorType("decimal", encoder=str)]

The falkor_type string is metadata-only — it participates in the schema fingerprint (so two flows declaring the same table must agree) but no DDL is emitted from it.

VectorSchemaProvider

For NumPy ndarray columns, attach a VectorSchema annotation to specify dtype + dimension. See VectorSchema for details.

Table schema: explicit column definitions

Build a TableSchema directly from a dict of column definitions when the row type is dynamic:

python
from cocoindex.connectors.falkordb import TableSchema, ColumnDef

schema = TableSchema(
    columns={
        "filename": ColumnDef(type="string"),
        "title": ColumnDef(type="string"),
        "summary": ColumnDef(type="string", nullable=True),
    },
    primary_key="filename",
)

ColumnDef fields:

  • type — The FalkorDB type string (metadata only; see table above).
  • nullable — Whether the column may be None. Defaults to True.
  • encoder — Optional Callable[[Any], Any] applied to non-None values before they’re sent to FalkorDB.

DDL: indexes and constraints

For each managed table, the connector creates the supporting Cypher index on the primary key field on first run:

  • For node tables: CREATE INDEX FOR (e:Label) ON (e.<pk>).
  • For relation tables: CREATE INDEX FOR ()-[e:RelType]-() ON (e.<pk>).

It then attempts a uniqueness constraint via the GRAPH.CONSTRAINT CREATE Redis command (best-effort — failures are logged but do not abort). Indexes and constraints are dropped on cocoindex drop or when the table is no longer declared.

When managed_by="user" is set, the connector skips DDL entirely — you’re responsible for creating and dropping the schema. Record-level upserts and deletes still work.

Example: Node tables

python
from collections.abc import AsyncIterator
from dataclasses import dataclass
import cocoindex as coco
from cocoindex.connectors import falkordb

KG_DB: coco.ContextKey[falkordb.ConnectionFactory] = coco.ContextKey("kg_db")


@dataclass
class Document:
    filename: str
    title: str
    summary: str


@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
    builder.provide(KG_DB, falkordb.ConnectionFactory(
        uri="falkor://localhost:6379", graph="knowledge_graph",
    ))
    yield


@coco.fn
async def app_main() -> None:
    schema = await falkordb.TableSchema.from_class(Document, primary_key="filename")
    documents = await falkordb.mount_table_target(
        KG_DB, "Document", schema, primary_key="filename",
    )
    documents.declare_record(
        row=Document(
            filename="overview.md",
            title="Overview",
            summary="An overview of CocoIndex...",
        )
    )


app = coco.App(coco.AppConfig(name="docs_to_falkordb"), app_main)

Example: Relation tables (knowledge graph)

python
@dataclass
class Entity:
    value: str


@dataclass
class RelationshipRow:
    id: str
    predicate: str


@coco.fn
async def kg_app_main() -> None:
    documents = await falkordb.mount_table_target(
        KG_DB, "Document",
        await falkordb.TableSchema.from_class(Document, primary_key="filename"),
        primary_key="filename",
    )
    entities = await falkordb.mount_table_target(
        KG_DB, "Entity",
        await falkordb.TableSchema.from_class(Entity, primary_key="value"),
        primary_key="value",
    )
    relationships = await falkordb.mount_relation_target(
        KG_DB, "RELATIONSHIP",
        entities, entities,
        await falkordb.TableSchema.from_class(RelationshipRow, primary_key="id"),
        primary_key="id",
    )

    # populate ...
    documents.declare_record(row=Document(filename="overview.md", title="Overview", summary="..."))
    entities.declare_record(row=Entity(value="CocoIndex"))
    entities.declare_record(row=Entity(value="FalkorDB"))
    relationships.declare_relation(
        from_id="CocoIndex",
        to_id="FalkorDB",
        record=RelationshipRow(id="rel-1", predicate="writes_to"),
    )


kg_app = coco.App(coco.AppConfig(name="kg_app"), kg_app_main)

The Entity table is declared up-front (via mount_table_target) so its index and constraint are reconciled before any RELATIONSHIP edge MERGEs entity endpoints. The relationship’s three-MERGE pattern (source endpoint → target endpoint → edge) means missing endpoints are auto-created — but it’s good practice to declare them explicitly so deletion-cascade behavior stays predictable.

CocoIndex Docs Edit this page Report issue