Skip to main content
Flows are the foundation of document processing in Doclo. A flow is a pipeline of nodes that transform documents step by step, from raw input to structured output.

What is a Flow?

A flow connects processing nodes in sequence:
import { createFlow, parse, extract } from '@docloai/flows';

const flow = createFlow()
  .step('parse', parse({ provider: ocrProvider }))
  .step('extract', extract({ provider: llmProvider, schema }))
  .build();

const result = await flow.run({ base64: documentData });
Each step receives the output of the previous step. The flow handles data transformation, error propagation, and metrics collection automatically.

Core Concepts

Steps

Steps are sequential processing stages:
createFlow()
  .step('parse', parse({ provider }))      // Step 1: Parse document
  .step('extract', extract({ ... }))       // Step 2: Extract data
  .step('output', output())                // Step 3: Format output
  .build();

Conditional Routing

Route documents to different nodes based on data:
createFlow()
  .step('categorize', categorize({ provider, categories: ['invoice', 'receipt'] }))
  .conditional('extract', (data) => {
    if (data.category === 'invoice') {
      return extract({ provider, schema: invoiceSchema });
    }
    return extract({ provider, schema: receiptSchema });
  })
  .build();

Parallel Processing (forEach)

Process arrays of items in parallel:
createFlow()
  .step('split', split({ provider, schemas: { invoice: invoiceSchema } }))
  .forEach('process', (doc) =>
    createFlow()
      .step('extract', extract({ provider, schema: doc.schema }))
  )
  .step('combine', combine())
  .build();

Flow Result

Every flow returns a FlowResult:
interface FlowResult<T> {
  output: T;                              // Final output data
  metrics: StepMetric[];                  // Per-step performance metrics
  aggregated: AggregatedMetrics;          // Summary metrics
  artifacts: Record<string, unknown>;     // Intermediate step outputs
}
Access individual step results via artifacts:
const result = await flow.run(input);

// Access final output
console.log(result.output);

// Access intermediate results
console.log(result.artifacts.parse);    // Parse node output
console.log(result.artifacts.extract);  // Extract node output

// Check performance
console.log(result.aggregated.totalCostUSD);
console.log(result.aggregated.totalMs);

Validation

Flows validate configuration before execution:
const flow = createFlow()
  .step('parse', parse({ provider }))
  .step('extract', extract({ provider, schema }))
  .build();

const validation = flow.validate();

if (!validation.valid) {
  console.error('Flow errors:', validation.errors);
}

if (validation.warnings.length > 0) {
  console.warn('Flow warnings:', validation.warnings);
}
The validator checks for:
  • Missing or invalid nodes
  • Duplicate step IDs
  • Type compatibility between connected steps
  • Efficiency anti-patterns

Input Formats

Flows accept multiple input formats:
// Base64 data URL
await flow.run({ base64: 'data:application/pdf;base64,...' });

// HTTP URL
await flow.run({ url: 'https://example.com/document.pdf' });

// Raw base64 string
await flow.run({ base64: 'JVBERi0xLjQ...' });

// With format restriction
const pdfOnlyFlow = createFlow()
  .acceptFormats(['application/pdf'])
  .step('parse', parse({ provider }))
  .build();

// Throws FlowInputValidationError if input is not a PDF
await pdfOnlyFlow.run({ base64: jpegData });

Progress Callbacks

Monitor flow execution with callbacks:
const result = await flow.run(input, {
  onStepStart: (stepId, index, type) => {
    console.log(`Starting ${stepId}...`);
  },
  onStepComplete: (stepId, index, type, durationMs) => {
    console.log(`${stepId} completed in ${durationMs}ms`);
  },
  onStepError: (stepId, index, type, error) => {
    console.error(`${stepId} failed:`, error.message);
  }
});

Observability

Add hooks for monitoring and tracing:
const flow = createFlow({
  observability: {
    onFlowStart: (ctx) => console.log('Flow started:', ctx.executionId),
    onStepStart: (ctx) => console.log('Step:', ctx.stepId),
    onStepEnd: (ctx) => console.log('Done:', ctx.stepId, ctx.duration + 'ms'),
    onFlowEnd: (ctx) => console.log('Complete:', ctx.stats.totalCost),
    onFlowError: (ctx) => console.error('Failed:', ctx.error),
  },
  metadata: {
    environment: 'production',
    userId: 'user_123'
  }
});

Common Patterns

Basic Extraction

const flow = createFlow()
  .step('parse', parse({ provider: ocrProvider }))
  .step('extract', extract({ provider: llmProvider, schema }))
  .build();

VLM Direct (No OCR)

const flow = createFlow()
  .step('extract', extract({ provider: vlmProvider, schema }))
  .build();

Multi-Document Processing

const flow = createFlow()
  .step('split', split({ provider, schemas }))
  .forEach('process', (doc) =>
    createFlow()
      .step('extract', extract({ provider, schema: doc.schema }))
  )
  .step('combine', combine({ strategy: 'concatenate' }))
  .build();

Conditional Schema Selection

const flow = createFlow()
  .step('categorize', categorize({ provider, categories }))
  .conditional('extract', (data, context) => {
    return extract({ provider, schema: SCHEMAS[data.category] });
  })
  .build();

Next Steps