How to save video with Inference Pipeline

I am try to implement a custom sink that saves the processed output of my model to a video file, similar to process_video() in the supervision library.

I attempted this by initializing a cv2.VideoWriter object and writing the annotated frames to it in the on_prediction sink, however the output file is empty and cannot be opened.

Without showing my full implementation, this is what my sinks look like:

output_video_path = "/content/out.mp4"
out_fps = 30
fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Codec for MP4
out = cv2.VideoWriter(output_video_path, fourcc, out_fps, (1280, 1920))

detector = VehicleIntrusionDetector()

def on_video_frame(video_frames: List[VideoFrame]) -> List[Any]: 
  images = [v.image for v in video_frames]

  vehicle_results = detector.vehicle_model.predict(images)
  lane_results = detector.lane_model.predict(images)
  
  return list(zip(vehicle_results, lane_results))

def on_prediction(predictions: Union[dict, List[Optional[dict]]], video_frame: Union[VideoFrame, List[Optional[VideoFrame]]]) -> None:
  if not issubclass(type(predictions), list):
      # this is required to support both sequential and batch processing with single code
      # if you use only one mode - you may create function that handles with only one type
      # of input
      predictions = [predictions]
      video_frame = [video_frame]

  for prediction, frame in zip(predictions, video_frame):
    vehicle_detections, plate_detections, license_numbers, lane_detections = detector.process_frame_prediction(prediction[0], prediction[1])
    vehicle_detections = detector.track(vehicle_detections, license_numbers)
    annotated_image = detector.annotate_frame(frame.image, vehicle_detections, plate_detections, license_numbers, lane_detections)
    cv2_imshow(annotated_image)
    
    out.write(annotated_image)

pipeline = InferencePipeline.init_with_custom_logic(
  video_reference="/content/1.mp4",
  on_video_frame=on_video_frame,
  on_prediction=on_prediction,
)

# start the pipeline
pipeline.start()
# wait for the pipeline to finish
pipeline.join()

out.release()

Any ideas on how to fix this?

Am working on a Google Colab notebook.

Hi there,

Here’s a code snippet for running a workflow on a video file and saving the annotated video. I have it running locally, might need to change a few things to run in Google Colab

# Import the InferencePipeline object
from inference import InferencePipeline
import cv2
import numpy as np
import supervision as sv
from typing import List

VIDEO_PATH = "input.mp4"
SAVE_VIDEO_PATH = "output.mp4"

# Set the heatmap as the first frame of the video
video = cv2.VideoCapture(VIDEO_PATH)
ret, ghost_image = video.read()

# Get video properties for writer
frame_width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(video.get(cv2.CAP_PROP_FPS))

video.release()

FPS = 5
count = 0

# Initialize video writer
video_writer = cv2.VideoWriter(
    SAVE_VIDEO_PATH,
    cv2.VideoWriter_fourcc(*"mp4v"),
    FPS,
    (frame_width, frame_height * 2),
)


def my_sink(result, video_frame):
    global count
    count += 1
    if result.get(
        "keypoint_visualization"
    ):  
        frame = result[“output_image”].numpy_image
        video_writer.write(frame)
        # cv2.imshow("Workflow Image", frame)
        # cv2.waitKey(1)
    print("frames processed", count)


# initialize a pipeline object
pipeline = InferencePipeline.init_with_workflow(
    api_key=API_KEY,
    workspace_name=WORKFLOW_NAME,
    workflow_id=WORKFLOW_ID,
    video_reference=VIDEO_PATH, 
    max_fps=FPS,
    on_prediction=my_sink
)
pipeline.start()  # start the pipeline
pipeline.join()  # wait for the pipeline thread to finish

# Release the video writer
video_writer.release()