Stream files from a bucket¶
This how-to will show you how to stream files from a bucket. This is useful when you don't want to download files to your local machine first, but want to stream them directly from a bucket to your deployment. This can furthermore be useful to stream files from compressed archives, such as zip files.
Structure¶
First, the library used to stream files from a bucket is introduced. Secondly, a code snippet is provided that shows how to stream files from both a UbiOps bucket and an external bucket. Lastly, a general outline is given of how to integrate this code snippet in different deep learning frameworks (TensorFlow and PyTorch) for training purposes.
Smart Open¶
The smart_open
library is a Python library that provides a uniform API for streaming files from different sources, such as local files, S3 buckets, Google Cloud Storage, HTTP(S) sources, and more. It is a drop-in replacement for Python's built-in open
function, but with support for additional sources.
This is therefore the perfect library to use when you want to stream (compressed) files from different buckets.
Python code¶
The following code snippet shows how to print the contents of a compressed text file inside a .zip file, which is present in the default UbiOps bucket.
import ubiops
import zipfile
from smart_open import open
PROJECT_NAME = "" # Your project name
API_TOKEN = "" # Your token
BUCKET_NAME = "default" # The name of the bucket
zip_file_name = "" # The name of the file in the bucket
text_file_name = "" # The name of the text file inside the zip file
# Login with ubiops
configuration = ubiops.Configuration()
configuration.api_key['Authorization'] = API_TOKEN
client = ubiops.ApiClient(configuration)
api = ubiops.api.CoreApi(client)
print(api.service_status())
# Get signed url
signed_url = api.files_download(PROJECT_NAME, BUCKET_NAME, zip_file_name)
with open(signed_url.url, "rb") as f:
with zipfile.ZipFile(f) as zip_file:
# List all files in the zip
print(zip_file.read(text_file_name).decode("utf-8"))
This code snippet can also be made to work with other (external) buckets.
An example for Google Cloud Storage is shown below:
from google.cloud import storage
bucket_name_gcs = "" # The name of the bucket
# Don't forget to include the service account json file
storage_client = storage.Client.from_service_account_json("service_account.json")
with open(f"gs://{bucket_name_gcs}/{zip_file_name}", "rb", transport_params=dict(client=storage_client)) as f:
# Add the rest of the code here
pass
Integration¶
The easiest way to integrate this file streaming functionality is to wrap this functionality into a custom data loader class. Luckily, both TensorFlow and PyTorch provide a Dataset
class functionality that can be used to create your own custom data loader. By implementing these classes, no changes have to be made to the training code, as the custom data loader can be used as a drop-in replacement for the default data loader. The framework will then take care of orchestrating the data loading process.
An example for PyTorch is shown below:
import torch
from torch.utils.data import Dataset
class ZipFileDataset(Dataset):
def __init__(self):
# Add the rest of the code here
pass
def __len__(self):
return 10 # Add the len of the dataset here
def __getitem__(self, idx):
data_file = stream_file_from_bucket(f"{idx}.txt") # Add the code from the previous section here
data_tensor = process_data(data_file) # Add the code to process the data here
return data_tensor
Do note that the stream_file_from_bucket
and process_data
functions are not defined in the code snippet above. These functions should be defined by the user, as they are specific to the use case.
An example for TensorFlow is shown below:
import tensorflow as tf
class ZipFileDataset(tf.data.Dataset):
def _generator(num_samples):
for sample_index in range(num_samples):
data_file = stream_file_from_bucket(f"{sample_index}.txt")
data_tensor = process_data(data_file)
yield data_tensor
def __new__(cls, num_samples=10):
return tf.data.Dataset.from_generator(
cls._generator,
output_signature=tf.TensorSpec(shape=(None,), dtype=tf.float32), # Adjust shape and dtype based on your data
args=(num_samples, )
)
With TensorFlow, a generator function is used to create the dataset. This generator function is then used to create the dataset with the from_generator
function.