Skip to main content

Control Processing Concurrency in CocoIndex

· 6 min read

cover

CocoIndex is designed to be production-ready from day one—built to process data in parallel, maximizing throughput while keeping your systems safe. Today, we’ll look at how to optimize performance without overloading your environment. With CocoIndex, it’s just one configuration away.

CocoIndex’s parallelism model boosts speed by processing multiple data items at once, but more parallelism isn’t always better. Left unconstrained, excessive concurrency can strain—or even destabilize—your systems. That’s why CocoIndex includes built-in concurrency control mechanisms that strike the right balance between raw performance and system stability, even at massive scale.

Processing too many items simultaneously can cause:

  • Memory exhaustion – large datasets loaded at once consume massive amounts of RAM.
  • Resource contention – CPU, disk I/O, and network bandwidth get overwhelmed by competing operations.
  • System instability – timeouts, degraded performance, or outright crashes.

Unlike generic concurrency features, CocoIndex lets you:

  • Constrain both data volume (rows) and memory usage (bytes).
  • Set limits at multiple layers: global, per source, and per-row iteration.
  • Combine controls: all specified constraints must be satisfied before processing proceeds.

This layered approach ensures that resource-heavy sources don’t overwhelm the system, and nested tasks (such as splitting documents into chunks) remain predictable and safe.

flow-control

You can review the full documentation here. CocoIndex is powering users process at the scale of millions in production, star us if you like it!

Concurrency Options

CocoIndex provides two primary settings:

OptionPurposeUnit
max_inflight_rowsMaximum number of rows processed concurrently.rows
max_inflight_bytesMaximum memory footprint of concurrently processed data (before transformations).bytes

When a limit is reached, CocoIndex pauses new processing until some existing work completes. This keeps throughput high without pushing your system past its limits.

note

For simplicity, max_inflight_bytes only measures the size of data already in memory before any transformations—it does not include temporary memory used during processing steps.

Where to Apply Concurrency Controls

1. Source Level

Controls how many rows from a data source are processed simultaneously. This prevents overwhelming your system when ingesting large datasets.

Source level control happens at two different granularities

  • Global, in which all sources across all indexing flows share the same budget.
  • Per-source, in which each source has its own budget.

Both global and per-source limits must pass before a new row is processed—providing two layers of safety.

Global Concurrency: One Setting to Shield All Flows

global-level concurrency

Global limits ensure your system never overshoots safe operating thresholds, even if individual flows attempt higher concurrency.

Apply system-wide protections either via environment variables or programmatic control:

The easiest way is to control it via environment variables:

export COCOINDEX_SOURCE_MAX_INFLIGHT_ROWS=256
export COCOINDEX_SOURCE_MAX_INFLIGHT_BYTES=1048576

Programmatically, configure it when calling cocoindex.init(), which will take precedence over the environment variable:

from cocoindex import GlobalExecutionOptions

cocoindex.init(
cocoindex.Settings(
...,
global_execution_options = GlobalExecutionOptions(
source_max_inflight_rows=256,
source_max_inflight_bytes=1_048_576
)
)
)

Currently, CocoIndex uses 1024 as the default value of global max inflight rows, if you don't explicitly set it.

Per-Source Concurrency: Granular Customization

per-source concurrency

Set different limits for each source according to workload and data characteristics:

@cocoindex.flow_def(name="DemoFlow")
def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope):
data_scope["documents"] = flow_builder.add_source(
DemoSourceSpec(...),
max_inflight_rows=10,
max_inflight_bytes=100*1024*1024 # 100 MB
)

2. Nested Iteration Level Concurrency: Deep Structural Control

nested-iteration concurrency

When processing nested rows, such as processing each chunk of each document, you can configure the maximum concurrent rows and/or bytes:

with data_scope["documents"].row() as doc:
doc["chunks"] = doc["content"].transform(SplitRecursively(...))
with doc["chunks"].row(max_inflight_rows=100, max_inflight_bytes=100*1000*1000):
# Process up to 100 chunks in parallel per document
...

Summary Table: Concurrency Configuration in CocoIndex

LevelConfiguration PathApplies To
GlobalEnvironment variables, or pass GlobalExecutionOptions to cocoindex.init()All sources, all flows, added together
Per-SourceArguments to FlowBuilder.add_source()Specific source/flow
Row IterationArguments to DataSlice.row(max_inflight_rows=...)Nested iterations

Best Practices

In actual incremental pipelines, the processing bottleneck is usually at a few heavy operations, such as running inference using an AI model locally or via a remote API. It's common to keep more data in memory even if it cannot be processed immediately—in this way, once the busy backend becomes available, new workloads can be taken on right away to keep the backends busy. However, we need a reasonable bound on this to prevent memory exhaustion and similar issues. That's where concurrency control comes in.

tuning-guide

Most Situations - Default is Good Enough

For most cases, the default global source max rows limit (COCOINDEX_SOURCE_MAX_INFLIGHT_ROWS: 1024) already fits the situation described above: loading more than what heavy operations can consume, but still within a reasonable bound. You don't need to do anything.

When and What to Tune

Decrease the limit if it's not stable enough (e.g. memory overuse, timeout observed on certain operations), or increase the limit if it's very stable but you want it to go faster. What to tune?

  • Most cases, tuning the global source row limit (COCOINDEX_SOURCE_MAX_INFLIGHT_ROWS) will just work.

  • Only start to touch more specific knobs when deemed necessary - usually only for some complex and unbalanced situations.

    • On highly unbalanced row (file) size, set the max bytes limit to prevent a small number of abnormally large inputs from overloading the system. e.g., when the distribution of your input data size follows a long-tail distribution rather than a normal distribution).
    • On highly unbalanced complexity across sources, set per-source concurrency. This happens when you want to run multiple flows within the same process, or have multiple sources within the same flow, and they vary in processing complexity (e.g., one source goes through a very heavy and slow model, and another only does simple data movement).
    • On high fanout in nested iterations and unbalanced nested rows, set concurrency control options on nested iterations. For example, you have a high number of nested rows to process, and the specific number varies significantly.

This concurrency control framework gives you safe, scalable, and customizable flow performance. You gain flexibility (configure per-flow), control (set global limits), and the confidence to scale cocoindex flows smoothly across diverse workloads.

Support us

We’re constantly improving our runtime. Please ⭐ star CocoIndex on GitHub and share it with others.

Need help crafting a more detailed code snippet, or insight into using byte-based or default concurrency settings? Just let me know!