Process Picamera2 stream in `inference` with `workflows`

Hello Support Team Roboflow,

i am new Student who want to try Image Recognition using Rasberry PI i completed all the Training for the Modules and try to Deploy it into the Rasberry pi by copying the code give to me for video below

Import the InferencePipeline object

from inference import InferencePipeline
import cv2

def my_sink(result, video_frame):
if result.get(“output_image”): # Display an image from the workflow response
cv2.imshow(“Workflow Image”, result[“output_image”].numpy_image)
cv2.waitKey(1)
print(result) # do something with the predictions of each frame

initialize a pipeline object

pipeline = InferencePipeline.init_with_workflow(
api_key=“Deleted as it is private”,
workspace_name=“engineering-k6o7f”,
workflow_id=“create-and-run-example”,
video_reference=0, # Path to video, device id (int, usually 0 for built in webcams), or RTSP stream url
max_fps=30,
on_prediction=my_sink
)
pipeline.start() #start the pipeline
pipeline.join() #wait for the pipeline thread to finish

but i am getting the error below which is mention the “WARN:0@8.419] global cap_v4l.cpp:1938 getProperty VIDEOIO(V4L2:/dev/video0): Unable to get camera FPS” idk why or how to fix it, i try multiple way but still nothing happen error below

i make Sure that the Hardware is fully working from Camera and raspberry pi given it the enough volt and amps but it still showing me the same error what is the best way to solve this issue or maybe i am missing some software to installed before running the code ?

and also the webcam is working when i test it and it taking photos but when i run the above code it not showing so i was saying maybe if i solved the above issue the webcam will automatically will pop-out

Universe public : Material Segregation Object Detection Dataset and Pre-Trained Model by Engineering
project type: Object detection
OS/browser: Raspberry PI 5
Camera used: pi camera module 3

Regards
Selim

Hi Selim,

Can you try to run the following and confirm if you get the same cap_v4 warning:

import cv2


cap = cv2.VideoCapture(0)

if not cap.isOpened():
    print("Error: Could not open camera")
    exit()

fps = cap.get(cv2.CAP_PROP_FPS)
print(f"FPS: {fps}")

cap.release()

Thanks,
Grzegorz

Dear Mr.Grzegorz

Thanks a lot for your fast Response

Yes it is showing the same error and this time shows the FPS:-1.0 (screen shot below)

Just FYI this is new Raspberry i just done all step up yesterday morning in my time

Thanks,

I don’t have RPI5 on my desk so can only speculate - can you confirm if this issue and this issue are related to your problem?

In RPI5 documentation authors mention some OS-level tools (like rpicam-vid), and also using libcamera. Based on google search it seems libcamera is not yet supported by opencv as backend.

I think you have below two options (there might be more so please do not consider below list to exhaust the subject):

  1. use non-opencv package (i.e. python3-picamera2 suggested by rpi5 docs) and write frame producer (I think this option would be preferable since you would have all frame handling managed from the script) (also see frame producer base class and example of frame producer)
  2. use rpicam-vid to read frames, and if it supports passing stream to stdout - pipe that output to your python program; add frame producer for handling frames that you can then plug to inference pipeline

Dear Mr.Grzegorz,

i try my best to understand that but what i just got that i will need to implement [picamera2] on my code as it is the more compatible with it and run .

Actually the following code run perfectly.

import cv2
from picamera2 import Picamera2
from ultralytics import YOLO

# Set up the camera with Picam
picam2 = Picamera2()
picam2.preview_configuration.main.size = (1280, 1280)
picam2.preview_configuration.main.format = "RGB888"
picam2.preview_configuration.align()
picam2.configure("preview")
picam2.start()

# Load YOLOv8
model = YOLO("yolov8n.pt")

while True:
    # Capture a frame from the camera
    frame = picam2.capture_array()
    
    # Run YOLO model on the captured frame and store the results
    results = model(frame)
    
    # Output the visual detection data, we will draw this on our camera preview window
    annotated_frame = results[0].plot()
    
    # Get inference time
    inference_time = results[0].speed['inference']
    fps = 1000 / inference_time  # Convert to milliseconds
    text = f'FPS: {fps:.1f}'

    # Define font and position
    font = cv2.FONT_HERSHEY_SIMPLEX
    text_size = cv2.getTextSize(text, font, 1, 2)[0]
    text_x = annotated_frame.shape[1] - text_size[0] - 10  # 10 pixels from the right
    text_y = text_size[1] + 10  # 10 pixels from the top

    # Draw the text on the annotated frame
    cv2.putText(annotated_frame, text, (text_x, text_y), font, 1, (255, 255, 255), 2, cv2.LINE_AA)

    # Display the resulting frame
    cv2.imshow("Camera", annotated_frame)

    # Exit the program if q is pressed
    if cv2.waitKey(1) == ord("q"):
        break

# Close all windows
cv2.destroyAllWindows()

which if iam not mistaken use picam2 but i want roboflow code as it have custom data train unlike this pre-train Yolov8 well i will try to read more cuz till now still nothing working in my favor

Nice!

OK, let me provide you with something you can use as starting point:

from functools import partial
from typing import Any, Dict, Optional, Tuple, Union

import cv2 as cv
from inference.core.interfaces.camera.entities import (
    SourceProperties,
    VideoFrame,
    VideoFrameProducer,
)
import numpy as np
from picamera2 import Picamera2
import supervision as sv

from inference.core.interfaces.stream.inference_pipeline import InferencePipeline


class Picamera2FrameProducer(VideoFrameProducer):
    def __init__(
        self,
    ):
        self._camera = Picamera2()
        self._camera.preview_configuration.main.size = (1280, 1280)
        self._camera.preview_configuration.main.format = "RGB888"
        self._camera.preview_configuration.align()
        self._camera.configure("preview")
        self._camera.start()

    def grab(self) -> bool:
        status, _ = self._camera.capture_metadata()
        return status

    def retrieve(self) -> Tuple[bool, Optional[np.ndarray]]:
        status, frame = self._camera.capture_array()

        return status, frame

    def release(self):
        self._camera.close()

    def isOpened(self) -> bool:
        return self._camera.is_open

    def discover_source_properties(self) -> SourceProperties:
        status, frame = self._camera.capture_array()
        if not status:
            return False, None

        h, w, *_ = frame.shape

        return SourceProperties(
            width=w,
            height=h,
            total_frames=-1,
            is_file=False,
            fps=1,  # I have no RPI5 on my desk so can't test how to obtain fps
            is_reconnectable=False,
        )

    def initialize_source_properties(self, properties: Dict[str, float]):
        pass


picamera2_producer = partial(
    Picamera2FrameProducer,
)


box_annotator = sv.BoundingBoxAnnotator()
label_annotator = sv.LabelAnnotator()


def my_sink(result, video_frame):
    if result.get("output_image"):
        cv.imshow("Workflow Image", result["output_image"].numpy_image)
        cv.waitKey(1)
        print(result)



inference_pipeline = InferencePipeline.init_with_workflow(
    api_key="Deleted as it is private",
    workspace_name="engineering-k6o7f",
    workflow_id="create-and-run-example",
    video_reference=picamera2_producer,
    max_fps=30,
    on_prediction=my_sink
)

inference_pipeline.start()
inference_pipeline.join()

Above example should stream frames from your camera to inference pipeline, you might need to work on it more since I wrote it without access to RPI5 :slight_smile:

Ps. I allowed myself to update the title of this issue to make it easier for other people to find, hope it’s OK

Hope this helps,
Grzegorz

Dear Mr.Grzegorz

i used your code which is the following and i got the following error

i work past that till i got to the following code

from functools import partial
from typing import Any, Dict, Optional, Tuple
import cv2 as cv
from inference.core.interfaces.camera.entities import (
    SourceProperties,
    VideoFrameProducer,
)
import numpy as np
from picamera2 import Picamera2
import supervision as sv
from inference.core.interfaces.stream.inference_pipeline import InferencePipeline


class Picamera2FrameProducer(VideoFrameProducer):
    def __init__(self):
        # Initialize Picamera2
        self._camera = Picamera2()
        self._camera.preview_configuration.main.size = (1280, 1280)
        self._camera.preview_configuration.main.format = "RGB888"
        self._camera.preview_configuration.align()
        self._camera.configure("preview")
        self._camera.start()

    def grab(self) -> bool:
        status, _ = self._camera.capture_metadata()
        return status

    def retrieve(self) -> Tuple[bool, Optional[np.ndarray]]:
        try:
            frame = self._camera.capture_array()
            return True, frame
        except Exception as e:
            print(f"Failed to capture frame: {e}")
            return False, None

    def release(self):
        self._camera.close()

    def isOpened(self) -> bool:
        return self._camera.is_open

    def discover_source_properties(self) -> SourceProperties:
        try:
            frame = self._camera.capture_array()
            h, w, *_ = frame.shape
            fps = 30  # Default FPS for now, as Picamera2 FPS query might differ
            return SourceProperties(
                width=w,
                height=h,
                total_frames=-1,
                is_file=False,
                fps=fps,
                is_reconnectable=False,
            )
        except Exception as e:
            raise RuntimeError(f"Error discovering source properties: {e}")

    def initialize_source_properties(self, properties: Dict[str, float]):
        pass


# Partial producer for Picamera2
picamera2_producer = partial(Picamera2FrameProducer)

# Annotators
box_annotator = sv.BoundingBoxAnnotator()
label_annotator = sv.LabelAnnotator()


# Custom sink to handle predictions
def my_sink(result, video_frame):
    if result.get("output_image"):
        # Display output image
        cv.imshow("Workflow Image", result["output_image"].numpy_image)
        cv.waitKey(1)
        print(result)


def main():
    # Replace with your API details
    api_key = "XD"
    workspace_name = "engineering-k6o7f"
    workflow_id = "XD"

    try:
        # Initialize InferencePipeline
        inference_pipeline = InferencePipeline.init_with_workflow(
            api_key=api_key,
            workspace_name=workspace_name,
            workflow_id=workflow_id,
            video_reference=picamera2_producer,
            max_fps=30,
            on_prediction=my_sink
        )

        print("Inference pipeline started successfully.")
        inference_pipeline.start()
        inference_pipeline.join()

    except Exception as e:
        print(f"Error running inference pipeline: {e}")
    finally:
        # Cleanup
        cv.destroyAllWindows()


if __name__ == "__main__":
    main()

it Showed me the following Error but this is i think it related to the server connection Right ?

as from the workflow the local server seems down is that reason why it cuzing this issues ?

and regarding the change in Title it up to you i guess u can do that so more ppl can search easily about there issues. :grin:

Regards
Selim

Hi Selim,

The error you got is due to inputs not passed to block. One block requires input that is expected to be a string. So, you either had not linked the block with the appropriate value or you did not provide the parameter through workflows_parameters in init_with_workflow. Without seeing the workflow it’s difficult to guess the exact problem, but please check if i.e. you filled in all model ID input boxes, and any other box marked as compulsory.

Hope this helps,
Grzegorz

Dear Grzegroz,

Thanks you for your comment i done it. it is Working now it was my bad on the server i fix it, and now it seem everything is working even the below Code

from picamera2 import Picamera2
import cv2
from roboflow import Roboflow
import os
# Initialize Picamera2
picam2 = Picamera2()
# Configure the camera to capture video (for webcam-like functionality)
picam2.configure(picam2.create_video_configuration())
# Start the camera
picam2.start()
# Initialize Roboflow with your API key
rf = Roboflow(api_key=os.getenv("ROBOFLOW_API_KEY"))
# Access the specific project and model version
workspace = rf.workspace("engineering-k6o7f")
project = workspace.project("material-segregation")
model = project.version(1).model
# Create a window to display the webcam feed
cv2.namedWindow("Webcam Feed - Material Segregation", cv2.WINDOW_NORMAL)
while True:
    # Capture a frame from the camera
    frame = picam2.capture_array()
    # Check if the frame was successfully captured
    if frame is not None:
        # Convert the frame from RGB to BGR (OpenCV uses BGR)
        frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
        # Perform inference on the current frame using the Roboflow model
        predictions = model.predict(frame_bgr, confidence=40, overlap=30).json()
        # Print out the predictions to inspect the structure
        print(predictions)
        # Draw bounding boxes and labels on the frame
        for prediction in predictions["predictions"]:
            print(prediction)  # Print each prediction to check its structure
            
            # Check which keys are available in the prediction
            if "x" in prediction and "y" in prediction and "width" in prediction and "height" in prediction:
                x1, y1 = prediction["x"], prediction["y"]
                width, height = prediction["width"], prediction["height"]
                x2, y2 = x1 + width, y1 + height
            else:
                # Handle alternative formats (e.g., xmin, ymin, xmax, ymax)
                x1, y1, x2, y2 = prediction["xmin"], prediction["ymin"], prediction["xmax"], prediction["ymax"]
            # Print bounding box coordinates to check the types and values
            print(f"Bounding Box: x1={x1}, y1={y1}, x2={x2}, y2={y2}")
            # Ensure that the values are integers before passing to OpenCV
            x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
            label = prediction["class"]
            confidence = prediction["confidence"]
            # Draw bounding box and label on image
            cv2.rectangle(frame_bgr, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(frame_bgr, f"{label} ({confidence*100:.2f}%)", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
        # Display the resulting frame with predictions
        cv2.imshow("Webcam Feed - Material Segregation", frame_bgr)
    # Break the loop if the user presses the 'q' key
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
# Release the camera and close all OpenCV windows
picam2.stop()
cv2.destroyAllWindows()

This code run well and even starting my Custom object detection but only small thing which is that the FPS is very low 1.5 idk how to fix that other then lower the size of pic but thanks this is my 1st train model which is working in Raspberry Pi 5 with Model 3 Camera

Regards
Selim

1 Like

Hi Selim,

Great to hear the solution works!

However I see you are not using the frame producer approach - with this approach inference would selectively drop some frames without even unpacking them and that should result in good fps (at cost of not all frames processed, but there are some tradeoffs that need to be taken when running on slower hardware). This approach should work even without workflows, inference pipeline can run on pure model too - replace init_with_workflow with something like below

inference_pipeline = InferencePipeline.init(
    video_reference= picamera2_producer,
    model_id="your model",
    on_prediction=my_sink,
)

Also, I would encourage you to look into your workflow definition, I looked into it and I see you have second step inference using classification model and in there model ID was missing, I think that was the reason why your workflow was failing.

Hope this helps,
Grzegorz

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.