design.md 31 KB

Design Document: AWS Resource Scanner

Overview

AWS资源扫描报告工具是一个全栈Web应用,采用前后端分离架构。后端使用Python Flask框架,前端使用React。系统通过Worker进程执行扫描任务,支持多账号、多区域并行扫描,并生成符合模板格式的Word报告。

技术栈

  • 前端:React + TypeScript + Ant Design
  • 后端:Python Flask + SQLAlchemy
  • 数据库:PostgreSQL (生产) / SQLite3 (开发/测试)
  • AWS SDK:boto3
  • Word文档处理:python-docx
  • 认证:JWT (PyJWT)
  • 任务调度:Celery + Redis
  • 消息队列:Redis (作为Celery Broker和Result Backend)

Architecture

graph TB
    subgraph Frontend["前端 (React)"]
        UI[用户界面]
        Auth[认证模块]
        TaskMgmt[任务管理]
        ReportView[报告查看]
        AdminPanel[管理面板]
    end

    subgraph Backend["后端 (Flask)"]
        API[REST API]
        AuthService[认证服务]
        TaskService[任务服务]
        CredentialService[凭证服务]
        ReportService[报告服务]
        WorkerManager[Worker管理器]
    end

    subgraph MessageQueue["消息队列"]
        Redis[(Redis)]
    end

    subgraph Workers["Celery Workers"]
        Worker1[Worker 1]
        Worker2[Worker 2]
        WorkerN[Worker N]
    end

    subgraph Scanner["扫描模块"]
        AWSScanner[AWS扫描器]
        CloudProvider[云厂商接口]
    end

    subgraph Storage["存储"]
        DB[(数据库)]
        FileStore[文件存储]
    end

    subgraph AWS["AWS Cloud"]
        AWSServices[AWS Services]
    end

    UI --> API
    API --> AuthService
    API --> TaskService
    API --> CredentialService
    API --> ReportService
    API --> WorkerManager

    TaskService --> Redis
    Redis --> Worker1
    Redis --> Worker2
    Redis --> WorkerN

    Worker1 --> AWSScanner
    Worker2 --> AWSScanner
    WorkerN --> AWSScanner

    AWSScanner --> CloudProvider
    AWSScanner --> AWSServices

    AuthService --> DB
    TaskService --> DB
    CredentialService --> DB
    ReportService --> DB
    ReportService --> FileStore
    
    Worker1 --> Redis
    Worker2 --> Redis
    WorkerN --> Redis

Components and Interfaces

1. 前端组件

1.1 认证模块 (Auth)

interface LoginRequest {
  username: string;
  password: string;
}

interface LoginResponse {
  token: string;
  user: User;
}

interface User {
  id: number;
  username: string;
  email: string;
  role: 'admin' | 'power_user' | 'user';
}

1.2 任务管理模块 (TaskManagement)

interface ScanTask {
  id: number;
  name: string;
  status: 'pending' | 'running' | 'completed' | 'failed';
  progress: number;
  createdAt: string;
  completedAt?: string;
  createdBy: number;
  accounts: string[];
  regions: string[];
  projectMetadata: ProjectMetadata;
  errorLogs?: ErrorLog[];
}

interface ProjectMetadata {
  clientName: string;
  projectName: string;
  bdManager: string;
  solutionsArchitect: string;
  cloudEngineer: string;
  cloudEngineerEmail: string;
  networkDiagram?: File;
}

interface CreateTaskRequest {
  name: string;
  credentialIds: number[];
  regions: string[];
  projectMetadata: ProjectMetadata;
}

1.3 报告模块 (Report)

interface Report {
  id: number;
  taskId: number;
  fileName: string;
  fileSize: number;
  createdAt: string;
  downloadUrl: string;
}

2. 后端API接口

2.0 通用分页响应格式

{
  "data": [...],
  "pagination": {
    "page": 1,
    "page_size": 20,
    "total": 100,
    "total_pages": 5
  }
}

所有列表API默认分页参数:

  • page: 页码,默认1
  • page_size: 每页数量,默认20,最大100

2.1 认证API

POST /api/auth/login          - 用户登录
POST /api/auth/logout         - 用户登出
GET  /api/auth/me             - 获取当前用户信息
POST /api/auth/refresh        - 刷新Token

2.2 用户管理API (Admin)

GET  /api/users               - 获取用户列表 (支持分页: page, page_size, 支持搜索: search)
POST /api/users/create        - 创建用户
POST /api/users/update        - 更新用户
POST /api/users/delete        - 删除用户
POST /api/users/assign-credentials - 分配凭证给用户

2.3 凭证管理API

GET  /api/credentials         - 获取凭证列表 (支持分页: page, page_size)
POST /api/credentials/create  - 创建凭证
POST /api/credentials/update  - 更新凭证
POST /api/credentials/delete  - 删除凭证
POST /api/credentials/validate - 验证凭证
GET  /api/credentials/base-role    - 获取基础Assume Role配置
POST /api/credentials/base-role    - 更新基础Assume Role配置

2.4 任务管理API

GET  /api/tasks               - 获取任务列表 (支持分页: page, page_size, 支持筛选: status)
POST /api/tasks/create        - 创建任务
GET  /api/tasks/detail        - 获取任务详情 (query param: id)
POST /api/tasks/delete        - 删除任务
GET  /api/tasks/logs          - 获取任务日志 (query param: id, 支持分页: page, page_size)

2.5 报告管理API

GET  /api/reports             - 获取报告列表 (支持分页: page, page_size, 支持筛选: task_id)
GET  /api/reports/detail      - 获取报告详情 (query param: id)
GET  /api/reports/download    - 下载报告 (query param: id)
POST /api/reports/delete      - 删除报告

2.6 Worker管理API (Admin)

GET  /api/workers             - 获取Celery Worker状态列表
GET  /api/workers/stats       - 获取Worker统计信息
POST /api/workers/purge       - 清除队列中的任务

3. Celery任务接口

3.1 Celery配置

from celery import Celery

# Celery配置
celery_app = Celery(
    'aws_scanner',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1'
)

celery_app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
    task_track_started=True,
    task_time_limit=3600,  # 1小时超时
    worker_prefetch_multiplier=1,  # 每个worker一次只取一个任务
    task_acks_late=True,  # 任务完成后才确认
)

3.2 Celery任务定义

from celery import shared_task, current_task
from typing import List, Dict, Any

@shared_task(bind=True, max_retries=3)
def scan_aws_resources(
    self,
    task_id: int,
    credential_ids: List[int],
    regions: List[str],
    project_metadata: Dict[str, Any]
) -> Dict[str, Any]:
    """
    执行AWS资源扫描任务
    
    Args:
        task_id: 数据库中的任务ID
        credential_ids: AWS凭证ID列表
        regions: 要扫描的区域列表
        project_metadata: 项目元数据
    
    Returns:
        扫描结果和报告路径
    """
    try:
        # 更新任务状态为运行中
        update_task_status(task_id, 'running')
        
        # 执行扫描
        results = {}
        total_steps = len(credential_ids) * len(regions)
        current_step = 0
        
        for cred_id in credential_ids:
            for region in regions:
                # 扫描资源
                resources = scan_region(cred_id, region)
                results[f"{cred_id}_{region}"] = resources
                
                # 更新进度
                current_step += 1
                progress = int((current_step / total_steps) * 100)
                self.update_state(
                    state='PROGRESS',
                    meta={'progress': progress, 'current': current_step, 'total': total_steps}
                )
                update_task_progress(task_id, progress)
        
        # 生成报告
        report_path = generate_report(task_id, results, project_metadata)
        
        # 更新任务状态为完成
        update_task_status(task_id, 'completed', report_path=report_path)
        
        return {'status': 'success', 'report_path': report_path}
        
    except Exception as e:
        # 记录错误并更新状态
        log_task_error(task_id, str(e))
        update_task_status(task_id, 'failed')
        raise self.retry(exc=e, countdown=60)  # 60秒后重试

@shared_task
def cleanup_old_reports(days: int = 30):
    """清理过期报告"""
    pass

@shared_task
def validate_credentials(credential_id: int) -> bool:
    """验证AWS凭证有效性"""
    pass

3.3 任务状态查询

from celery.result import AsyncResult

def get_task_status(celery_task_id: str) -> Dict[str, Any]:
    """获取Celery任务状态"""
    result = AsyncResult(celery_task_id)
    
    if result.state == 'PENDING':
        return {'status': 'pending', 'progress': 0}
    elif result.state == 'PROGRESS':
        return {
            'status': 'running',
            'progress': result.info.get('progress', 0),
            'current': result.info.get('current', 0),
            'total': result.info.get('total', 0)
        }
    elif result.state == 'SUCCESS':
        return {'status': 'completed', 'result': result.result}
    elif result.state == 'FAILURE':
        return {'status': 'failed', 'error': str(result.result)}
    else:
        return {'status': result.state}

4. 扫描器接口 (可扩展)

from abc import ABC, abstractmethod
from typing import List, Dict, Any

class CloudProviderScanner(ABC):
    """云厂商扫描器抽象基类"""
    
    @abstractmethod
    def get_credentials(self, credential_config: dict) -> Any:
        """获取云厂商凭证"""
        pass
    
    @abstractmethod
    def list_regions(self) -> List[str]:
        """列出可用区域"""
        pass
    
    @abstractmethod
    def scan_resources(self, regions: List[str], services: List[str]) -> Dict[str, List[dict]]:
        """扫描资源"""
        pass

class AWSScanner(CloudProviderScanner):
    """AWS扫描器实现"""
    
    def __init__(self, credential_type: str, credential_config: dict):
        self.credential_type = credential_type  # 'assume_role' or 'access_key'
        self.credential_config = credential_config
    
    def get_credentials(self, credential_config: dict) -> boto3.Session:
        """获取AWS Session"""
        pass
    
    def list_regions(self) -> List[str]:
        """列出AWS区域"""
        pass
    
    def scan_resources(self, regions: List[str], services: List[str]) -> Dict[str, List[dict]]:
        """并行扫描AWS资源"""
        pass

Data Models

数据库模型

from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Text, Boolean, Enum
from sqlalchemy.orm import relationship
from datetime import datetime

class User(Base):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    username = Column(String(50), unique=True, nullable=False)
    email = Column(String(100), unique=True, nullable=False)
    password_hash = Column(String(255), nullable=False)
    role = Column(Enum('admin', 'power_user', 'user'), default='user')
    created_at = Column(DateTime, default=datetime.utcnow)
    is_active = Column(Boolean, default=True)
    
    credentials = relationship('UserCredential', back_populates='user')
    tasks = relationship('Task', back_populates='created_by_user')

class AWSCredential(Base):
    __tablename__ = 'aws_credentials'
    
    id = Column(Integer, primary_key=True)
    name = Column(String(100), nullable=False)
    credential_type = Column(Enum('assume_role', 'access_key'), nullable=False)
    account_id = Column(String(12), nullable=False)
    # For assume_role
    role_arn = Column(String(255))
    external_id = Column(String(255))
    # For access_key (encrypted)
    access_key_id = Column(String(255))
    secret_access_key_encrypted = Column(Text)
    created_at = Column(DateTime, default=datetime.utcnow)
    is_active = Column(Boolean, default=True)
    
    users = relationship('UserCredential', back_populates='credential')

class UserCredential(Base):
    __tablename__ = 'user_credentials'
    
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey('users.id'), nullable=False)
    credential_id = Column(Integer, ForeignKey('aws_credentials.id'), nullable=False)
    assigned_at = Column(DateTime, default=datetime.utcnow)
    
    user = relationship('User', back_populates='credentials')
    credential = relationship('AWSCredential', back_populates='users')

class BaseAssumeRoleConfig(Base):
    __tablename__ = 'base_assume_role_config'
    
    id = Column(Integer, primary_key=True)
    access_key_id = Column(String(255), nullable=False)
    secret_access_key_encrypted = Column(Text, nullable=False)
    updated_at = Column(DateTime, default=datetime.utcnow)

class Task(Base):
    __tablename__ = 'tasks'
    
    id = Column(Integer, primary_key=True)
    name = Column(String(200), nullable=False)
    status = Column(Enum('pending', 'running', 'completed', 'failed'), default='pending')
    progress = Column(Integer, default=0)
    created_by = Column(Integer, ForeignKey('users.id'), nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    started_at = Column(DateTime)
    completed_at = Column(DateTime)
    celery_task_id = Column(String(100))  # Celery任务ID,用于查询状态
    
    # Task configuration (JSON)
    credential_ids = Column(Text)  # JSON array
    regions = Column(Text)  # JSON array
    project_metadata = Column(Text)  # JSON object
    
    created_by_user = relationship('User', back_populates='tasks')
    logs = relationship('TaskLog', back_populates='task')
    report = relationship('Report', back_populates='task', uselist=False)

class TaskLog(Base):
    __tablename__ = 'task_logs'
    
    id = Column(Integer, primary_key=True)
    task_id = Column(Integer, ForeignKey('tasks.id'), nullable=False)
    level = Column(Enum('info', 'warning', 'error'), default='info')
    message = Column(Text, nullable=False)
    details = Column(Text)  # JSON for stack trace, etc.
    created_at = Column(DateTime, default=datetime.utcnow)
    
    task = relationship('Task', back_populates='logs')

class Report(Base):
    __tablename__ = 'reports'
    
    id = Column(Integer, primary_key=True)
    task_id = Column(Integer, ForeignKey('tasks.id'), nullable=False)
    file_name = Column(String(255), nullable=False)
    file_path = Column(String(500), nullable=False)
    file_size = Column(Integer)
    created_at = Column(DateTime, default=datetime.utcnow)
    
    task = relationship('Task', back_populates='report')

class Worker(Base):
    __tablename__ = 'workers'
    
    id = Column(Integer, primary_key=True)
    worker_id = Column(String(100), unique=True, nullable=False)  # Celery worker hostname
    status = Column(Enum('online', 'offline'), default='offline')
    active_tasks = Column(Integer, default=0)
    processed_tasks = Column(Integer, default=0)
    last_heartbeat = Column(DateTime)
    registered_at = Column(DateTime, default=datetime.utcnow)

AWS资源数据结构

# 扫描结果的统一数据格式
class ResourceData:
    """资源数据基类"""
    account_id: str
    region: str
    service: str
    resource_type: str
    resource_id: str
    name: str
    attributes: dict  # 服务特定属性

# 表格布局类型
class TableLayout:
    HORIZONTAL = 'horizontal'  # 横向表格:列标题在顶部,多行数据(如VPC、Subnet)
    VERTICAL = 'vertical'      # 纵向表格:属性名在左列,值在右列,每个资源一个表格(如EC2、RDS)

# 各服务的属性定义和表格布局(与sample-reports完全一致)
SERVICE_CONFIG = {
    # ===== VPC相关资源 =====
    'vpc': {
        'layout': TableLayout.HORIZONTAL,
        'resources': {
            'VPC': ['Region', 'Name', 'ID', 'CIDR'],
        }
    },
    'subnet': {
        'layout': TableLayout.HORIZONTAL,
        'resources': {
            'Subnet': ['Name', 'ID', 'AZ', 'CIDR'],
        }
    },
    'route_table': {
        'layout': TableLayout.HORIZONTAL,
        'resources': {
            'Route Table': ['Name', 'ID', 'Subnet Associations'],
        }
    },
    'internet_gateway': {
        'layout': TableLayout.VERTICAL,
        'resources': {
            'Internet Gateway': ['Name'],  # 每个IGW一个表格,只显示Name
        }
    },
    'nat_gateway': {
        'layout': TableLayout.HORIZONTAL,
        'resources': {
            'NAT Gateway': ['Name', 'ID', 'Public IP', 'Private IP'],
        }
    },
    'security_group': {
        'layout': TableLayout.HORIZONTAL,
        'resources': {
            'Security Group': ['Name', 'ID', 'Protocol', 'Port range', 'Source'],
        }
    },
    'vpc_endpoint': {
        'layout': TableLayout.HORIZONTAL,
        'resources': {
            'Endpoint': ['Name', 'ID', 'VPC', 'Service Name', 'Type'],
        }
    },
    'vpc_peering': {
        'layout': TableLayout.HORIZONTAL,
        'resources': {
            'VPC Peering': ['Name', 'Peering Connection ID', 'Requester VPC', 'Accepter VPC'],
        }
    },
    'customer_gateway': {
        'layout': TableLayout.HORIZONTAL,
        'resources': {
            'Customer Gateway': ['Name', 'Customer Gateway ID', 'IP Address'],
        }
    },
    'virtual_private_gateway': {
        'layout': TableLayout.HORIZONTAL,
        'resources': {
            'Virtual Private Gateway': ['Name', 'Virtual Private Gateway ID', 'VPC'],
        }
    },
    'vpn_connection': {
        'layout': TableLayout.HORIZONTAL,
        'resources': {
            'VPN Connection': ['Name', 'VPN ID', 'Routes'],
        }
    },
    
    # ===== EC2相关资源 =====
    'ec2': {
        'layout': TableLayout.VERTICAL,
        'resources': {
            'Instance': ['Name', 'Instance ID', 'Instance Type', 'AZ', 'AMI', 
                        'Public IP', 'Public DNS', 'Private IP', 'VPC ID', 'Subnet ID',
                        'Key', 'Security Groups', 'EBS Type', 'EBS Size', 'Encryption', 
                        'Other Requirement'],
        }
    },
    'elastic_ip': {
        'layout': TableLayout.VERTICAL,
        'resources': {
            'Elastic IP': ['Name'],  # 每个EIP一行,只显示Name
        }
    },
    
    # ===== Auto Scaling =====
    'autoscaling': {
        'layout': TableLayout.VERTICAL,
        'resources': {
            'Auto Scaling Group': ['Name', 'Launch Template', 'AMI', 'Instance type', 
                                  'Key', 'Target Groups', 'Desired', 'Min', 'Max', 
                                  'Scaling Policy'],
        }
    },
    
    # ===== ELB相关资源 =====
    'elb': {
        'layout': TableLayout.VERTICAL,
        'resources': {
            'Load Balancer': ['Name', 'Type', 'DNS', 'Scheme', 'VPC', 
                             'Availability Zones', 'Subnet', 'Security Groups'],
        }
    },
    'target_group': {
        'layout': TableLayout.VERTICAL,
        'resources': {
            'Target Group': ['Load Balancer', 'TG Name', 'Port', 'Protocol', 
                           'Registered Instances', 'Health Check Path'],
        }
    },
    
    # ===== RDS =====
    'rds': {
        'layout': TableLayout.VERTICAL,
        'resources': {
            'DB Instance': ['Region', 'Endpoint', 'DB instance ID', 'DB name', 
                          'Master Username', 'Port', 'DB Engine', 'DB Version',
                          'Instance Type', 'Storage type', 'Storage', 'Multi-AZ',
                          'Security Group', 'Deletion Protection', 
                          'Performance Insights Enabled', 'CloudWatch Logs'],
        }
    },
    
    # ===== ElastiCache =====
    'elasticache': {
        'layout': TableLayout.VERTICAL,
        'resources': {
            'Cache Cluster': ['Cluster ID', 'Engine', 'Engine Version', 'Node Type', 
                            'Num Nodes', 'Status'],
        }
    },
    
    # ===== EKS =====
    'eks': {
        'layout': TableLayout.VERTICAL,
        'resources': {
            'Cluster': ['Cluster Name', 'Version', 'Status', 'Endpoint', 'VPC ID'],
        }
    },
    
    # ===== Lambda =====
    'lambda': {
        'layout': TableLayout.HORIZONTAL,
        'resources': {
            'Function': ['Function Name', 'Runtime', 'Memory (MB)', 'Timeout (s)', 'Last Modified'],
        }
    },
    
    # ===== S3 =====
    's3': {
        'layout': TableLayout.VERTICAL,
        'resources': {
            'Bucket': ['Bucket Name'],  # 每个Bucket一行,只显示Name
        }
    },
    's3_event_notification': {
        'layout': TableLayout.VERTICAL,
        'resources': {
            'S3 event notification': ['Bucket', 'Name', 'Event Type', 
                                     'Destination type', 'Destination'],
        }
    },
    
    # ===== CloudFront (Global) =====
    'cloudfront': {
        'layout': TableLayout.VERTICAL,
        'resources': {
            'Distribution': ['CloudFront ID', 'Domain Name', 'CNAME', 
                           'Origin Domain Name', 'Origin Protocol Policy',
                           'Viewer Protocol Policy', 'Allowed HTTP Methods', 
                           'Cached HTTP Methods'],
        }
    },
    
    # ===== Route 53 (Global) =====
    'route53': {
        'layout': TableLayout.HORIZONTAL,
        'resources': {
            'Hosted Zone': ['Zone ID', 'Name', 'Type', 'Record Count'],
        }
    },
    
    # ===== ACM (Global) =====
    'acm': {
        'layout': TableLayout.VERTICAL,
        'resources': {
            'Certificate': ['Domain name'],  # 每个证书一行,只显示Domain name
        }
    },
    
    # ===== WAF (Global) =====
    'waf': {
        'layout': TableLayout.HORIZONTAL,
        'resources': {
            'Web ACL': ['WebACL Name', 'Scope', 'Rules Count', 'Associated Resources'],
        }
    },
    
    # ===== SNS =====
    'sns': {
        'layout': TableLayout.HORIZONTAL,
        'resources': {
            'Topic': ['Topic Name', 'Topic Display Name', 'Subscription Protocol', 
                     'Subscription Endpoint'],
        }
    },
    
    # ===== CloudWatch =====
    'cloudwatch': {
        'layout': TableLayout.HORIZONTAL,
        'resources': {
            'Log Group': ['Log Group Name', 'Retention Days', 'Stored Bytes', 'KMS Encryption'],
        }
    },
    
    # ===== EventBridge =====
    'eventbridge': {
        'layout': TableLayout.HORIZONTAL,
        'resources': {
            'Rule': ['Name', 'Description', 'Event Bus', 'State'],
        }
    },
    
    # ===== CloudTrail =====
    'cloudtrail': {
        'layout': TableLayout.HORIZONTAL,
        'resources': {
            'Trail': ['Name', 'Multi-Region Trail', 'Log File Validation', 'KMS Encryption'],
        }
    },
    
    # ===== Config =====
    'config': {
        'layout': TableLayout.HORIZONTAL,
        'resources': {
            'Config': ['Name', 'Regional Resources', 'Global Resources', 'Retention period'],
        }
    },
}

Correctness Properties

A property is a characteristic or behavior that should hold true across all valid executions of a system-essentially, a formal statement about what the system should do. Properties serve as the bridge between human-readable specifications and machine-verifiable correctness guarantees.

Property 1: Role-Based Access Control (RBAC)

For any user with a given role (admin, power_user, user), the system should enforce the following access rules:

  • Admin users can access all resources (users, credentials, reports, workers)
  • Power users can access all credentials and all reports
  • Regular users can only access their assigned credentials and their own reports
  • Unauthorized access attempts should return 403 Forbidden

Validates: Requirements 1.3, 1.4, 1.5, 1.6

Property 2: JWT Token Validation

For any JWT token, the system should:

  • Accept valid, non-expired tokens and grant access
  • Reject expired tokens and require re-authentication
  • Reject malformed or tampered tokens

Validates: Requirements 1.1, 1.2

Property 3: User Creation Validation

For any user creation request, the system should require all mandatory fields (username, password, email, role) and reject requests with missing fields.

Validates: Requirements 1.7

Property 4: Credential Assignment Enforcement

For any credential assignment to a user, the system should:

  • Record the assignment in the database
  • Enforce the assignment during task creation (users can only use assigned credentials)

Validates: Requirements 1.8, 2.5

Property 5: Sensitive Data Masking

For any API response containing credentials, the system should mask sensitive information (Secret Access Keys) and never expose them in plaintext.

Validates: Requirements 2.7

Property 6: Credential Encryption

For any sensitive data stored in the database (passwords, AWS secret keys), the system should encrypt them before storage and decrypt only when needed.

Validates: Requirements 2.3, 9.3

Property 7: Task Creation Validation

For any task creation request, the system should require selection of at least one AWS account, at least one region, and all required project metadata fields.

Validates: Requirements 3.1

Property 8: Global Resource Scanning

For any scan task, regardless of selected regions, the system should always scan global resources (CloudFront, Route 53, ACM, WAF).

Validates: Requirements 3.2, 5.2

Property 9: Regional Resource Filtering

For any scan task with selected regions, the system should only scan regional services in those specific regions.

Validates: Requirements 5.3

Property 10: Multi-Account Resource Identification

For any scan task involving multiple AWS accounts, every resource record should include the AWS Account ID, and the report should include an Account column in all resource tables.

Validates: Requirements 3.3, 5.7

Property 11: Error Resilience in Scanning

For any scan task, if a service scan encounters an error:

  • The error should be logged with full details
  • The scan should continue with other services
  • Services with errors should be excluded from the report (or marked as failed)

Validates: Requirements 4.5, 5.6, 8.2

Property 12: Empty Service Exclusion

For any generated report, services with no resources should not appear in the Implementation List section.

Validates: Requirements 4.6, 6.4

Property 13: Retry Mechanism

For any AWS API call that fails, the system should retry with exponential backoff up to 3 times before marking it as failed.

Validates: Requirements 5.5

Property 14: Resource Attribute Extraction

For any scanned resource, the system should extract all attributes defined in the service column specification.

Validates: Requirements 5.4

Property 15: Template Placeholder Replacement

For any generated report, all [placeholder] markers in the template should be replaced with actual values, and no placeholders should remain in the final document.

Validates: Requirements 6.2

Property 16: Report Content Completeness

For any generated report:

  • All services with resources should have corresponding tables
  • All project metadata fields should be present in the document
  • The Update History section should include version, date, modifier, and details

Validates: Requirements 6.3, 6.8, 6.9, 3.8

Property 17: Report Storage and Accessibility

For any completed task, the generated report should be stored and accessible for download by authorized users.

Validates: Requirements 6.6, 6.7

Property 18: Error Logging Completeness

For any error that occurs in the system, the log entry should include timestamp, context, and stack trace.

Validates: Requirements 8.1

Property 19: Task Error Display

For any task with errors, the error logs should be retrievable and displayable to the user.

Validates: Requirements 8.3

Property 20: System Error Resilience

For any critical error, the system should not crash but gracefully handle the error and continue operation.

Validates: Requirements 8.5

Property 21: Database Migration Safety

For any database schema migration, existing data should be preserved without loss.

Validates: Requirements 9.4

Property 22: Worker Task Retry

For any failed task, the Celery worker should retry up to 3 times with exponential backoff before marking it as permanently failed.

Validates: Requirements 4.10

Error Handling

Error Categories

  1. Authentication Errors

    • Invalid credentials (401 Unauthorized)
    • Expired token (401 Unauthorized)
    • Insufficient permissions (403 Forbidden)
  2. Validation Errors

    • Missing required fields (400 Bad Request)
    • Invalid field format (400 Bad Request)
    • Resource not found (404 Not Found)
  3. AWS API Errors

    • Credential validation failure
    • API rate limiting (throttling)
    • Service unavailable
    • Access denied
  4. System Errors

    • Database connection failure
    • File system errors
    • Worker communication failure

Error Response Format

{
  "error": {
    "code": "ERROR_CODE",
    "message": "Human-readable error message",
    "details": {
      "field": "specific field with error",
      "reason": "detailed reason"
    }
  }
}

Retry Strategy

class RetryConfig:
    max_retries: int = 3
    base_delay: float = 1.0  # seconds
    max_delay: float = 30.0  # seconds
    exponential_base: float = 2.0

def retry_with_backoff(func, config: RetryConfig):
    for attempt in range(config.max_retries):
        try:
            return func()
        except RetryableError as e:
            if attempt == config.max_retries - 1:
                raise
            delay = min(
                config.base_delay * (config.exponential_base ** attempt),
                config.max_delay
            )
            time.sleep(delay)

Testing Strategy

Unit Tests

Unit tests will focus on:

  • Individual component functionality
  • Input validation
  • Error handling
  • Data transformation

Property-Based Tests

Property-based tests will use hypothesis library to verify:

  • RBAC enforcement across all role combinations
  • JWT token validation with various token states
  • Data masking for all credential types
  • Report generation with various resource combinations

Integration Tests

Integration tests will verify:

  • API endpoint functionality
  • Database operations
  • Worker communication
  • AWS API interactions (using moto for mocking)

Test Configuration

# pytest configuration
import pytest
from hypothesis import settings

# Property test settings
settings.register_profile("ci", max_examples=100)
settings.register_profile("dev", max_examples=20)
settings.load_profile("ci")

Test Tagging Format

Each property test should be tagged with:

@pytest.mark.property
def test_rbac_enforcement():
    """
    Feature: aws-resource-scanner
    Property 1: Role-Based Access Control (RBAC)
    Validates: Requirements 1.3, 1.4, 1.5, 1.6
    """
    pass