Skip to main content
A flow is a composable pipeline of document processing operations. Flows define the sequence of steps to transform raw documents into structured data.

What is a Flow?

Flows connect nodes together to form processing pipelines:
  • Linear execution chain: Steps run sequentially by default
  • Type-safe data passing: TypeScript ensures outputs match expected inputs
  • Stateful context: Each step can access outputs from previous steps
  • Built-in metrics: Automatic cost and duration tracking

Flow Builder Pattern

Flows use a fluent API for declarative pipeline definition:
import { createFlow, parse, extract } from '@doclo/flows';
import { output } from '@doclo/nodes';

const flow = createFlow()
  .step('parse', parse({ provider: ocrProvider }))
  .step('extract', extract({ provider: llmProvider, schema, inputMode: 'ir' }))
  .build();

const result = await flow.run({ base64: pdfDataUrl });

Key Methods

MethodDescription
createFlow()Initialize a new flow
.step(id, node)Add a sequential processing step
.conditional(id, fn)Add conditional branching based on data
.forEach(id, fn)Process arrays in parallel
.build()Compile the flow into an executable

Execution Patterns

Sequential Processing

Default pattern—each step runs after the previous completes:
createFlow()
  .step('parse', parse({ provider: ocrProvider }))
  .step('extract', extract({ provider: llmProvider, schema, inputMode: 'ir' }))
  .build();

Conditional Routing

Route documents based on previous step output:
createFlow()
  .step('categorize', categorize({
    provider: vlmProvider,
    categories: ['invoice', 'receipt', 'contract']
  }))
  .conditional('extract', (data) => {
    if (data.category === 'invoice') {
      return extract({ provider, schema: invoiceSchema });
    }
    return extract({ provider, schema: genericSchema });
  })
  .build();

Parallel Processing

Process arrays of documents in parallel:
createFlow()
  .step('split', split({
    provider: vlmProvider,
    schemas: { invoice: invoiceSchema, receipt: receiptSchema }
  }))
  .forEach('process', (doc) =>
    createFlow()
      .step('extract', extract({ provider, schema }))
  )
  .step('combine', combine())
  .build();

Flow Results

Every flow execution returns:
interface FlowResult<T> {
  output: T;                    // Final extracted data
  aggregated: {                 // Pre-calculated totals
    totalDurationMs: number;
    totalCostUSD: number;
    totalInputTokens: number;
    totalOutputTokens: number;
    stepCount: number;
    byProvider: Record<string, ProviderMetrics>;
  };
  metrics: StepMetric[];        // Per-step metrics
  artifacts: Record<string, unknown>;  // Intermediate outputs
}

Accessing Results

const result = await flow.run({ base64: pdf });

// Final output
console.log(result.output);

// Aggregated metrics
console.log(`Total cost: $${result.aggregated.totalCostUSD}`);
console.log(`Duration: ${result.aggregated.totalDurationMs}ms`);

// Per-step metrics
result.metrics.forEach(m => {
  console.log(`${m.step}: ${m.ms}ms, $${m.costUSD}, ${m.provider}`);
});

// Intermediate outputs
const parseResult = result.artifacts['parse'];

Flow Context

Each step has access to the flow context:
interface FlowContext {
  artifacts: Record<string, unknown>;  // Previous step outputs
  metrics: StepMetric[];               // Execution metrics so far
}
Use context in conditional logic:
.conditional('extract', (data, context) => {
  const parseProvider = context.metrics.find(m => m.step === 'parse')?.provider;
  // Choose strategy based on parse provider
})

Flow Composition

Reusing Flows

Flows can be defined once and reused:
const invoiceFlow = createFlow()
  .step('extract', extract({ provider, schema: invoiceSchema }))
  .build();

// Use in different contexts
const result1 = await invoiceFlow.run({ base64: pdf1 });
const result2 = await invoiceFlow.run({ url: 'https://...' });

Nested Flows

Use flows inside forEach:
const documentFlow = createFlow()
  .step('parse', parse({ provider: ocrProvider }))
  .step('extract', extract({ provider: llmProvider, schema, inputMode: 'ir' }));

createFlow()
  .step('split', split({
    provider: vlmProvider,
    schemas: { invoice: invoiceSchema, receipt: receiptSchema }
  }))
  .forEach('process', () => documentFlow)
  .step('combine', combine())
  .build();

Common Flow Patterns

Simple Extraction

createFlow()
  .step('extract', extract({ provider: vlmProvider, schema }))
  .build();

OCR + Extraction

createFlow()
  .step('parse', parse({ provider: ocrProvider }))
  .step('extract', extract({ provider: llmProvider, schema, inputMode: 'ir' }))
  .build();

Multi-Document Bundle

createFlow()
  .step('split', split({
    provider: vlmProvider,
    schemas: { invoice: invoiceSchema, receipt: receiptSchema }
  }))
  .forEach('process', () =>
    createFlow()
      .step('parse', parse({ provider: ocrProvider }))
      .step('extract', extract({ provider: llmProvider, schema, inputMode: 'ir' }))
  )
  .step('combine', combine())
  .build();

Quality-Based Routing

Route based on document quality assessment:
createFlow()
  .step('assess', categorize({
    provider: vlmProvider,
    categories: ['low', 'high']
  }))
  .conditional('parse', (data) => {
    if (data.category === 'high') {
      // High quality → skip OCR, use VLM directly
      return extract({ provider: vlmProvider, schema });
    } else {
      // Low quality → use accurate OCR first
      return parse({ provider: ocrProvider });
    }
  })
  .conditional('extract', (data, context) => {
    // For low quality docs that went through OCR
    const category = (context?.artifacts?.['assess'] as { category: string })?.category;
    if (category === 'low') {
      return extract({ provider: llmProvider, schema });
    }
    // High quality already extracted in previous step
    return output();
  })
  .build();
For complex multi-step conditional paths, use trigger with nested flows:
const ocrPath = createFlow()
  .step('parse', parse({ provider: ocrProvider }))
  .step('extract', extract({ provider: llmProvider, schema, inputMode: 'ir' }))
  .build();

createFlow()
  .step('assess', categorize({
    provider: vlmProvider,
    categories: ['low', 'high']
  }))
  .conditional('process', (data) => {
    if (data.category === 'high') {
      return extract({ provider: vlmProvider, schema });
    } else {
      return trigger({ flow: ocrPath });
    }
  })
  .build();

Next Steps