Hi @Clint_Suson ,
Script below works for me, could you try it out? Itâs uploading a single image into the batch, and starts a batch processing job for it.
Thanks, Erik
import requests
import time
API_KEY = "MY_API_KEY"
WORKSPACE = "my-workspace"
WORKFLOW_ID = "my-workflow"
IMAGE_URL = "https://storage.googleapis.com/com-roboflow-marketing/docs/cars-highway.png"
BASE_STAGING = "https://api.roboflow.com/data-staging/v1/external"
BASE_JOBS = "https://api.roboflow.com/batch-processing/v1/external"
ts = int(time.time())
batch_id = f"my-batch-{ts}"
job_id = f"my-job-{ts}"
# 1. Upload an image to the batch
resp = requests.post(
f"{BASE_STAGING}/{WORKSPACE}/batches/{batch_id}/upload/image",
params={"api_key": API_KEY, "fileName": "cars-highway.png"},
files={"file": ("image.png", requests.get(IMAGE_URL).content, "image/png")},
)
resp.raise_for_status()
time.sleep(3)
# 1.1 Check number of files in batch
resp = requests.get(f"{BASE_STAGING}/{WORKSPACE}/batches/{batch_id}/count", params={"api_key": API_KEY})
resp.raise_for_status()
print("Total files in batch: ", resp.json())
# 2. Create a job
resp = requests.post(
f"{BASE_JOBS}/{WORKSPACE}/jobs/{job_id}",
params={"api_key": API_KEY},
json={
"type": "simple-image-processing-v1",
"jobInput": {
"type": "staging-batch-input-v1",
"batchId": batch_id,
},
"computeConfiguration": {
"type": "compute-configuration-v2",
"machineType": "gpu",
"workersPerMachine": 4,
},
"processingSpecification": {
"type": "workflows-processing-specification-v1",
"workspace": WORKSPACE,
"workflowId": WORKFLOW_ID,
"aggregationFormat": "jsonl",
},
"processingTimeoutSeconds": 3600,
},
)
resp.raise_for_status()
# 3. Wait for the job to complete
while True:
resp = requests.get(f"{BASE_JOBS}/{WORKSPACE}/jobs/{job_id}", params={"api_key": API_KEY})
resp.raise_for_status()
data = resp.json()
job = data.get("job", data)
if job.get("isTerminal", False):
break
time.sleep(60)
# 4. Download results
notification = job.get("lastNotification", {})
results_batches = notification.get("resultsBatches", [])
for result_batch_id in results_batches:
resp = requests.get(f"{BASE_STAGING}/{WORKSPACE}/batches/{result_batch_id}/parts", params={"api_key": API_KEY})
parts = resp.json() if resp.ok else []
if not parts:
parts = [None]
for part in parts:
part_name = part if isinstance(part, str) else (part.get("partName") if isinstance(part, dict) else None)
params = {"api_key": API_KEY}
if part_name:
params["partName"] = part_name
resp = requests.get(f"{BASE_STAGING}/{WORKSPACE}/batches/{result_batch_id}/list", params=params)
resp.raise_for_status()
files = resp.json()
items = files if isinstance(files, list) else files.get("filesMetadata", files.get("files", files.get("items", [])))
for f in items:
download_url = f.get("downloadURL") or f.get("url")
name = f.get("fileName", "result")
resp = requests.get(download_url)
resp.raise_for_status()
with open(name, "wb") as fh:
fh.write(resp.content)