Skip to main content
Some workflows need to do multiple things at once — look up company data from three APIs simultaneously, enrich 50 leads in a batch, or run quality checks in parallel after generating content. executeInParallel handles this: run multiple steps concurrently with optional rate limiting, and get all results back in order.

Basic Usage

Wrap each job in an arrow function and pass them as an array. Results come back in the same order you submitted them, with each result telling you whether the job succeeded or failed:
workflow.ts
import { workflow, executeInParallel } from '@outputai/core';
import { lookupCompany, searchNews } from './steps.js';
import { CompanyResearchInput, CompanyResearchOutput } from './types.js';

export default workflow({
  name: 'company_research',
  description: 'Research a company from multiple sources in parallel',
  inputSchema: CompanyResearchInput,
  outputSchema: CompanyResearchOutput,
  fn: async (input) => {
    const results = await executeInParallel({
      jobs: [
        () => lookupCompany(input.companyDomain),
        () => searchNews(input.companyName)
      ]
    });

    const [companyResult, newsResult] = results;
    return {
      company: companyResult.ok ? companyResult.result : null,
      articles: newsResult.ok ? newsResult.result : []
    };
  }
});

// types.ts
// import { z } from '@outputai/core';
//
// export const CompanyResearchInput = z.object({
//   companyDomain: z.string(),
//   companyName: z.string()
// });
//
// export const CompanyResearchOutput = z.object({
//   company: z.any(),
//   articles: z.array(z.any())
// });

Parameters

ParameterTypeDescription
jobsArray<() => Promise<T> | T>Array of functions that return a step, evaluator, or child workflow call (or any value). Do not pass promises directly.
concurrencynumberMaximum number of jobs running at once. Default: Infinity (no limit).
onJobCompleted(result: ParallelJobResult<T>) => voidOptional callback invoked as each job completes, in completion order (not submission order).

Result Type

The function returns Promise<Array<ParallelJobResult<T>>>, sorted by original job index (so results stay in the same order you submitted them). Each element is a discriminated union:
WhenShapeFields
Success{ ok: true; result: T; index: number }result is the job’s return value
Failure{ ok: false; error: unknown; index: number }error is the thrown value
Handling successes and failures:
const results = await executeInParallel({
  jobs: input.domains.map(domain => () => enrichCompany(domain))
});

const enriched = results.filter((r): r is { ok: true; result: Company; index: number } => r.ok).map(r => r.result);
const failed = results.filter(r => !r.ok);

Concurrency Limit

When you’re calling an external API that has rate limits, use concurrency to cap how many jobs run at once:
const results = await executeInParallel({
  jobs: input.domains.map(domain => () => enrichCompany(domain)),
  concurrency: 3
});
With concurrency: 3, at most three enrichment calls run at a time. When one finishes, the next starts.

onJobCompleted Callback

onJobCompleted fires once per job in completion order (fastest first), not submission order. Use it for logging progress or streaming partial results:
const results = await executeInParallel({
  jobs: leads.map(lead => () => enrichLead(lead)),
  onJobCompleted: (result) => {
    if (result.ok) {
      // Log progress as each lead completes
    } else {
      // Log failure at result.index
    }
  }
});

Examples

Batch enrichment with rate limiting

Enrich a list of leads, capping concurrency to respect API rate limits:
workflow.ts
import { workflow, executeInParallel } from '@outputai/core';
import { enrichLead } from './steps.js';
import { BatchInput, BatchOutput } from './types.js';

export default workflow({
  name: 'batch_enrich',
  description: 'Enrich leads in parallel with rate limiting',
  inputSchema: BatchInput,
  outputSchema: BatchOutput,
  fn: async (input) => {
    const results = await executeInParallel({
      jobs: input.leads.map(lead => () => enrichLead(lead)),
      concurrency: 5
    });

    const enriched = [];
    const failedIds = [];
    results.forEach((r, i) => {
      if (r.ok) {
        enriched.push(r.result);
      } else {
        failedIds.push(input.leads[r.index].id);
      }
    });

    return { enriched, failedIds };
  }
});

// types.ts
// import { z } from '@outputai/core';
//
// export const BatchInput = z.object({
//   leads: z.array(z.object({ id: z.string(), domain: z.string() }))
// });
//
// export const BatchOutput = z.object({
//   enriched: z.array(z.any()),
//   failedIds: z.array(z.string())
// });

Gather data from multiple sources

Run different steps in one batch — results stay in job order:
const results = await executeInParallel({
  jobs: [
    () => lookupCompany(input.domain),
    () => searchLinkedIn(input.companyName),
    () => searchNews(input.companyName)
  ]
});

const [companyResult, linkedInResult, newsResult] = results;
const company = companyResult.ok ? companyResult.result : null;
const linkedIn = linkedInResult.ok ? linkedInResult.result : null;
const news = newsResult.ok ? newsResult.result : [];

Child workflows in parallel

Run multiple child workflows with a concurrency cap:
import enrichCompanyWorkflow from '../enrich_company/workflow.js';

const results = await executeInParallel({
  jobs: input.domains.map(domain => () => enrichCompanyWorkflow({ domain })),
  concurrency: 4
});

const enriched = results.filter(r => r.ok).map(r => r.result);

Important: Wrap in arrow functions

Jobs must be functions that return the step, evaluator, or child workflow call. Do not pass promises:
// Correct: each job is a function
executeInParallel({
  jobs: [
    () => enrichLead(data1),
    () => enrichLead(data2)
  ]
});

// Wrong: passing promises directly
executeInParallel({
  jobs: [
    enrichLead(data1),  // Don't do this
    enrichLead(data2)
  ]
});
Passing promises would start all work immediately and break determinism; wrapping in functions lets executeInParallel control when each job runs and respect the concurrency limit.