本设计实现 CloudShell 扫描器功能,允许用户在 AWS CloudShell 环境中运行独立的 Python 脚本扫描 AWS 资源,并将结果上传到系统生成报告。
cloudshell_scanner.py) - 独立的单文件 Python 脚本flowchart TB
subgraph CloudShell["AWS CloudShell"]
Scanner["cloudshell_scanner.py"]
JSON["scan_result.json"]
Scanner --> JSON
end
subgraph Frontend["前端 (React)"]
Upload["上传组件"]
Validate["JSON 验证"]
Upload --> Validate
end
subgraph Backend["后端 (Flask)"]
API["上传 API"]
Task["任务创建"]
Worker["Celery Worker"]
Report["报告生成器"]
API --> Task
Task --> Worker
Worker --> Report
end
JSON -.->|用户下载| Upload
Validate --> API
Report --> Download["报告下载"]
cloudshell_scanner.py # 单体文件,包含所有扫描逻辑
# 扫描所有区域
python cloudshell_scanner.py
# 扫描指定区域
python cloudshell_scanner.py --regions us-east-1,ap-northeast-1
# 指定输出文件
python cloudshell_scanner.py --output my_scan.json
# 扫描指定服务
python cloudshell_scanner.py --services ec2,vpc,rds
class CloudShellScanner:
"""CloudShell 环境下的 AWS 资源扫描器"""
SUPPORTED_SERVICES: List[str] # 支持的服务列表
GLOBAL_SERVICES: List[str] # 全局服务列表
def __init__(self):
"""初始化扫描器,自动获取 CloudShell 凭证"""
pass
def get_account_id(self) -> str:
"""获取当前 AWS 账户 ID"""
pass
def list_regions(self) -> List[str]:
"""列出所有可用区域"""
pass
def scan_resources(
self,
regions: List[str],
services: Optional[List[str]] = None
) -> Dict[str, Any]:
"""扫描指定区域和服务的资源"""
pass
def export_json(self, result: Dict[str, Any], output_path: str) -> None:
"""导出扫描结果为 JSON 文件"""
pass
interface ScanData {
metadata: {
account_id: string;
scan_timestamp: string; // ISO 8601 格式
regions_scanned: string[];
services_scanned: string[];
scanner_version: string;
total_resources: number;
total_errors: number;
};
resources: {
[service: string]: ResourceData[];
};
errors: ErrorData[];
}
interface ResourceData {
account_id: string;
region: string;
service: string;
resource_type: string;
resource_id: string;
name: string;
attributes: Record<string, any>;
}
interface ErrorData {
service: string;
region: string;
error: string;
error_type: string;
details: Record<string, any> | null;
}
frontend/src/components/
├── Upload/
│ ├── JsonUploader.tsx # JSON 文件上传组件
│ └── ScanDataValidator.ts # JSON 数据验证工具
interface JsonUploaderProps {
onUploadSuccess: (data: ScanData, file: File) => void;
onUploadError: (error: string) => void;
maxFileSize?: number; // 默认 50MB
}
POST /api/tasks/upload-scan
请求体 (multipart/form-data):
{
"scan_data": "<JSON 文件>",
"project_metadata": {
"clientName": "客户名称",
"projectName": "项目名称",
"bdManager": "BD 经理",
"bdManagerEmail": "bd@example.com",
"solutionsArchitect": "解决方案架构师",
"solutionsArchitectEmail": "sa@example.com",
"cloudEngineer": "云工程师",
"cloudEngineerEmail": "ce@example.com"
},
"network_diagram": "<可选:网络拓扑图>"
}
响应:
{
"message": "任务创建成功",
"task": {
"id": 123,
"name": "CloudShell Scan - 2024-01-15",
"status": "pending",
"source": "upload"
}
}
class ScanDataProcessor:
"""处理上传的扫描数据"""
def validate_scan_data(self, data: Dict[str, Any]) -> Tuple[bool, List[str]]:
"""验证扫描数据结构
Returns:
(is_valid, error_messages)
"""
pass
def convert_to_scan_result(self, data: Dict[str, Any]) -> ScanResult:
"""将 JSON 数据转换为 ScanResult 对象"""
pass
在现有 Task 模型中添加字段:
class Task(db.Model):
# ... 现有字段 ...
# 新增字段
source = db.Column(db.String(20), default='credential') # 'credential' 或 'upload'
scan_data_path = db.Column(db.String(500), nullable=True) # 上传的 JSON 文件路径
# migrations/versions/xxx_add_task_source_field.py
def upgrade():
op.add_column('task', sa.Column('source', sa.String(20), default='credential'))
op.add_column('task', sa.Column('scan_data_path', sa.String(500), nullable=True))
def downgrade():
op.drop_column('task', 'source')
op.drop_column('task', 'scan_data_path')
正确性属性是一种特征或行为,应该在系统的所有有效执行中保持为真——本质上是关于系统应该做什么的形式化陈述。属性作为人类可读规范和机器可验证正确性保证之间的桥梁。
对于任意 有效的 ScanResult 对象,将其序列化为 JSON 然后反序列化,应该产生与原始对象等价的数据结构。
验证: 需求 2.4, 2.5
对于任意 CloudShell_Scanner 生成的 Scan_Data,必须包含以下所有字段:
验证: 需求 2.1, 2.2, 2.3
对于任意 指定的区域列表,CloudShell_Scanner 扫描的资源应该只来自这些指定的区域。
验证: 需求 1.3
对于任意 CloudShell_Scanner 实例,其 SUPPORTED_SERVICES 列表应该与现有 AWSScanner 的 SUPPORTED_SERVICES 列表完全相同。
验证: 需求 1.5
对于任意 扫描过程中的服务失败,CloudShell_Scanner 应该记录错误并继续扫描其他服务,最终结果应该包含成功扫描的资源和失败的错误记录。
验证: 需求 1.8
对于任意 上传的 JSON 数据,如果缺少必要字段,Upload_Handler 应该返回包含所有缺失字段名称的错误信息。
验证: 需求 3.4, 4.2, 6.2
对于任意 无效的 JSON 格式输入,系统应该返回明确的错误信息而不是崩溃。
验证: 需求 3.3, 3.5, 4.5
对于任意 有效的 Scan_Data,通过上传方式生成的报告应该与通过凭证扫描方式生成的报告具有相同的结构和格式。
验证: 需求 5.1, 5.2
| 错误类型 | 处理方式 |
|---|---|
| 凭证无效 | 显示错误信息,提示用户检查 IAM 权限 |
| 区域不可用 | 跳过该区域,记录警告,继续扫描其他区域 |
| 服务 API 错误 | 记录错误详情,继续扫描其他服务 |
| 网络超时 | 重试 3 次,使用指数退避策略 |
| 权限不足 | 记录缺少的权限,继续扫描有权限的资源 |
| 错误类型 | HTTP 状态码 | 响应 |
|---|---|---|
| JSON 解析失败 | 400 | {"error": "无效的 JSON 格式", "details": "..."} |
| 缺少必要字段 | 400 | {"error": "缺少必要字段", "missing_fields": [...]} |
| 文件过大 | 413 | {"error": "文件大小超过限制", "max_size": "50MB"} |
| 服务器错误 | 500 | {"error": "服务器内部错误", "request_id": "..."} |
本功能采用单元测试和属性测试相结合的方式:
| 组件 | 单元测试 | 属性测试 |
|---|---|---|
| CloudShell 扫描脚本 | 各服务扫描方法 | Property 2, 3, 4, 5 |
| JSON 序列化/反序列化 | 边界情况 | Property 1 |
| 前端 JSON 验证 | 特定错误场景 | Property 6, 7 |
| 后端上传处理 | API 端点测试 | Property 6, 7 |
| 报告生成 | 格式验证 | Property 8 |
# Feature: cloudshell-scanner, Property 1: JSON 数据往返一致性
@given(scan_result=scan_result_strategy())
def test_json_round_trip(scan_result):
...
CloudShell 扫描脚本设计为易于扩展,添加新的 AWS 服务只需要以下步骤:
在 cloudshell_scanner.py 中:
class CloudShellScanner:
# 1. 在 SUPPORTED_SERVICES 列表中添加新服务
SUPPORTED_SERVICES = [
'vpc', 'subnet', 'ec2', ...,
'new_service' # 添加新服务标识
]
# 2. 如果是全局服务,添加到 GLOBAL_SERVICES
GLOBAL_SERVICES = ['cloudfront', 'route53', ..., 'new_global_service']
# 3. 在 _get_scanner_method 中注册扫描方法
def _get_scanner_method(self, service: str):
scanner_methods = {
...
'new_service': self._scan_new_service,
}
return scanner_methods.get(service)
# 4. 实现扫描方法
def _scan_new_service(self, account_id: str, region: str) -> List[Dict]:
"""扫描新服务资源"""
client = boto3.client('new-service', region_name=region)
resources = []
try:
response = client.list_resources() # 根据实际 API 调整
for item in response.get('Resources', []):
resources.append({
'account_id': account_id,
'region': region,
'service': 'new_service',
'resource_type': 'NewResourceType',
'resource_id': item['ResourceId'],
'name': item.get('Name', ''),
'attributes': {
# 添加服务特定属性
'attribute1': item.get('Attribute1'),
'attribute2': item.get('Attribute2'),
}
})
except Exception as e:
# 错误会被上层捕获并记录
raise
return resources
在 backend/app/scanners/aws_scanner.py 中进行相同的修改,确保两个扫描器支持相同的服务。
在 backend/app/services/report_generator.py 中:
SERVICE_CONFIG = {
...
'new_service': {
'layout': TableLayout.HORIZONTAL, # 或 VERTICAL
'title': 'New Service',
'columns': ['Name', 'ID', 'Attribute1', 'Attribute2'],
},
}
# 添加到服务顺序
SERVICE_ORDER = [..., 'new_service']
# 添加到服务分组
SERVICE_GROUPS = {
...
'new_service': 'NewServiceGroup',
}
确保新服务在以下测试中被覆盖:
添加新服务时,请确保:
def _scan_service_template(self, account_id: str, region: str) -> List[Dict]:
"""
服务扫描方法模板
Args:
account_id: AWS 账户 ID
region: 区域名称
Returns:
资源列表,每个资源包含标准字段
"""
client = boto3.client('service-name', region_name=region)
resources = []
# 使用分页器处理大量资源
paginator = client.get_paginator('list_resources')
for page in paginator.paginate():
for item in page.get('Resources', []):
resources.append({
'account_id': account_id,
'region': region,
'service': 'service_key',
'resource_type': 'ResourceType',
'resource_id': item['Id'],
'name': self._get_name_from_tags(item.get('Tags', [])),
'attributes': {
# 服务特定属性
}
})
return resources