"message":"\"value\" does not match response from creating a batch job via REST-API

IDescribe your question/issue here! (delete this when you post)

I’m trying to submit a batch processing job to execute my workflow with the following POST request: curl ‘https://api.roboflow.com/batch-processing/v1/external/hostview/jobs/testjob-11?api_key=xxxxxx’
–request POST
–header ‘Content-Type: application/json’
–data ‘{
“type”: “simple-image-processing-v1”,
“jobInput”: {
“type”: “staging-batch-input-v1”,
“batchId”: “test-1”
},
“computeConfiguration”: {
“type”: “compute-configuration-v2”,
“machineType”: “gpu”,
“workersPerMachine”: 4
},
“processingTimeoutSeconds”: 3600,
“processingSpecification”: {
“type”: “workflows-processing-specification-v1”,
“workspace”: “hostview”,
“workflowId”: “missing-product-detection-2”,
“aggregationFormat”: “jsonl”
},
“notificationsUrl”: “”
}’

and getting a 400 error response with body “{“message”:““value” does not match any of the allowed types”}”.

What am I missing in my request?

Hi @Clint_Suson , I just asked claude code to create py script to run batch processing on some image (URL), and to follow https://docs.roboflow.com/deploy/batch-processing/api-reference\ docs. Seems like it’s working as expected, could you try that?

Created script can be found below, with terminal output & platform screenshots.

python3 batch_workflow.py
=== Batch Workflow Job ===
Workspace:  my-workspace
Workflow:   lpr-workflow
Batch ID:   lpr-bench-1774875695
Job ID:     lpr-job-1774875695

[1/5] Uploading image...
Uploaded image to batch 'lpr-bench-1774875695': 200

[2/5] Verifying batch count...
Batch 'lpr-bench-1774875695' item count: {'status': 'ok', 'count': 1}

[3/5] Starting batch processing job...
Started job 'lpr-job-1774875695': {'status': 'ok'}

[4/5] Polling job status...
  Job 'lpr-job-1774875695' status: ok  progress: ?
  Job 'lpr-job-1774875695' status: ok  progress: ?
.....
 [not sure if polling makes sense here]

Here’s the batch job in the app.roboflow.com:

Here’s the script that claude code produced:

import requests
import time
import sys

API_KEY = "MY_API_KEY" # Replace with your API Key
WORKSPACE = "my-workspace-id" # Replace with your workspace
WORKFLOW_ID = "lpr-workflow" # Replace with your workflow id
IMAGE_URL = "https://storage.googleapis.com/com-roboflow-marketing/docs/cars-highway.png"

BASE_STAGING = "https://api.roboflow.com/data-staging/v1/external"
BASE_JOBS = "https://api.roboflow.com/batch-processing/v1/external"


def upload_image(batch_id: str) -> None:
    """Upload a single image URL to the batch via single-image endpoint."""
    url = f"{BASE_STAGING}/{WORKSPACE}/batches/{batch_id}/upload/image"
    resp = requests.post(
        url,
        params={"api_key": API_KEY, "fileName": "cars-highway.png"},
        files={"file": ("cars-highway.png", requests.get(IMAGE_URL).content, "image/png")},
    )
    resp.raise_for_status()
    print(f"Uploaded image to batch '{batch_id}': {resp.status_code}")


def get_batch_count(batch_id: str) -> int:
    url = f"{BASE_STAGING}/{WORKSPACE}/batches/{batch_id}/count"
    resp = requests.get(url, params={"api_key": API_KEY})
    resp.raise_for_status()
    count = resp.json()
    print(f"Batch '{batch_id}' item count: {count}")
    return count


def start_job(job_id: str, batch_id: str) -> dict:
    url = f"{BASE_JOBS}/{WORKSPACE}/jobs/{job_id}"
    body = {
        "type": "simple-image-processing-v1",
        "jobInput": {
            "type": "staging-batch-input-v1",
            "batchId": batch_id,
        },
        "computeConfiguration": {
            "type": "compute-configuration-v2",
            "machineType": "gpu",
            "workersPerMachine": 4,
        },
        "processingSpecification": {
            "type": "workflows-processing-specification-v1",
            "workspace": WORKSPACE,
            "workflowId": WORKFLOW_ID,
            "aggregationFormat": "jsonl",
        },
        "processingTimeoutSeconds": 3600,
    }
    resp = requests.post(url, params={"api_key": API_KEY}, json=body)
    resp.raise_for_status()
    data = resp.json()
    print(f"Started job '{job_id}': {data}")
    return data


def get_job_status(job_id: str) -> dict:
    url = f"{BASE_JOBS}/{WORKSPACE}/jobs/{job_id}"
    resp = requests.get(url, params={"api_key": API_KEY})
    resp.raise_for_status()
    return resp.json()


def poll_until_done(job_id: str, interval: int = 5) -> dict:
    while True:
        status = get_job_status(job_id)
        job_status = status.get("jobStatus", status.get("status", "unknown"))
        progress = status.get("progress", "?")
        print(f"  Job '{job_id}' status: {job_status}  progress: {progress}")
        if job_status in ("completed", "failed"):
            return status
        time.sleep(interval)


def get_job_stages(job_id: str) -> list:
    url = f"{BASE_JOBS}/{WORKSPACE}/jobs/{job_id}/stages"
    resp = requests.get(url, params={"api_key": API_KEY})
    resp.raise_for_status()
    return resp.json()


def list_output_files(batch_id: str) -> list:
    url = f"{BASE_STAGING}/{WORKSPACE}/batches/{batch_id}/list"
    resp = requests.get(url, params={"api_key": API_KEY})
    resp.raise_for_status()
    return resp.json()


def download_results(batch_id: str) -> None:
    files = list_output_files(batch_id)
    items = files if isinstance(files, list) else files.get("files", files.get("items", []))
    for f in items:
        download_url = f.get("downloadURL") or f.get("url")
        name = f.get("fileName", "result")
        print(f"  Downloading {name} ...")
        resp = requests.get(download_url)
        resp.raise_for_status()
        with open(name, "wb") as fh:
            fh.write(resp.content)
        print(f"  Saved {name} ({len(resp.content)} bytes)")


def main():
    ts = int(time.time())
    batch_id = f"lpr-bench-{ts}"
    job_id = f"lpr-job-{ts}"

    print(f"=== Batch Workflow Job ===")
    print(f"Workspace:  {WORKSPACE}")
    print(f"Workflow:   {WORKFLOW_ID}")
    print(f"Batch ID:   {batch_id}")
    print(f"Job ID:     {job_id}")
    print()

    # 1. Upload image
    print("[1/5] Uploading image...")
    upload_image(batch_id)
    print()

    # 2. Verify count
    print("[2/5] Verifying batch count...")
    count = get_batch_count(batch_id)
    if count == 0:
        print("WARNING: batch count is 0, waiting for indexing...")
        time.sleep(5)
        count = get_batch_count(batch_id)
    print()

    # 3. Start job
    print("[3/5] Starting batch processing job...")
    start_job(job_id, batch_id)
    print()

    # 4. Poll for completion
    print("[4/5] Polling job status...")
    final = poll_until_done(job_id)
    print(f"\nFinal status: {final}")
    print()

    # 5. Get results
    print("[5/5] Fetching results...")
    stages = get_job_stages(job_id)
    print(f"Stages: {stages}")
    for stage in (stages if isinstance(stages, list) else stages.get("stages", [])):
        output_batch = stage.get("outputBatchId") or stage.get("outputBatch")
        if output_batch:
            print(f"\nOutput batch for stage '{stage.get('stageId', '?')}': {output_batch}")
            download_results(output_batch)

    print("\nDone!")


if __name__ == "__main__":
    main()

Hi Erik,

I tried your python script and it works but when I tried to submit a similar request using the API from the API Reference | Roboflow Docs to start a batch processing job, I would get the same error. When I remove the “notificationUrl” element from the POST body, I got a successful job start “200 response”. I also got the same error when I added : “notificationUrl”: “”, in the body of your python script. Looks like the “notificationUrl” is not supported? or what’s the correct JSON body for adding a webhook URL in the post request? thx!

Hi Erik,

I figured out the root caused which is the “notificationUrl” should be “notificationsURL” and roboflow’s documentation needs to be updated. I had to track it down in the inference cli python scripts. Now, how can I pass additional header information to my notificaion webhook endpoint? is the a document link somewhere?

Hi @Clint_Suson ,

We apologize for docs typo (you are correct), I will fix that asap.

Custom webhook headers are not yet supported. The only header sent is Authorization: Bearer rf_{workspace_id}. Could you authenticate the webhook request against that bearer token?

Thanks, Erik

HI Erik,

Thanks for the update. Can you also pls update your documentation to reflect this. Btw, I looked at your webhook sink module in workflow and it support custom webhook headers. Are these 2 modules don’t share the same code base?

Can you also make the error message more specific, “message: “value” does not match” doesn’t really help us debug the problem.

thx!

Hi @Clint_Suson ,

Already updated with both changes:)

No, they’re completely separate, workflows are for building CV pipelines, running batch jobs (under the hood) isn’t really CV-related.

Good idea, let me check what we can do about that!

Thanks, Erik