Hi,
I read your article: How to Set Up a Basler Camera on a Jetson.
I would be curious to see how the inference workflow is.
I know my integration works but I would like a consultancy on that if this is recommended.
What I especially dont like is the conversion to frames instead of using the roboflow streaming api. (No i do not want to use gstreamer)
# camera.py
'''
A simple Program for grabing video from basler camera and converting it to opencv img.
Tested on Basler acA1300-200uc (USB3, linux 64bit , python 3.5)
'''
from pypylon import pylon
import cv2
import time
from annotate import infer
# conecting to the first available camera
camera = pylon.InstantCamera(pylon.TlFactory.GetInstance().CreateFirstDevice())
# Grabing Continusely (video) with minimal delay
camera.StartGrabbing(pylon.GrabStrategy_LatestImageOnly)
converter = pylon.ImageFormatConverter()
# converting to opencv bgr format
converter.OutputPixelFormat = pylon.PixelType_BGR8packed
converter.OutputBitAlignment = pylon.OutputBitAlignment_MsbAligned
while camera.IsGrabbing():
start_time = time.time() # start time of the loop
########################
# your fancy code here #
########################
grabResult = camera.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException)
if grabResult.GrabSucceeded():
# Access the image data
image = converter.Convert(grabResult)
img = image.GetArray()
# annotation integration
img = infer(img)
cv2.namedWindow('title', cv2.WINDOW_NORMAL)
cv2.imshow('title', img)
print("FPS: ", round(1.0 / (time.time() - start_time),2)) # FPS = 1 / time to process loop
if cv2.waitKey(1) == ord('q'):
break
grabResult.Release()
# Releasing the resource
camera.StopGrabbing()
cv2.destroyAllWindows()
# annotate.py
import supervision as sv
import cv2
import json
from inference import get_model
from inference_sdk import InferenceHTTPClient
# Infer via the Roboflow Infer API and return the result
def infer(img: cv2.typing.MatLike) -> cv2.typing.MatLike:
with open('roboflow_config.json') as f:
config = json.load(f)
ROBOFLOW_API_KEY = config["ROBOFLOW_API_KEY"]
ROBOFLOW_MODEL = config["ROBOFLOW_MODEL"]
ROBOFLOW_SIZE = config["ROBOFLOW_SIZE"]
FRAMERATE = config["FRAMERATE"]
BUFFER = config["BUFFER"]
# local inference
client = InferenceHTTPClient(
api_url="http://localhost:9001",
api_key=ROBOFLOW_API_KEY,
)
results = client.infer(img, model_id=ROBOFLOW_MODEL)
# remote inference (slower)
# model = get_model(ROBOFLOW_MODEL, ROBOFLOW_API_KEY)
# results = model.infer(img)[0]
detections = sv.Detections.from_inference(results)
if len(detections) == 0:
return img
box_annotator = sv.BoxAnnotator()
label_annotator = sv.LabelAnnotator()
labels = [
f"{class_name} {confidence:.2f}"
for class_name, confidence
in zip(detections['class_name'], detections.confidence)
]
annotated_image = box_annotator.annotate(
scene=img, detections=detections)
annotated_image = label_annotator.annotate(
scene=annotated_image, detections=detections, labels=labels)
return annotated_image
if __name__ == '__main__':
# Main loop; infers sequentially until you press "q"
while 1:
# On "q" keypress, exit
if cv2.waitKey(1) == ord('q'):
break
img = cv2.imread("YOUR_IMAGE.jpg")
# Synchronously get a prediction from the Roboflow Infer API
annotated = infer(img)
# predict on a local image
# And display the inference results
cv2.imshow('image', annotated)
# Release resources when finished
cv2.destroyAllWindows()