# OCI Object Storage connector

> **CocoIndex v1.** This page documents CocoIndex **v1** — a ground-up redesign from v0. When writing code, ignore any v0 flow-builder DSL or deprecated decorators.
>
> Source: https://cocoindex.io/docs/connectors/oci_object_storage/ · Docs index: https://cocoindex.io/docs/llms.txt · Agent skill: https://cocoindex.io/docs/skill.md

The `oci_object_storage` connector provides utilities for reading objects from Oracle Cloud Infrastructure (OCI) Object Storage buckets, with an optional [live mode](../programming_guide/live_mode) driven by OCI Object Storage events delivered through OCI Streaming.

```python
from cocoindex.connectors import oci_object_storage
```

**Note — Installation**
This connector requires the `oci` SDK. Install with:

```bash
pip install cocoindex[oci_object_storage]
```

For live mode, you also need the Kafka connector to consume OCI Streaming:

```bash
pip install cocoindex[kafka]
```

## As source

The connector provides three ways to read from OCI Object Storage:

- `list_objects()` — List and iterate over objects in a bucket (with optional prefix and filtering)
- `get_object()` — Fetch a single object by its name
- `read()` — Read object content directly without first fetching metadata

All require an `oci.object_storage.ObjectStorageClient`, which you create and manage yourself. The OCI SDK is synchronous; this connector wraps SDK calls with `asyncio.to_thread`, so the public API remains async.

```python
import oci
from oci.object_storage import ObjectStorageClient

config = oci.config.from_file()  # or instance principals, etc.
client = ObjectStorageClient(config)
```

### list_objects

List objects in an OCI Object Storage bucket. Returns an `OCIWalker` that supports async iteration.

```python
def list_objects(
    client: ObjectStorageClient,
    namespace: str,
    bucket_name: str,
    *,
    prefix: str = "",
    path_matcher: FilePathMatcher | None = None,
    max_file_size: int | None = None,
    live_stream: LiveStream[bytes] | None = None,
) -> OCIWalker
```

**Parameters:**

- `client` — An `oci.object_storage.ObjectStorageClient`.
- `namespace` — The OCI Object Storage namespace.
- `bucket_name` — The bucket name.
- `prefix` — Only list objects whose name starts with this prefix. The prefix is stripped from relative paths in the returned files.
- `path_matcher` — Optional filter for files. Patterns are matched against the relative path (after prefix stripping). See [PatternFilePathMatcher](../common_resources/data_types#patternfilepathmatcher).
- `max_file_size` — Skip objects larger than this size in bytes.
- `live_stream` — Optional `LiveStream[bytes]` of OCI Object Storage event payloads. When provided, `OCIWalker.items()` returns a [`LiveMapView`](../advanced_topics/live_component#livemapfeed-and-livemapview) that performs an initial scan and continues watching for changes via the supplied stream. See [Live mode](#live-bucket-watching).

**Returns:** An `OCIWalker` that can be used with `async for` loops.

### Iterating files

`list_objects()` returns an `OCIWalker` that yields `OCIFile` objects (implementing the [`FileLike`](../common_resources/data_types#filelike) base class):

```python
import oci
from oci.object_storage import ObjectStorageClient
from cocoindex.connectors import oci_object_storage

config = oci.config.from_file()
client = ObjectStorageClient(config)

async for file in oci_object_storage.list_objects(client, "my-namespace", "my-bucket", prefix="data/"):
    text = await file.read_text()
    ...
```

See [`FileLike`](../common_resources/data_types#filelike) for details on the file objects.

### Keyed iteration with `items()`

`OCIWalker.items()` yields `(str, OCIFile)` pairs, useful for associating each file with a stable string key (its relative path):

```python
async for key, file in oci_object_storage.list_objects(client, "ns", "my-bucket").items():
    content = await file.read()
```

### Filtering files

Use `PatternFilePathMatcher` to filter which objects are included. Patterns are matched against the relative path (after prefix stripping):

```python
from cocoindex.connectors import oci_object_storage
from cocoindex.resources.file import PatternFilePathMatcher

matcher = PatternFilePathMatcher(included_patterns=["**/*.json"])

async for file in oci_object_storage.list_objects(
    client, "ns", "my-bucket", prefix="data/", path_matcher=matcher,
):
    process(file)
```

### Limiting file size

Use `max_file_size` to skip objects that exceed a size threshold:

```python
# Skip objects larger than 10 MB
async for file in oci_object_storage.list_objects(
    client, "ns", "my-bucket", max_file_size=10 * 1024 * 1024,
):
    process(file)
```

### get_object

Fetch a single object from an OCI bucket by its full object name.

```python
async def get_object(
    client: ObjectStorageClient,
    namespace: str,
    bucket_name: str,
    object_name: str,
) -> OCIFile
```

**Parameters:**

- `client` — An `oci.object_storage.ObjectStorageClient`.
- `namespace` — The OCI namespace.
- `bucket_name` — The bucket name.
- `object_name` — The full object name.

**Returns:** An `OCIFile` (FileLike) for the specified object, with its metadata pre-populated.

**Example:**

```python
f = await oci_object_storage.get_object(
    client, "my-namespace", "my-bucket", "data/config.json",
)
data = await f.read()
```

### read

Read object content directly without first fetching metadata.

```python
async def read(
    client: ObjectStorageClient,
    namespace: str,
    bucket_name: str,
    object_name: str,
    size: int = -1,
) -> bytes
```

**Parameters:**

- `client` — An `oci.object_storage.ObjectStorageClient`.
- `namespace` — The OCI namespace.
- `bucket_name` — The bucket name.
- `object_name` — The full object name.
- `size` — Number of bytes to read. If -1 (default), read the entire object.

**Returns:** The object content as bytes.

**Example:**

```python
data = await oci_object_storage.read(client, "my-namespace", "my-bucket", "data/config.json")
```

### OCIFilePath

Each file returned by the connector has an `OCIFilePath` — a [`FilePath`](../common_resources/data_types#filepath) specialized for OCI Object Storage:

- **Relative path** (`file.file_path.path`) — The object name relative to the walker prefix (or the full name if no prefix was used).
- **Resolved path** (`file.file_path.resolve()`) — The full OCI object name (the value passed to `head_object` / `get_object`).
- **Namespace and bucket** — `file.file_path.namespace`, `file.file_path.bucket_name`.

For example, with `prefix="data/"` and an object named `"data/docs/readme.md"`:
- `file.file_path.path` → `PurePath("docs/readme.md")`
- `file.file_path.resolve()` → `"data/docs/readme.md"`

### OCIFile.exists()

`OCIFile` provides an async `exists()` method that probes whether the object currently exists in OCI:

```python
if await oci_file.exists():
    data = await oci_file.read()
else:
    print("Object no longer exists")
```

Call `exists()` first and branch on the result; on a `True` verdict, subsequent `size()` / `read()` calls reuse the cached metadata without re-probing.

### Live bucket watching

When `live_stream` is provided, `items()` returns a [`LiveMapView`](../advanced_topics/live_component#livemapfeed-and-livemapview) instead of a plain `AsyncIterable`. Combined with [`mount_each()`](../programming_guide/processing_component#mount_each), this enables automatic incremental bucket watching — newly created, updated, and deleted objects are processed without a full rescan.

The typical setup uses **OCI Streaming**. Configure an [OCI Events Rule](https://docs.oracle.com/en-us/iaas/Content/Events/Concepts/eventsoverview.htm) that forwards Object Storage events (`createobject`, `updateobject`, `deleteobject`) to an OCI Streaming stream, then consume that stream as a `LiveStream[bytes]` via the Kafka connector — see [`topic_as_stream()` and `payloads()`](./kafka#as-a-live-stream) for the consumer / stream setup — and pass it as `live_stream`:

```python
from confluent_kafka.aio import AIOConsumer
from cocoindex.connectors import kafka, oci_object_storage

consumer = AIOConsumer({
    "bootstrap.servers": "<oci-streaming-bootstrap-host>:9092",
    "group.id": "my-group",
    "enable.auto.commit": "false",
})
topic_stream = kafka.topic_as_stream(consumer, ["object-storage-events"])

walker = oci_object_storage.list_objects(
    client, "my-namespace", "my-bucket",
    path_matcher=PatternFilePathMatcher(included_patterns=["**/*.md"]),
    live_stream=topic_stream.payloads(),
)
await coco.mount_each(process_file, walker.items(), target)
```

See [Live Mode](../programming_guide/live_mode) for how live mode works and how to enable it on the app.

### Example

```python
import pathlib

from confluent_kafka.aio import AIOConsumer
import oci
from oci.object_storage import ObjectStorageClient

import cocoindex as coco
from cocoindex.connectors import kafka, oci_object_storage, localfs
from cocoindex.resources.file import FileLike, PatternFilePathMatcher


@coco.fn(memo=True)
async def process_file(file: FileLike[str], target: localfs.DirTarget) -> None:
    text = await file.read_text()
    target.declare_file(filename=file.file_path.path.as_posix(), content=text)


@coco.fn
async def app_main(
    namespace: str, bucket: str, outdir: pathlib.Path,
) -> None:
    target = await localfs.mount_dir_target(outdir)

    config = oci.config.from_file()
    client = ObjectStorageClient(config)

    consumer = AIOConsumer({
        "bootstrap.servers": "<oci-streaming-host>:9092",
        "group.id": "my-group",
        "enable.auto.commit": "false",
    })
    topic_stream = kafka.topic_as_stream(consumer, ["object-storage-events"])

    matcher = PatternFilePathMatcher(included_patterns=["**/*.md"])
    walker = oci_object_storage.list_objects(
        client, namespace, bucket,
        prefix="docs/",
        path_matcher=matcher,
        live_stream=topic_stream.payloads(),
    )

    await coco.mount_each(process_file, walker.items(), target)


app = coco.App(
    coco.AppConfig(name="OCIToFiles"),
    app_main,
    namespace="my-namespace",
    bucket="my-bucket",
    outdir=pathlib.Path("./out"),
)
app.update_blocking(live=True)
```
