Apache Doris connector
Write rows to Apache Doris tables via stream load with automatic upserts and deletions, HNSW/IVF vector indexes, and inverted indexes for full-text search — with optional schema enforcement.
The doris connector provides utilities for writing rows to Apache Doris databases, with support for vector indexes (HNSW, IVF) and inverted indexes for full-text search.
from cocoindex.connectors import doris
This connector requires additional dependencies. Install with:
pip install cocoindex[doris]Connection setup
DorisConnectionConfig
Configure the connection to your Doris cluster:
from cocoindex.connectors import doris
config = doris.DorisConnectionConfig(
fe_host="localhost",
database="my_database",
fe_http_port=8080,
query_port=9030,
username="root",
password="",
)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
fe_host | str | (required) | Frontend host address |
database | str | (required) | Database name |
fe_http_port | int | 8080 | Frontend HTTP port (for stream load) |
query_port | int | 9030 | MySQL-compatible query port |
username | str | "root" | Username |
password | str | "" | Password |
enable_https | bool | False | Use HTTPS for stream load |
be_load_host | str | None | None | Override backend host for stream load (defaults to fe_host) |
batch_size | int | 10000 | Max rows per stream load batch |
stream_load_timeout | int | 600 | Timeout (seconds) for stream load |
replication_num | int | 1 | Replication factor for new tables |
buckets | int | str | "auto" | Bucket count for new tables |
connect
Create a managed connection:
def connect(config: DorisConnectionConfig) -> ManagedConnection
Example:
conn = doris.connect(doris.DorisConnectionConfig(
fe_host="localhost",
database="my_database",
))
As target
The doris connector provides target state APIs for writing rows to tables. CocoIndex tracks what rows should exist and automatically handles upserts and deletions via Doris stream load.
Declaring target states
Setting up a connection
Create a ContextKey[doris.ManagedConnection] to identify your connection, then provide it in your lifespan:
The key name is load-bearing across runs — it’s the stable identity CocoIndex uses to track managed rows. See ContextKey as stable identity before renaming.
import cocoindex as coco
from cocoindex.connectors import doris
DORIS_DB = coco.ContextKey[doris.ManagedConnection]("my_doris")
@coco.lifespan
def coco_lifespan(builder: coco.EnvironmentBuilder) -> Iterator[None]:
conn = doris.connect(doris.DorisConnectionConfig(
fe_host="localhost",
database="my_database",
))
builder.provide(DORIS_DB, conn)
yield
# conn is cleaned up after yield
Tables (parent state)
Declares a table as a target state. Returns a DorisTableTarget for declaring rows.
def declare_table_target(
db: ContextKey[ManagedConnection],
table_name: str,
table_schema: TableSchema[RowT],
*,
managed_by: Literal["system", "user"] = "system",
vector_indexes: list[VectorIndexDef] | None = None,
inverted_indexes: list[InvertedIndexDef] | None = None,
) -> DorisTableTarget[RowT, coco.PendingS]
Parameters:
db— AContextKey[doris.ManagedConnection]identifying the connection to use.table_name— Name of the table.table_schema— Schema definition including columns and primary key (see Table schema).managed_by— Whether CocoIndex manages the table lifecycle ("system") or assumes it exists ("user").vector_indexes— Optional list of vector index definitions (see Vector indexes).inverted_indexes— Optional list of inverted index definitions (see Inverted indexes).
Returns: A pending DorisTableTarget. Use the convenience wrapper await doris.mount_table_target(...) to resolve.
Rows (child states)
Once a DorisTableTarget is resolved, declare rows to be upserted:
def DorisTableTarget.declare_row(
self,
*,
row: RowT,
) -> None
Parameters:
row— A row object (dict, dataclass, NamedTuple, or Pydantic model). Must include all primary key columns.
Table schema: from Python class
Define the table structure using a Python class:
@classmethod
async def TableSchema.from_class(
cls,
record_type: type[RowT],
primary_key: list[str],
*,
column_overrides: dict[str, DorisType | VectorSchemaProvider] | None = None,
) -> TableSchema[RowT]
Parameters:
record_type— A record type whose fields define table columns.primary_key— List of column names forming the primary key.column_overrides— Optional per-column overrides for type mapping or vector configuration.
Example:
@dataclass
class DocEmbedding:
id: int
text: str
embedding: Annotated[NDArray, embedder]
schema = await doris.TableSchema.from_class(
DocEmbedding,
primary_key=["id"],
)
Python types are automatically mapped to Doris types:
| Python Type | Doris Type |
|---|---|
bool | BOOLEAN |
int | BIGINT |
float | DOUBLE |
str | STRING |
bytes | STRING (base64) |
uuid.UUID | VARCHAR(36) |
datetime.datetime | DATETIME |
datetime.date | DATE |
list, dict, nested structs | JSON |
NDArray (with vector schema) | ARRAY<FLOAT> |
DorisType
Use DorisType to specify a custom Doris type:
from typing import Annotated
from cocoindex.connectors.doris import DorisType
@dataclass
class MyRow:
id: Annotated[int, DorisType("INT")]
value: Annotated[float, DorisType("FLOAT")]
Vector indexes
Doris supports vector similarity search via HNSW and IVF indexes. Define them with VectorIndexDef:
from cocoindex.connectors.doris import VectorIndexDef
vector_idx = VectorIndexDef(
field_name="embedding",
index_type="HNSW", # or "IVF"
metric_type="l2_distance", # or "cosine_distance"
)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
field_name | str | (required) | Column to index |
index_type | str | "HNSW" | Index type: "HNSW" or "IVF" |
metric_type | str | "l2_distance" | Distance metric: "l2_distance" or "cosine_distance" |
max_degree | int | None | None | HNSW max degree |
ef_construction | int | None | None | HNSW construction parameter |
nlist | int | None | None | IVF number of partitions |
Inverted indexes
Doris supports inverted indexes for full-text search. Define them with InvertedIndexDef:
from cocoindex.connectors.doris import InvertedIndexDef
inverted_idx = InvertedIndexDef(
field_name="text",
parser="unicode", # or "english", "chinese", etc.
)
Parameters:
field_name— Column to index.parser— Optional tokenizer for full-text search (e.g.,"unicode","english","chinese"). IfNone, the index supports exact matching only.
Query helpers
build_vector_search_query
Build a vector similarity search SQL query:
def build_vector_search_query(
table: str,
vector_field: str,
query_vector: list[float],
metric: str = "l2_distance",
limit: int = 10,
select_columns: list[str] | None = None,
where_clause: str | None = None,
) -> str
Example:
sql = doris.build_vector_search_query(
table="doc_embeddings",
vector_field="embedding",
query_vector=query_vec.tolist(),
metric="cosine_distance",
limit=5,
)
connect_async
Create an async MySQL connection for running queries:
async def connect_async(
fe_host: str,
query_port: int = 9030,
username: str = "root",
password: str = "",
database: str | None = None,
) -> Any # aiomysql connection
Example
from typing import Annotated, Iterator
from dataclasses import dataclass
from numpy.typing import NDArray
import cocoindex as coco
from cocoindex.connectors import doris
from cocoindex.ops.sentence_transformers import SentenceTransformerEmbedder
DORIS_DB = coco.ContextKey[doris.ManagedConnection]("my_doris")
embedder = SentenceTransformerEmbedder("sentence-transformers/all-MiniLM-L6-v2")
@dataclass
class DocEmbedding:
id: int
text: str
embedding: Annotated[NDArray, embedder]
@coco.lifespan
def coco_lifespan(builder: coco.EnvironmentBuilder) -> Iterator[None]:
conn = doris.connect(doris.DorisConnectionConfig(
fe_host="localhost",
database="my_database",
))
builder.provide(DORIS_DB, conn)
yield
@coco.fn
async def app_main() -> None:
table = await doris.mount_table_target(
DORIS_DB,
"doc_embeddings",
await doris.TableSchema.from_class(
DocEmbedding,
primary_key=["id"],
),
vector_indexes=[
doris.VectorIndexDef(
field_name="embedding",
index_type="HNSW",
metric_type="cosine_distance",
),
],
)
# Declare rows
for doc in documents:
table.declare_row(row=doc)