Automated Document Processing Pipelines
How to build reliable automated document processing pipelines in TypeScript for extracting, validating, and classifying customer documents at scale.
Every customer onboarding system eventually becomes a document processing system. Passports, national IDs, driver's licenses, utility bills, bank statements, proof of address letters -- the variety is staggering, and each document type has its own layout, language, and set of extractable fields. Handling this at scale requires a pipeline that is modular, fault-tolerant, and observable.
In Oasis, our document processing pipeline handles tens of thousands of documents daily. Documents arrive in various formats and quality levels, pass through classification, enhancement, extraction, and validation stages, and emerge as structured data that feeds into the KYC verification workflow. This article details the architecture of that pipeline, the design decisions we made, and the lessons we learned along the way.
Pipeline Architecture Overview
The pipeline follows a staged processing model where each stage has a single responsibility, communicates through well-defined interfaces, and can be independently scaled and monitored. Documents flow through the stages via an in-memory queue for synchronous processing or a persistent message queue for asynchronous batch processing.
interface PipelineStage<TInput, TOutput> {
name: string;
process(input: TInput): Promise<TOutput>;
validate(input: TInput): boolean;
}
interface DocumentPipelineContext {
documentId: string;
customerId: string;
rawFile: Buffer;
mimeType: string;
metadata: Record<string, string>;
stageResults: Map<string, unknown>;
}
class DocumentPipeline {
private stages: Array<PipelineStage<DocumentPipelineContext, DocumentPipelineContext>> = [];
private errorHandler: PipelineErrorHandler;
private metrics: PipelineMetrics;
constructor(errorHandler: PipelineErrorHandler, metrics: PipelineMetrics) {
this.errorHandler = errorHandler;
this.metrics = metrics;
}
addStage(
stage: PipelineStage<DocumentPipelineContext, DocumentPipelineContext>
): void {
this.stages.push(stage);
}
async execute(context: DocumentPipelineContext): Promise<PipelineResult> {
const startTime = Date.now();
let currentContext = context;
for (const stage of this.stages) {
const stageStart = Date.now();
try {
if (!stage.validate(currentContext)) {
throw new ValidationError(
`Input validation failed for stage: ${stage.name}`
);
}
currentContext = await stage.process(currentContext);
this.metrics.recordStageLatency(stage.name, Date.now() - stageStart);
this.metrics.recordStageSuccess(stage.name);
} catch (error) {
this.metrics.recordStageFailure(stage.name);
const decision = await this.errorHandler.handle(stage.name, error as Error, currentContext);
if (decision === "abort") {
return {
success: false,
documentId: context.documentId,
failedStage: stage.name,
error: (error as Error).message,
processingTimeMs: Date.now() - startTime,
};
}
}
}
return {
success: true,
documentId: context.documentId,
failedStage: null,
error: null,
processingTimeMs: Date.now() - startTime,
};
}
}Document Classification
The first stage classifies the incoming document. A customer might upload a passport when we asked for a utility bill, or submit a bank statement in an unexpected format. Classification determines what extraction strategy to apply and catches mismatched documents early.
interface ClassificationResult {
documentType: DocumentType;
confidence: number;
detectedLanguage: string;
pageCount: number;
quality: ImageQualityAssessment;
}
interface ImageQualityAssessment {
resolution: { width: number; height: number };
blurriness: number;
brightness: number;
isReadable: boolean;
}
class DocumentClassifier implements PipelineStage<DocumentPipelineContext, DocumentPipelineContext> {
name = "document-classification";
constructor(
private classificationService: ClassificationService,
private qualityAssessor: ImageQualityService
) {}
validate(input: DocumentPipelineContext): boolean {
const supportedTypes = ["image/jpeg", "image/png", "application/pdf"];
return supportedTypes.includes(input.mimeType) && input.rawFile.length > 0;
}
async process(
context: DocumentPipelineContext
): Promise<DocumentPipelineContext> {
const quality = await this.qualityAssessor.assess(context.rawFile);
if (!quality.isReadable) {
throw new DocumentQualityError(
"Document image quality is too low for processing",
{
blurriness: quality.blurriness,
brightness: quality.brightness,
resolution: quality.resolution,
}
);
}
const classification = await this.classificationService.classify(
context.rawFile,
context.mimeType
);
if (classification.confidence < 0.7) {
throw new ClassificationUncertainError(
`Could not classify document with sufficient confidence: ${classification.confidence}`
);
}
context.stageResults.set(this.name, {
...classification,
quality,
});
return context;
}
}Quality assessment happens before classification. There is no point in running an expensive classification model on a blurry, underexposed photograph that will inevitably fail extraction. By catching low-quality uploads early, we can prompt the customer to retake the photo immediately rather than failing several minutes into the pipeline.
Image Enhancement and Preprocessing
After classification, documents pass through an enhancement stage that improves image quality for downstream OCR. This stage handles rotation correction, perspective transformation, noise reduction, and contrast adjustment.
class ImageEnhancer implements PipelineStage<DocumentPipelineContext, DocumentPipelineContext> {
name = "image-enhancement";
constructor(private imageProcessor: ImageProcessingService) {}
validate(input: DocumentPipelineContext): boolean {
return input.stageResults.has("document-classification");
}
async process(
context: DocumentPipelineContext
): Promise<DocumentPipelineContext> {
const classification = context.stageResults.get(
"document-classification"
) as ClassificationResult;
const enhancementPlan = this.buildEnhancementPlan(classification);
let processedImage = context.rawFile;
for (const step of enhancementPlan) {
processedImage = await this.imageProcessor.apply(processedImage, step);
}
context.stageResults.set(this.name, {
originalSize: context.rawFile.length,
enhancedSize: processedImage.length,
appliedEnhancements: enhancementPlan.map((s) => s.name),
});
context.rawFile = processedImage;
return context;
}
private buildEnhancementPlan(
classification: ClassificationResult
): EnhancementStep[] {
const steps: EnhancementStep[] = [];
if (classification.quality.blurriness > 0.5) {
steps.push({ name: "sharpen", params: { strength: 0.8 } });
}
if (
classification.quality.brightness < 0.3 ||
classification.quality.brightness > 0.9
) {
steps.push({
name: "normalize-brightness",
params: { target: 0.6 },
});
}
steps.push({ name: "deskew", params: {} });
steps.push({ name: "remove-noise", params: { threshold: 0.2 } });
return steps;
}
}The enhancement plan is adaptive. A well-lit, properly oriented passport photo might only need noise reduction, while a crumpled utility bill photographed at an angle needs the full treatment. Building the plan based on the quality assessment from the classification stage avoids unnecessary processing.
Data Extraction and Normalization
The extraction stage is where documents become structured data. Different document types require different extraction strategies. A passport has a machine-readable zone (MRZ) that can be parsed deterministically, while a utility bill requires OCR and field-matching heuristics.
interface ExtractionStrategy {
documentTypes: DocumentType[];
extract(
image: Buffer,
classification: ClassificationResult
): Promise<ExtractedFields>;
}
interface ExtractedFields {
fields: Map<string, ExtractedField>;
rawText: string;
confidence: number;
}
interface ExtractedField {
key: string;
value: string;
confidence: number;
boundingBox?: BoundingBox;
}
class MRZExtractionStrategy implements ExtractionStrategy {
documentTypes: DocumentType[] = ["PASSPORT"];
constructor(private mrzParser: MRZParser, private ocrService: OCRService) {}
async extract(
image: Buffer,
classification: ClassificationResult
): Promise<ExtractedFields> {
const ocrResult = await this.ocrService.extractText(image);
const mrzLines = this.findMRZLines(ocrResult.text);
if (mrzLines.length < 2) {
throw new ExtractionError("Could not locate MRZ in passport image");
}
const parsed = this.mrzParser.parse(mrzLines);
const fields = new Map<string, ExtractedField>();
fields.set("full_name", {
key: "full_name",
value: `${parsed.givenNames} ${parsed.surname}`,
confidence: 0.95,
});
fields.set("date_of_birth", {
key: "date_of_birth",
value: parsed.dateOfBirth,
confidence: 0.98,
});
fields.set("document_number", {
key: "document_number",
value: parsed.documentNumber,
confidence: 0.97,
});
fields.set("nationality", {
key: "nationality",
value: parsed.nationality,
confidence: 0.99,
});
fields.set("expiry_date", {
key: "expiry_date",
value: parsed.expiryDate,
confidence: 0.98,
});
return {
fields,
rawText: ocrResult.text,
confidence: parsed.checksumValid ? 0.97 : 0.6,
};
}
private findMRZLines(text: string): string[] {
const lines = text.split("\n").map((l) => l.trim());
return lines.filter(
(line) => line.length >= 30 && /^[A-Z0-9<]+$/.test(line)
);
}
}
class DataExtractionStage implements PipelineStage<DocumentPipelineContext, DocumentPipelineContext> {
name = "data-extraction";
private strategies: ExtractionStrategy[] = [];
registerStrategy(strategy: ExtractionStrategy): void {
this.strategies.push(strategy);
}
validate(input: DocumentPipelineContext): boolean {
return input.stageResults.has("image-enhancement");
}
async process(
context: DocumentPipelineContext
): Promise<DocumentPipelineContext> {
const classification = context.stageResults.get(
"document-classification"
) as ClassificationResult;
const strategy = this.strategies.find((s) =>
s.documentTypes.includes(classification.documentType)
);
if (!strategy) {
throw new Error(
`No extraction strategy for document type: ${classification.documentType}`
);
}
const extracted = await strategy.extract(context.rawFile, classification);
context.stageResults.set(this.name, extracted);
return context;
}
}The strategy pattern allows us to add support for new document types without modifying the pipeline. When we needed to support a new national ID format for a market expansion, we wrote a new extraction strategy, registered it, and deployed. No changes to the pipeline orchestration, classification, or validation stages were required.
Cross-Validation and Fraud Detection
The final stage cross-validates extracted data against the information the customer provided during registration and runs fraud detection checks on the document images themselves.
class CrossValidationStage implements PipelineStage<DocumentPipelineContext, DocumentPipelineContext> {
name = "cross-validation";
constructor(
private customerStore: CustomerStore,
private fraudDetector: DocumentFraudDetector
) {}
validate(input: DocumentPipelineContext): boolean {
return input.stageResults.has("data-extraction");
}
async process(
context: DocumentPipelineContext
): Promise<DocumentPipelineContext> {
const extracted = context.stageResults.get("data-extraction") as ExtractedFields;
const customer = await this.customerStore.findById(context.customerId);
const discrepancies: Discrepancy[] = [];
const nameField = extracted.fields.get("full_name");
if (nameField && !this.namesMatch(nameField.value, customer.fullName)) {
discrepancies.push({
field: "full_name",
documentValue: nameField.value,
providedValue: customer.fullName,
severity: "high",
});
}
const dobField = extracted.fields.get("date_of_birth");
if (dobField && dobField.value !== customer.dateOfBirth) {
discrepancies.push({
field: "date_of_birth",
documentValue: dobField.value,
providedValue: customer.dateOfBirth,
severity: "critical",
});
}
const fraudResult = await this.fraudDetector.analyze(context.rawFile);
context.stageResults.set(this.name, {
discrepancies,
fraudDetection: fraudResult,
crossValidationPassed: discrepancies.filter((d) => d.severity === "critical").length === 0,
});
return context;
}
private namesMatch(documentName: string, providedName: string): boolean {
const normalize = (name: string) =>
name.toLowerCase().replace(/[^a-z\s]/g, "").replace(/\s+/g, " ").trim();
const docNorm = normalize(documentName);
const provNorm = normalize(providedName);
if (docNorm === provNorm) return true;
const docParts = docNorm.split(" ");
const provParts = provNorm.split(" ");
const firstNameMatch = docParts[0] === provParts[0];
const lastNameMatch = docParts[docParts.length - 1] === provParts[provParts.length - 1];
return firstNameMatch && lastNameMatch;
}
}Conclusion
A document processing pipeline is only as strong as its weakest stage. By decomposing the problem into classification, enhancement, extraction, and validation stages, each with clear interfaces and independent failure modes, you build a system that is robust, extensible, and observable.
The most valuable lesson from building this pipeline in Oasis is that early rejection saves everyone time. Catching a blurry photo at the quality assessment stage, a misclassified document at the classification stage, or a forged document at the fraud detection stage prevents wasted processing and gives the customer immediate feedback. A well-designed pipeline is not just faster; it creates a better experience for everyone involved.
Related Articles
Improving Customer Conversion Through Better Onboarding
Data-driven strategies for improving customer conversion rates during fintech onboarding, from A/B testing frameworks to personalization and real-time optimization.
Integrating Third-Party Verification APIs
Practical strategies for integrating third-party identity verification APIs, covering adapter patterns, error handling, rate limiting, and provider management in TypeScript.
Data Privacy in Customer Onboarding
Strategies for protecting customer data during the onboarding process, covering data minimization, encryption, consent management, and regulatory compliance.