Hi @Clint_Suson , I just asked claude code to create py script to run batch processing on some image (URL), and to follow https://docs.roboflow.com/deploy/batch-processing/api-reference\ docs. Seems like itâs working as expected, could you try that?
Created script can be found below, with terminal output & platform screenshots.
python3 batch_workflow.py
=== Batch Workflow Job ===
Workspace: my-workspace
Workflow: lpr-workflow
Batch ID: lpr-bench-1774875695
Job ID: lpr-job-1774875695
[1/5] Uploading image...
Uploaded image to batch 'lpr-bench-1774875695': 200
[2/5] Verifying batch count...
Batch 'lpr-bench-1774875695' item count: {'status': 'ok', 'count': 1}
[3/5] Starting batch processing job...
Started job 'lpr-job-1774875695': {'status': 'ok'}
[4/5] Polling job status...
Job 'lpr-job-1774875695' status: ok progress: ?
Job 'lpr-job-1774875695' status: ok progress: ?
.....
[not sure if polling makes sense here]
Hereâs the batch job in the app.roboflow.com:
Hereâs the script that claude code produced:
import requests
import time
import sys
API_KEY = "MY_API_KEY" # Replace with your API Key
WORKSPACE = "my-workspace-id" # Replace with your workspace
WORKFLOW_ID = "lpr-workflow" # Replace with your workflow id
IMAGE_URL = "https://storage.googleapis.com/com-roboflow-marketing/docs/cars-highway.png"
BASE_STAGING = "https://api.roboflow.com/data-staging/v1/external"
BASE_JOBS = "https://api.roboflow.com/batch-processing/v1/external"
def upload_image(batch_id: str) -> None:
"""Upload a single image URL to the batch via single-image endpoint."""
url = f"{BASE_STAGING}/{WORKSPACE}/batches/{batch_id}/upload/image"
resp = requests.post(
url,
params={"api_key": API_KEY, "fileName": "cars-highway.png"},
files={"file": ("cars-highway.png", requests.get(IMAGE_URL).content, "image/png")},
)
resp.raise_for_status()
print(f"Uploaded image to batch '{batch_id}': {resp.status_code}")
def get_batch_count(batch_id: str) -> int:
url = f"{BASE_STAGING}/{WORKSPACE}/batches/{batch_id}/count"
resp = requests.get(url, params={"api_key": API_KEY})
resp.raise_for_status()
count = resp.json()
print(f"Batch '{batch_id}' item count: {count}")
return count
def start_job(job_id: str, batch_id: str) -> dict:
url = f"{BASE_JOBS}/{WORKSPACE}/jobs/{job_id}"
body = {
"type": "simple-image-processing-v1",
"jobInput": {
"type": "staging-batch-input-v1",
"batchId": batch_id,
},
"computeConfiguration": {
"type": "compute-configuration-v2",
"machineType": "gpu",
"workersPerMachine": 4,
},
"processingSpecification": {
"type": "workflows-processing-specification-v1",
"workspace": WORKSPACE,
"workflowId": WORKFLOW_ID,
"aggregationFormat": "jsonl",
},
"processingTimeoutSeconds": 3600,
}
resp = requests.post(url, params={"api_key": API_KEY}, json=body)
resp.raise_for_status()
data = resp.json()
print(f"Started job '{job_id}': {data}")
return data
def get_job_status(job_id: str) -> dict:
url = f"{BASE_JOBS}/{WORKSPACE}/jobs/{job_id}"
resp = requests.get(url, params={"api_key": API_KEY})
resp.raise_for_status()
return resp.json()
def poll_until_done(job_id: str, interval: int = 5) -> dict:
while True:
status = get_job_status(job_id)
job_status = status.get("jobStatus", status.get("status", "unknown"))
progress = status.get("progress", "?")
print(f" Job '{job_id}' status: {job_status} progress: {progress}")
if job_status in ("completed", "failed"):
return status
time.sleep(interval)
def get_job_stages(job_id: str) -> list:
url = f"{BASE_JOBS}/{WORKSPACE}/jobs/{job_id}/stages"
resp = requests.get(url, params={"api_key": API_KEY})
resp.raise_for_status()
return resp.json()
def list_output_files(batch_id: str) -> list:
url = f"{BASE_STAGING}/{WORKSPACE}/batches/{batch_id}/list"
resp = requests.get(url, params={"api_key": API_KEY})
resp.raise_for_status()
return resp.json()
def download_results(batch_id: str) -> None:
files = list_output_files(batch_id)
items = files if isinstance(files, list) else files.get("files", files.get("items", []))
for f in items:
download_url = f.get("downloadURL") or f.get("url")
name = f.get("fileName", "result")
print(f" Downloading {name} ...")
resp = requests.get(download_url)
resp.raise_for_status()
with open(name, "wb") as fh:
fh.write(resp.content)
print(f" Saved {name} ({len(resp.content)} bytes)")
def main():
ts = int(time.time())
batch_id = f"lpr-bench-{ts}"
job_id = f"lpr-job-{ts}"
print(f"=== Batch Workflow Job ===")
print(f"Workspace: {WORKSPACE}")
print(f"Workflow: {WORKFLOW_ID}")
print(f"Batch ID: {batch_id}")
print(f"Job ID: {job_id}")
print()
# 1. Upload image
print("[1/5] Uploading image...")
upload_image(batch_id)
print()
# 2. Verify count
print("[2/5] Verifying batch count...")
count = get_batch_count(batch_id)
if count == 0:
print("WARNING: batch count is 0, waiting for indexing...")
time.sleep(5)
count = get_batch_count(batch_id)
print()
# 3. Start job
print("[3/5] Starting batch processing job...")
start_job(job_id, batch_id)
print()
# 4. Poll for completion
print("[4/5] Polling job status...")
final = poll_until_done(job_id)
print(f"\nFinal status: {final}")
print()
# 5. Get results
print("[5/5] Fetching results...")
stages = get_job_stages(job_id)
print(f"Stages: {stages}")
for stage in (stages if isinstance(stages, list) else stages.get("stages", [])):
output_batch = stage.get("outputBatchId") or stage.get("outputBatch")
if output_batch:
print(f"\nOutput batch for stage '{stage.get('stageId', '?')}': {output_batch}")
download_results(output_batch)
print("\nDone!")
if __name__ == "__main__":
main()