perfecXion.ai

AI Threat Modeling Mastery

Master systematic approaches to identify, analyze, and mitigate threats in AI systems. Learn industry-standard methodologies like STRIDE, attack trees, and comprehensive risk assessment for robust AI security.

78%
of AI Attacks Preventable
5x
ROI on Threat Modeling
99%
Threat Coverage Achievable
85%
Risk Reduction Rate

Core Threat Modeling Methodologies

STRIDE Methodology for AI Systems

Microsoft's STRIDE framework adapted for AI systems, providing systematic threat identification across six categories: Spoofing, Tampering, Repudiation, Information Disclosure, Denial of Service, and Elevation of Privilege.

AI-STRIDE Framework Implementation

import json
import uuid
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from enum import Enum
import networkx as nx
import matplotlib.pyplot as plt

class ThreatCategory(Enum):
    SPOOFING = "spoofing"
    TAMPERING = "tampering"
    REPUDIATION = "repudiation"
    INFORMATION_DISCLOSURE = "information_disclosure"
    DENIAL_OF_SERVICE = "denial_of_service"
    ELEVATION_OF_PRIVILEGE = "elevation_of_privilege"

class Severity(Enum):
    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"

@dataclass
class AIThreat:
    id: str
    category: ThreatCategory
    title: str
    description: str
    ai_component: str
    attack_vector: str
    impact: str
    likelihood: str
    severity: Severity
    mitigation_strategies: List[str]
    detection_methods: List[str]
    
class AIStrideAnalyzer:
    def __init__(self):
        self.threats: List[AIThreat] = []
        self.ai_components = [
            'model_training', 'data_pipeline', 'inference_engine',
            'api_gateway', 'model_storage', 'feedback_loop',
            'monitoring_system', 'user_interface'
        ]
        
        # AI-specific threat templates
        self.threat_templates = {
            ThreatCategory.SPOOFING: {
                'model_impersonation': {
                    'title': 'Model Impersonation Attack',
                    'description': 'Attacker deploys fake AI model to impersonate legitimate service',
                    'attack_vector': 'Model substitution, API endpoint spoofing',
                    'impact': 'Malicious outputs, data harvesting, reputation damage'
                },
                'data_source_spoofing': {
                    'title': 'Training Data Source Spoofing',
                    'description': 'Malicious data sources masquerading as legitimate',
                    'attack_vector': 'Compromised data feeds, DNS spoofing',
                    'impact': 'Model poisoning, biased outputs'
                }
            },
            ThreatCategory.TAMPERING: {
                'model_poisoning': {
                    'title': 'Model Poisoning Attack',
                    'description': 'Malicious modification of training data or model parameters',
                    'attack_vector': 'Data injection, gradient manipulation',
                    'impact': 'Compromised model behavior, backdoor insertion'
                },
                'adversarial_examples': {
                    'title': 'Adversarial Input Tampering',
                    'description': 'Crafted inputs designed to fool AI models',
                    'attack_vector': 'Pixel manipulation, feature perturbation',
                    'impact': 'Misclassification, system bypass'
                }
            },
            ThreatCategory.REPUDIATION: {
                'model_decision_denial': {
                    'title': 'AI Decision Repudiation',
                    'description': 'Lack of audit trails for AI decision-making',
                    'attack_vector': 'Missing logs, tampered audit trails',
                    'impact': 'Legal liability, compliance violations'
                }
            },
            ThreatCategory.INFORMATION_DISCLOSURE: {
                'model_extraction': {
                    'title': 'Model Extraction Attack',
                    'description': 'Unauthorized access to model architecture and parameters',
                    'attack_vector': 'API queries, side-channel analysis',
                    'impact': 'IP theft, competitive disadvantage'
                },
                'training_data_leak': {
                    'title': 'Training Data Inference',
                    'description': 'Extracting sensitive training data through model queries',
                    'attack_vector': 'Membership inference, model inversion',
                    'impact': 'Privacy violations, regulatory penalties'
                }
            },
            ThreatCategory.DENIAL_OF_SERVICE: {
                'inference_overload': {
                    'title': 'Inference Resource Exhaustion',
                    'description': 'Overwhelming AI system with expensive computations',
                    'attack_vector': 'Adversarial queries, resource-intensive inputs',
                    'impact': 'Service unavailability, financial losses'
                }
            },
            ThreatCategory.ELEVATION_OF_PRIVILEGE: {
                'prompt_injection': {
                    'title': 'Prompt Injection Privilege Escalation',
                    'description': 'Manipulating AI prompts to gain unauthorized access',
                    'attack_vector': 'Crafted prompts, system instruction override',
                    'impact': 'Unauthorized data access, system control'
                }
            }
        }
    
    def analyze_component(self, component: str) -> List[AIThreat]:
        """Generate STRIDE threats for a specific AI component"""
        component_threats = []
        
        for category in ThreatCategory:
            threats = self._generate_threats_for_category(component, category)
            component_threats.extend(threats)
        
        return component_threats
    
    def _generate_threats_for_category(self, component: str, category: ThreatCategory) -> List[AIThreat]:
        """Generate threats for a specific category and component"""
        threats = []
        
        if category in self.threat_templates:
            for threat_key, template in self.threat_templates[category].items():
                # Customize threat based on component
                customized_description = f"{template['description']} targeting {component}"
                
                threat = AIThreat(
                    id=str(uuid.uuid4()),
                    category=category,
                    title=template['title'],
                    description=customized_description,
                    ai_component=component,
                    attack_vector=template['attack_vector'],
                    impact=template['impact'],
                    likelihood=self._assess_likelihood(component, category),
                    severity=self._assess_severity(component, category),
                    mitigation_strategies=self._get_mitigations(component, category),
                    detection_methods=self._get_detection_methods(component, category)
                )
                threats.append(threat)
        
        return threats
    
    def _assess_likelihood(self, component: str, category: ThreatCategory) -> str:
        """Assess likelihood based on component and threat category"""
        likelihood_matrix = {
            ('model_training', ThreatCategory.TAMPERING): 'High',
            ('inference_engine', ThreatCategory.SPOOFING): 'Medium',
            ('api_gateway', ThreatCategory.DENIAL_OF_SERVICE): 'High',
            ('data_pipeline', ThreatCategory.INFORMATION_DISCLOSURE): 'Medium'
        }
        
        return likelihood_matrix.get((component, category), 'Medium')
    
    def _assess_severity(self, component: str, category: ThreatCategory) -> Severity:
        """Assess severity based on component and threat category"""
        if category in [ThreatCategory.ELEVATION_OF_PRIVILEGE, ThreatCategory.TAMPERING]:
            return Severity.CRITICAL
        elif category in [ThreatCategory.INFORMATION_DISCLOSURE, ThreatCategory.DENIAL_OF_SERVICE]:
            return Severity.HIGH
        else:
            return Severity.MEDIUM
    
    def _get_mitigations(self, component: str, category: ThreatCategory) -> List[str]:
        """Get mitigation strategies for component and threat category"""
        mitigations = {
            ThreatCategory.SPOOFING: [
                'Model signing and verification',
                'Certificate-based authentication',
                'Cryptographic model sealing'
            ],
            ThreatCategory.TAMPERING: [
                'Input validation and sanitization',
                'Model integrity checks',
                'Differential privacy techniques'
            ],
            ThreatCategory.REPUDIATION: [
                'Comprehensive audit logging',
                'Digital signatures for decisions',
                'Immutable decision trails'
            ],
            ThreatCategory.INFORMATION_DISCLOSURE: [
                'Access controls and authorization',
                'Data anonymization',
                'Query rate limiting'
            ],
            ThreatCategory.DENIAL_OF_SERVICE: [
                'Resource quotas and throttling',
                'Input complexity analysis',
                'Distributed inference architecture'
            ],
            ThreatCategory.ELEVATION_OF_PRIVILEGE: [
                'Prompt sanitization',
                'Context isolation',
                'Least privilege principles'
            ]
        }
        
        return mitigations.get(category, [])
    
    def _get_detection_methods(self, component: str, category: ThreatCategory) -> List[str]:
        """Get detection methods for component and threat category"""
        detection_methods = {
            ThreatCategory.SPOOFING: [
                'Model fingerprinting',
                'Behavioral analysis',
                'Certificate validation'
            ],
            ThreatCategory.TAMPERING: [
                'Statistical analysis of outputs',
                'Model drift detection',
                'Input anomaly detection'
            ],
            ThreatCategory.INFORMATION_DISCLOSURE: [
                'Query pattern analysis',
                'Response correlation monitoring',
                'Access pattern analysis'
            ]
        }
        
        return detection_methods.get(category, ['System monitoring', 'Log analysis'])
    
    def generate_threat_model(self) -> Dict[str, Any]:
        """Generate complete STRIDE threat model for AI system"""
        all_threats = []
        
        for component in self.ai_components:
            component_threats = self.analyze_component(component)
            all_threats.extend(component_threats)
        
        # Organize by severity
        threats_by_severity = {}
        for severity in Severity:
            threats_by_severity[severity.value] = [
                threat for threat in all_threats if threat.severity == severity
            ]
        
        return {
            'total_threats': len(all_threats),
            'threats_by_component': self._group_by_component(all_threats),
            'threats_by_category': self._group_by_category(all_threats),
            'threats_by_severity': threats_by_severity,
            'risk_score': self._calculate_risk_score(all_threats),
            'recommendations': self._generate_recommendations(all_threats)
        }
    
    def _group_by_component(self, threats: List[AIThreat]) -> Dict[str, int]:
        """Group threats by AI component"""
        component_counts = {}
        for threat in threats:
            component_counts[threat.ai_component] = component_counts.get(threat.ai_component, 0) + 1
        return component_counts
    
    def _group_by_category(self, threats: List[AIThreat]) -> Dict[str, int]:
        """Group threats by STRIDE category"""
        category_counts = {}
        for threat in threats:
            category_counts[threat.category.value] = category_counts.get(threat.category.value, 0) + 1
        return category_counts
    
    def _calculate_risk_score(self, threats: List[AIThreat]) -> float:
        """Calculate overall risk score"""
        severity_weights = {
            Severity.CRITICAL: 4,
            Severity.HIGH: 3,
            Severity.MEDIUM: 2,
            Severity.LOW: 1
        }
        
        total_score = sum(severity_weights[threat.severity] for threat in threats)
        max_possible = len(threats) * 4
        
        return (total_score / max_possible) * 100 if max_possible > 0 else 0
    
    def _generate_recommendations(self, threats: List[AIThreat]) -> List[str]:
        """Generate prioritized recommendations"""
        critical_threats = [t for t in threats if t.severity == Severity.CRITICAL]
        high_threats = [t for t in threats if t.severity == Severity.HIGH]
        
        recommendations = []
        
        if critical_threats:
            recommendations.append(f"Immediately address {len(critical_threats)} critical threats")
        
        if high_threats:
            recommendations.append(f"Prioritize {len(high_threats)} high-severity threats")
            
        recommendations.extend([
            "Implement comprehensive monitoring and detection",
            "Establish incident response procedures",
            "Regular threat model updates and reviews"
        ])
        
        return recommendations
STRIDE Categories
  • Spoofing - Identity impersonation
  • Tampering - Unauthorized modification
  • Repudiation - Denial of actions
  • Information Disclosure - Data leaks
  • Denial of Service - Availability attacks
  • Elevation of Privilege - Access escalation
AI-Specific Adaptations
  • • Model poisoning and backdoors
  • • Adversarial example generation
  • • Training data extraction
  • • Model stealing attacks
  • • Prompt injection vulnerabilities

Attack Trees for AI Systems

Hierarchical diagrams that represent attack paths against AI systems, helping visualize how attackers might combine multiple techniques to achieve their objectives.

Attack Tree Builder Framework

import json
import networkx as nx
from typing import Dict, List, Optional, Union
from dataclasses import dataclass
from enum import Enum

class AttackType(Enum):
    AND = "and"  # All child attacks must succeed
    OR = "or"    # Any child attack can succeed

class AttackComplexity(Enum):
    LOW = 1
    MEDIUM = 2
    HIGH = 3
    VERY_HIGH = 4

@dataclass
class AttackNode:
    id: str
    name: str
    description: str
    attack_type: AttackType
    complexity: AttackComplexity
    cost: float  # Estimated cost for attacker
    probability: float  # Success probability
    detection_difficulty: AttackComplexity
    impact_level: int  # 1-10 scale
    prerequisites: List[str]
    mitigations: List[str]
    children: List['AttackNode'] = None
    
    def __post_init__(self):
        if self.children is None:
            self.children = []

class AIAttackTreeBuilder:
    def __init__(self):
        self.attack_graph = nx.DiGraph()
        self.nodes: Dict[str, AttackNode] = {}
        
        # Common AI attack patterns
        self.ai_attack_patterns = {
            'model_compromise': {
                'name': 'Compromise AI Model',
                'description': 'Gain unauthorized control over AI model behavior',
                'attack_type': AttackType.OR,
                'complexity': AttackComplexity.HIGH,
                'cost': 10000,
                'probability': 0.3,
                'impact_level': 9
            },
            'data_poisoning': {
                'name': 'Training Data Poisoning',
                'description': 'Inject malicious data into training pipeline',
                'attack_type': AttackType.AND,
                'complexity': AttackComplexity.MEDIUM,
                'cost': 2000,
                'probability': 0.6,
                'impact_level': 8
            },
            'model_extraction': {
                'name': 'Model Extraction Attack',
                'description': 'Steal proprietary model through API queries',
                'attack_type': AttackType.OR,
                'complexity': AttackComplexity.MEDIUM,
                'cost': 1000,
                'probability': 0.7,
                'impact_level': 6
            }
        }
    
    def create_attack_tree(self, root_objective: str) -> AttackNode:
        """Create comprehensive attack tree for AI system"""
        
        # Root objective
        root = AttackNode(
            id="root",
            name=root_objective,
            description=f"Primary attack objective: {root_objective}",
            attack_type=AttackType.OR,
            complexity=AttackComplexity.VERY_HIGH,
            cost=50000,
            probability=0.1,
            detection_difficulty=AttackComplexity.LOW,
            impact_level=10,
            prerequisites=[],
            mitigations=[
                "Comprehensive security monitoring",
                "Multi-layered defense strategy",
                "Regular security assessments"
            ]
        )
        
        # Build specific attack paths based on objective
        if "compromise" in root_objective.lower():
            self._build_model_compromise_tree(root)
        elif "steal" in root_objective.lower() or "extract" in root_objective.lower():
            self._build_model_extraction_tree(root)
        elif "poison" in root_objective.lower():
            self._build_data_poisoning_tree(root)
        else:
            self._build_generic_ai_attack_tree(root)
        
        self.nodes[root.id] = root
        return root
    
    def _build_model_compromise_tree(self, root: AttackNode):
        """Build attack tree for model compromise objective"""
        
        # Level 1: Primary attack vectors
        backdoor_attack = AttackNode(
            id="backdoor_insertion",
            name="Insert Model Backdoor",
            description="Embed hidden triggers in model behavior",
            attack_type=AttackType.AND,
            complexity=AttackComplexity.HIGH,
            cost=15000,
            probability=0.4,
            detection_difficulty=AttackComplexity.VERY_HIGH,
            impact_level=9,
            prerequisites=["Access to training pipeline"],
            mitigations=[
                "Training data validation",
                "Model behavior monitoring",
                "Differential privacy"
            ]
        )
        
        adversarial_attack = AttackNode(
            id="adversarial_manipulation",
            name="Adversarial Input Manipulation",
            description="Craft inputs to manipulate model outputs",
            attack_type=AttackType.OR,
            complexity=AttackComplexity.MEDIUM,
            cost=5000,
            probability=0.8,
            detection_difficulty=AttackComplexity.MEDIUM,
            impact_level=6,
            prerequisites=["Model access", "Domain knowledge"],
            mitigations=[
                "Input validation",
                "Adversarial training",
                "Output monitoring"
            ]
        )
        
        # Level 2: Supporting attacks for backdoor insertion
        data_source_compromise = AttackNode(
            id="compromise_data_source",
            name="Compromise Training Data Source",
            description="Gain access to training data repositories",
            attack_type=AttackType.OR,
            complexity=AttackComplexity.MEDIUM,
            cost=3000,
            probability=0.6,
            detection_difficulty=AttackComplexity.LOW,
            impact_level=7,
            prerequisites=["Network access"],
            mitigations=[
                "Access controls",
                "Data integrity monitoring",
                "Network segmentation"
            ]
        )
        
        insider_access = AttackNode(
            id="insider_threat",
            name="Malicious Insider Access",
            description="Leverage insider access to training systems",
            attack_type=AttackType.AND,
            complexity=AttackComplexity.LOW,
            cost=1000,
            probability=0.3,
            detection_difficulty=AttackComplexity.HIGH,
            impact_level=9,
            prerequisites=["Insider recruitment", "System access"],
            mitigations=[
                "Background checks",
                "Access monitoring",
                "Segregation of duties"
            ]
        )
        
        # Level 3: Specific techniques
        credential_theft = AttackNode(
            id="steal_credentials",
            name="Steal System Credentials",
            description="Obtain authentication credentials for data systems",
            attack_type=AttackType.OR,
            complexity=AttackComplexity.MEDIUM,
            cost=500,
            probability=0.7,
            detection_difficulty=AttackComplexity.MEDIUM,
            impact_level=5,
            prerequisites=["Target identification"],
            mitigations=[
                "Multi-factor authentication",
                "Credential rotation",
                "Anomaly detection"
            ]
        )
        
        # Build tree structure
        root.children = [backdoor_attack, adversarial_attack]
        backdoor_attack.children = [data_source_compromise, insider_access]
        data_source_compromise.children = [credential_theft]
        
        # Add nodes to tracking
        for node in [backdoor_attack, adversarial_attack, data_source_compromise, 
                    insider_access, credential_theft]:
            self.nodes[node.id] = node
    
    def calculate_attack_probability(self, node: AttackNode) -> float:
        """Calculate probability of attack success considering tree structure"""
        if not node.children:
            return node.probability
        
        if node.attack_type == AttackType.AND:
            # All child attacks must succeed
            child_probs = [self.calculate_attack_probability(child) for child in node.children]
            return min(child_probs) * node.probability
        else:  # OR gate
            # At least one child attack must succeed
            child_probs = [self.calculate_attack_probability(child) for child in node.children]
            combined_failure = 1.0
            for prob in child_probs:
                combined_failure *= (1 - prob)
            return (1 - combined_failure) * node.probability
    
    def calculate_attack_cost(self, node: AttackNode) -> float:
        """Calculate expected cost of attack path"""
        if not node.children:
            return node.cost
        
        if node.attack_type == AttackType.AND:
            # Must execute all child attacks
            child_costs = [self.calculate_attack_cost(child) for child in node.children]
            return sum(child_costs) + node.cost
        else:  # OR gate
            # Choose cheapest successful path
            child_costs = [self.calculate_attack_cost(child) for child in node.children]
            return min(child_costs) + node.cost
    
    def identify_critical_paths(self, root: AttackNode) -> List[List[str]]:
        """Identify most critical attack paths"""
        critical_paths = []
        
        def traverse_paths(node: AttackNode, current_path: List[str]):
            current_path.append(node.id)
            
            if not node.children:
                # Leaf node - complete path
                if node.probability > 0.5 and node.impact_level >= 7:
                    critical_paths.append(current_path.copy())
            else:
                for child in node.children:
                    traverse_paths(child, current_path)
            
            current_path.pop()
        
        traverse_paths(root, [])
        return critical_paths
    
    def generate_mitigation_priorities(self, root: AttackNode) -> List[Dict[str, Any]]:
        """Generate prioritized mitigation recommendations"""
        mitigations = {}
        
        def collect_mitigations(node: AttackNode):
            node_risk = node.probability * node.impact_level
            for mitigation in node.mitigations:
                if mitigation not in mitigations:
                    mitigations[mitigation] = {
                        'total_risk_reduction': 0,
                        'applicable_nodes': [],
                        'cost_effectiveness': 0
                    }
                
                mitigations[mitigation]['total_risk_reduction'] += node_risk
                mitigations[mitigation]['applicable_nodes'].append(node.id)
            
            for child in node.children:
                collect_mitigations(child)
        
        collect_mitigations(root)
        
        # Sort by risk reduction potential
        sorted_mitigations = sorted(
            mitigations.items(),
            key=lambda x: x[1]['total_risk_reduction'],
            reverse=True
        )
        
        return [
            {
                'mitigation': name,
                'risk_reduction': data['total_risk_reduction'],
                'coverage': len(data['applicable_nodes']),
                'priority': 'High' if data['total_risk_reduction'] > 20 else 'Medium'
            }
            for name, data in sorted_mitigations[:10]
        ]
    
    def export_tree_analysis(self, root: AttackNode) -> Dict[str, Any]:
        """Export comprehensive attack tree analysis"""
        return {
            'root_objective': root.name,
            'total_attack_probability': self.calculate_attack_probability(root),
            'minimum_attack_cost': self.calculate_attack_cost(root),
            'critical_paths': self.identify_critical_paths(root),
            'mitigation_priorities': self.generate_mitigation_priorities(root),
            'risk_score': root.probability * root.impact_level,
            'node_count': len(self.nodes),
            'recommendations': [
                "Focus on highest-impact mitigations first",
                "Monitor critical attack paths continuously",
                "Regular attack tree updates as threats evolve",
                "Implement defense-in-depth strategies"
            ]
        }
Attack Tree Benefits
  • • Visual attack path representation
  • • Quantitative risk assessment
  • • Critical path identification
  • • Mitigation prioritization
  • • Cost-benefit analysis
Key Components
  • • AND/OR gate logic
  • • Probability calculations
  • • Cost estimations
  • • Impact assessments
  • • Detection difficulty ratings

AI Risk Assessment Matrix

Comprehensive risk assessment framework for AI systems, combining threat likelihood, impact severity, and AI-specific risk factors to prioritize security investments and mitigation strategies.

AI Risk Assessment Engine

import numpy as np
import pandas as pd
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
import json

class RiskCategory(Enum):
    TECHNICAL = "technical"
    OPERATIONAL = "operational"
    REPUTATIONAL = "reputational"
    COMPLIANCE = "compliance"
    FINANCIAL = "financial"

class AIRiskFactor(Enum):
    MODEL_COMPLEXITY = "model_complexity"
    DATA_SENSITIVITY = "data_sensitivity"
    DEPLOYMENT_SCALE = "deployment_scale"
    USER_IMPACT = "user_impact"
    REGULATORY_EXPOSURE = "regulatory_exposure"

@dataclass
class RiskScenario:
    id: str
    name: str
    description: str
    category: RiskCategory
    threat_sources: List[str]
    vulnerabilities: List[str]
    likelihood_score: float  # 1-5 scale
    impact_score: float      # 1-5 scale
    ai_risk_factors: Dict[AIRiskFactor, float]
    existing_controls: List[str]
    residual_risk: float
    
class AIRiskAssessment:
    def __init__(self):
        self.risk_scenarios: List[RiskScenario] = []
        self.risk_matrix = np.zeros((5, 5))  # 5x5 likelihood x impact matrix
        
        # AI-specific risk factor weights
        self.ai_factor_weights = {
            AIRiskFactor.MODEL_COMPLEXITY: 0.25,
            AIRiskFactor.DATA_SENSITIVITY: 0.30,
            AIRiskFactor.DEPLOYMENT_SCALE: 0.20,
            AIRiskFactor.USER_IMPACT: 0.15,
            AIRiskFactor.REGULATORY_EXPOSURE: 0.10
        }
        
        # Risk tolerance thresholds
        self.risk_thresholds = {
            'critical': 20,
            'high': 15,
            'medium': 10,
            'low': 5
        }
    
    def assess_ai_system_risk(self, system_profile: Dict[str, Any]) -> Dict[str, Any]:
        """Comprehensive risk assessment for AI system"""
        
        # Generate risk scenarios based on system profile
        scenarios = self._generate_risk_scenarios(system_profile)
        
        # Calculate risk scores
        risk_analysis = {
            'system_profile': system_profile,
            'total_scenarios': len(scenarios),
            'risk_by_category': self._categorize_risks(scenarios),
            'critical_risks': self._identify_critical_risks(scenarios),
            'risk_heat_map': self._generate_risk_heatmap(scenarios),
            'overall_risk_score': self._calculate_overall_risk(scenarios),
            'recommendations': self._generate_risk_recommendations(scenarios),
            'mitigation_priorities': self._prioritize_mitigations(scenarios)
        }
        
        return risk_analysis
    
    def _generate_risk_scenarios(self, system_profile: Dict[str, Any]) -> List[RiskScenario]:
        """Generate relevant risk scenarios based on system characteristics"""
        scenarios = []
        
        # Model-specific risks
        if system_profile.get('model_type') in ['llm', 'generative']:
            scenarios.extend(self._generate_generative_ai_risks(system_profile))
        
        if system_profile.get('handles_pii', False):
            scenarios.extend(self._generate_privacy_risks(system_profile))
        
        if system_profile.get('public_facing', False):
            scenarios.extend(self._generate_public_facing_risks(system_profile))
        
        # Always include common AI risks
        scenarios.extend(self._generate_common_ai_risks(system_profile))
        
        return scenarios
    
    def _generate_generative_ai_risks(self, profile: Dict[str, Any]) -> List[RiskScenario]:
        """Generate risks specific to generative AI systems"""
        risks = []
        
        # Harmful content generation
        harmful_content_risk = RiskScenario(
            id="gen_ai_harmful_content",
            name="Harmful Content Generation",
            description="AI system generates inappropriate, biased, or harmful content",
            category=RiskCategory.REPUTATIONAL,
            threat_sources=["Malicious users", "Unintended model behavior", "Training data bias"],
            vulnerabilities=["Insufficient content filtering", "Inadequate training data curation"],
            likelihood_score=self._assess_likelihood(profile, "harmful_content"),
            impact_score=4.0,
            ai_risk_factors={
                AIRiskFactor.MODEL_COMPLEXITY: 4.0,
                AIRiskFactor.DATA_SENSITIVITY: 3.0,
                AIRiskFactor.USER_IMPACT: 5.0,
                AIRiskFactor.REGULATORY_EXPOSURE: 4.0
            },
            existing_controls=profile.get('content_controls', []),
            residual_risk=0.0
        )
        
        # Prompt injection
        prompt_injection_risk = RiskScenario(
            id="prompt_injection",
            name="Prompt Injection Attack",
            description="Malicious prompts manipulate AI behavior to bypass safety measures",
            category=RiskCategory.TECHNICAL,
            threat_sources=["External attackers", "Malicious users"],
            vulnerabilities=["Insufficient input validation", "Lack of prompt sanitization"],
            likelihood_score=self._assess_likelihood(profile, "prompt_injection"),
            impact_score=3.5,
            ai_risk_factors={
                AIRiskFactor.MODEL_COMPLEXITY: 3.0,
                AIRiskFactor.DEPLOYMENT_SCALE: 4.0,
                AIRiskFactor.USER_IMPACT: 3.0
            },
            existing_controls=profile.get('input_controls', []),
            residual_risk=0.0
        )
        
        risks.extend([harmful_content_risk, prompt_injection_risk])
        return risks
    
    def _generate_privacy_risks(self, profile: Dict[str, Any]) -> List[RiskScenario]:
        """Generate privacy-related risks"""
        risks = []
        
        data_leakage_risk = RiskScenario(
            id="training_data_leakage",
            name="Training Data Leakage",
            description="Sensitive training data exposed through model outputs",
            category=RiskCategory.COMPLIANCE,
            threat_sources=["Model inversion attacks", "Membership inference"],
            vulnerabilities=["Insufficient differential privacy", "Model overfitting"],
            likelihood_score=self._assess_likelihood(profile, "data_leakage"),
            impact_score=4.5,
            ai_risk_factors={
                AIRiskFactor.DATA_SENSITIVITY: 5.0,
                AIRiskFactor.REGULATORY_EXPOSURE: 5.0,
                AIRiskFactor.USER_IMPACT: 4.0
            },
            existing_controls=profile.get('privacy_controls', []),
            residual_risk=0.0
        )
        
        risks.append(data_leakage_risk)
        return risks
    
    def _assess_likelihood(self, profile: Dict[str, Any], risk_type: str) -> float:
        """Assess likelihood based on system profile and risk type"""
        base_likelihood = {
            'harmful_content': 3.0,
            'prompt_injection': 3.5,
            'data_leakage': 2.5,
            'model_theft': 2.0,
            'adversarial_attack': 3.0
        }
        
        likelihood = base_likelihood.get(risk_type, 2.5)
        
        # Adjust based on system characteristics
        if profile.get('public_facing', False):
            likelihood += 0.5
        
        if profile.get('high_value_target', False):
            likelihood += 1.0
        
        if len(profile.get('security_controls', [])) > 5:
            likelihood -= 0.5
        
        return min(5.0, max(1.0, likelihood))
    
    def _calculate_ai_adjusted_risk(self, scenario: RiskScenario) -> float:
        """Calculate risk score adjusted for AI-specific factors"""
        base_risk = scenario.likelihood_score * scenario.impact_score
        
        # Calculate AI factor adjustment
        ai_adjustment = 0
        for factor, score in scenario.ai_risk_factors.items():
            weight = self.ai_factor_weights[factor]
            ai_adjustment += (score - 3.0) * weight  # 3.0 is baseline
        
        # Apply adjustment (can increase or decrease risk)
        adjusted_risk = base_risk * (1 + ai_adjustment)
        
        # Factor in existing controls
        control_reduction = min(0.4, len(scenario.existing_controls) * 0.05)
        final_risk = adjusted_risk * (1 - control_reduction)
        
        scenario.residual_risk = final_risk
        return final_risk
    
    def _identify_critical_risks(self, scenarios: List[RiskScenario]) -> List[Dict[str, Any]]:
        """Identify and rank critical risks"""
        critical_risks = []
        
        for scenario in scenarios:
            risk_score = self._calculate_ai_adjusted_risk(scenario)
            
            if risk_score >= self.risk_thresholds['critical']:
                critical_risks.append({
                    'scenario': scenario.name,
                    'risk_score': risk_score,
                    'category': scenario.category.value,
                    'likelihood': scenario.likelihood_score,
                    'impact': scenario.impact_score,
                    'key_factors': [
                        factor.value for factor, score in scenario.ai_risk_factors.items()
                        if score >= 4.0
                    ],
                    'immediate_actions': self._get_immediate_actions(scenario)
                })
        
        # Sort by risk score
        critical_risks.sort(key=lambda x: x['risk_score'], reverse=True)
        return critical_risks
    
    def _generate_risk_heatmap(self, scenarios: List[RiskScenario]) -> Dict[str, Any]:
        """Generate risk heatmap data"""
        heatmap_data = np.zeros((5, 5))
        scenario_mapping = {}
        
        for scenario in scenarios:
            likelihood_bucket = min(4, int(scenario.likelihood_score) - 1)
            impact_bucket = min(4, int(scenario.impact_score) - 1)
            
            heatmap_data[likelihood_bucket][impact_bucket] += 1
            
            cell_key = f"{likelihood_bucket}_{impact_bucket}"
            if cell_key not in scenario_mapping:
                scenario_mapping[cell_key] = []
            scenario_mapping[cell_key].append(scenario.name)
        
        return {
            'matrix': heatmap_data.tolist(),
            'scenario_mapping': scenario_mapping,
            'risk_distribution': {
                'critical': int(np.sum(heatmap_data[3:, 3:])),
                'high': int(np.sum(heatmap_data[2:, 2:]) - np.sum(heatmap_data[3:, 3:])),
                'medium': int(np.sum(heatmap_data[1:, 1:]) - np.sum(heatmap_data[2:, 2:])),
                'low': int(np.sum(heatmap_data) - np.sum(heatmap_data[1:, 1:]))
            }
        }
    
    def _prioritize_mitigations(self, scenarios: List[RiskScenario]) -> List[Dict[str, Any]]:
        """Prioritize mitigation strategies across all scenarios"""
        mitigation_impact = {}
        
        for scenario in scenarios:
            risk_score = self._calculate_ai_adjusted_risk(scenario)
            
            # Common AI mitigations
            common_mitigations = [
                "Input validation and sanitization",
                "Output monitoring and filtering",
                "Adversarial training",
                "Differential privacy implementation",
                "Regular model auditing",
                "Access controls and authentication"
            ]
            
            for mitigation in common_mitigations:
                if mitigation not in mitigation_impact:
                    mitigation_impact[mitigation] = {
                        'total_risk_reduction': 0,
                        'applicable_scenarios': 0,
                        'implementation_cost': self._estimate_mitigation_cost(mitigation),
                        'roi_score': 0
                    }
                
                # Estimate risk reduction for this mitigation
                reduction = self._estimate_risk_reduction(scenario, mitigation)
                mitigation_impact[mitigation]['total_risk_reduction'] += risk_score * reduction
                mitigation_impact[mitigation]['applicable_scenarios'] += 1
        
        # Calculate ROI scores and sort
        for mitigation_data in mitigation_impact.values():
            mitigation_data['roi_score'] = (
                mitigation_data['total_risk_reduction'] / mitigation_data['implementation_cost']
            )
        
        sorted_mitigations = sorted(
            mitigation_impact.items(),
            key=lambda x: x[1]['roi_score'],
            reverse=True
        )
        
        return [
            {
                'mitigation': name,
                'risk_reduction': data['total_risk_reduction'],
                'roi_score': data['roi_score'],
                'priority': 'High' if data['roi_score'] > 2.0 else 'Medium',
                'implementation_cost': data['implementation_cost']
            }
            for name, data in sorted_mitigations[:10]
        ]

MITRE ATT&CK for AI Systems

Attack Tactics

  • • Initial Access via API endpoints
  • • Execution through prompt injection
  • • Persistence via model backdoors
  • • Defense Evasion using adversarial examples
  • • Collection of training data

Techniques

  • • T1566 - Phishing for ML credentials
  • • T1190 - Exploit public-facing ML APIs
  • • T1055 - Process injection in ML pipelines
  • • T1020 - Automated exfiltration of models
  • • T1485 - Data destruction in training sets

Threat Modeling Best Practices

Continuous Assessment

Threat modeling is not a one-time activity. Regularly update threat models as your AI system evolves, new threats emerge, and attack techniques advance.

Cross-Functional Teams

Include AI researchers, security professionals, domain experts, and business stakeholders in threat modeling exercises for comprehensive coverage.

Quantitative Analysis

Use quantitative methods to prioritize threats based on likelihood, impact, and cost. This enables data-driven security investment decisions.

Integration with SDLC

Integrate threat modeling into your AI development lifecycle. Conduct initial assessments during design and update models with each significant change.

Implementation Checklist

Essential Activities

  • System architecture documentation
  • Asset inventory and classification
  • STRIDE analysis completed
  • Attack trees constructed
  • Risk assessment matrix populated

Advanced Capabilities

  • Automated threat model updates
  • Quantitative risk scoring
  • MITRE ATT&CK mapping
  • Threat intelligence integration
  • Executive reporting dashboards