OCI Object Storage connector

Read objects from Oracle Cloud Infrastructure (OCI) Object Storage buckets — with prefix filters, path matchers, size limits, and an optional live mode driven by OCI Object Storage events delivered through OCI Streaming.

Version
v 1.0.0-alpha48

The oci_object_storage connector provides utilities for reading objects from Oracle Cloud Infrastructure (OCI) Object Storage buckets, with an optional live mode driven by OCI Object Storage events delivered through OCI Streaming.

python
from cocoindex.connectors import oci_object_storage
Installation

This connector requires the oci SDK. Install with:

bash
pip install cocoindex[oci_object_storage]

For live mode, you also need the Kafka connector to consume OCI Streaming:

bash
pip install cocoindex[kafka]

As source

The connector provides three ways to read from OCI Object Storage:

  • list_objects() — List and iterate over objects in a bucket (with optional prefix and filtering)
  • get_object() — Fetch a single object by its name
  • read() — Read object content directly without first fetching metadata

All require an oci.object_storage.ObjectStorageClient, which you create and manage yourself. The OCI SDK is synchronous; this connector wraps SDK calls with asyncio.to_thread, so the public API remains async.

python
import oci
from oci.object_storage import ObjectStorageClient

config = oci.config.from_file()  # or instance principals, etc.
client = ObjectStorageClient(config)

list_objects

List objects in an OCI Object Storage bucket. Returns an OCIWalker that supports async iteration.

python
def list_objects(
    client: ObjectStorageClient,
    namespace: str,
    bucket_name: str,
    *,
    prefix: str = "",
    path_matcher: FilePathMatcher | None = None,
    max_file_size: int | None = None,
    live_stream: LiveStream[bytes] | None = None,
) -> OCIWalker

Parameters:

  • client — An oci.object_storage.ObjectStorageClient.
  • namespace — The OCI Object Storage namespace.
  • bucket_name — The bucket name.
  • prefix — Only list objects whose name starts with this prefix. The prefix is stripped from relative paths in the returned files.
  • path_matcher — Optional filter for files. Patterns are matched against the relative path (after prefix stripping). See PatternFilePathMatcher.
  • max_file_size — Skip objects larger than this size in bytes.
  • live_stream — Optional LiveStream[bytes] of OCI Object Storage event payloads. When provided, OCIWalker.items() returns a LiveMapView that performs an initial scan and continues watching for changes via the supplied stream. See Live mode.

Returns: An OCIWalker that can be used with async for loops.

Iterating files

list_objects() returns an OCIWalker that yields OCIFile objects (implementing the FileLike base class):

python
import oci
from oci.object_storage import ObjectStorageClient
from cocoindex.connectors import oci_object_storage

config = oci.config.from_file()
client = ObjectStorageClient(config)

async for file in oci_object_storage.list_objects(client, "my-namespace", "my-bucket", prefix="data/"):
    text = await file.read_text()
    ...

See FileLike for details on the file objects.

Keyed iteration with items()

OCIWalker.items() yields (str, OCIFile) pairs, useful for associating each file with a stable string key (its relative path):

python
async for key, file in oci_object_storage.list_objects(client, "ns", "my-bucket").items():
    content = await file.read()

Filtering files

Use PatternFilePathMatcher to filter which objects are included. Patterns are matched against the relative path (after prefix stripping):

python
from cocoindex.connectors import oci_object_storage
from cocoindex.resources.file import PatternFilePathMatcher

matcher = PatternFilePathMatcher(included_patterns=["**/*.json"])

async for file in oci_object_storage.list_objects(
    client, "ns", "my-bucket", prefix="data/", path_matcher=matcher,
):
    process(file)

Limiting file size

Use max_file_size to skip objects that exceed a size threshold:

python
# Skip objects larger than 10 MB
async for file in oci_object_storage.list_objects(
    client, "ns", "my-bucket", max_file_size=10 * 1024 * 1024,
):
    process(file)

get_object

Fetch a single object from an OCI bucket by its full object name.

python
async def get_object(
    client: ObjectStorageClient,
    namespace: str,
    bucket_name: str,
    object_name: str,
) -> OCIFile

Parameters:

  • client — An oci.object_storage.ObjectStorageClient.
  • namespace — The OCI namespace.
  • bucket_name — The bucket name.
  • object_name — The full object name.

Returns: An OCIFile (FileLike) for the specified object, with its metadata pre-populated.

Example:

python
f = await oci_object_storage.get_object(
    client, "my-namespace", "my-bucket", "data/config.json",
)
data = await f.read()

read

Read object content directly without first fetching metadata.

python
async def read(
    client: ObjectStorageClient,
    namespace: str,
    bucket_name: str,
    object_name: str,
    size: int = -1,
) -> bytes

Parameters:

  • client — An oci.object_storage.ObjectStorageClient.
  • namespace — The OCI namespace.
  • bucket_name — The bucket name.
  • object_name — The full object name.
  • size — Number of bytes to read. If -1 (default), read the entire object.

Returns: The object content as bytes.

Example:

python
data = await oci_object_storage.read(client, "my-namespace", "my-bucket", "data/config.json")

OCIFilePath

Each file returned by the connector has an OCIFilePath — a FilePath specialized for OCI Object Storage:

  • Relative path (file.file_path.path) — The object name relative to the walker prefix (or the full name if no prefix was used).
  • Resolved path (file.file_path.resolve()) — The full OCI object name (the value passed to head_object / get_object).
  • Namespace and bucketfile.file_path.namespace, file.file_path.bucket_name.

For example, with prefix="data/" and an object named "data/docs/readme.md":

  • file.file_path.pathPurePath("docs/readme.md")
  • file.file_path.resolve()"data/docs/readme.md"

OCIFile.exists()

OCIFile provides an async exists() method that probes whether the object currently exists in OCI:

python
if await oci_file.exists():
    data = await oci_file.read()
else:
    print("Object no longer exists")

Call exists() first and branch on the result; on a True verdict, subsequent size() / read() calls reuse the cached metadata without re-probing.

Live bucket watching

When live_stream is provided, items() returns a LiveMapView instead of a plain AsyncIterable. Combined with mount_each(), this enables automatic incremental bucket watching — newly created, updated, and deleted objects are processed without a full rescan.

The typical setup uses OCI Streaming. Configure an OCI Events Rule that forwards Object Storage events (createobject, updateobject, deleteobject) to an OCI Streaming stream, then consume that stream as a LiveStream[bytes] via the Kafka connector — see topic_as_stream() and payloads() for the consumer / stream setup — and pass it as live_stream:

python
from confluent_kafka.aio import AIOConsumer
from cocoindex.connectors import kafka, oci_object_storage

consumer = AIOConsumer({
    "bootstrap.servers": "<oci-streaming-bootstrap-host>:9092",
    "group.id": "my-group",
    "enable.auto.commit": "false",
})
topic_stream = kafka.topic_as_stream(consumer, ["object-storage-events"])

walker = oci_object_storage.list_objects(
    client, "my-namespace", "my-bucket",
    path_matcher=PatternFilePathMatcher(included_patterns=["**/*.md"]),
    live_stream=topic_stream.payloads(),
)
await coco.mount_each(process_file, walker.items(), target)

See Live Mode for how live mode works and how to enable it on the app.

Example

python
import pathlib

from confluent_kafka.aio import AIOConsumer
import oci
from oci.object_storage import ObjectStorageClient

import cocoindex as coco
from cocoindex.connectors import kafka, oci_object_storage, localfs
from cocoindex.resources.file import FileLike, PatternFilePathMatcher


@coco.fn(memo=True)
async def process_file(file: FileLike[str], target: localfs.DirTarget) -> None:
    text = await file.read_text()
    target.declare_file(filename=file.file_path.path.as_posix(), content=text)


@coco.fn
async def app_main(
    namespace: str, bucket: str, outdir: pathlib.Path,
) -> None:
    target = await localfs.mount_dir_target(outdir)

    config = oci.config.from_file()
    client = ObjectStorageClient(config)

    consumer = AIOConsumer({
        "bootstrap.servers": "<oci-streaming-host>:9092",
        "group.id": "my-group",
        "enable.auto.commit": "false",
    })
    topic_stream = kafka.topic_as_stream(consumer, ["object-storage-events"])

    matcher = PatternFilePathMatcher(included_patterns=["**/*.md"])
    walker = oci_object_storage.list_objects(
        client, namespace, bucket,
        prefix="docs/",
        path_matcher=matcher,
        live_stream=topic_stream.payloads(),
    )

    await coco.mount_each(process_file, walker.items(), target)


app = coco.App(
    coco.AppConfig(name="OCIToFiles"),
    app_main,
    namespace="my-namespace",
    bucket="my-bucket",
    outdir=pathlib.Path("./out"),
)
app.update_blocking(live=True)
CocoIndex Docs Edit this page Report issue