Skip to main content
Large documents like contracts, reports, and multi-page invoices can exceed LLM context limits or result in poor extraction quality. This guide covers strategies for processing documents of any size.

Prerequisites

  • Node.js 18+
  • A Doclo API key or provider API keys
  • A multi-page document (PDF)

When to Use Chunking

Consider chunking when:
  • Documents exceed 20+ pages
  • Extraction quality degrades on long documents
  • You need page-level citations or references
  • Processing time exceeds acceptable limits

Strategy Overview

Basic Page-Based Chunking

Split a document by pages and process each chunk:
import { createFlow, parse, chunk, extract, combine } from '@doclo/flows';
import { createVLMProvider } from '@doclo/providers-llm';
import { createOCRProvider } from '@doclo/providers-datalab';

const ocrProvider = createOCRProvider({
  endpoint: 'https://www.datalab.to/api/v1/marker',
  apiKey: process.env.DATALAB_API_KEY!
});

const vlmProvider = createVLMProvider({
  provider: 'google',
  model: 'google/gemini-2.5-flash',
  apiKey: process.env.OPENROUTER_API_KEY!,
  via: 'openrouter'
});

const schema = {
  type: 'object',
  properties: {
    items: {
      type: 'array',
      items: {
        type: 'object',
        properties: {
          title: { type: 'string' },
          content: { type: 'string' },
          pageNumber: { type: 'number' }
        }
      }
    }
  }
};

const flow = createFlow()
  .step('parse', parse({ provider: ocrProvider }))
  .step('chunk', chunk({
    strategy: 'page',
    pagesPerChunk: 5,
    combineShortPages: true
  }))
  .forEach('extract', () =>
    createFlow()
      .step('extract', extract({
        provider: vlmProvider,
        schema: schema,
        inputMode: 'ir'
      }))
  )
  .step('combine', combine({ strategy: 'merge' }))
  .build();

Chunking Strategies

Page Strategy

Process documents in page groups:
chunk({
  strategy: 'page',
  pagesPerChunk: 5,       // Pages per chunk
  combineShortPages: true, // Merge pages with little content
  minPageContent: 100      // Min chars to keep a page separate
})
Best for:
  • Documents where page boundaries matter (contracts, reports)
  • Maintaining page-level references
  • Consistent chunk sizes

Section Strategy

Split by document sections (headers, chapters):
chunk({
  strategy: 'section',
  maxSize: 2000,  // Max chars per chunk
  minSize: 100    // Min chars per chunk
})
Best for:
  • Structured documents with clear headings
  • Technical documentation
  • Reports with distinct sections

Recursive Strategy

Split by natural text boundaries:
chunk({
  strategy: 'recursive',
  maxSize: 1000,
  overlap: 100,
  separators: ['\n\n', '\n', '. ', ' ']
})
Best for:
  • Unstructured text
  • Articles and narratives
  • RAG pipelines

Fixed Strategy

Split at exact intervals:
chunk({
  strategy: 'fixed',
  size: 512,
  unit: 'characters',  // or 'tokens'
  overlap: 50
})
Best for:
  • Embedding generation
  • Uniform processing requirements

Overlap for Context Continuity

When content spans chunk boundaries, use overlap to ensure complete extraction:
chunk({
  strategy: 'recursive',
  maxSize: 1000,
  overlap: 200  // 200 characters repeated between chunks
})
This ensures sentences or data points split across chunks appear fully in at least one chunk.

Processing Multi-Document Files

For files containing multiple logical documents (e.g., a PDF with several invoices), use split instead of chunk:
import { createFlow, split, extract, combine } from '@doclo/flows';

const multiInvoiceFlow = createFlow()
  .step('split', split({
    provider: vlmProvider,
    schemas: {
      invoice: invoiceSchema
    },
    splitStrategy: 'document'  // Split into separate logical documents
  }))
  .forEach('extract', (doc) =>
    createFlow()
      .step('extract', extract({
        provider: vlmProvider,
        schema: doc.schema
      }))
  )
  .step('combine', combine({ strategy: 'concatenate' }))
  .build();

const result = await multiInvoiceFlow.run({ base64 });
// result.output is array of extracted invoices

Parallel Processing with forEach

The forEach step processes chunks in parallel:
.forEach('extract', (chunkData) =>
  createFlow()
    .step('extract', extract({
      provider: vlmProvider,
      schema: schema
    }))
)
Each chunk is processed independently, and results are collected for the combine step.

Accessing Chunk Metadata

Within forEach, you have access to chunk information:
.forEach('extract', (chunkData, index, context) => {
  console.log(`Processing chunk ${index}`);
  console.log(`Pages: ${chunkData.pageNumbers?.join(', ')}`);

  return createFlow()
    .step('extract', extract({
      provider: vlmProvider,
      schema: schema,
      additionalInstructions: `This is pages ${chunkData.pageNumbers?.join(', ')} of the document.`
    }));
})

Combining Results

Choose a combine strategy based on your data:

Merge (Default)

Intelligently merges based on type:
combine({ strategy: 'merge' })

// Arrays get flattened
// [{ items: [1, 2] }, { items: [3, 4] }] → { items: [1, 2, 3, 4] }

// Objects get merged
// [{ a: 1 }, { b: 2 }] → { a: 1, b: 2 }

Concatenate

Keep all results as an array:
combine({ strategy: 'concatenate' })

// Always returns array
// [{ page: 1 }, { page: 2 }] → [{ page: 1 }, { page: 2 }]

First/Last

Return first or last non-null result:
combine({ strategy: 'first' })  // First successful extraction
combine({ strategy: 'last' })   // Last successful extraction

Complete Example: Contract Processing

Extract clauses from a multi-page contract:
import { createFlow, parse, chunk, extract, combine } from '@doclo/flows';
import { createVLMProvider } from '@doclo/providers-llm';
import { createOCRProvider } from '@doclo/providers-datalab';
import { readFileSync } from 'fs';

// Types
interface ContractClause {
  clauseNumber: string;
  title: string;
  content: string;
  pageNumber: number;
  type: 'obligation' | 'right' | 'definition' | 'general';
}

interface ContractExtraction {
  clauses: ContractClause[];
}

// Schema
const clauseSchema = {
  type: 'object',
  properties: {
    clauses: {
      type: 'array',
      items: {
        type: 'object',
        properties: {
          clauseNumber: { type: 'string', description: 'Clause number (e.g., 1.1, 2.3)' },
          title: { type: 'string', description: 'Clause title or heading' },
          content: { type: 'string', description: 'Full clause text' },
          pageNumber: { type: 'number', description: 'Page where clause appears' },
          type: {
            type: 'string',
            enum: ['obligation', 'right', 'definition', 'general'],
            description: 'Clause type classification'
          }
        },
        required: ['clauseNumber', 'content', 'type']
      }
    }
  }
};

// Providers
const ocrProvider = createOCRProvider({
  endpoint: 'https://www.datalab.to/api/v1/marker',
  apiKey: process.env.DATALAB_API_KEY!
});

const vlmProvider = createVLMProvider({
  provider: 'anthropic',
  model: 'anthropic/claude-sonnet-4.5',
  apiKey: process.env.OPENROUTER_API_KEY!,
  via: 'openrouter'
});

// Build flow
const contractFlow = createFlow({
  observability: {
    onBatchStart: (ctx) => {
      console.log(`Processing ${ctx.totalItems} chunks...`);
    },
    onBatchItemEnd: (ctx) => {
      console.log(`Chunk ${ctx.itemIndex + 1} complete`);
    },
    onBatchEnd: (ctx) => {
      console.log(`All chunks processed: ${ctx.successfulItems} success, ${ctx.failedItems} failed`);
    }
  }
})
  .step('parse', parse({ provider: ocrProvider }))
  .step('chunk', chunk({
    strategy: 'section',  // Split by sections/headers
    maxSize: 3000,
    minSize: 200
  }))
  .forEach('extract', () =>
    createFlow()
      .step('extract', extract<ContractExtraction>({
        provider: vlmProvider,
        schema: clauseSchema,
        inputMode: 'ir',
        additionalInstructions: `
          Extract all contract clauses from this section.
          Include the exact page number where each clause starts.
          Classify each clause by its primary purpose.
        `
      }))
  )
  .step('combine', combine({ strategy: 'merge' }))
  .build();

// Process contract
async function processContract(filePath: string) {
  const fileBuffer = readFileSync(filePath);
  const base64 = `data:application/pdf;base64,${fileBuffer.toString('base64')}`;

  const result = await contractFlow.run({ base64 });

  console.log('\n--- Contract Analysis ---');
  console.log('Total clauses found:', result.output.clauses?.length ?? 0);

  // Group by type
  const byType = result.output.clauses?.reduce((acc, clause) => {
    acc[clause.type] = (acc[clause.type] || 0) + 1;
    return acc;
  }, {} as Record<string, number>);

  console.log('By type:', byType);
  console.log('\n--- Metrics ---');
  console.log('Duration:', result.aggregated.totalDurationMs, 'ms');
  console.log('Cost: $', result.aggregated.totalCostUSD.toFixed(4));

  return result.output;
}

processContract('./contract.pdf').catch(console.error);

Progress Tracking

Monitor chunked processing with observability hooks:
const flow = createFlow({
  observability: {
    onStepStart: (ctx) => {
      if (ctx.stepId === 'chunk') {
        console.log('Chunking document...');
      }
    },
    onStepEnd: (ctx) => {
      if (ctx.stepId === 'chunk') {
        console.log(`Created ${ctx.output.totalChunks} chunks`);
      }
    },
    onBatchStart: (ctx) => {
      console.log(`Starting parallel processing of ${ctx.totalItems} chunks`);
    },
    onBatchItemEnd: (ctx) => {
      const progress = ((ctx.itemIndex + 1) / ctx.totalItems * 100).toFixed(0);
      console.log(`Progress: ${progress}% (${ctx.itemIndex + 1}/${ctx.totalItems})`);
    }
  }
});

Memory Considerations

For very large documents:
  1. Stream processing: Process chunks sequentially if memory is constrained
  2. Reduce pagesPerChunk: Smaller chunks use less memory per operation
  3. Use IR-only mode: Skip visual processing to reduce memory usage
// Memory-efficient configuration
chunk({
  strategy: 'page',
  pagesPerChunk: 2,  // Smaller chunks
  combineShortPages: false
})

// Use text-only extraction
extract({
  provider: llmProvider,  // LLM instead of VLM
  schema: schema,
  inputMode: 'ir'  // Text only, no images
})

Error Handling in Chunked Flows

Handle failures gracefully:
import { FlowExecutionError } from '@doclo/core';

try {
  const result = await contractFlow.run({ base64 });
} catch (error) {
  if (error instanceof FlowExecutionError) {
    console.error('Failed at step:', error.stepId);

    // Access partial results from successful chunks
    const partialResults = error.artifacts?.extract;
    if (partialResults) {
      console.log('Partial results:', partialResults.length, 'chunks completed');
    }
  }
}

Using Doclo Cloud

Process large documents via Doclo Cloud with async execution:
import { DocloClient } from '@doclo/client';

const client = new DocloClient({
  apiKey: process.env.DOCLO_API_KEY!
});

// Start async execution for large documents
const execution = await client.flows.run('contract-extraction-flow', {
  input: {
    document: { base64, filename: 'contract.pdf', mimeType: 'application/pdf' }
  },
  webhookUrl: 'https://your-app.com/api/webhook'  // Get notified when done
});

console.log('Processing started:', execution.id);

// Or poll for completion
const result = await client.runs.waitForCompletion(execution.id, {
  interval: 5000,   // Check every 5 seconds
  timeout: 600000   // Wait up to 10 minutes
});

console.log('Clauses extracted:', result.output.clauses.length);

Next Steps