Skip to main content

Overview

The CGIAR Risk Intelligence Tool uses a multi-agent AI pipeline built on AWS Bedrock to automate risk assessment. The system orchestrates specialized AI agents, each handling a specific phase of the analysis workflow.

Architecture

Processing Flow

  1. Document Upload → PDF files uploaded to S3
  2. AWS Textract → Extracts text and tables from documents
  3. Parser Agent → Structures extracted data into risk categories
  4. Gap Detector → Identifies missing or incomplete fields
  5. Risk Analysis Agent → Scores all 7 risk categories with subcategories
  6. Report Generator → Creates comprehensive PDF with traffic-light indicators
All AI operations run asynchronously as background jobs. The frontend polls job status for completion.

AWS Bedrock Integration

Foundation Models

All agents use Claude 3.5 Sonnet v2 from Anthropic:
import { AgentSection } from '../enums/agent-section.enum';

export const BEDROCK_MODELS: Record<
  AgentSection,
  { modelId: string; knowledgeBaseId?: string }
> = {
  [AgentSection.PARSER]: {
    modelId: 'anthropic.claude-3-5-sonnet-20241022-v2:0',
  },
  [AgentSection.GAP_DETECTOR]: {
    modelId: 'anthropic.claude-3-5-sonnet-20241022-v2:0',
  },
  [AgentSection.RISK_ANALYSIS]: {
    modelId: 'anthropic.claude-3-5-sonnet-20241022-v2:0',
  },
  [AgentSection.REPORT_GENERATION]: {
    modelId: 'anthropic.claude-3-5-sonnet-20241022-v2:0',
  },
};

Resilience Features

Prevents cascading failures by opening after 3 consecutive failures:
this.circuitBreaker = new CircuitBreaker({
  failureThreshold: 3,
  resetTimeoutMs: 60_000,
  isFailure: (err) => !((err as Error)?.name === 'ValidationException'),
});
  • Open: Rejects requests immediately for 60 seconds
  • Half-Open: Allows one test request after timeout
  • Closed: Normal operation
Automatically retries throttled requests:
await withRetry(
  () => this.client.send(new InvokeModelCommand(...)),
  {
    maxAttempts: 3,
    isRetryable: (err) =>
      (err as Error)?.name === 'ThrottlingException' ||
      (err as Error)?.name === 'ServiceUnavailableException',
  }
);
  • Retries up to 3 times for throttling/service errors
  • Exponential backoff between attempts
  • Fails fast for validation errors

Agent Pipeline

1. Parser Agent

Purpose: Structures raw Textract output into organized risk category data Input:
  • Extracted text content from AWS Textract
  • Extracted tables with headers and rows
  • Assessment metadata (company name, type, country)
Processing:
interface ParseDocumentInput {
  assessmentId: string;
  documentId: string;
  s3Key: string;
}

// Job Type: PARSE_DOCUMENT
const jobId = await jobsService.create(
  JobType.PARSE_DOCUMENT,
  { assessmentId, documentId, s3Key },
  userId
);
AWS Textract Analysis:
async analyzeDocument(s3Bucket: string, s3Key: string): Promise<ExtractionResult> {
  // Step 1: Start async analysis job
  const textractJobId = await this.startAnalysis(s3Bucket, s3Key);
  
  // Step 2: Poll with exponential backoff (2s → 4s → 8s → 16s → 30s)
  const pages = await this.pollUntilComplete(textractJobId);
  
  // Step 3: Extract LINE blocks for text content
  const textContent = lineBlocks
    .map(b => b.Text ?? '')
    .filter(Boolean)
    .join('\n');
  
  // Step 4: Extract TABLE blocks with cells
  const tables: ExtractedTable[] = tableBlocks.map((table) => {
    const cells = getCellBlocks(table);
    const grid = buildTableGrid(cells);
    return {
      page: table.Page ?? 1,
      headers: grid[0],
      rows: grid,
      rowCount: grid.length,
      columnCount: grid[0].length
    };
  });
  
  return { pages: pageCount, textContent, tables, metadata };
}
Output: Structured data ready for gap detection Document Status Transitions:
PENDING_UPLOAD → UPLOADED → PARSING → PARSED
                                    ↓
                                 FAILED (on error)
Textract jobs can take several minutes for large PDFs. The frontend should poll document status every 3-5 seconds with a maximum timeout of 10 minutes.

2. Gap Detector Agent

Purpose: Identifies missing or incomplete data fields that need user verification Input:
  • Parsed document data
  • Assessment metadata
  • List of required fields per risk category
Processing:
interface GapDetectionInput {
  assessmentId: string;
}

// Job Type: GAP_DETECTION (auto-chained after PARSE_DOCUMENT)
Gap Field States:
Field has no extracted value. User must provide data manually.
{
  "field": "revenue_2025",
  "label": "FY2025 Revenue",
  "extractedValue": null,
  "status": "MISSING",
  "isMandatory": true
}
Output: Gap fields created across all 7 risk categories (5 fields each = 35 total) Assessment Status Transition:
DRAFT → ANALYZING → ACTION_REQUIRED (if gaps detected)
                  → COMPLETE (if no gaps)
Gap detection is triggered for GUIDED_INTERVIEW and MANUAL_ENTRY intake modes after data submission, and for UPLOAD mode after document parsing.

3. Risk Analysis Agent

Purpose: Generates risk scores, narratives, and recommendations for all categories Input:
  • Verified gap field data
  • Assessment metadata
  • Historical risk benchmarks (future enhancement)
Processing:
interface RiskAnalysisInput {
  assessmentId: string;
}

// Job Type: RISK_ANALYSIS
const jobId = await jobsService.create(
  JobType.RISK_ANALYSIS,
  { assessmentId },
  userId
);
For Each Risk Category:
1

Score Subcategories

AI analyzes data and assigns scores (0-100) to 5 subcategories:
const subcategories = [
  {
    name: 'Revenue Stability',
    indicator: 'Year-over-year revenue variance',
    score: 42,
    level: 'MODERATE',
    evidence: 'Revenue declined 8% in FY2025 but stabilized in Q4',
    mitigation: 'Diversify customer base and develop new revenue streams'
  },
  // ... 4 more subcategories
];
2

Calculate Category Score

Aggregate subcategory scores (default: equal weights):
const categoryScore = Math.round(
  subcategories.reduce((sum, sub) => sum + sub.score, 0) / subcategories.length
);
3

Assign Risk Level

Map score to traffic-light level:
const level = 
  categoryScore < 25 ? 'LOW' :
  categoryScore < 50 ? 'MODERATE' :
  categoryScore < 75 ? 'HIGH' : 'CRITICAL';
4

Generate Narrative

AI creates contextual risk narrative:
const narrative = `The ${category} risk level is ${level} based on analysis of ${subcategories.map(s => s.name).join(', ')}.`;
5

Create Recommendations

Generate 2-3 prioritized recommendations:
const recommendations = [
  {
    text: 'Develop a 3-year financial sustainability plan...',
    priority: 'HIGH',
    order: 0
  },
  {
    text: 'Implement quarterly financial health monitoring...',
    priority: 'MEDIUM',
    order: 1
  }
];
Output: Complete risk scores for all 7 categories + overall assessment score Assessment Status Transition:
ACTION_REQUIRED → COMPLETE (progress: 90)

4. Report Generation Agent

Purpose: Creates PDF report with visualizations and traffic-light indicators Input:
  • Complete risk score data
  • Assessment metadata
  • Recommendations
Processing:
interface ReportGenerationInput {
  assessmentId: string;
}

// Job Type: REPORT_GENERATION
const jobId = await jobsService.create(
  JobType.REPORT_GENERATION,
  { assessmentId },
  userId
);
See: Report Generation for detailed documentation

Asynchronous Job Processing

All AI operations run as background jobs:
// API Lambda creates job and invokes Worker Lambda
const jobId = await jobsService.create(
  JobType.PARSE_DOCUMENT,
  { assessmentId, documentId, s3Key },
  userId
);

// Returns immediately with job ID
return { jobId };

Retry Logic

Jobs retry up to 3 times on failure:
if (attempts >= maxAttempts) {
  await updateStatus(jobId, JobStatus.FAILED, undefined, errorMsg);
  // Notify handler of permanent failure
  await handler.onFailure(documentId, error);
} else {
  // Reset to PENDING for retry
  await updateStatus(jobId, JobStatus.PENDING);
}
Jobs can automatically trigger dependent jobs:
// After PARSE_DOCUMENT completes, auto-chain GAP_DETECTION
if (job.type === JobType.PARSE_DOCUMENT && job.status === 'COMPLETED') {
  const gapJobId = await this.create(
    JobType.GAP_DETECTION,
    { assessmentId: input.assessmentId },
    job.createdById
  );
  await this.processJob(gapJobId);
}

Prompt Management

AI agents use versioned prompts managed through the Prompt CMS:
// Public endpoint for runtime prompt retrieval
GET /api/prompts/section/parser

{
  "id": "prompt-123",
  "section": "PARSER",
  "systemPrompt": "You are an expert document analyst...",
  "userPromptTemplate": "Analyze the following document and extract...",
  "tone": "Professional and analytical",
  "outputFormat": "JSON with structured fields",
  "version": 5
}
See: Prompt Management documentation for versioning, comments, and change tracking

Code Example: Complete AI Pipeline

import { useJob } from '@/hooks/use-job';

function AssessmentAnalysis({ assessmentId }: { assessmentId: string }) {
  // Step 1: Trigger risk analysis
  const triggerAnalysis = async () => {
    const { jobId } = await fetch(
      `/api/assessments/${assessmentId}/analyze`,
      { method: 'POST' }
    ).then(r => r.json());
    
    return jobId;
  };
  
  // Step 2: Poll job status with custom hook
  const { job, isLoading, error } = useJob(jobId, {
    pollInterval: 3000,
    maxAttempts: 200, // ~10 minutes
  });
  
  // Step 3: Handle completion
  useEffect(() => {
    if (job?.status === 'COMPLETED') {
      // Refresh assessment data to get new scores
      refreshAssessment();
    } else if (job?.status === 'FAILED') {
      toast.error(`Analysis failed: ${job.error}`);
    }
  }, [job?.status]);
  
  return (
    <div>
      {isLoading && <ProgressBar progress={job?.progress ?? 0} />}
      {job?.status === 'COMPLETED' && <RiskScoresDashboard />}
    </div>
  );
}

Best Practices

Monitor Token Usage

Track Bedrock token consumption per job type to optimize costs and identify inefficient prompts

Implement Timeouts

Set reasonable polling timeouts (10 min for Textract, 5 min for AI jobs) to prevent infinite loops

Handle Partial Results

Design UIs to show progress and partial results rather than blocking on job completion

Version Prompts

Always version prompts before deploying to production. Use the Prompt CMS change tracking.

Risk Scoring

Understand how AI-generated scores map to risk levels

Assessment Workflow

See how AI agents fit into the assessment lifecycle

Report Generation

Learn about PDF report creation with AI-generated content