FalkorDB connector
Write to FalkorDB — a Redis-backed graph database — with support for node tables, relationship tables (edges), per-graph multitenancy, and vector indexes with cosine / euclidean / inner-product distances.
The falkordb connector writes records to FalkorDB, a Cypher-compatible graph database that runs as a Redis module. It supports node tables (labels), relationship tables (edge types), per-graph multitenancy (one Redis instance, many isolated graphs), and vector indexes.
from cocoindex.connectors import falkordb
This connector requires additional dependencies. Install with:
pip install cocoindex[falkordb]Connection setup
Create a ConnectionFactory and provide it via a ContextKey. The factory holds the FalkorDB URI plus the target graph name, and yields a graph handle on demand.
The key name is load-bearing across runs — it’s the stable identity CocoIndex uses to track managed rows. See ContextKey as stable identity before renaming.
from collections.abc import AsyncIterator
from cocoindex.connectors import falkordb
import cocoindex as coco
KG_DB: coco.ContextKey[falkordb.ConnectionFactory] = coco.ContextKey("kg_db")
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
builder.provide(
KG_DB,
falkordb.ConnectionFactory(
uri="falkor://localhost:6379",
graph="knowledge_graph",
),
)
yield
Multitenancy
A single Redis instance can host many fully isolated graphs. Pair each graph with its own ContextKey and ConnectionFactory(graph=...):
KG_DB: coco.ContextKey[falkordb.ConnectionFactory] = coco.ContextKey("kg_db")
APIS_DB: coco.ContextKey[falkordb.ConnectionFactory] = coco.ContextKey("apis_db")
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
uri = "falkor://localhost:6379"
builder.provide(KG_DB, falkordb.ConnectionFactory(uri=uri, graph="knowledge_graph"))
builder.provide(APIS_DB, falkordb.ConnectionFactory(uri=uri, graph="apis_graph"))
yield
Different ContextKeys with different graph names produce fully separate target-state trees — changes to one never spill into the other.
As target
The falkordb connector provides target state APIs for writing records to node tables and relation tables. CocoIndex tracks what records should exist and automatically handles upserts and deletions.
Each graph.query call against FalkorDB is its own atomic unit (FalkorDB does not expose multi-statement transactions); the connector orders writes within a batch as node upserts → relation upserts → relation deletes → node deletes so dependent edges always see their endpoints.
Declaring target states
Node tables (parent state)
Declares a node label as a target state. Returns a TableTarget for declaring records.
def declare_table_target(
db: ContextKey,
table_name: str,
table_schema: TableSchema[RowT] | None = None,
*,
primary_key: str = "id",
managed_by: Literal["system", "user"] = "system",
) -> TableTarget[RowT, coco.PendingS]
Parameters:
db— AContextKey[falkordb.ConnectionFactory]for the FalkorDB connection.table_name— The Cypher node label (e.g."Document").table_schema— Optional schema definition (see Table Schema). FalkorDB does not enforce per-property types server-side, so the schema participates in CocoIndex’s fingerprint (so two flows declaring the same label must agree) but no per-column DDL is emitted.primary_key— Single property name used as the node’s primary key. Defaults to"id". Compound primary keys are not supported in v1.0.managed_by— Whether CocoIndex manages the table lifecycle ("system") or assumes it exists ("user").
Returns: A pending TableTarget. Use await falkordb.mount_table_target(KG_DB, ...) to get a resolved target.
Records (child states)
Once a TableTarget is resolved, declare records to be upserted (translated to MERGE (n:Label {pk: $key_0}) SET n += $props):
def TableTarget.declare_record(
self,
*,
row: RowT,
) -> None
Parameters:
row— A row object (dict, dataclass, NamedTuple, or Pydantic model). Must include theprimary_keyfield declared above.
declare_row is an alias for declare_record, for compatibility with Postgres and other RDBMS targets.
Relation tables (parent state)
Declares a relationship type as a target state. Returns a RelationTarget for declaring edges.
def declare_relation_target(
db: ContextKey,
table_name: str,
from_table: TableTarget,
to_table: TableTarget,
table_schema: TableSchema[RowT] | None = None,
*,
primary_key: str = "id",
managed_by: Literal["system", "user"] = "system",
) -> RelationTarget[RowT, coco.PendingS]
Parameters:
db— AContextKey[falkordb.ConnectionFactory]for the FalkorDB connection.table_name— The Cypher relationship type (e.g."MENTION").from_table— TheTableTargetwhose nodes are the source endpoints of edges in this relationship.to_table— TheTableTargetwhose nodes are the target endpoints of edges in this relationship.table_schema— Optional schema for the relationship’s own properties (see Table Schema). The relationship’sprimary_keyfield uniquely identifies each edge.primary_key— Single property name used as the edge’s primary key. Defaults to"id".managed_by— Whether CocoIndex manages the relationship lifecycle ("system") or assumes it exists ("user").
Returns: A pending RelationTarget. Use await falkordb.mount_relation_target(KG_DB, ...) to get a resolved target.
Relations (child states)
Once a RelationTarget is resolved, declare edges. Each declaration produces a triple-MERGE: source endpoint, target endpoint, then the relationship.
def RelationTarget.declare_relation(
self,
*,
from_id: Any,
to_id: Any,
record: RowT | None = None,
) -> None
Parameters:
from_id— The source node’s primary-key value. The connector MERGEs(s:FromLabel {pk: $from_id})so endpoints are auto-created if absent.to_id— The target node’s primary-key value. Same MERGE behavior.record— Optional row object whose fields populate the relationship’s properties. Must include the relationship’sprimary_keyfield if provided.
If record is omitted, the connector derives a deterministic edge id from (from_label, from_id, to_label, to_id). This is convenient when an edge has no properties of its own.
Vector indexes (attachment)
Declares a vector index on a column of a node table. Vector indexes are an attachment to a TableTarget:
def TableTarget.declare_vector_index(
self,
*,
name: str | None = None,
field: str,
metric: Literal["cosine", "euclidean", "ip"] = "cosine",
dimension: int,
) -> None
Parameters:
name— Optional logical name for the index. Defaults tof"idx_{table_name}__{field}".field— The node property holding the vector.metric— Similarity metric:"cosine","euclidean", or"ip"(inner product). Translated to FalkorDB’ssimilarityFunctionoption.dimension— The vector’s dimension. Required.
The connector emits CREATE VECTOR INDEX FOR (e:Label) ON (e.field) OPTIONS {dimension: N, similarityFunction: '...'}. Vectors are float32 only — wider vector dtypes are not supported.
Table schema: from Python class
Build a TableSchema by introspecting a record type:
@classmethod
async def TableSchema.from_class(
cls,
record_type: type[RowT],
*,
primary_key: str = "id",
column_overrides: dict[str, FalkorType | VectorSchemaProvider] | None = None,
) -> TableSchema[RowT]
Parameters:
record_type— A dataclass, NamedTuple, or Pydantic model.primary_key— Field name to use as the table’s primary key. Defaults to"id".column_overrides— Optional dict mapping field names toFalkorTypeorVectorSchemaProviderto override the default Python-to-FalkorDB type mapping.
Returns: A TableSchema[RowT] populated from the class’s fields.
Default Python → FalkorDB type mapping
| Python type | FalkorDB type | Notes |
|---|---|---|
bool | boolean | |
int, NumPy integer scalars | integer | |
float, NumPy float scalars | float | |
decimal.Decimal | string | Encoded via str() — FalkorDB has no decimal type. |
str | string | |
bytes | string | Encoded as base64. |
uuid.UUID | string | Encoded via str(). |
datetime.date / datetime.datetime / datetime.time | string | Encoded via .isoformat(). |
datetime.timedelta | integer | Encoded as milliseconds (int(td.total_seconds() * 1000)). |
numpy.ndarray (with VectorSchema annotation) | vector<float32, N> | Encoded as list[float]. |
dict, list, nested record, Any | map / array | Passed through native parameter binding. |
FalkorType
Override the default mapping for a single column with FalkorType:
class FalkorType(NamedTuple):
falkor_type: str
encoder: ValueEncoder | None = None
Use with typing.Annotated:
from typing import Annotated
from dataclasses import dataclass
from cocoindex.connectors.falkordb import FalkorType
@dataclass
class Row:
id: str
score: Annotated[float, FalkorType("decimal", encoder=str)]
The falkor_type string is metadata-only — it participates in the schema fingerprint (so two flows declaring the same table must agree) but no DDL is emitted from it.
VectorSchemaProvider
For NumPy ndarray columns, attach a VectorSchema annotation to specify dtype + dimension. See VectorSchema for details.
Table schema: explicit column definitions
Build a TableSchema directly from a dict of column definitions when the row type is dynamic:
from cocoindex.connectors.falkordb import TableSchema, ColumnDef
schema = TableSchema(
columns={
"filename": ColumnDef(type="string"),
"title": ColumnDef(type="string"),
"summary": ColumnDef(type="string", nullable=True),
},
primary_key="filename",
)
ColumnDef fields:
type— The FalkorDB type string (metadata only; see table above).nullable— Whether the column may beNone. Defaults toTrue.encoder— OptionalCallable[[Any], Any]applied to non-Nonevalues before they’re sent to FalkorDB.
DDL: indexes and constraints
For each managed table, the connector creates the supporting Cypher index on the primary key field on first run:
- For node tables:
CREATE INDEX FOR (e:Label) ON (e.<pk>). - For relation tables:
CREATE INDEX FOR ()-[e:RelType]-() ON (e.<pk>).
It then attempts a uniqueness constraint via the GRAPH.CONSTRAINT CREATE Redis command (best-effort — failures are logged but do not abort). Indexes and constraints are dropped on cocoindex drop or when the table is no longer declared.
When managed_by="user" is set, the connector skips DDL entirely — you’re responsible for creating and dropping the schema. Record-level upserts and deletes still work.
Example: Node tables
from collections.abc import AsyncIterator
from dataclasses import dataclass
import cocoindex as coco
from cocoindex.connectors import falkordb
KG_DB: coco.ContextKey[falkordb.ConnectionFactory] = coco.ContextKey("kg_db")
@dataclass
class Document:
filename: str
title: str
summary: str
@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
builder.provide(KG_DB, falkordb.ConnectionFactory(
uri="falkor://localhost:6379", graph="knowledge_graph",
))
yield
@coco.fn
async def app_main() -> None:
schema = await falkordb.TableSchema.from_class(Document, primary_key="filename")
documents = await falkordb.mount_table_target(
KG_DB, "Document", schema, primary_key="filename",
)
documents.declare_record(
row=Document(
filename="overview.md",
title="Overview",
summary="An overview of CocoIndex...",
)
)
app = coco.App(coco.AppConfig(name="docs_to_falkordb"), app_main)
Example: Relation tables (knowledge graph)
@dataclass
class Entity:
value: str
@dataclass
class RelationshipRow:
id: str
predicate: str
@coco.fn
async def kg_app_main() -> None:
documents = await falkordb.mount_table_target(
KG_DB, "Document",
await falkordb.TableSchema.from_class(Document, primary_key="filename"),
primary_key="filename",
)
entities = await falkordb.mount_table_target(
KG_DB, "Entity",
await falkordb.TableSchema.from_class(Entity, primary_key="value"),
primary_key="value",
)
relationships = await falkordb.mount_relation_target(
KG_DB, "RELATIONSHIP",
entities, entities,
await falkordb.TableSchema.from_class(RelationshipRow, primary_key="id"),
primary_key="id",
)
# populate ...
documents.declare_record(row=Document(filename="overview.md", title="Overview", summary="..."))
entities.declare_record(row=Entity(value="CocoIndex"))
entities.declare_record(row=Entity(value="FalkorDB"))
relationships.declare_relation(
from_id="CocoIndex",
to_id="FalkorDB",
record=RelationshipRow(id="rel-1", predicate="writes_to"),
)
kg_app = coco.App(coco.AppConfig(name="kg_app"), kg_app_main)
The Entity table is declared up-front (via mount_table_target) so its index and constraint are reconciled before any RELATIONSHIP edge MERGEs entity endpoints. The relationship’s three-MERGE pattern (source endpoint → target endpoint → edge) means missing endpoints are auto-created — but it’s good practice to declare them explicitly so deletion-cascade behavior stays predictable.