Overview
The CGIAR Risk Intelligence Tool uses a multi-agent AI pipeline built on AWS Bedrock to automate risk assessment. The system orchestrates specialized AI agents, each handling a specific phase of the analysis workflow.
Architecture
Processing Flow
Document Upload → PDF files uploaded to S3
AWS Textract → Extracts text and tables from documents
Parser Agent → Structures extracted data into risk categories
Gap Detector → Identifies missing or incomplete fields
Risk Analysis Agent → Scores all 7 risk categories with subcategories
Report Generator → Creates comprehensive PDF with traffic-light indicators
All AI operations run asynchronously as background jobs. The frontend polls job status for completion.
AWS Bedrock Integration
Foundation Models
All agents use Claude 3.5 Sonnet v2 from Anthropic:
bedrock.config.ts
BedrockService.invokeModel
import { AgentSection } from '../enums/agent-section.enum' ;
export const BEDROCK_MODELS : Record <
AgentSection ,
{ modelId : string ; knowledgeBaseId ?: string }
> = {
[AgentSection. PARSER ]: {
modelId: 'anthropic.claude-3-5-sonnet-20241022-v2:0' ,
},
[AgentSection. GAP_DETECTOR ]: {
modelId: 'anthropic.claude-3-5-sonnet-20241022-v2:0' ,
},
[AgentSection. RISK_ANALYSIS ]: {
modelId: 'anthropic.claude-3-5-sonnet-20241022-v2:0' ,
},
[AgentSection. REPORT_GENERATION ]: {
modelId: 'anthropic.claude-3-5-sonnet-20241022-v2:0' ,
},
};
Resilience Features
Prevents cascading failures by opening after 3 consecutive failures: this . circuitBreaker = new CircuitBreaker ({
failureThreshold: 3 ,
resetTimeoutMs: 60_000 ,
isFailure : ( err ) => ! (( err as Error )?. name === 'ValidationException' ),
});
Open: Rejects requests immediately for 60 seconds
Half-Open: Allows one test request after timeout
Closed: Normal operation
Automatically retries throttled requests: await withRetry (
() => this . client . send ( new InvokeModelCommand ( ... )),
{
maxAttempts: 3 ,
isRetryable : ( err ) =>
( err as Error )?. name === 'ThrottlingException' ||
( err as Error )?. name === 'ServiceUnavailableException' ,
}
);
Retries up to 3 times for throttling/service errors
Exponential backoff between attempts
Fails fast for validation errors
Agent Pipeline
1. Parser Agent
Purpose: Structures raw Textract output into organized risk category data
Input:
Extracted text content from AWS Textract
Extracted tables with headers and rows
Assessment metadata (company name, type, country)
Processing:
interface ParseDocumentInput {
assessmentId : string ;
documentId : string ;
s3Key : string ;
}
// Job Type: PARSE_DOCUMENT
const jobId = await jobsService . create (
JobType . PARSE_DOCUMENT ,
{ assessmentId , documentId , s3Key },
userId
);
AWS Textract Analysis:
TextractService.analyzeDocument
ExtractionResult
async analyzeDocument ( s3Bucket : string , s3Key : string ): Promise < ExtractionResult > {
// Step 1: Start async analysis job
const textractJobId = await this . startAnalysis ( s3Bucket , s3Key );
// Step 2: Poll with exponential backoff (2s → 4s → 8s → 16s → 30s)
const pages = await this . pollUntilComplete ( textractJobId );
// Step 3: Extract LINE blocks for text content
const textContent = lineBlocks
.map( b => b . Text ?? '' )
.filter(Boolean)
.join( ' \n ' );
// Step 4: Extract TABLE blocks with cells
const tables: ExtractedTable [] = tableBlocks . map (( table ) => {
const cells = getCellBlocks ( table );
const grid = buildTableGrid ( cells );
return {
page: table . Page ?? 1 ,
headers: grid [ 0 ],
rows: grid ,
rowCount: grid . length ,
columnCount: grid [ 0 ]. length
};
});
return { pages: pageCount , textContent , tables , metadata };
}
Output: Structured data ready for gap detection
Document Status Transitions:
PENDING_UPLOAD → UPLOADED → PARSING → PARSED
↓
FAILED (on error)
Textract jobs can take several minutes for large PDFs. The frontend should poll document status every 3-5 seconds with a maximum timeout of 10 minutes.
2. Gap Detector Agent
Purpose: Identifies missing or incomplete data fields that need user verification
Input:
Parsed document data
Assessment metadata
List of required fields per risk category
Processing:
interface GapDetectionInput {
assessmentId : string ;
}
// Job Type: GAP_DETECTION (auto-chained after PARSE_DOCUMENT)
Gap Field States:
Field has no extracted value. User must provide data manually. {
"field" : "revenue_2025" ,
"label" : "FY2025 Revenue" ,
"extractedValue" : null ,
"status" : "MISSING" ,
"isMandatory" : true
}
Field has an extracted value but needs verification or is incomplete. {
"field" : "operating_margin" ,
"label" : "Operating Margin %" ,
"extractedValue" : "~12% (estimated)" ,
"status" : "PARTIAL" ,
"isMandatory" : true
}
User has reviewed and confirmed the extracted value. {
"field" : "total_employees" ,
"label" : "Total Employees" ,
"extractedValue" : "45" ,
"correctedValue" : "47" ,
"status" : "VERIFIED" ,
"isMandatory" : false
}
Output: Gap fields created across all 7 risk categories (5 fields each = 35 total)
Assessment Status Transition:
DRAFT → ANALYZING → ACTION_REQUIRED ( if gaps detected )
→ COMPLETE ( if no gaps )
Gap detection is triggered for GUIDED_INTERVIEW and MANUAL_ENTRY intake modes after data submission, and for UPLOAD mode after document parsing.
3. Risk Analysis Agent
Purpose: Generates risk scores, narratives, and recommendations for all categories
Input:
Verified gap field data
Assessment metadata
Historical risk benchmarks (future enhancement)
Processing:
interface RiskAnalysisInput {
assessmentId : string ;
}
// Job Type: RISK_ANALYSIS
const jobId = await jobsService . create (
JobType . RISK_ANALYSIS ,
{ assessmentId },
userId
);
For Each Risk Category:
Score Subcategories
AI analyzes data and assigns scores (0-100) to 5 subcategories: const subcategories = [
{
name: 'Revenue Stability' ,
indicator: 'Year-over-year revenue variance' ,
score: 42 ,
level: 'MODERATE' ,
evidence: 'Revenue declined 8% in FY2025 but stabilized in Q4' ,
mitigation: 'Diversify customer base and develop new revenue streams'
},
// ... 4 more subcategories
];
Calculate Category Score
Aggregate subcategory scores (default: equal weights): const categoryScore = Math . round (
subcategories . reduce (( sum , sub ) => sum + sub . score , 0 ) / subcategories . length
);
Assign Risk Level
Map score to traffic-light level: const level =
categoryScore < 25 ? 'LOW' :
categoryScore < 50 ? 'MODERATE' :
categoryScore < 75 ? 'HIGH' : 'CRITICAL' ;
Generate Narrative
AI creates contextual risk narrative: const narrative = `The ${ category } risk level is ${ level } based on analysis of ${ subcategories . map ( s => s . name ). join ( ', ' ) } .` ;
Create Recommendations
Generate 2-3 prioritized recommendations: const recommendations = [
{
text: 'Develop a 3-year financial sustainability plan...' ,
priority: 'HIGH' ,
order: 0
},
{
text: 'Implement quarterly financial health monitoring...' ,
priority: 'MEDIUM' ,
order: 1
}
];
Output: Complete risk scores for all 7 categories + overall assessment score
Assessment Status Transition:
ACTION_REQUIRED → COMPLETE ( progress : 90 )
4. Report Generation Agent
Purpose: Creates PDF report with visualizations and traffic-light indicators
Input:
Complete risk score data
Assessment metadata
Recommendations
Processing:
interface ReportGenerationInput {
assessmentId : string ;
}
// Job Type: REPORT_GENERATION
const jobId = await jobsService . create (
JobType . REPORT_GENERATION ,
{ assessmentId },
userId
);
See: Report Generation for detailed documentation
Asynchronous Job Processing
All AI operations run as background jobs:
Creating a Job
Job Status Lifecycle
Polling Job Status
// API Lambda creates job and invokes Worker Lambda
const jobId = await jobsService . create (
JobType . PARSE_DOCUMENT ,
{ assessmentId , documentId , s3Key },
userId
);
// Returns immediately with job ID
return { jobId };
Retry Logic
Jobs retry up to 3 times on failure: if ( attempts >= maxAttempts ) {
await updateStatus ( jobId , JobStatus . FAILED , undefined , errorMsg );
// Notify handler of permanent failure
await handler . onFailure ( documentId , error );
} else {
// Reset to PENDING for retry
await updateStatus ( jobId , JobStatus . PENDING );
}
Jobs can automatically trigger dependent jobs: // After PARSE_DOCUMENT completes, auto-chain GAP_DETECTION
if ( job . type === JobType . PARSE_DOCUMENT && job . status === 'COMPLETED' ) {
const gapJobId = await this . create (
JobType . GAP_DETECTION ,
{ assessmentId: input . assessmentId },
job . createdById
);
await this . processJob ( gapJobId );
}
Prompt Management
AI agents use versioned prompts managed through the Prompt CMS:
GET /api/prompts/section/:section
Variable Injection
// Public endpoint for runtime prompt retrieval
GET / api / prompts / section / parser
{
"id" : "prompt-123" ,
"section" : "PARSER" ,
"systemPrompt" : "You are an expert document analyst..." ,
"userPromptTemplate" : "Analyze the following document and extract..." ,
"tone" : "Professional and analytical" ,
"outputFormat" : "JSON with structured fields" ,
"version" : 5
}
See: Prompt Management documentation for versioning, comments, and change tracking
Code Example: Complete AI Pipeline
import { useJob } from '@/hooks/use-job' ;
function AssessmentAnalysis ({ assessmentId } : { assessmentId : string }) {
// Step 1: Trigger risk analysis
const triggerAnalysis = async () => {
const { jobId } = await fetch (
`/api/assessments/ ${ assessmentId } /analyze` ,
{ method: 'POST' }
). then ( r => r . json ());
return jobId ;
};
// Step 2: Poll job status with custom hook
const { job , isLoading , error } = useJob ( jobId , {
pollInterval: 3000 ,
maxAttempts: 200 , // ~10 minutes
});
// Step 3: Handle completion
useEffect (() => {
if ( job ?. status === 'COMPLETED' ) {
// Refresh assessment data to get new scores
refreshAssessment ();
} else if ( job ?. status === 'FAILED' ) {
toast . error ( `Analysis failed: ${ job . error } ` );
}
}, [ job ?. status ]);
return (
< div >
{ isLoading && < ProgressBar progress = {job?.progress ?? 0 } /> }
{ job ?. status === ' COMPLETED ' && < RiskScoresDashboard />}
</ div >
);
}
Best Practices
Monitor Token Usage Track Bedrock token consumption per job type to optimize costs and identify inefficient prompts
Implement Timeouts Set reasonable polling timeouts (10 min for Textract, 5 min for AI jobs) to prevent infinite loops
Handle Partial Results Design UIs to show progress and partial results rather than blocking on job completion
Version Prompts Always version prompts before deploying to production. Use the Prompt CMS change tracking.
Related Resources
Risk Scoring Understand how AI-generated scores map to risk levels
Assessment Workflow See how AI agents fit into the assessment lifecycle
Report Generation Learn about PDF report creation with AI-generated content