Documentation Index
Fetch the complete documentation index at: https://docs.doclo.ai/llms.txt
Use this file to discover all available pages before exploring further.
The createFlow() function provides a fluent API for building document processing pipelines. This guide covers all flow construction patterns.
Basic Flow Structure
Every flow starts with createFlow() and ends with .build():
import { createFlow, parse, extract } from '@doclo/flows';
const flow = createFlow()
.step('parse', parse({ provider: ocrProvider }))
.step('extract', extract({ provider: llmProvider, schema }))
.build();
const result = await flow.run({ base64: documentData });
Adding Steps
Sequential Steps
Use .step() to add sequential processing stages:
createFlow()
.step('parse', parse({ provider: ocrProvider }))
.step('extract', extract({ provider: llmProvider, schema }))
.step('output', output({ name: 'result' }))
.build();
Each step has:
- id: Unique identifier for the step
- node: The processing node to execute
- name (optional): Display name for observability
.step('parse', parse({ provider }), 'Parse Document')
Step Output Flow
Data flows automatically between steps:
Input → parse → DocumentIR → extract → ExtractedData → output → FinalOutput
Each step receives the output of the previous step. The final output is the result of the last step.
Conditional Routing
Use .conditional() to route documents based on data:
const flow = createFlow()
.step('categorize', categorize({
provider: vlmProvider,
categories: ['invoice', 'receipt', 'contract']
}))
.conditional('extract', (data) => {
// data.category contains the classification result
switch (data.category) {
case 'invoice':
return extract({ provider, schema: invoiceSchema });
case 'receipt':
return extract({ provider, schema: receiptSchema });
case 'contract':
return extract({ provider, schema: contractSchema });
default:
return extract({ provider, schema: genericSchema });
}
})
.build();
Conditional Rules
Conditionals must return a node, not a promise or executed result:
// Correct - return a node
.conditional('parse', (data) => {
if (data.quality === 'high') {
return parse({ provider: fastProvider });
}
return parse({ provider: accurateProvider });
})
// INCORRECT - don't call .run()
.conditional('parse', (data) => {
return someFlow.build().run(data); // This will error!
})
Accessing Previous Step Results
The conditional function receives a second context parameter with access to all previous step artifacts:
.step('categorize', categorize({ provider, categories }))
.step('parse', parse({ provider }))
.conditional('extract', (data, context) => {
// Access categorize result from context
const category = context?.artifacts.categorize?.category;
return extract({ provider, schema: SCHEMAS[category] });
})
Parallel Processing with forEach
Use .forEach() to process arrays in parallel:
const flow = createFlow()
.step('split', split({
provider: vlmProvider,
schemas: {
invoice: invoiceSchema,
receipt: receiptSchema
}
}))
.forEach('process', (doc) =>
// Each item processed in parallel
createFlow()
.step('extract', extract({
provider: vlmProvider,
schema: doc.schema
}))
)
.step('combine', combine({ strategy: 'concatenate' }))
.build();
forEach Requirements
- Previous step must output an array (e.g., from
split node)
- The callback receives each array item
- Returns a new
Flow for processing that item
- Results are collected and passed to the next step
Split Document Items
When using split before forEach, each item is a SplitDocument:
interface SplitDocument {
type: string; // Document type ('invoice', 'receipt', etc.)
schema: object; // Schema for this document
pages: number[]; // Page numbers
input: FlowInput; // Input for the child flow
}
The flow automatically extracts .input when passing to child flows.
Output Nodes
Use .output() to explicitly control what data is returned:
// Return specific fields
createFlow()
.step('extract', extract({ provider, schema: fullSchema }))
.output({
name: 'summary',
transform: 'pick',
fields: ['id', 'amount', 'date']
})
.build();
Multiple Outputs
Create flows with multiple named outputs:
const flow = createFlow()
.step('extract', extract({ provider, schema }))
.output({ name: 'full_data' })
.step('summarize', extract({ provider, schema: summarySchema }))
.output({ name: 'summary' })
.build();
const result = await flow.run(input);
// result.outputs.full_data
// result.outputs.summary
Restrict accepted input formats:
// Only accept PDFs
const pdfFlow = createFlow()
.acceptFormats(['application/pdf'])
.step('parse', parse({ provider }))
.step('extract', extract({ provider, schema }))
.build();
// Throws FlowInputValidationError if input is not a PDF
await pdfFlow.run({ base64: imageData });
Supported MIME types:
application/pdf
image/jpeg
image/png
image/webp
image/gif
image/tiff
Flow Options
Configure flow behavior at creation:
const flow = createFlow({
// Observability hooks
observability: {
onFlowStart: (ctx) => { /* ... */ },
onFlowEnd: (ctx) => { /* ... */ },
onFlowError: (ctx) => { /* ... */ },
onStepStart: (ctx) => { /* ... */ },
onStepEnd: (ctx) => { /* ... */ },
onStepError: (ctx) => { /* ... */ },
onBatchStart: (ctx) => { /* ... */ },
onBatchEnd: (ctx) => { /* ... */ },
},
// Custom metadata (passed to all hooks)
metadata: {
environment: 'production',
customerId: 'cust_123'
},
// Input validation
inputValidation: {
acceptedFormats: ['application/pdf']
}
});
Progress Callbacks
Monitor execution with callbacks:
const result = await flow.run(input, {
onStepStart: (stepId, stepIndex, stepType) => {
console.log(`[${stepIndex}] Starting ${stepId} (${stepType})`);
},
onStepComplete: (stepId, stepIndex, stepType, durationMs) => {
console.log(`[${stepIndex}] ${stepId} completed in ${durationMs}ms`);
},
onStepError: (stepId, stepIndex, stepType, error) => {
console.error(`[${stepIndex}] ${stepId} failed:`, error);
}
});
Flow Validation
Validate flows before execution:
const flow = createFlow()
.step('parse', parse({ provider }))
.step('extract', extract({ provider, schema }))
.build();
const validation = flow.validate();
console.log('Valid:', validation.valid);
console.log('Errors:', validation.errors);
console.log('Warnings:', validation.warnings);
Validation Errors
interface FlowValidationError {
stepId: string;
stepIndex: number;
stepType: string;
message: string;
}
Common validation errors:
- Empty flow (no steps)
- Duplicate step IDs
- Missing node configuration
- Invalid conditional functions
- Type incompatibility between steps
Validation Warnings
Warnings don’t prevent execution but indicate potential issues:
forEach not preceded by array-producing step
- Inefficient patterns (e.g.,
parse → VLM extract where parse is ignored)
Error Handling
Flows wrap errors with context:
import { FlowExecutionError } from '@doclo/core';
try {
await flow.run(input);
} catch (error) {
if (error instanceof FlowExecutionError) {
console.error('Failed at step:', error.stepId);
console.error('Step index:', error.stepIndex);
console.error('Completed steps:', error.completedSteps);
console.error('Partial results:', error.artifacts);
console.error('Original error:', error.cause);
}
}
Accessing Partial Results
When a flow fails, you can still access completed step results:
try {
await flow.run(input);
} catch (error) {
if (error instanceof FlowExecutionError) {
// Get results from steps that completed before failure
const parseResult = error.artifacts?.parse;
if (parseResult) {
console.log('Parsed document:', parseResult);
}
}
}
Complete Example
import { createFlow, parse, categorize, extract, split, combine } from '@doclo/flows';
import { output } from '@doclo/nodes';
import { createVLMProvider, createOCRProvider } from '@doclo/providers-llm';
// Create providers
const vlmProvider = createVLMProvider({
provider: 'google',
model: 'google/gemini-2.5-flash',
apiKey: process.env.OPENROUTER_API_KEY!,
via: 'openrouter'
});
const ocrProvider = createOCRProvider({
provider: 'reducto',
apiKey: process.env.REDUCTO_API_KEY!
});
// Define schemas
const invoiceSchema = {
type: 'object',
properties: {
invoiceNumber: { type: 'string' },
date: { type: 'string' },
total: { type: 'number' },
vendor: { type: 'string' },
lineItems: {
type: 'array',
items: {
type: 'object',
properties: {
description: { type: 'string' },
quantity: { type: 'number' },
amount: { type: 'number' }
}
}
}
}
};
// Build the flow
const documentFlow = createFlow({
observability: {
onStepEnd: (ctx) => {
console.log(`${ctx.stepId}: ${ctx.duration}ms, $${ctx.cost?.toFixed(4)}`);
}
},
metadata: { flowName: 'invoice-processor' }
})
.acceptFormats(['application/pdf', 'image/jpeg', 'image/png'])
.step('parse', parse({ provider: ocrProvider }))
.step('extract', extract({
provider: vlmProvider,
schema: invoiceSchema,
consensus: { runs: 3, strategy: 'majority' }
}))
.output({
name: 'invoice',
transform: 'pick',
fields: ['invoiceNumber', 'date', 'total', 'vendor']
})
.build();
// Run the flow
const result = await documentFlow.run({ base64: pdfData });
console.log('Invoice:', result.output);
console.log('Total cost:', result.aggregated.totalCostUSD);
Next Steps
Flow Registry
Register and reuse flows
Pre-built Flows
Ready-to-use flow templates