Skip to main content
The createFlow() function provides a fluent API for building document processing pipelines. This guide covers all flow construction patterns.

Basic Flow Structure

Every flow starts with createFlow() and ends with .build():
import { createFlow, parse, extract } from '@docloai/flows';

const flow = createFlow()
  .step('parse', parse({ provider: ocrProvider }))
  .step('extract', extract({ provider: llmProvider, schema }))
  .build();

const result = await flow.run({ base64: documentData });

Adding Steps

Sequential Steps

Use .step() to add sequential processing stages:
createFlow()
  .step('parse', parse({ provider: ocrProvider }))
  .step('extract', extract({ provider: llmProvider, schema }))
  .step('output', output({ name: 'result' }))
  .build();
Each step has:
  • id: Unique identifier for the step
  • node: The processing node to execute
  • name (optional): Display name for observability
.step('parse', parse({ provider }), 'Parse Document')

Step Output Flow

Data flows automatically between steps:
Input → parse → DocumentIR → extract → ExtractedData → output → FinalOutput
Each step receives the output of the previous step. The final output is the result of the last step.

Conditional Routing

Use .conditional() to route documents based on data:
const flow = createFlow()
  .step('categorize', categorize({
    provider: vlmProvider,
    categories: ['invoice', 'receipt', 'contract']
  }))
  .conditional('extract', (data) => {
    // data.category contains the classification result
    switch (data.category) {
      case 'invoice':
        return extract({ provider, schema: invoiceSchema });
      case 'receipt':
        return extract({ provider, schema: receiptSchema });
      case 'contract':
        return extract({ provider, schema: contractSchema });
      default:
        return extract({ provider, schema: genericSchema });
    }
  })
  .build();

Conditional Rules

Conditionals must return a node, not a promise or executed result:
// Correct - return a node
.conditional('parse', (data) => {
  if (data.quality === 'high') {
    return parse({ provider: fastProvider });
  }
  return parse({ provider: accurateProvider });
})

// INCORRECT - don't call .run()
.conditional('parse', (data) => {
  return someFlow.build().run(data);  // This will error!
})

Accessing Previous Step Results

The conditional function receives a second context parameter with access to all previous step artifacts:
.step('categorize', categorize({ provider, categories }))
.step('parse', parse({ provider }))
.conditional('extract', (data, context) => {
  // Access categorize result from context
  const category = context?.artifacts.categorize?.category;
  return extract({ provider, schema: SCHEMAS[category] });
})

Parallel Processing with forEach

Use .forEach() to process arrays in parallel:
const flow = createFlow()
  .step('split', split({
    provider: vlmProvider,
    schemas: {
      invoice: invoiceSchema,
      receipt: receiptSchema
    }
  }))
  .forEach('process', (doc) =>
    // Each item processed in parallel
    createFlow()
      .step('extract', extract({
        provider: vlmProvider,
        schema: doc.schema
      }))
  )
  .step('combine', combine({ strategy: 'concatenate' }))
  .build();

forEach Requirements

  • Previous step must output an array (e.g., from split node)
  • The callback receives each array item
  • Returns a new Flow for processing that item
  • Results are collected and passed to the next step

Split Document Items

When using split before forEach, each item is a SplitDocument:
interface SplitDocument {
  type: string;           // Document type ('invoice', 'receipt', etc.)
  schema: object;         // Schema for this document
  pages: number[];        // Page numbers
  input: FlowInput;       // Input for the child flow
}
The flow automatically extracts .input when passing to child flows.

Output Nodes

Use .output() to explicitly control what data is returned:
// Return specific fields
createFlow()
  .step('extract', extract({ provider, schema: fullSchema }))
  .output({
    name: 'summary',
    transform: 'pick',
    fields: ['id', 'amount', 'date']
  })
  .build();

Multiple Outputs

Create flows with multiple named outputs:
const flow = createFlow()
  .step('extract', extract({ provider, schema }))
  .output({ name: 'full_data' })
  .step('summarize', extract({ provider, schema: summarySchema }))
  .output({ name: 'summary' })
  .build();

const result = await flow.run(input);
// result.outputs.full_data
// result.outputs.summary

Input Validation

Restrict accepted input formats:
// Only accept PDFs
const pdfFlow = createFlow()
  .acceptFormats(['application/pdf'])
  .step('parse', parse({ provider }))
  .step('extract', extract({ provider, schema }))
  .build();

// Throws FlowInputValidationError if input is not a PDF
await pdfFlow.run({ base64: imageData });
Supported MIME types:
  • application/pdf
  • image/jpeg
  • image/png
  • image/webp
  • image/gif
  • image/tiff

Flow Options

Configure flow behavior at creation:
const flow = createFlow({
  // Observability hooks
  observability: {
    onFlowStart: (ctx) => { /* ... */ },
    onFlowEnd: (ctx) => { /* ... */ },
    onFlowError: (ctx) => { /* ... */ },
    onStepStart: (ctx) => { /* ... */ },
    onStepEnd: (ctx) => { /* ... */ },
    onStepError: (ctx) => { /* ... */ },
    onBatchStart: (ctx) => { /* ... */ },
    onBatchEnd: (ctx) => { /* ... */ },
  },

  // Custom metadata (passed to all hooks)
  metadata: {
    environment: 'production',
    customerId: 'cust_123'
  },

  // Input validation
  inputValidation: {
    acceptedFormats: ['application/pdf']
  }
});

Progress Callbacks

Monitor execution with callbacks:
const result = await flow.run(input, {
  onStepStart: (stepId, stepIndex, stepType) => {
    console.log(`[${stepIndex}] Starting ${stepId} (${stepType})`);
  },

  onStepComplete: (stepId, stepIndex, stepType, durationMs) => {
    console.log(`[${stepIndex}] ${stepId} completed in ${durationMs}ms`);
  },

  onStepError: (stepId, stepIndex, stepType, error) => {
    console.error(`[${stepIndex}] ${stepId} failed:`, error);
  }
});

Flow Validation

Validate flows before execution:
const flow = createFlow()
  .step('parse', parse({ provider }))
  .step('extract', extract({ provider, schema }))
  .build();

const validation = flow.validate();

console.log('Valid:', validation.valid);
console.log('Errors:', validation.errors);
console.log('Warnings:', validation.warnings);

Validation Errors

interface FlowValidationError {
  stepId: string;
  stepIndex: number;
  stepType: string;
  message: string;
}
Common validation errors:
  • Empty flow (no steps)
  • Duplicate step IDs
  • Missing node configuration
  • Invalid conditional functions
  • Type incompatibility between steps

Validation Warnings

Warnings don’t prevent execution but indicate potential issues:
  • forEach not preceded by array-producing step
  • Inefficient patterns (e.g., parse → VLM extract where parse is ignored)

Error Handling

Flows wrap errors with context:
import { FlowExecutionError } from '@docloai/core';

try {
  await flow.run(input);
} catch (error) {
  if (error instanceof FlowExecutionError) {
    console.error('Failed at step:', error.stepId);
    console.error('Step index:', error.stepIndex);
    console.error('Completed steps:', error.completedSteps);
    console.error('Partial results:', error.artifacts);
    console.error('Original error:', error.cause);
  }
}

Accessing Partial Results

When a flow fails, you can still access completed step results:
try {
  await flow.run(input);
} catch (error) {
  if (error instanceof FlowExecutionError) {
    // Get results from steps that completed before failure
    const parseResult = error.artifacts?.parse;
    if (parseResult) {
      console.log('Parsed document:', parseResult);
    }
  }
}

Complete Example

import { createFlow, parse, categorize, extract, split, combine, output } from '@docloai/flows';
import { createVLMProvider, createOCRProvider } from '@docloai/providers-llm';

// Create providers
const vlmProvider = createVLMProvider({
  provider: 'google',
  model: 'google/gemini-flash-2.5',
  apiKey: process.env.OPENROUTER_API_KEY!,
  via: 'openrouter'
});

const ocrProvider = createOCRProvider({
  provider: 'reducto',
  apiKey: process.env.REDUCTO_API_KEY!
});

// Define schemas
const invoiceSchema = {
  type: 'object',
  properties: {
    invoiceNumber: { type: 'string' },
    date: { type: 'string' },
    total: { type: 'number' },
    vendor: { type: 'string' },
    lineItems: {
      type: 'array',
      items: {
        type: 'object',
        properties: {
          description: { type: 'string' },
          quantity: { type: 'number' },
          amount: { type: 'number' }
        }
      }
    }
  }
};

// Build the flow
const documentFlow = createFlow({
  observability: {
    onStepEnd: (ctx) => {
      console.log(`${ctx.stepId}: ${ctx.duration}ms, $${ctx.cost?.toFixed(4)}`);
    }
  },
  metadata: { flowName: 'invoice-processor' }
})
  .acceptFormats(['application/pdf', 'image/jpeg', 'image/png'])
  .step('parse', parse({ provider: ocrProvider }))
  .step('extract', extract({
    provider: vlmProvider,
    schema: invoiceSchema,
    consensus: { runs: 3, strategy: 'majority' }
  }))
  .output({
    name: 'invoice',
    transform: 'pick',
    fields: ['invoiceNumber', 'date', 'total', 'vendor']
  })
  .build();

// Run the flow
const result = await documentFlow.run({ base64: pdfData });

console.log('Invoice:', result.output);
console.log('Total cost:', result.aggregated.totalCostUSD);

Next Steps