Apache Doris connector

Write rows to Apache Doris tables via stream load with automatic upserts and deletions, HNSW/IVF vector indexes, and inverted indexes for full-text search — with optional schema enforcement.

Version
v 1.0.0-alpha48
Last reviewed
Apr 19, 2026

The doris connector provides utilities for writing rows to Apache Doris databases, with support for vector indexes (HNSW, IVF) and inverted indexes for full-text search.

python
from cocoindex.connectors import doris
Dependencies

This connector requires additional dependencies. Install with:

bash
pip install cocoindex[doris]

Connection setup

DorisConnectionConfig

Configure the connection to your Doris cluster:

python
from cocoindex.connectors import doris

config = doris.DorisConnectionConfig(
    fe_host="localhost",
    database="my_database",
    fe_http_port=8080,
    query_port=9030,
    username="root",
    password="",
)

Parameters:

ParameterTypeDefaultDescription
fe_hoststr(required)Frontend host address
databasestr(required)Database name
fe_http_portint8080Frontend HTTP port (for stream load)
query_portint9030MySQL-compatible query port
usernamestr"root"Username
passwordstr""Password
enable_httpsboolFalseUse HTTPS for stream load
be_load_hoststr | NoneNoneOverride backend host for stream load (defaults to fe_host)
batch_sizeint10000Max rows per stream load batch
stream_load_timeoutint600Timeout (seconds) for stream load
replication_numint1Replication factor for new tables
bucketsint | str"auto"Bucket count for new tables

connect

Create a managed connection:

python
def connect(config: DorisConnectionConfig) -> ManagedConnection

Example:

python
conn = doris.connect(doris.DorisConnectionConfig(
    fe_host="localhost",
    database="my_database",
))

As target

The doris connector provides target state APIs for writing rows to tables. CocoIndex tracks what rows should exist and automatically handles upserts and deletions via Doris stream load.

Declaring target states

Setting up a connection

Create a ContextKey[doris.ManagedConnection] to identify your connection, then provide it in your lifespan:

Note

The key name is load-bearing across runs — it’s the stable identity CocoIndex uses to track managed rows. See ContextKey as stable identity before renaming.

python
import cocoindex as coco
from cocoindex.connectors import doris

DORIS_DB = coco.ContextKey[doris.ManagedConnection]("my_doris")

@coco.lifespan
def coco_lifespan(builder: coco.EnvironmentBuilder) -> Iterator[None]:
    conn = doris.connect(doris.DorisConnectionConfig(
        fe_host="localhost",
        database="my_database",
    ))
    builder.provide(DORIS_DB, conn)
    yield
    # conn is cleaned up after yield

Tables (parent state)

Declares a table as a target state. Returns a DorisTableTarget for declaring rows.

python
def declare_table_target(
    db: ContextKey[ManagedConnection],
    table_name: str,
    table_schema: TableSchema[RowT],
    *,
    managed_by: Literal["system", "user"] = "system",
    vector_indexes: list[VectorIndexDef] | None = None,
    inverted_indexes: list[InvertedIndexDef] | None = None,
) -> DorisTableTarget[RowT, coco.PendingS]

Parameters:

  • db — A ContextKey[doris.ManagedConnection] identifying the connection to use.
  • table_name — Name of the table.
  • table_schema — Schema definition including columns and primary key (see Table schema).
  • managed_by — Whether CocoIndex manages the table lifecycle ("system") or assumes it exists ("user").
  • vector_indexes — Optional list of vector index definitions (see Vector indexes).
  • inverted_indexes — Optional list of inverted index definitions (see Inverted indexes).

Returns: A pending DorisTableTarget. Use the convenience wrapper await doris.mount_table_target(...) to resolve.

Rows (child states)

Once a DorisTableTarget is resolved, declare rows to be upserted:

python
def DorisTableTarget.declare_row(
    self,
    *,
    row: RowT,
) -> None

Parameters:

  • row — A row object (dict, dataclass, NamedTuple, or Pydantic model). Must include all primary key columns.

Table schema: from Python class

Define the table structure using a Python class:

python
@classmethod
async def TableSchema.from_class(
    cls,
    record_type: type[RowT],
    primary_key: list[str],
    *,
    column_overrides: dict[str, DorisType | VectorSchemaProvider] | None = None,
) -> TableSchema[RowT]

Parameters:

  • record_type — A record type whose fields define table columns.
  • primary_key — List of column names forming the primary key.
  • column_overrides — Optional per-column overrides for type mapping or vector configuration.

Example:

python
@dataclass
class DocEmbedding:
    id: int
    text: str
    embedding: Annotated[NDArray, embedder]

schema = await doris.TableSchema.from_class(
    DocEmbedding,
    primary_key=["id"],
)

Python types are automatically mapped to Doris types:

Python TypeDoris Type
boolBOOLEAN
intBIGINT
floatDOUBLE
strSTRING
bytesSTRING (base64)
uuid.UUIDVARCHAR(36)
datetime.datetimeDATETIME
datetime.dateDATE
list, dict, nested structsJSON
NDArray (with vector schema)ARRAY<FLOAT>

DorisType

Use DorisType to specify a custom Doris type:

python
from typing import Annotated
from cocoindex.connectors.doris import DorisType

@dataclass
class MyRow:
    id: Annotated[int, DorisType("INT")]
    value: Annotated[float, DorisType("FLOAT")]

Vector indexes

Doris supports vector similarity search via HNSW and IVF indexes. Define them with VectorIndexDef:

python
from cocoindex.connectors.doris import VectorIndexDef

vector_idx = VectorIndexDef(
    field_name="embedding",
    index_type="HNSW",       # or "IVF"
    metric_type="l2_distance",  # or "cosine_distance"
)

Parameters:

ParameterTypeDefaultDescription
field_namestr(required)Column to index
index_typestr"HNSW"Index type: "HNSW" or "IVF"
metric_typestr"l2_distance"Distance metric: "l2_distance" or "cosine_distance"
max_degreeint | NoneNoneHNSW max degree
ef_constructionint | NoneNoneHNSW construction parameter
nlistint | NoneNoneIVF number of partitions

Inverted indexes

Doris supports inverted indexes for full-text search. Define them with InvertedIndexDef:

python
from cocoindex.connectors.doris import InvertedIndexDef

inverted_idx = InvertedIndexDef(
    field_name="text",
    parser="unicode",  # or "english", "chinese", etc.
)

Parameters:

  • field_name — Column to index.
  • parser — Optional tokenizer for full-text search (e.g., "unicode", "english", "chinese"). If None, the index supports exact matching only.

Query helpers

build_vector_search_query

Build a vector similarity search SQL query:

python
def build_vector_search_query(
    table: str,
    vector_field: str,
    query_vector: list[float],
    metric: str = "l2_distance",
    limit: int = 10,
    select_columns: list[str] | None = None,
    where_clause: str | None = None,
) -> str

Example:

python
sql = doris.build_vector_search_query(
    table="doc_embeddings",
    vector_field="embedding",
    query_vector=query_vec.tolist(),
    metric="cosine_distance",
    limit=5,
)

connect_async

Create an async MySQL connection for running queries:

python
async def connect_async(
    fe_host: str,
    query_port: int = 9030,
    username: str = "root",
    password: str = "",
    database: str | None = None,
) -> Any  # aiomysql connection

Example

python
from typing import Annotated, Iterator
from dataclasses import dataclass

from numpy.typing import NDArray

import cocoindex as coco
from cocoindex.connectors import doris
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder

DORIS_DB = coco.ContextKey[doris.ManagedConnection]("my_doris")

embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2")

@dataclass
class DocEmbedding:
    id: int
    text: str
    embedding: Annotated[NDArray, embedder]

@coco.lifespan
def coco_lifespan(builder: coco.EnvironmentBuilder) -> Iterator[None]:
    conn = doris.connect(doris.DorisConnectionConfig(
        fe_host="localhost",
        database="my_database",
    ))
    builder.provide(DORIS_DB, conn)
    yield

@coco.fn
async def app_main() -> None:
    table = await doris.mount_table_target(
        DORIS_DB,
        "doc_embeddings",
        await doris.TableSchema.from_class(
            DocEmbedding,
            primary_key=["id"],
        ),
        vector_indexes=[
            doris.VectorIndexDef(
                field_name="embedding",
                index_type="HNSW",
                metric_type="cosine_distance",
            ),
        ],
    )

    # Declare rows
    for doc in documents:
        table.declare_row(row=doc)
CocoIndex Docs Edit this page Report issue