# Valkey connector

> **CocoIndex v1.** This page documents CocoIndex **v1** — a ground-up redesign from v0. When writing code, ignore any v0 flow-builder DSL or deprecated decorators.
>
> Source: https://cocoindex.io/docs/connectors/valkey/ · Docs index: https://cocoindex.io/docs/llms.txt · Agent skill: https://cocoindex.io/docs/skill.md
>
> v0→v1 quick map — if you reach for these v0 symbols, stop and use the v1 form: `@cocoindex.flow_def`/`FlowBuilder` → `coco.App` + a `@coco.fn` main function; `add_collector()`/`collect()`/`export()` → declare target states (`declare_row`, `declare_file`); `cocoindex.sources/functions/targets.*` → connector APIs (`localfs.walk_dir`, `coco.ops.*`, `postgres.declare_table_target`). Full mapping + API reference: https://cocoindex.io/docs/skill.md.

The `valkey` connector provides utilities for writing documents to Valkey with vector search support, using the valkey-search module for similarity search indexes.

```python
from cocoindex.connectors import valkey
```

**Note — Dependencies**
This connector requires additional dependencies. Install with:

```bash
pip install cocoindex[valkey]
```

Requires a Valkey server with the [valkey-search](https://github.com/valkey-io/valkey-search) module loaded.

## Connection setup

`create_client_config()` creates a configuration for connecting to Valkey.

```python
def create_client_config(
    host: str = "localhost",
    port: int = 6379,
    *,
    password: str | None = None,
    use_tls: bool = False,
    client_name: str | None = "cocoindex_vector_store",
    **kwargs: Any,
) -> GlideClientConfiguration
```

**Parameters:**

- `host` — Valkey server host (default: `"localhost"`).
- `port` — Valkey server port (default: `6379`).
- `password` — Optional authentication password.
- `use_tls` — Whether to use TLS for the connection (default: `False`).
- `client_name` — Client name for the connection, visible in `CLIENT LIST` and monitoring dashboards (default: `"cocoindex_vector_store"`). Pass `None` to disable.
- `**kwargs` — Additional keyword arguments passed to `GlideClientConfiguration`.

For advanced configurations not covered by these parameters, construct `GlideClientConfiguration` directly.

**Returns:** A `GlideClientConfiguration` instance.

**Example:**

```python
from glide import GlideClient
config = valkey.create_client_config("localhost", 6379)
client = await GlideClient.create(config)
```

## As target

The `valkey` connector provides target state APIs for writing documents to search indexes. CocoIndex tracks what documents should exist and automatically handles upserts and deletions.

### Declaring target states

#### Setting up a connection

Create a `ContextKey[GlideClient]` to identify your Valkey client, then provide it in your lifespan:

**Note**
The key name is load-bearing across runs — it's the stable identity CocoIndex uses to track managed indexes. See [ContextKey as stable identity](../programming_guide/context#contextkey-as-stable-identity) before renaming.

```python
from glide import GlideClient
import cocoindex as coco
from cocoindex.connectors import valkey

VALKEY_DB = coco.ContextKey[GlideClient]("my_valkey")

@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
    config = valkey.create_client_config("localhost", 6379)
    client = await GlideClient.create(config)
    builder.provide(VALKEY_DB, client)
    yield
    await client.close()
```

#### Indexes (parent state)

Declares a search index as a target state. Returns an `IndexTarget` for declaring documents.

```python
from cocoindex.connectorkits.target import ManagedBy

def declare_index_target(
    db: ContextKey[GlideClient],
    index_name: str,
    schema: IndexSchema,
    *,
    managed_by: ManagedBy = ManagedBy.SYSTEM,
) -> IndexTarget[coco.PendingS]
```

**Parameters:**

- `db` — A `ContextKey[GlideClient]` identifying the Valkey client to use.
- `index_name` — Name of the search index.
- `schema` — Schema definition specifying vector configuration (see [Index Schema](#index-schema)).
- `managed_by` — `ManagedBy.SYSTEM` (default): CocoIndex creates and drops the index automatically. `ManagedBy.USER`: assumes the index already exists and never drops it.

**Returns:** A pending `IndexTarget`. Use the convenience wrapper `await valkey.mount_index_target(VALKEY_DB, index_name, schema)` to resolve.

#### Documents (child states)

Once an `IndexTarget` is resolved, declare documents to be upserted:

```python
def IndexTarget.declare_document(
    self,
    document: valkey.Document,
) -> None
```

**Parameters:**

- `document` — A `valkey.Document` containing:
  - `id` — Document ID (string)
  - `vector` — Vector data (list of floats or numpy array)
  - `payload` — Optional metadata dict (values must be str, int, or float)

### Index schema

Define vector configuration for an index using `IndexSchema`.

```python
class IndexSchema:
    @classmethod
    async def create(
        cls,
        vectors: VectorDef,
        fields: list[FieldDef] | None = None,
    ) -> IndexSchema
```

**Parameters:**

- `vectors` — A `VectorDef` specifying the vector field configuration.
- `fields` — Optional list of `FieldDef` for indexed payload fields (enables search/filtering).

#### VectorDef

Specifies vector configuration including dimension, distance metric, and algorithm:

```python
class VectorDef(NamedTuple):
    schema: VectorSchemaProvider | ContextKey[VectorSchemaProvider]
    distance: Literal["cosine", "l2", "ip"] = "cosine"
    algorithm: Literal["hnsw", "flat"] = "hnsw"
```

**Parameters:**

- `schema` — A `VectorSchemaProvider` or `ContextKey` that defines vector dimensions.
- `distance` — Distance metric for similarity search (default: `"cosine"`).
- `algorithm` — Vector index algorithm (default: `"hnsw"`).

#### VectorSchemaProvider

The `schema` field of `VectorDef` accepts a [`VectorSchemaProvider`](../common_resources/vector_schema#vectorschemaprovider), a `ContextKey`, or an explicit `VectorSchema` to specify the vector dimension and dtype. See [Vector Schema](../common_resources/vector_schema#vectorschemaprovider) for details.

### Distance metrics

The `distance` parameter in `VectorDef` specifies the similarity metric:

- `"cosine"` — Cosine similarity (default)
- `"l2"` — Euclidean distance (L2)
- `"ip"` — Inner product (dot product)

### Index algorithms

The `algorithm` parameter specifies the vector indexing strategy:

- `"hnsw"` — Hierarchical Navigable Small World graph (default, best for most use cases)
- `"flat"` — Brute-force flat index (exact results, slower for large datasets)

### Data storage

Documents are stored as Valkey HASH keys with a prefix pattern:

- Key format: `{index_name}:{document_id}`
- Vector field: stored as binary float32 blob in a `vector` hash field
- Payload fields: stored as individual hash fields (string values)

The search index is created with `FT.CREATE ON HASH PREFIX 1 {index_name}:` to automatically index all documents with the matching prefix.

### Example

```python
from glide import GlideClient
import cocoindex as coco
from cocoindex.connectors import valkey
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
from typing import AsyncIterator

VALKEY_DB = coco.ContextKey[GlideClient]("main_vectors")
EMBEDDER = coco.ContextKey[SentenceTransformerEmbedder]("embedder")

@coco.lifespan
async def coco_lifespan(builder: coco.EnvironmentBuilder) -> AsyncIterator[None]:
    config = valkey.create_client_config("localhost", 6379)
    client = await GlideClient.create(config)
    builder.provide(VALKEY_DB, client)
    builder.provide(EMBEDDER, SentenceTransformerEmbedder("all-MiniLM-L6-v2"))
    yield
    await client.close()

@coco.fn
async def process_document(
    doc_id: str,
    text: str,
    target: valkey.IndexTarget,
) -> None:
    embedder = coco.use_context(EMBEDDER)
    embedding = await embedder.embed(text)

    target.declare_document(valkey.Document(
        id=doc_id,
        vector=embedding.tolist(),
        payload={"text": text},
    ))

@coco.fn
async def app_main() -> None:
    embedder = coco.use_context(EMBEDDER)

    index = await valkey.mount_index_target(
        VALKEY_DB,
        "documents",
        await valkey.IndexSchema.create(
            vectors=valkey.VectorDef(schema=embedder, distance="cosine"),
        ),
    )

    for doc_id, text in documents:
        await coco.mount(
            coco.component_subpath("doc", doc_id),
            process_document,
            doc_id,
            text,
            index,
        )
```

## Vector search

The connector focuses on writing documents to Valkey. For vector search queries, use the Valkey GLIDE client directly:

```python
from glide import GlideClient
from glide.async_commands import ft
from glide.async_commands.ft import FtSearchOptions
import struct

# Build a KNN query
query_vec = struct.pack(f"<{len(embedding)}f", *embedding)
query = f"*=>[KNN 10 @vector $query_vec AS score]"

results = await ft.search(
    client,
    "documents",
    query,
    options=FtSearchOptions(params={"query_vec": query_vec}),
)
```
