Skip to main content

Data Consistency in Indexing Pipelines

ยท 7 min read

Data Consistency in Indexing Pipelines

An indexing pipeline builds indexes derived from source data. The index should always be converging to the current version of source data. In other words, once a new version of source data is processed by the pipeline, all data derived from previous versions should no longer exist in the target index storage. This is called data consistency requirement for an indexing pipeline.

The Challenge of Concurrent Updatesโ€‹

Compared to building single-process in-memory transformation logic, indexing pipeline has additional challenges in two aspects, to maintain data consistency:

  1. Temporally, it's long-running, i.e. the execution may span across multiple processes. e.g. an early execution may terminate in the middle, due to a SIGTERM, or a failure to commit to the target storage or a reboot. When the process restarts later, it should pick up existing states, and keep moving forward to update the target storage to desired status. This is both tedious and error-prone: for example, if a source data is only partially processed, and it gets updated before the restarted pipeline starts to process it again, how can we make sure states derived from the old version are properly cleared?

  2. Spatially, execution happens concurrently, sometimes distributed across multiple machines for large workloads. This brings up the problem caused by out-of-order processing. Consider a scenario where your source data gets updated rapidly:

    1. Version 1 (V1) exists in the system
    2. Version 2 (V2) and Version 3 (V3) arrive within a second of each other
    3. V2 takes longer to process than V3 Without proper handling, you might end up with V2 data in your index even though V3 is the latest version - effectively rolling back to stale data.

Common Issues and Gapsโ€‹

Data Exposure Risksโ€‹

Data inconsistency in indexing systems can lead to several critical issues:

System Integrityโ€‹

  • Stale data persists in indexes despite source updates
  • Inconsistent states across different storage systems
  • Mixed versions of the same source entry coexisting in the index

Security and Complianceโ€‹

  • Deleted sensitive information remains accessible through search interfaces
  • Outdated personal data violates privacy requirements (e.g., GDPR)
  • Regulatory non-compliance due to improperly retained records

AI System Reliabilityโ€‹

  • RAG systems incorporating outdated information into responses
  • Semantic search surfacing deprecated or retracted content
  • AI agents making decisions based on stale data

Business Operationsโ€‹

  • Customer-facing applications showing incorrect information
  • Loss of trust due to inconsistent data presentation
  • Financial impact from outdated pricing or product details

Approaches for Maintaining Data Consistencyโ€‹

1. Tracking source-to-target Key Mappingsโ€‹

If all your state management and storage systems are within a single transactional system (e.g., PostgreSQL), you can leverage database transactions for consistency. However, this is often not the case in real-world scenarios. For example, you may use an internal storage (e.g. PostgreSQL) to track per-source metadata, and uses an external vector store (e.g. Pinecone or Milvus) to store vector embeddings.

There're extra challenges around this:

  • We have to keep track of all the keys in target vector stores derived from each source key, in the internal storage. This is necessary when the old version of source data is updated or deleted: we need to locate previously derived keys and delete them if they no longer exist in the new version after processing.

  • However, the internal storage is not transactional with the external vector store, as they're two independent storage systems. So that if the process terminates abnormally in the middle, states in two systems will be out of sync.

So we need to carefully design the order of write operations during commit, to avoid data leakage even when the process is only partially executed:

  1. Add new target keys to the internal store
  2. Write to target storage
    • Create new records
    • Delete old records Note that for any target storage that supports atomic writing, this allows all updates derived from the same source key to be written atomically. This avoids mixed-version data on query time.
  3. Remove no-longer-exist target keys from the internal store

This ensures a key invariant: keys tracked in the internal storage is always a superset of those really exist in the target store. This makes sure all the data in the target store is always tracked so never leaked.

2. Ordinal Bookkeepingโ€‹

Ordinals are unique identifiers that establish a strict ordering of data updates in your pipeline. They help bookkeep the sequence and timing of changes to ensure consistency. Common examples include:

  • Timestamps (e.g., 2024-01-06T12:34:56Z) - Precise time-based ordering
  • Version numbers (e.g., v1.2.3, 20240106.1) - Sequential numbering

Bookkeep ordinals carefully in two key stages:

  • During ingestion: Record and preserve the source ordinal (e.g., timestamp or version) from the original data
  • During commit: Verify the source ordinal to prevent newer versions from being overwritten by older ones

Ordinal bookkeeping happens in Phase 1, which rejects out-of-order commits - they won't arise any updates in Phase 1, and Phase 2 and 3 won't be executed at all.

3. Versioned Unique Key Assignment with Soft Deletionโ€‹

When there's multiple ongoing processings happen on the same source key, updates writing to the target storage (Phase 2 above) may be out of order, which causes results derived from old versions overwriting newer ones.

To avoid this, we need to:

  • For data in the target storage, make sure the keys are unique even across different versions.
  • Leverage soft deletion in the target storage, i.e. deleted versions are represented by a row with a deleted field set.

Specifically:

  • In the transaction in Phase 1, we establish a full view of all derived keys for the current source key: collect existing keys, and generate new keys.
  • In Phase 2, we do UPSERT for both new and deleted versions - the only difference is the deleted field is set for deleted versions, and not touched for new versions.
  • During query time, we filter out all records with deleted field is set.

Besides, we need an offline GC process to garbage collect rows in the target storage with the deleted field set. Because of this, the deleted field could be a timestamp of the deletion time, hence we can decide when to GC a specific version based on it.

The CocoIndex Paradigmโ€‹

The discussions above only cover a part of the complexity of building and maintaining long-running or distributed indexing pipelines. There're other additional complexities, compared to building single-process in-memory transformation logic, such as:

  • CRUD across different APIs
  • Incremental historical state management
  • Reuse existing computations
  • Data migration on processing logic changes

CocoIndex framework aims at handling these complexities so users can focus on pure transformations:

  • Users write transformations as pure functions

    • Output depends only on input
    • No side effects or hidden states
    • Easy to understand and maintain
  • Framework handles:

    • Previous computation result management
    • Incremental processing
    • State management
    • Data consistency
    • Data upgrade on processing logic changes

By exposing a data-driven programming paradigm, CocoIndex allows developers to focus on their core business logic while maintaining data consistency and freshness.

Join Our Community!โ€‹

Interested to learn more about CocoIndex? Join our community!

  • Follow our GitHub repository to stay up to date with the latest developments.
  • Check out our documentation to learn more about how CocoIndex can help build robust AI applications with properly indexed data.
  • Join our Discord community to connect with other developers and get support.
  • Follow us on Twitter for the latest updates.