Skip to main content
The Doclo SDK provides comprehensive observability through lifecycle hooks, metrics aggregation, and cloud integration. Monitor every step of your document processing pipelines.

Local Observability

Configure observability hooks when creating a flow:
import { createFlow, extract } from '@doclo/flows';

const flow = createFlow({
  observability: {
    onFlowStart: (ctx) => {
      console.log(`Flow ${ctx.flowId} started`);
    },
    onFlowEnd: (ctx) => {
      console.log(`Flow completed in ${ctx.duration}ms`);
      console.log(`Cost: $${ctx.stats?.totalCostUSD.toFixed(4)}`);
    },
    onFlowError: (ctx) => {
      console.error(`Flow failed: ${ctx.error.message}`);
    },
    onStepStart: (ctx) => {
      console.log(`Step ${ctx.stepId} starting...`);
    },
    onStepEnd: (ctx) => {
      console.log(`Step ${ctx.stepId}: ${ctx.duration}ms, $${ctx.cost?.toFixed(4)}`);
    },
    onStepError: (ctx) => {
      console.error(`Step ${ctx.stepId} failed: ${ctx.error.message}`);
    }
  }
})
  .step('extract', extract({ provider, schema }))
  .build();

Available Hooks

Flow-Level Hooks

HookTriggerContext
onFlowStartFlow execution beginsflowId, executionId, input, metadata
onFlowEndFlow completes successfullyduration, output, stats, traceContext
onFlowErrorFlow failserror, errorCode, failedAtStepIndex

Step-Level Hooks

HookTriggerContext
onStepStartStep execution beginsstepId, stepIndex, stepType, provider, model
onStepEndStep completes successfullyduration, output, usage, cost
onStepErrorStep failserror, willRetry, retryAttempt

Consensus Hooks

HookTriggerContext
onConsensusStartConsensus voting beginsrunsPlanned, strategy
onConsensusRunCompleteIndividual run completesrunIndex, output, status
onConsensusCompleteAll runs completeagreement, agreedOutput, totalCost

Batch/forEach Hooks

HookTriggerContext
onBatchStartforEach beginstotalItems, batchId
onBatchItemEndSingle item processeditemIndex, result, status
onBatchEndAll items processedsuccessfulItems, failedItems

Provider-Level Hooks

HookTriggerContext
onProviderRequestAPI request sentprovider, model, input
onProviderResponseAPI response receivedoutput, usage, cost, httpStatusCode
onProviderRetryRequest will retryerror, attemptNumber, nextRetryDelay

Cloud Observability

Send execution events to the Doclo Cloud dashboard for monitoring and analytics.

Using createCloudObservability

import { createCloudObservability } from '@doclo/client';
import { createFlow, extract } from '@doclo/flows';

// Create cloud observability transport
const cloudObs = createCloudObservability({
  client: docloClient,
  flowId: 'invoice-extraction',
  flowVersion: '1.0.0',
  mode: 'stream',           // Real-time events
  flushIntervalMs: 5000,    // Flush every 5 seconds
  includeInputs: false,     // Privacy: exclude input data
  includeOutputs: true      // Include extraction results
});

// Create flow with cloud observability
const flow = createFlow({ observability: cloudObs })
  .step('extract', extract({ provider, schema }))
  .build();

// Run the flow
const result = await flow.run(input);

// Ensure all events are sent
await cloudObs.flush();

Configuration Options

OptionTypeDefaultDescription
clientDocloClientRequiredDoclo client instance
flowIdstringRequiredFlow ID for tracking
flowVersionstring-Flow version
mode'stream' | 'batch-at-end''stream'When to send events
flushIntervalMsnumber5000Flush interval (0 = immediate)
batchSizenumber50Events before auto-flush
maxRetriesnumber3Retry attempts for failed sends
retryDelayMsnumber1000Base retry delay
maxBufferSizenumber1000Max buffered events
includeInputsbooleanfalseInclude input data in events
includeOutputsbooleanfalseInclude output data in events
onErrorfunction-Called when events are dropped

Transport Modes

Stream Mode (Default)

Events are sent periodically during execution for real-time dashboard updates:
createCloudObservability({
  client,
  flowId: 'my-flow',
  mode: 'stream',
  flushIntervalMs: 5000,  // Flush every 5 seconds
  batchSize: 50           // Or when 50 events buffered
});
Set flushIntervalMs: 0 for immediate per-event sending:
createCloudObservability({
  client,
  flowId: 'my-flow',
  mode: 'stream',
  flushIntervalMs: 0  // Send immediately
});

Batch-at-End Mode

Events are collected and sent only when flush() is called:
const obs = createCloudObservability({
  client,
  flowId: 'my-flow',
  mode: 'batch-at-end'
});

await flow.run(input);
await obs.flush();  // Send all events at once

Privacy Controls

Control what data is sent to the cloud:
createCloudObservability({
  client,
  flowId: 'my-flow',
  includeInputs: false,   // Don't send document content
  includeOutputs: true    // Send extraction results
});

Error Handling

Handle dropped events and transport errors:
createCloudObservability({
  client,
  flowId: 'my-flow',
  onError: (error, droppedCount) => {
    console.error(`Dropped ${droppedCount} events:`, error.message);
  }
});

Metrics

Every flow execution returns aggregated metrics:
const result = await flow.run(input);

console.log('Metrics:', result.aggregated);
// {
//   totalDurationMs: 2500,
//   totalCostUSD: 0.0042,
//   totalInputTokens: 1500,
//   totalOutputTokens: 200,
//   stepCount: 2
// }

// Per-step metrics
console.log('Step metrics:', result.metrics);
// [
//   { step: 'parse', ms: 1200, costUSD: 0.0010 },
//   { step: 'extract', ms: 1300, costUSD: 0.0032 }
// ]

Distributed Tracing

The SDK supports W3C trace context for distributed tracing:
const flow = createFlow({
  observability: {
    onFlowStart: (ctx) => {
      // W3C trace context available
      console.log('Trace ID:', ctx.traceContext.traceId);
      console.log('Span ID:', ctx.traceContext.spanId);
    }
  }
});

Propagating Trace Context

Pass trace context from upstream services:
const result = await flow.run(input, {
  metadata: {
    traceParent: req.headers['traceparent']
  }
});

Combining Local and Cloud

Use both local logging and cloud observability:
const cloudObs = createCloudObservability({
  client,
  flowId: 'my-flow'
});

const flow = createFlow({
  observability: {
    ...cloudObs,
    // Add local logging on top of cloud
    onStepEnd: (ctx) => {
      cloudObs.onStepEnd?.(ctx);
      console.log(`Step ${ctx.stepId}: ${ctx.duration}ms`);
    }
  }
});

Next Steps