Complete Basler Integration

Hi,

I read your article: How to Set Up a Basler Camera on a Jetson.

I would be curious to see how the inference workflow is.

I know my integration works but I would like a consultancy on that if this is recommended.

What I especially dont like is the conversion to frames instead of using the roboflow streaming api. (No i do not want to use gstreamer)

# camera.py
'''
A simple Program for grabing video from basler camera and converting it to opencv img.
Tested on Basler acA1300-200uc (USB3, linux 64bit , python 3.5)

'''
from pypylon import pylon
import cv2
import time
from annotate import infer

# conecting to the first available camera
camera = pylon.InstantCamera(pylon.TlFactory.GetInstance().CreateFirstDevice())

# Grabing Continusely (video) with minimal delay
camera.StartGrabbing(pylon.GrabStrategy_LatestImageOnly)
converter = pylon.ImageFormatConverter()

# converting to opencv bgr format
converter.OutputPixelFormat = pylon.PixelType_BGR8packed
converter.OutputBitAlignment = pylon.OutputBitAlignment_MsbAligned

while camera.IsGrabbing():
    start_time = time.time()  # start time of the loop

    ########################
    # your fancy code here #
    ########################
    grabResult = camera.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException)

    if grabResult.GrabSucceeded():
        # Access the image data
        image = converter.Convert(grabResult)
        img = image.GetArray()

        #  annotation integration
        img = infer(img)

        cv2.namedWindow('title', cv2.WINDOW_NORMAL)
        cv2.imshow('title', img)

        print("FPS: ", round(1.0 / (time.time() - start_time),2))  # FPS = 1 / time to process loop
        if cv2.waitKey(1) == ord('q'):
            break

    grabResult.Release()

# Releasing the resource
camera.StopGrabbing()

cv2.destroyAllWindows()

# annotate.py
import supervision as sv
import cv2
import json
from inference import get_model
from inference_sdk import InferenceHTTPClient


# Infer via the Roboflow Infer API and return the result
def infer(img: cv2.typing.MatLike) -> cv2.typing.MatLike:
    with open('roboflow_config.json') as f:
        config = json.load(f)

        ROBOFLOW_API_KEY = config["ROBOFLOW_API_KEY"]
        ROBOFLOW_MODEL = config["ROBOFLOW_MODEL"]
        ROBOFLOW_SIZE = config["ROBOFLOW_SIZE"]

        FRAMERATE = config["FRAMERATE"]
        BUFFER = config["BUFFER"]

    # local inference
    client = InferenceHTTPClient(
        api_url="http://localhost:9001",
        api_key=ROBOFLOW_API_KEY,
    )
    results = client.infer(img, model_id=ROBOFLOW_MODEL)

    # remote inference (slower)
    # model = get_model(ROBOFLOW_MODEL, ROBOFLOW_API_KEY)
    # results = model.infer(img)[0]

    detections = sv.Detections.from_inference(results)
    if len(detections) == 0:
        return img

    box_annotator = sv.BoxAnnotator()
    label_annotator = sv.LabelAnnotator()
    labels = [
        f"{class_name} {confidence:.2f}"
        for class_name, confidence
        in zip(detections['class_name'], detections.confidence)
    ]

    annotated_image = box_annotator.annotate(
        scene=img, detections=detections)
    annotated_image = label_annotator.annotate(
        scene=annotated_image, detections=detections, labels=labels)

    return annotated_image


if __name__ == '__main__':
    # Main loop; infers sequentially until you press "q"
    while 1:
        # On "q" keypress, exit
        if cv2.waitKey(1) == ord('q'):
            break

        img = cv2.imread("YOUR_IMAGE.jpg")
        # Synchronously get a prediction from the Roboflow Infer API
        annotated = infer(img)
        # predict on a local image

        # And display the inference results
        cv2.imshow('image', annotated)

        # Release resources when finished
    cv2.destroyAllWindows()

Hi Claus!

Happy to assist here. The way you are grabbing the frames from the Basler camera and converting them like below looks good to me.

And then you’re calling infer() for that frame.

Are you asking specifically on how to do this without having to have an infer() call for each frame and rather use Roboflow Inference pipeline?

Thanks!

Reed

Hi Reed,

Are you asking specifically on how to do this without having to have an infer() call for each frame and rather use Roboflow Inference pipeline?

exactly. For 2 Reasons:

  • I am actually working with the live-stream inference (like surveilance cam etc.) so working with frames seems to be a bad design
  • I was hoping for stream optimization on your end (async frame buffering, parallization etc.)

cheers,
Claus