Process Picamera2 stream in `inference` with `workflows`

Dear Mr.Grzegorz

i used your code which is the following and i got the following error

i work past that till i got to the following code

from functools import partial
from typing import Any, Dict, Optional, Tuple
import cv2 as cv
from inference.core.interfaces.camera.entities import (
    SourceProperties,
    VideoFrameProducer,
)
import numpy as np
from picamera2 import Picamera2
import supervision as sv
from inference.core.interfaces.stream.inference_pipeline import InferencePipeline


class Picamera2FrameProducer(VideoFrameProducer):
    def __init__(self):
        # Initialize Picamera2
        self._camera = Picamera2()
        self._camera.preview_configuration.main.size = (1280, 1280)
        self._camera.preview_configuration.main.format = "RGB888"
        self._camera.preview_configuration.align()
        self._camera.configure("preview")
        self._camera.start()

    def grab(self) -> bool:
        status, _ = self._camera.capture_metadata()
        return status

    def retrieve(self) -> Tuple[bool, Optional[np.ndarray]]:
        try:
            frame = self._camera.capture_array()
            return True, frame
        except Exception as e:
            print(f"Failed to capture frame: {e}")
            return False, None

    def release(self):
        self._camera.close()

    def isOpened(self) -> bool:
        return self._camera.is_open

    def discover_source_properties(self) -> SourceProperties:
        try:
            frame = self._camera.capture_array()
            h, w, *_ = frame.shape
            fps = 30  # Default FPS for now, as Picamera2 FPS query might differ
            return SourceProperties(
                width=w,
                height=h,
                total_frames=-1,
                is_file=False,
                fps=fps,
                is_reconnectable=False,
            )
        except Exception as e:
            raise RuntimeError(f"Error discovering source properties: {e}")

    def initialize_source_properties(self, properties: Dict[str, float]):
        pass


# Partial producer for Picamera2
picamera2_producer = partial(Picamera2FrameProducer)

# Annotators
box_annotator = sv.BoundingBoxAnnotator()
label_annotator = sv.LabelAnnotator()


# Custom sink to handle predictions
def my_sink(result, video_frame):
    if result.get("output_image"):
        # Display output image
        cv.imshow("Workflow Image", result["output_image"].numpy_image)
        cv.waitKey(1)
        print(result)


def main():
    # Replace with your API details
    api_key = "XD"
    workspace_name = "engineering-k6o7f"
    workflow_id = "XD"

    try:
        # Initialize InferencePipeline
        inference_pipeline = InferencePipeline.init_with_workflow(
            api_key=api_key,
            workspace_name=workspace_name,
            workflow_id=workflow_id,
            video_reference=picamera2_producer,
            max_fps=30,
            on_prediction=my_sink
        )

        print("Inference pipeline started successfully.")
        inference_pipeline.start()
        inference_pipeline.join()

    except Exception as e:
        print(f"Error running inference pipeline: {e}")
    finally:
        # Cleanup
        cv.destroyAllWindows()


if __name__ == "__main__":
    main()

it Showed me the following Error but this is i think it related to the server connection Right ?

as from the workflow the local server seems down is that reason why it cuzing this issues ?

and regarding the change in Title it up to you i guess u can do that so more ppl can search easily about there issues. :grin:

Regards
Selim