Documentation Index Fetch the complete documentation index at: https://docs.doclo.ai/llms.txt
Use this file to discover all available pages before exploring further.
Flows are the foundation of document processing in Doclo. A flow is a pipeline of nodes that transform documents step by step, from raw input to structured output.
What is a Flow?
A flow connects processing nodes in sequence:
import { createFlow , parse , extract } from '@doclo/flows' ;
const flow = createFlow ()
. step ( 'parse' , parse ({ provider: ocrProvider }))
. step ( 'extract' , extract ({ provider: llmProvider , schema }))
. build ();
const result = await flow . run ({ base64: documentData });
Each step receives the output of the previous step. The flow handles data transformation, error propagation, and metrics collection automatically.
Core Concepts
Steps
Steps are sequential processing stages:
createFlow ()
. step ( 'parse' , parse ({ provider })) // Step 1: Parse document
. step ( 'extract' , extract ({ ... })) // Step 2: Extract data
. step ( 'output' , output ()) // Step 3: Format output
. build ();
Conditional Routing
Route documents to different nodes based on data:
createFlow ()
. step ( 'categorize' , categorize ({ provider , categories: [ 'invoice' , 'receipt' ] }))
. conditional ( 'extract' , ( data ) => {
if ( data . category === 'invoice' ) {
return extract ({ provider , schema: invoiceSchema });
}
return extract ({ provider , schema: receiptSchema });
})
. build ();
Parallel Processing (forEach)
Process arrays of items in parallel:
createFlow ()
. step ( 'split' , split ({ provider , schemas: { invoice: invoiceSchema } }))
. forEach ( 'process' , ( doc ) =>
createFlow ()
. step ( 'extract' , extract ({ provider , schema: doc . schema }))
)
. step ( 'combine' , combine ())
. build ();
Flow Result
Every flow returns a FlowResult:
interface FlowResult < T > {
output : T ; // Final output data
metrics : StepMetric []; // Per-step performance metrics
aggregated : AggregatedMetrics ; // Summary metrics
artifacts : Record < string , unknown >; // Intermediate step outputs
}
Access individual step results via artifacts:
const result = await flow . run ( input );
// Access final output
console . log ( result . output );
// Access intermediate results
console . log ( result . artifacts . parse ); // Parse node output
console . log ( result . artifacts . extract ); // Extract node output
// Check performance
console . log ( result . aggregated . totalCostUSD );
console . log ( result . aggregated . totalMs );
Validation
Flows validate configuration before execution:
const flow = createFlow ()
. step ( 'parse' , parse ({ provider }))
. step ( 'extract' , extract ({ provider , schema }))
. build ();
const validation = flow . validate ();
if ( ! validation . valid ) {
console . error ( 'Flow errors:' , validation . errors );
}
if ( validation . warnings . length > 0 ) {
console . warn ( 'Flow warnings:' , validation . warnings );
}
The validator checks for:
Missing or invalid nodes
Duplicate step IDs
Type compatibility between connected steps
Efficiency anti-patterns
Flows accept multiple input formats:
// Base64 data URL
await flow . run ({ base64: 'data:application/pdf;base64,...' });
// HTTP URL
await flow . run ({ url: 'https://example.com/document.pdf' });
// Raw base64 string
await flow . run ({ base64: 'JVBERi0xLjQ...' });
// With format restriction
const pdfOnlyFlow = createFlow ()
. acceptFormats ([ 'application/pdf' ])
. step ( 'parse' , parse ({ provider }))
. build ();
// Throws FlowInputValidationError if input is not a PDF
await pdfOnlyFlow . run ({ base64: jpegData });
Progress Callbacks
Monitor flow execution with callbacks:
const result = await flow . run ( input , {
onStepStart : ( stepId , index , type ) => {
console . log ( `Starting ${ stepId } ...` );
},
onStepComplete : ( stepId , index , type , durationMs ) => {
console . log ( ` ${ stepId } completed in ${ durationMs } ms` );
},
onStepError : ( stepId , index , type , error ) => {
console . error ( ` ${ stepId } failed:` , error . message );
}
});
Observability
Add hooks for monitoring and tracing:
const flow = createFlow ({
observability: {
onFlowStart : ( ctx ) => console . log ( 'Flow started:' , ctx . executionId ),
onStepStart : ( ctx ) => console . log ( 'Step:' , ctx . stepId ),
onStepEnd : ( ctx ) => console . log ( 'Done:' , ctx . stepId , ctx . duration + 'ms' ),
onFlowEnd : ( ctx ) => console . log ( 'Complete:' , ctx . stats . totalCost ),
onFlowError : ( ctx ) => console . error ( 'Failed:' , ctx . error ),
},
metadata: {
environment: 'production' ,
userId: 'user_123'
}
});
Common Patterns
const flow = createFlow ()
. step ( 'parse' , parse ({ provider: ocrProvider }))
. step ( 'extract' , extract ({ provider: llmProvider , schema }))
. build ();
VLM Direct (No OCR)
const flow = createFlow ()
. step ( 'extract' , extract ({ provider: vlmProvider , schema }))
. build ();
Multi-Document Processing
const flow = createFlow ()
. step ( 'split' , split ({ provider , schemas }))
. forEach ( 'process' , ( doc ) =>
createFlow ()
. step ( 'extract' , extract ({ provider , schema: doc . schema }))
)
. step ( 'combine' , combine ({ strategy: 'concatenate' }))
. build ();
Conditional Schema Selection
const flow = createFlow ()
. step ( 'categorize' , categorize ({ provider , categories }))
. conditional ( 'extract' , ( data , context ) => {
return extract ({ provider , schema: SCHEMAS [ data . category ] });
})
. build ();
Next Steps
Creating Flows Detailed guide to building custom flows
Flow Registry Register and reuse flows
Pre-built Flows Ready-to-use flow templates
Nodes Reference Available processing nodes