design.md 14 KB

设计文档

概述

本设计实现 CloudShell 扫描器功能,允许用户在 AWS CloudShell 环境中运行独立的 Python 脚本扫描 AWS 资源,并将结果上传到系统生成报告。

核心组件

  1. CloudShell 扫描脚本 (cloudshell_scanner.py) - 独立的单文件 Python 脚本
  2. 前端上传组件 - React 组件,支持 JSON 文件上传和验证
  3. 后端上传 API - Flask 端点,处理 JSON 数据上传和任务创建
  4. 数据处理服务 - 将上传的 JSON 数据转换为报告

架构

flowchart TB
    subgraph CloudShell["AWS CloudShell"]
        Scanner["cloudshell_scanner.py"]
        JSON["scan_result.json"]
        Scanner --> JSON
    end
    
    subgraph Frontend["前端 (React)"]
        Upload["上传组件"]
        Validate["JSON 验证"]
        Upload --> Validate
    end
    
    subgraph Backend["后端 (Flask)"]
        API["上传 API"]
        Task["任务创建"]
        Worker["Celery Worker"]
        Report["报告生成器"]
        API --> Task
        Task --> Worker
        Worker --> Report
    end
    
    JSON -.->|用户下载| Upload
    Validate --> API
    Report --> Download["报告下载"]

组件和接口

1. CloudShell 扫描脚本

文件结构

cloudshell_scanner.py  # 单体文件,包含所有扫描逻辑

命令行接口

# 扫描所有区域
python cloudshell_scanner.py

# 扫描指定区域
python cloudshell_scanner.py --regions us-east-1,ap-northeast-1

# 指定输出文件
python cloudshell_scanner.py --output my_scan.json

# 扫描指定服务
python cloudshell_scanner.py --services ec2,vpc,rds

核心类

class CloudShellScanner:
    """CloudShell 环境下的 AWS 资源扫描器"""
    
    SUPPORTED_SERVICES: List[str]  # 支持的服务列表
    GLOBAL_SERVICES: List[str]     # 全局服务列表
    
    def __init__(self):
        """初始化扫描器,自动获取 CloudShell 凭证"""
        pass
    
    def get_account_id(self) -> str:
        """获取当前 AWS 账户 ID"""
        pass
    
    def list_regions(self) -> List[str]:
        """列出所有可用区域"""
        pass
    
    def scan_resources(
        self,
        regions: List[str],
        services: Optional[List[str]] = None
    ) -> Dict[str, Any]:
        """扫描指定区域和服务的资源"""
        pass
    
    def export_json(self, result: Dict[str, Any], output_path: str) -> None:
        """导出扫描结果为 JSON 文件"""
        pass

2. JSON 数据格式

interface ScanData {
  metadata: {
    account_id: string;
    scan_timestamp: string;  // ISO 8601 格式
    regions_scanned: string[];
    services_scanned: string[];
    scanner_version: string;
    total_resources: number;
    total_errors: number;
  };
  resources: {
    [service: string]: ResourceData[];
  };
  errors: ErrorData[];
}

interface ResourceData {
  account_id: string;
  region: string;
  service: string;
  resource_type: string;
  resource_id: string;
  name: string;
  attributes: Record<string, any>;
}

interface ErrorData {
  service: string;
  region: string;
  error: string;
  error_type: string;
  details: Record<string, any> | null;
}

3. 前端上传组件

组件结构

frontend/src/components/
├── Upload/
│   ├── JsonUploader.tsx      # JSON 文件上传组件
│   └── ScanDataValidator.ts  # JSON 数据验证工具

JsonUploader 组件接口

interface JsonUploaderProps {
  onUploadSuccess: (data: ScanData, file: File) => void;
  onUploadError: (error: string) => void;
  maxFileSize?: number;  // 默认 50MB
}

4. 后端上传 API

新增端点

POST /api/tasks/upload-scan

请求体 (multipart/form-data):

{
  "scan_data": "<JSON 文件>",
  "project_metadata": {
    "clientName": "客户名称",
    "projectName": "项目名称",
    "bdManager": "BD 经理",
    "bdManagerEmail": "bd@example.com",
    "solutionsArchitect": "解决方案架构师",
    "solutionsArchitectEmail": "sa@example.com",
    "cloudEngineer": "云工程师",
    "cloudEngineerEmail": "ce@example.com"
  },
  "network_diagram": "<可选:网络拓扑图>"
}

响应:

{
  "message": "任务创建成功",
  "task": {
    "id": 123,
    "name": "CloudShell Scan - 2024-01-15",
    "status": "pending",
    "source": "upload"
  }
}

5. 数据处理服务

新增服务类

class ScanDataProcessor:
    """处理上传的扫描数据"""
    
    def validate_scan_data(self, data: Dict[str, Any]) -> Tuple[bool, List[str]]:
        """验证扫描数据结构
        
        Returns:
            (is_valid, error_messages)
        """
        pass
    
    def convert_to_scan_result(self, data: Dict[str, Any]) -> ScanResult:
        """将 JSON 数据转换为 ScanResult 对象"""
        pass

数据模型

Task 模型扩展

在现有 Task 模型中添加字段:

class Task(db.Model):
    # ... 现有字段 ...
    
    # 新增字段
    source = db.Column(db.String(20), default='credential')  # 'credential' 或 'upload'
    scan_data_path = db.Column(db.String(500), nullable=True)  # 上传的 JSON 文件路径

数据库迁移

# migrations/versions/xxx_add_task_source_field.py
def upgrade():
    op.add_column('task', sa.Column('source', sa.String(20), default='credential'))
    op.add_column('task', sa.Column('scan_data_path', sa.String(500), nullable=True))

def downgrade():
    op.drop_column('task', 'source')
    op.drop_column('task', 'scan_data_path')

正确性属性

正确性属性是一种特征或行为,应该在系统的所有有效执行中保持为真——本质上是关于系统应该做什么的形式化陈述。属性作为人类可读规范和机器可验证正确性保证之间的桥梁。

Property 1: JSON 数据往返一致性

对于任意 有效的 ScanResult 对象,将其序列化为 JSON 然后反序列化,应该产生与原始对象等价的数据结构。

验证: 需求 2.4, 2.5

Property 2: JSON 结构完整性

对于任意 CloudShell_Scanner 生成的 Scan_Data,必须包含以下所有字段:

  • metadata.account_id
  • metadata.scan_timestamp
  • metadata.regions_scanned
  • metadata.services_scanned
  • resources(按服务类型组织)
  • errors(错误列表)

验证: 需求 2.1, 2.2, 2.3

Property 3: 区域扫描约束

对于任意 指定的区域列表,CloudShell_Scanner 扫描的资源应该只来自这些指定的区域。

验证: 需求 1.3

Property 4: 服务类型一致性

对于任意 CloudShell_Scanner 实例,其 SUPPORTED_SERVICES 列表应该与现有 AWSScanner 的 SUPPORTED_SERVICES 列表完全相同。

验证: 需求 1.5

Property 5: 错误容错性

对于任意 扫描过程中的服务失败,CloudShell_Scanner 应该记录错误并继续扫描其他服务,最终结果应该包含成功扫描的资源和失败的错误记录。

验证: 需求 1.8

Property 6: JSON 验证完整性

对于任意 上传的 JSON 数据,如果缺少必要字段,Upload_Handler 应该返回包含所有缺失字段名称的错误信息。

验证: 需求 3.4, 4.2, 6.2

Property 7: 无效 JSON 错误处理

对于任意 无效的 JSON 格式输入,系统应该返回明确的错误信息而不是崩溃。

验证: 需求 3.3, 3.5, 4.5

Property 8: 报告生成一致性

对于任意 有效的 Scan_Data,通过上传方式生成的报告应该与通过凭证扫描方式生成的报告具有相同的结构和格式。

验证: 需求 5.1, 5.2

错误处理

CloudShell 扫描脚本错误处理

错误类型 处理方式
凭证无效 显示错误信息,提示用户检查 IAM 权限
区域不可用 跳过该区域,记录警告,继续扫描其他区域
服务 API 错误 记录错误详情,继续扫描其他服务
网络超时 重试 3 次,使用指数退避策略
权限不足 记录缺少的权限,继续扫描有权限的资源

后端上传处理错误处理

错误类型 HTTP 状态码 响应
JSON 解析失败 400 {"error": "无效的 JSON 格式", "details": "..."}
缺少必要字段 400 {"error": "缺少必要字段", "missing_fields": [...]}
文件过大 413 {"error": "文件大小超过限制", "max_size": "50MB"}
服务器错误 500 {"error": "服务器内部错误", "request_id": "..."}

测试策略

双重测试方法

本功能采用单元测试和属性测试相结合的方式:

  • 单元测试: 验证特定示例、边界情况和错误条件
  • 属性测试: 验证跨所有输入的通用属性

属性测试配置

  • 使用 Hypothesis 库进行 Python 属性测试
  • 使用 fast-check 库进行 TypeScript 属性测试
  • 每个属性测试至少运行 100 次迭代
  • 每个属性测试必须引用设计文档中的属性编号

测试覆盖范围

组件 单元测试 属性测试
CloudShell 扫描脚本 各服务扫描方法 Property 2, 3, 4, 5
JSON 序列化/反序列化 边界情况 Property 1
前端 JSON 验证 特定错误场景 Property 6, 7
后端上传处理 API 端点测试 Property 6, 7
报告生成 格式验证 Property 8

测试标签格式

# Feature: cloudshell-scanner, Property 1: JSON 数据往返一致性
@given(scan_result=scan_result_strategy())
def test_json_round_trip(scan_result):
    ...

扩展性:添加新服务支持

概述

CloudShell 扫描脚本设计为易于扩展,添加新的 AWS 服务只需要以下步骤:

步骤 1:在 CloudShell 扫描脚本中添加服务

cloudshell_scanner.py 中:

class CloudShellScanner:
    # 1. 在 SUPPORTED_SERVICES 列表中添加新服务
    SUPPORTED_SERVICES = [
        'vpc', 'subnet', 'ec2', ...,
        'new_service'  # 添加新服务标识
    ]
    
    # 2. 如果是全局服务,添加到 GLOBAL_SERVICES
    GLOBAL_SERVICES = ['cloudfront', 'route53', ..., 'new_global_service']
    
    # 3. 在 _get_scanner_method 中注册扫描方法
    def _get_scanner_method(self, service: str):
        scanner_methods = {
            ...
            'new_service': self._scan_new_service,
        }
        return scanner_methods.get(service)
    
    # 4. 实现扫描方法
    def _scan_new_service(self, account_id: str, region: str) -> List[Dict]:
        """扫描新服务资源"""
        client = boto3.client('new-service', region_name=region)
        resources = []
        
        try:
            response = client.list_resources()  # 根据实际 API 调整
            for item in response.get('Resources', []):
                resources.append({
                    'account_id': account_id,
                    'region': region,
                    'service': 'new_service',
                    'resource_type': 'NewResourceType',
                    'resource_id': item['ResourceId'],
                    'name': item.get('Name', ''),
                    'attributes': {
                        # 添加服务特定属性
                        'attribute1': item.get('Attribute1'),
                        'attribute2': item.get('Attribute2'),
                    }
                })
        except Exception as e:
            # 错误会被上层捕获并记录
            raise
        
        return resources

步骤 2:在后端 AWSScanner 中同步添加(保持一致性)

backend/app/scanners/aws_scanner.py 中进行相同的修改,确保两个扫描器支持相同的服务。

步骤 3:在报告生成器中添加服务配置

backend/app/services/report_generator.py 中:

SERVICE_CONFIG = {
    ...
    'new_service': {
        'layout': TableLayout.HORIZONTAL,  # 或 VERTICAL
        'title': 'New Service',
        'columns': ['Name', 'ID', 'Attribute1', 'Attribute2'],
    },
}

# 添加到服务顺序
SERVICE_ORDER = [..., 'new_service']

# 添加到服务分组
SERVICE_GROUPS = {
    ...
    'new_service': 'NewServiceGroup',
}

步骤 4:更新测试

确保新服务在以下测试中被覆盖:

  • 服务类型一致性测试(Property 4)
  • JSON 结构完整性测试(Property 2)

维护检查清单

添加新服务时,请确保:

  • CloudShell 扫描脚本中添加了服务
  • 后端 AWSScanner 中添加了相同的服务
  • 报告生成器中配置了服务的表格布局
  • 两个扫描器的 SUPPORTED_SERVICES 列表保持一致
  • 添加了必要的 IAM 权限说明
  • 更新了相关文档

服务扫描方法模板

def _scan_service_template(self, account_id: str, region: str) -> List[Dict]:
    """
    服务扫描方法模板
    
    Args:
        account_id: AWS 账户 ID
        region: 区域名称
    
    Returns:
        资源列表,每个资源包含标准字段
    """
    client = boto3.client('service-name', region_name=region)
    resources = []
    
    # 使用分页器处理大量资源
    paginator = client.get_paginator('list_resources')
    
    for page in paginator.paginate():
        for item in page.get('Resources', []):
            resources.append({
                'account_id': account_id,
                'region': region,
                'service': 'service_key',
                'resource_type': 'ResourceType',
                'resource_id': item['Id'],
                'name': self._get_name_from_tags(item.get('Tags', [])),
                'attributes': {
                    # 服务特定属性
                }
            })
    
    return resources