UbiOps Operators #1 Parallel processing of video frames

[vc_row type=”in_container” full_screen_row_position=”middle” column_margin=”default” column_direction=”default” column_direction_tablet=”default” column_direction_phone=”default” scene_position=”center” text_color=”dark” text_align=”left” row_border_radius=”none” row_border_radius_applies=”bg” overlay_strength=”0.3″ gradient_direction=”left_to_right” shape_divider_position=”bottom” bg_image_animation=”none”][vc_column column_padding=”no-extra-padding” column_padding_tablet=”inherit” column_padding_phone=”inherit” column_padding_position=”all” background_color_opacity=”1″ background_hover_color_opacity=”1″ column_shadow=”none” column_border_radius=”none” column_link_target=”_self” gradient_direction=”left_to_right” overlay_strength=”0.3″ width=”1/1″ tablet_width_inherit=”default” tablet_text_alignment=”default” phone_text_alignment=”default” column_border_width=”none” column_border_style=”solid” bg_image_animation=”none”][vc_column_text]

Exciting things you can build with the new UbiOps operators!

On the 24th of November we introduced pipeline operators in UbiOps. Pipeline operators are a library of pipeline objects that you can use to quickly create pipelines for more complex use cases. They can help to make certain branches of your pipeline execute only if specific conditions are met, allow you to parallelize parts of your pipeline, and so much more. To give you an idea of what’s possible I will walk you through a complete example use case today that makes use of the create subrequests and the collect subrequests operators specifically. Let’s dive in!

The example use case: video processing

Let’s have a look at a video processing use case. In this case we have an input video that contains some hand motions, and we want to end up with a processed video where all the hand positions are marked like this:

 

The set-up

In order to achieve this I came up with the following workflow:

  1. Split the video into individual frames
  2. Process each frame with a hand gesture recognition algorithm
  3. Combine the processed frames back together into a full video

That translates to three different deployments:

  1. A video-splitter deployment that returns a list of frames
  2. A frame-processor deployment
  3. A video-compiler that compiles the processed video

Since we will need to process a bunch of frames in exactly the same way, I want to make sure that multiple frames are processed in parallel, making use of UbiOps’s autoscaling functionality. This will greatly speed things up! I can do that by using the create subrequests, and the collect subrequests operators. The final pipeline I want should then look something like this:

 

The create subrequests operator will split out the returned list of frames from the video splitter into separate subrequests to be processed by the frame-processor. If I set my max_instances from the frame-processor to a number that is higher than 1, let’s say 10, these subrequests can be handled in parallel.

Note: The amount of parallelization is equal to the maximum number of instances of the frame-processor. If you want to use parallelization make sure to increase the number of max instances!

To be able to combine all the processed frames into a single video again with the video-compiler, it of course needs access to the full list of processed frames. That is exactly what the collect subrequests operator does. It collects all the individual subrequests and forwards it to the video-compiler as a single list.

Splitting the video into frames

To split the video into frames we can use opencv. I simply create a list that will hold all the frames and use opencv to read all the individual frames from the video and add them to the list. My code looks like this:

import cv2

class Deployment:

    def __init__(self, base_directory, context):
        print("Initialising My Deployment")

    def request(self, data):
        print("Processing request for video splitter")

        video = cv2.VideoCapture(data["video"])
 
        frame_nr = 0
        splitted_frames = []

        while (True):
            success, frame = video.read()
            if success:
                cv2.imwrite(f'frame_{frame_nr}.png', frame)
                splitted_frames.append({'frame':f'frame_{frame_nr}.png'})

            else:
                break

            frame_nr = frame_nr+1

        video.release()
        
        return splitted_frames

The deployment has the following input/output definition:

  • Input:
    • video of type blob
  • Output:
    • frame of type blob

Please note that I return a list of dictionaries, as opposed to a single dictionary as you normally would in a UbiOps deployment. The reason for this is that I want to use the create subrequests operator to split this list into individual subrequests. Every dictionary in the list still needs to be formatted according to the output format from the deployment, which in my case is a single output field called frame. I do not need to change the output data type to be an array of some kind, as long as each dictionary in my list is formatted correctly.

Processing a frame

Our second deployment should process a video frame to detect hands and draw them on the frame. I again use opencv but this time in combination with mediapipe for their built-in hand detection functions. My code looks like this:

import cv2
import mediapipe

class Deployment:

    def __init__(self, base_directory, context):
        print("Initialising My Deployment")

    def request(self, data):
        print("Processing request for image splitter")
        drawingModule = mediapipe.solutions.drawing_utils
        handsModule = mediapipe.solutions.hands
        frame = cv2.imread(data["frame"])
        
        with handsModule.Hands() as hands:
            # Detect hands
            results = hands.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

            # Draw hands if we detected any
            if results.multi_hand_landmarks != None:
                for handLandmarks in results.multi_hand_landmarks:
                    drawingModule.draw_landmarks(frame, handLandmarks, handsModule.HAND_CONNECTIONS)

            # Get the frame number from the frame filename
            frame_nr = data["frame"][66:-4]
            processed_frame_name = f'processed_frame_{frame_nr}.png'
            cv2.imwrite(processed_frame_name, frame)
        
        return {'processed_frame': processed_frame_name}

There’s nothing too special happening in this code and it could of course be swapped with any kind of image processing code that you need for your use case.

The input/output definition of this deployment is:

  • Input:
    • frame of type blob
  • Output:
    • processed_frame of type blob

Since we want this frame processor to work through frames in parallel it is very important to set the maximum number of instances of this deployment to a higher number than 1, I used 10.

Compiling the processed video

Now that we have some frame processing code it’s time to look at putting all these processed frames back together to a full video. My code looks like this:

import cv2
import numpy as np

class Deployment:

    def __init__(self, base_directory, context):
        print("Initialising My Deployment")

    def requests(self, data):
        print("Processing request for video_compiler")

        frame_array = []
        input_array = []
        for item in data:
            input_array.append(item['processed_frame'])

        # Sort frames based on frame number in filename
        input_array.sort(key = lambda x: int(x[76:-4]))

        for i in range(len(input_array)):
            # Reading each frame
            img = cv2.imread(input_array[i])
            # Inserting the frames into an image array
            frame_array.append(img)

        height, width, layers = img.shape
        size = (width,height)

        out = cv2.VideoWriter('processed_video.avi',cv2.VideoWriter_fourcc(*'DIVX'), 30, size)

        for i in range(len(frame_array)):
            # Writing each frame to a video
            out.write(frame_array[i])
        out.release()
        
        return {'processed_video': 'processed_video.avi'}

You might notice that I used a function called requests instead of the standard request function. This is because I want to work with a full list of frames and compile them together in one go. If the API detects the requests function and there are subrequests being sent to that deployment, the API will send those subrequests over as a full list, instead of sending one item at a time. If I would use the standard request method, I would only have access to one processed frame in my code, which wouldn’t be very helpful. Right now the passed data parameter is a list of dictionaries, each dictionary containing a processed frame.

The input/output definition of this deployment is:

  • Input:
    • processed_frame of type blob
  • Output:
    • processed_video of type blob

 

Putting everything together in a UbiOps pipeline

Now that we have our three main deployments, it’s time to put them together in a pipeline. We need a pipeline with the following input/output definition:

The input/output definition of this deployment is:

  • Input:
    • video of type blob
  • Output:
    • processed_video of type blob

We can start by going to edit mode and adding all our deployments onto our pipeline canvas.

 

Now that we have all our deployments in, we can start from left to right and add the connections and operators we need as we go. First up is connecting pipeline start to the video-splitter object. After that we can add in the create subrequest operator from the sidebar (you can see the operators by going to the operators tab). Once added to your pipeline you will be prompted to provide a source object, in our case that’s the video-splitter object. You can also adjust the batch size, which dictates how many parallel batches there are. As a rule of thumb you can set the batch size equal to the number of expected items divided by the maximum number of instances. In our case I had about 200 frames, and max instances set to 10, so I used batch size 20.

When you now click save you should see your operator on the canvas, properly connected to the video-splitter You can connect the operator to the frame-processor via drag and drop and then your pipeline should look like this:

 

We also need to collect all the subrequests again before passing the processed frames to the video-compiler. To do this we need to add a collect subrequests operator. When you add it to your canvas you will need to provide a destination object, in this case that’s the video-compiler. When you press save you should see it on your canvas correctly connected to the video-compiler. You just need to add a connection between the frame-processor and the collect subrequests operator still, and a final connection from the video-compiler to pipeline end to also output the processed video properly.

 

With this final step out of the way your pipeline is now complete! Just hit save and exit the editor and it’s ready for use.

Using the pipeline

To use your newly created video processing pipeline you can create a request with a small input video on which you can see hands. I personally created a quick recording with my webcam to test it with and it worked well!

 

I hope this gave you inspiration on what to do with the create and collect subrequests operators. If you have any questions, do not hesitate to reach out to us via email or slack!

Make sure you check out the release notes for the other operators we added.

The full deployment packages for each deployment can be downloaded from here:

 

[/vc_column_text][/vc_column][/vc_row]

Latest news

Turn your AI & ML models into powerful services with UbiOps