Skip to content

Work with the collect subrequests operator

UbiOps pipelines come with some predefined operators that you can use. One of those operators is the collect subrequests operator. This operator is almost always used together with the create subrequests operator. It allows you to stop the parallelization created by that operator. Read on to find out how you can work with it.

Configuring the collect subrequests operator

Once you have added the create subrequests operator to your pipeline canvas, the UI will prompt you to provide a destination object. This destination object is the object where the collected subrequests need to be sent to. If you want to add this operator using the Client Library you need to provide the following parameters:

Field Description
name many-to-one
reference name many-to-one
reference type operator
configuration (example) {
# Input fields should be the same as the input fields of the destination object
"input_fields": [{"name": "integer", "data_type": "int"}],
# Input fields and output fields should be the same
"output_fields": [{"name": "integer", "data_type": "int"}]
}

Let's say you have a video processing pipeline for instance, where you process each frame in parallel with the use of the create subrequests operator as described in the accompanying how-to.

In this scenario you would have a pipeline with the following objects:

  • a pipeline object that splits the videos into frames (video-splitter)
  • a pipeline object that processes a single frame (process-video-frame)
  • a pipeline object that collects these processed frames and puts them back together to a single video (frames-to-video).

To make sure the frames-to-video object has access to all the processed frames, we need the collect subrequests operator. The operator then would have the frames-to-video object as its destination.

Example pipeline with collect subrequests operator

Preparing your deployment code to work with the operator

To be able to use the collect subrequests operator with your deployment, your deployment code should have a requests function, as opposed to the standard request function. UbiOps passes a list of inputs to the requests function, instead of a single input. The operator will compile this list from all the subrequest outputs and makes sure it gets passed correctly to your requests function. In the case of our video processing example, our video-splitter code should thus look something like this:

class Deployment:

def __init__(self, base_directory, context):
    print("Initialising My Deployment")

def requests(self, data):
    print("Processing request for video splitter")

    input_array = []
    for item in data:
        input_array.append(item['processed_frame'])

    # < code to put frames back together >

    # In the return statement we return a single video
    return {"processed_video": processed_video}
init <- function(base_directory, context) {
print("Initialising My Deployment")
}

request <- function(input_data, base_directory, context) {
    print("Processing request for My Deployment")

    input_list = list()
    for (item in input_data) {
        input_list.append(item["processed_frame"])
    }

    # < code to put frames back together >

    # We output a single video
    list(list(frame = frame1), list(frame = frame2), list(frame = frame3))
}

Our example has its input defined as processed_frame of type file. You do not need to change your input type to array to work with this operator.

That's all there is to this operator. If you run into any issues though, do not hesitate to contact support.