OCI Object Storage connector
Read objects from Oracle Cloud Infrastructure (OCI) Object Storage buckets — with prefix filters, path matchers, size limits, and an optional live mode driven by OCI Object Storage events delivered through OCI Streaming.
The oci_object_storage connector provides utilities for reading objects from Oracle Cloud Infrastructure (OCI) Object Storage buckets, with an optional live mode driven by OCI Object Storage events delivered through OCI Streaming.
from cocoindex.connectors import oci_object_storage
This connector requires the oci SDK. Install with:
pip install cocoindex[oci_object_storage]For live mode, you also need the Kafka connector to consume OCI Streaming:
pip install cocoindex[kafka]As source
The connector provides three ways to read from OCI Object Storage:
list_objects()— List and iterate over objects in a bucket (with optional prefix and filtering)get_object()— Fetch a single object by its nameread()— Read object content directly without first fetching metadata
All require an oci.object_storage.ObjectStorageClient, which you create and manage yourself. The OCI SDK is synchronous; this connector wraps SDK calls with asyncio.to_thread, so the public API remains async.
import oci
from oci.object_storage import ObjectStorageClient
config = oci.config.from_file() # or instance principals, etc.
client = ObjectStorageClient(config)
list_objects
List objects in an OCI Object Storage bucket. Returns an OCIWalker that supports async iteration.
def list_objects(
client: ObjectStorageClient,
namespace: str,
bucket_name: str,
*,
prefix: str = "",
path_matcher: FilePathMatcher | None = None,
max_file_size: int | None = None,
live_stream: LiveStream[bytes] | None = None,
) -> OCIWalker
Parameters:
client— Anoci.object_storage.ObjectStorageClient.namespace— The OCI Object Storage namespace.bucket_name— The bucket name.prefix— Only list objects whose name starts with this prefix. The prefix is stripped from relative paths in the returned files.path_matcher— Optional filter for files. Patterns are matched against the relative path (after prefix stripping). See PatternFilePathMatcher.max_file_size— Skip objects larger than this size in bytes.live_stream— OptionalLiveStream[bytes]of OCI Object Storage event payloads. When provided,OCIWalker.items()returns aLiveMapViewthat performs an initial scan and continues watching for changes via the supplied stream. See Live mode.
Returns: An OCIWalker that can be used with async for loops.
Iterating files
list_objects() returns an OCIWalker that yields OCIFile objects (implementing the FileLike base class):
import oci
from oci.object_storage import ObjectStorageClient
from cocoindex.connectors import oci_object_storage
config = oci.config.from_file()
client = ObjectStorageClient(config)
async for file in oci_object_storage.list_objects(client, "my-namespace", "my-bucket", prefix="data/"):
text = await file.read_text()
...
See FileLike for details on the file objects.
Keyed iteration with items()
OCIWalker.items() yields (str, OCIFile) pairs, useful for associating each file with a stable string key (its relative path):
async for key, file in oci_object_storage.list_objects(client, "ns", "my-bucket").items():
content = await file.read()
Filtering files
Use PatternFilePathMatcher to filter which objects are included. Patterns are matched against the relative path (after prefix stripping):
from cocoindex.connectors import oci_object_storage
from cocoindex.resources.file import PatternFilePathMatcher
matcher = PatternFilePathMatcher(included_patterns=["**/*.json"])
async for file in oci_object_storage.list_objects(
client, "ns", "my-bucket", prefix="data/", path_matcher=matcher,
):
process(file)
Limiting file size
Use max_file_size to skip objects that exceed a size threshold:
# Skip objects larger than 10 MB
async for file in oci_object_storage.list_objects(
client, "ns", "my-bucket", max_file_size=10 * 1024 * 1024,
):
process(file)
get_object
Fetch a single object from an OCI bucket by its full object name.
async def get_object(
client: ObjectStorageClient,
namespace: str,
bucket_name: str,
object_name: str,
) -> OCIFile
Parameters:
client— Anoci.object_storage.ObjectStorageClient.namespace— The OCI namespace.bucket_name— The bucket name.object_name— The full object name.
Returns: An OCIFile (FileLike) for the specified object, with its metadata pre-populated.
Example:
f = await oci_object_storage.get_object(
client, "my-namespace", "my-bucket", "data/config.json",
)
data = await f.read()
read
Read object content directly without first fetching metadata.
async def read(
client: ObjectStorageClient,
namespace: str,
bucket_name: str,
object_name: str,
size: int = -1,
) -> bytes
Parameters:
client— Anoci.object_storage.ObjectStorageClient.namespace— The OCI namespace.bucket_name— The bucket name.object_name— The full object name.size— Number of bytes to read. If -1 (default), read the entire object.
Returns: The object content as bytes.
Example:
data = await oci_object_storage.read(client, "my-namespace", "my-bucket", "data/config.json")
OCIFilePath
Each file returned by the connector has an OCIFilePath — a FilePath specialized for OCI Object Storage:
- Relative path (
file.file_path.path) — The object name relative to the walker prefix (or the full name if no prefix was used). - Resolved path (
file.file_path.resolve()) — The full OCI object name (the value passed tohead_object/get_object). - Namespace and bucket —
file.file_path.namespace,file.file_path.bucket_name.
For example, with prefix="data/" and an object named "data/docs/readme.md":
file.file_path.path→PurePath("docs/readme.md")file.file_path.resolve()→"data/docs/readme.md"
OCIFile.exists()
OCIFile provides an async exists() method that probes whether the object currently exists in OCI:
if await oci_file.exists():
data = await oci_file.read()
else:
print("Object no longer exists")
Call exists() first and branch on the result; on a True verdict, subsequent size() / read() calls reuse the cached metadata without re-probing.
Live bucket watching
When live_stream is provided, items() returns a LiveMapView instead of a plain AsyncIterable. Combined with mount_each(), this enables automatic incremental bucket watching — newly created, updated, and deleted objects are processed without a full rescan.
The typical setup uses OCI Streaming. Configure an OCI Events Rule that forwards Object Storage events (createobject, updateobject, deleteobject) to an OCI Streaming stream, then consume that stream as a LiveStream[bytes] via the Kafka connector — see topic_as_stream() and payloads() for the consumer / stream setup — and pass it as live_stream:
from confluent_kafka.aio import AIOConsumer
from cocoindex.connectors import kafka, oci_object_storage
consumer = AIOConsumer({
"bootstrap.servers": "<oci-streaming-bootstrap-host>:9092",
"group.id": "my-group",
"enable.auto.commit": "false",
})
topic_stream = kafka.topic_as_stream(consumer, ["object-storage-events"])
walker = oci_object_storage.list_objects(
client, "my-namespace", "my-bucket",
path_matcher=PatternFilePathMatcher(included_patterns=["**/*.md"]),
live_stream=topic_stream.payloads(),
)
await coco.mount_each(process_file, walker.items(), target)
See Live Mode for how live mode works and how to enable it on the app.
Example
import pathlib
from confluent_kafka.aio import AIOConsumer
import oci
from oci.object_storage import ObjectStorageClient
import cocoindex as coco
from cocoindex.connectors import kafka, oci_object_storage, localfs
from cocoindex.resources.file import FileLike, PatternFilePathMatcher
@coco.fn(memo=True)
async def process_file(file: FileLike[str], target: localfs.DirTarget) -> None:
text = await file.read_text()
target.declare_file(filename=file.file_path.path.as_posix(), content=text)
@coco.fn
async def app_main(
namespace: str, bucket: str, outdir: pathlib.Path,
) -> None:
target = await localfs.mount_dir_target(outdir)
config = oci.config.from_file()
client = ObjectStorageClient(config)
consumer = AIOConsumer({
"bootstrap.servers": "<oci-streaming-host>:9092",
"group.id": "my-group",
"enable.auto.commit": "false",
})
topic_stream = kafka.topic_as_stream(consumer, ["object-storage-events"])
matcher = PatternFilePathMatcher(included_patterns=["**/*.md"])
walker = oci_object_storage.list_objects(
client, namespace, bucket,
prefix="docs/",
path_matcher=matcher,
live_stream=topic_stream.payloads(),
)
await coco.mount_each(process_file, walker.items(), target)
app = coco.App(
coco.AppConfig(name="OCIToFiles"),
app_main,
namespace="my-namespace",
bucket="my-bucket",
outdir=pathlib.Path("./out"),
)
app.update_blocking(live=True)