Complete Basler Integration

Hi,

I read your article: How to Set Up a Basler Camera on a Jetson.

I would be curious to see how the inference workflow is.

I know my integration works but I would like a consultancy on that if this is recommended.

What I especially dont like is the conversion to frames instead of using the roboflow streaming api. (No i do not want to use gstreamer)

# camera.py
'''
A simple Program for grabing video from basler camera and converting it to opencv img.
Tested on Basler acA1300-200uc (USB3, linux 64bit , python 3.5)

'''
from pypylon import pylon
import cv2
import time
from annotate import infer

# conecting to the first available camera
camera = pylon.InstantCamera(pylon.TlFactory.GetInstance().CreateFirstDevice())

# Grabing Continusely (video) with minimal delay
camera.StartGrabbing(pylon.GrabStrategy_LatestImageOnly)
converter = pylon.ImageFormatConverter()

# converting to opencv bgr format
converter.OutputPixelFormat = pylon.PixelType_BGR8packed
converter.OutputBitAlignment = pylon.OutputBitAlignment_MsbAligned

while camera.IsGrabbing():
    start_time = time.time()  # start time of the loop

    ########################
    # your fancy code here #
    ########################
    grabResult = camera.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException)

    if grabResult.GrabSucceeded():
        # Access the image data
        image = converter.Convert(grabResult)
        img = image.GetArray()

        #  annotation integration
        img = infer(img)

        cv2.namedWindow('title', cv2.WINDOW_NORMAL)
        cv2.imshow('title', img)

        print("FPS: ", round(1.0 / (time.time() - start_time),2))  # FPS = 1 / time to process loop
        if cv2.waitKey(1) == ord('q'):
            break

    grabResult.Release()

# Releasing the resource
camera.StopGrabbing()

cv2.destroyAllWindows()

# annotate.py
import supervision as sv
import cv2
import json
from inference import get_model
from inference_sdk import InferenceHTTPClient


# Infer via the Roboflow Infer API and return the result
def infer(img: cv2.typing.MatLike) -> cv2.typing.MatLike:
    with open('roboflow_config.json') as f:
        config = json.load(f)

        ROBOFLOW_API_KEY = config["ROBOFLOW_API_KEY"]
        ROBOFLOW_MODEL = config["ROBOFLOW_MODEL"]
        ROBOFLOW_SIZE = config["ROBOFLOW_SIZE"]

        FRAMERATE = config["FRAMERATE"]
        BUFFER = config["BUFFER"]

    # local inference
    client = InferenceHTTPClient(
        api_url="http://localhost:9001",
        api_key=ROBOFLOW_API_KEY,
    )
    results = client.infer(img, model_id=ROBOFLOW_MODEL)

    # remote inference (slower)
    # model = get_model(ROBOFLOW_MODEL, ROBOFLOW_API_KEY)
    # results = model.infer(img)[0]

    detections = sv.Detections.from_inference(results)
    if len(detections) == 0:
        return img

    box_annotator = sv.BoxAnnotator()
    label_annotator = sv.LabelAnnotator()
    labels = [
        f"{class_name} {confidence:.2f}"
        for class_name, confidence
        in zip(detections['class_name'], detections.confidence)
    ]

    annotated_image = box_annotator.annotate(
        scene=img, detections=detections)
    annotated_image = label_annotator.annotate(
        scene=annotated_image, detections=detections, labels=labels)

    return annotated_image


if __name__ == '__main__':
    # Main loop; infers sequentially until you press "q"
    while 1:
        # On "q" keypress, exit
        if cv2.waitKey(1) == ord('q'):
            break

        img = cv2.imread("YOUR_IMAGE.jpg")
        # Synchronously get a prediction from the Roboflow Infer API
        annotated = infer(img)
        # predict on a local image

        # And display the inference results
        cv2.imshow('image', annotated)

        # Release resources when finished
    cv2.destroyAllWindows()

Hi Claus!

Happy to assist here. The way you are grabbing the frames from the Basler camera and converting them like below looks good to me.

And then you’re calling infer() for that frame.

Are you asking specifically on how to do this without having to have an infer() call for each frame and rather use Roboflow Inference pipeline?

Thanks!

Reed

Hi Reed,

Are you asking specifically on how to do this without having to have an infer() call for each frame and rather use Roboflow Inference pipeline?

exactly. For 2 Reasons:

  • I am actually working with the live-stream inference (like surveilance cam etc.) so working with frames seems to be a bad design
  • I was hoping for stream optimization on your end (async frame buffering, parallization etc.)

cheers,
Claus

Hey @Reed-Johnson,

I’m facing a similar problem - I would like to use the Roboflow Inference Pipeline, but it doesn’t seem to support the image capture from a USB3.0 Basler camera (specifically using daa1440-220uc)

From what I understand, the video_refence effectively takes inputs like the cv2.VideoCapture function to initiate the inference pipeline, but that doesn’t seem to be compatible with taking inputs from the Pypylon continuous grabs.

Do you have any documents or guidance on how to do this? The only options I’ve seen are GStreamer to create a local RSTP feed or switching to a different Basler camera to set up an IP address and local network.

We do have a Basler integration into InferencePipeline as part of our Inference for Manufacturing package; it’s a part of our enterprise plans. Please reach out to our sales team for a demo.

Perfect, thanks Brad - I’ll reach out to sales.