Skip to content

Stream files from a bucket

This how-to will show you how to stream files from a bucket. This is useful when you don't want to download files to your local machine first, but want to stream them directly from a bucket to your deployment. This can furthermore be useful to stream files from compressed archives, such as zip files.

Structure

First, the library used to stream files from a bucket is introduced. Secondly, a code snippet is provided that shows how to stream files from both a UbiOps bucket and an external bucket. Lastly, a general outline is given of how to integrate this code snippet in different deep learning frameworks (TensorFlow and PyTorch) for training purposes.

Smart Open

The smart_open library is a Python library that provides a uniform API for streaming files from different sources, such as local files, S3 buckets, Google Cloud Storage, HTTP(S) sources, and more. It is a drop-in replacement for Python's built-in open function, but with support for additional sources.
This is therefore the perfect library to use when you want to stream (compressed) files from different buckets.

Python code

The following code snippet shows how to print the contents of a compressed text file inside a .zip file, which is present in the default UbiOps bucket.

import ubiops
import zipfile
from smart_open import open

PROJECT_NAME = "" # Your project name
API_TOKEN = "" # Your token
BUCKET_NAME = "default" # The name of the bucket

zip_file_name = "" # The name of the file in the bucket
text_file_name = "" # The name of the text file inside the zip file


# Login with ubiops
configuration = ubiops.Configuration()
configuration.api_key['Authorization'] = API_TOKEN

client = ubiops.ApiClient(configuration)
api = ubiops.api.CoreApi(client)

print(api.service_status())

# Get signed url
signed_url = api.files_download(PROJECT_NAME, BUCKET_NAME, zip_file_name)

with open(signed_url.url, "rb") as f:
    with zipfile.ZipFile(f) as zip_file:
        # List all files in the zip
        print(zip_file.read(text_file_name).decode("utf-8"))

This code snippet can also be made to work with other (external) buckets.
An example for Google Cloud Storage is shown below:

from google.cloud import storage

bucket_name_gcs = "" # The name of the bucket

# Don't forget to include the service account json file
storage_client = storage.Client.from_service_account_json("service_account.json")

with open(f"gs://{bucket_name_gcs}/{zip_file_name}", "rb", transport_params=dict(client=storage_client)) as f:
    # Add the rest of the code here
    pass

Integration

The easiest way to integrate this file streaming functionality is to wrap this functionality into a custom data loader class. Luckily, both TensorFlow and PyTorch provide a Dataset class functionality that can be used to create your own custom data loader. By implementing these classes, no changes have to be made to the training code, as the custom data loader can be used as a drop-in replacement for the default data loader. The framework will then take care of orchestrating the data loading process.

An example for PyTorch is shown below:

import torch
from torch.utils.data import Dataset


class ZipFileDataset(Dataset):
    def __init__(self):
        # Add the rest of the code here
        pass

    def __len__(self):
        return 10 # Add the len of the dataset here

    def __getitem__(self, idx):
        data_file = stream_file_from_bucket(f"{idx}.txt") # Add the code from the previous section here
        data_tensor = process_data(data_file) # Add the code to process the data here

        return data_tensor

Do note that the stream_file_from_bucket and process_data functions are not defined in the code snippet above. These functions should be defined by the user, as they are specific to the use case.

An example for TensorFlow is shown below:

import tensorflow as tf

class ZipFileDataset(tf.data.Dataset):
    def _generator(num_samples):
        for sample_index in range(num_samples):
            data_file = stream_file_from_bucket(f"{sample_index}.txt")
            data_tensor = process_data(data_file)
            yield data_tensor

    def __new__(cls, num_samples=10):
        return tf.data.Dataset.from_generator(
            cls._generator,
            output_signature=tf.TensorSpec(shape=(None,), dtype=tf.float32),  # Adjust shape and dtype based on your data
            args=(num_samples, )
        )

With TensorFlow, a generator function is used to create the dataset. This generator function is then used to create the dataset with the from_generator function.