AWS资源扫描报告工具是一个全栈Web应用,采用前后端分离架构。后端使用Python Flask框架,前端使用React。系统通过Worker进程执行扫描任务,支持多账号、多区域并行扫描,并生成符合模板格式的Word报告。
graph TB
subgraph Frontend["前端 (React)"]
UI[用户界面]
Auth[认证模块]
TaskMgmt[任务管理]
ReportView[报告查看]
AdminPanel[管理面板]
end
subgraph Backend["后端 (Flask)"]
API[REST API]
AuthService[认证服务]
TaskService[任务服务]
CredentialService[凭证服务]
ReportService[报告服务]
WorkerManager[Worker管理器]
end
subgraph MessageQueue["消息队列"]
Redis[(Redis)]
end
subgraph Workers["Celery Workers"]
Worker1[Worker 1]
Worker2[Worker 2]
WorkerN[Worker N]
end
subgraph Scanner["扫描模块"]
AWSScanner[AWS扫描器]
CloudProvider[云厂商接口]
end
subgraph Storage["存储"]
DB[(数据库)]
FileStore[文件存储]
end
subgraph AWS["AWS Cloud"]
AWSServices[AWS Services]
end
UI --> API
API --> AuthService
API --> TaskService
API --> CredentialService
API --> ReportService
API --> WorkerManager
TaskService --> Redis
Redis --> Worker1
Redis --> Worker2
Redis --> WorkerN
Worker1 --> AWSScanner
Worker2 --> AWSScanner
WorkerN --> AWSScanner
AWSScanner --> CloudProvider
AWSScanner --> AWSServices
AuthService --> DB
TaskService --> DB
CredentialService --> DB
ReportService --> DB
ReportService --> FileStore
Worker1 --> Redis
Worker2 --> Redis
WorkerN --> Redis
interface LoginRequest {
username: string;
password: string;
}
interface LoginResponse {
token: string;
user: User;
}
interface User {
id: number;
username: string;
email: string;
role: 'admin' | 'power_user' | 'user';
}
interface ScanTask {
id: number;
name: string;
status: 'pending' | 'running' | 'completed' | 'failed';
progress: number;
createdAt: string;
completedAt?: string;
createdBy: number;
accounts: string[];
regions: string[];
projectMetadata: ProjectMetadata;
errorLogs?: ErrorLog[];
}
interface ProjectMetadata {
clientName: string;
projectName: string;
bdManager: string;
solutionsArchitect: string;
cloudEngineer: string;
cloudEngineerEmail: string;
networkDiagram?: File;
}
interface CreateTaskRequest {
name: string;
credentialIds: number[];
regions: string[];
projectMetadata: ProjectMetadata;
}
interface Report {
id: number;
taskId: number;
fileName: string;
fileSize: number;
createdAt: string;
downloadUrl: string;
}
{
"data": [...],
"pagination": {
"page": 1,
"page_size": 20,
"total": 100,
"total_pages": 5
}
}
所有列表API默认分页参数:
page: 页码,默认1page_size: 每页数量,默认20,最大100POST /api/auth/login - 用户登录
POST /api/auth/logout - 用户登出
GET /api/auth/me - 获取当前用户信息
POST /api/auth/refresh - 刷新Token
GET /api/users - 获取用户列表 (支持分页: page, page_size, 支持搜索: search)
POST /api/users/create - 创建用户
POST /api/users/update - 更新用户
POST /api/users/delete - 删除用户
POST /api/users/assign-credentials - 分配凭证给用户
GET /api/credentials - 获取凭证列表 (支持分页: page, page_size)
POST /api/credentials/create - 创建凭证
POST /api/credentials/update - 更新凭证
POST /api/credentials/delete - 删除凭证
POST /api/credentials/validate - 验证凭证
GET /api/credentials/base-role - 获取基础Assume Role配置
POST /api/credentials/base-role - 更新基础Assume Role配置
GET /api/tasks - 获取任务列表 (支持分页: page, page_size, 支持筛选: status)
POST /api/tasks/create - 创建任务
GET /api/tasks/detail - 获取任务详情 (query param: id)
POST /api/tasks/delete - 删除任务
GET /api/tasks/logs - 获取任务日志 (query param: id, 支持分页: page, page_size)
GET /api/reports - 获取报告列表 (支持分页: page, page_size, 支持筛选: task_id)
GET /api/reports/detail - 获取报告详情 (query param: id)
GET /api/reports/download - 下载报告 (query param: id)
POST /api/reports/delete - 删除报告
GET /api/workers - 获取Celery Worker状态列表
GET /api/workers/stats - 获取Worker统计信息
POST /api/workers/purge - 清除队列中的任务
from celery import Celery
# Celery配置
celery_app = Celery(
'aws_scanner',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
celery_app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
task_track_started=True,
task_time_limit=3600, # 1小时超时
worker_prefetch_multiplier=1, # 每个worker一次只取一个任务
task_acks_late=True, # 任务完成后才确认
)
from celery import shared_task, current_task
from typing import List, Dict, Any
@shared_task(bind=True, max_retries=3)
def scan_aws_resources(
self,
task_id: int,
credential_ids: List[int],
regions: List[str],
project_metadata: Dict[str, Any]
) -> Dict[str, Any]:
"""
执行AWS资源扫描任务
Args:
task_id: 数据库中的任务ID
credential_ids: AWS凭证ID列表
regions: 要扫描的区域列表
project_metadata: 项目元数据
Returns:
扫描结果和报告路径
"""
try:
# 更新任务状态为运行中
update_task_status(task_id, 'running')
# 执行扫描
results = {}
total_steps = len(credential_ids) * len(regions)
current_step = 0
for cred_id in credential_ids:
for region in regions:
# 扫描资源
resources = scan_region(cred_id, region)
results[f"{cred_id}_{region}"] = resources
# 更新进度
current_step += 1
progress = int((current_step / total_steps) * 100)
self.update_state(
state='PROGRESS',
meta={'progress': progress, 'current': current_step, 'total': total_steps}
)
update_task_progress(task_id, progress)
# 生成报告
report_path = generate_report(task_id, results, project_metadata)
# 更新任务状态为完成
update_task_status(task_id, 'completed', report_path=report_path)
return {'status': 'success', 'report_path': report_path}
except Exception as e:
# 记录错误并更新状态
log_task_error(task_id, str(e))
update_task_status(task_id, 'failed')
raise self.retry(exc=e, countdown=60) # 60秒后重试
@shared_task
def cleanup_old_reports(days: int = 30):
"""清理过期报告"""
pass
@shared_task
def validate_credentials(credential_id: int) -> bool:
"""验证AWS凭证有效性"""
pass
from celery.result import AsyncResult
def get_task_status(celery_task_id: str) -> Dict[str, Any]:
"""获取Celery任务状态"""
result = AsyncResult(celery_task_id)
if result.state == 'PENDING':
return {'status': 'pending', 'progress': 0}
elif result.state == 'PROGRESS':
return {
'status': 'running',
'progress': result.info.get('progress', 0),
'current': result.info.get('current', 0),
'total': result.info.get('total', 0)
}
elif result.state == 'SUCCESS':
return {'status': 'completed', 'result': result.result}
elif result.state == 'FAILURE':
return {'status': 'failed', 'error': str(result.result)}
else:
return {'status': result.state}
from abc import ABC, abstractmethod
from typing import List, Dict, Any
class CloudProviderScanner(ABC):
"""云厂商扫描器抽象基类"""
@abstractmethod
def get_credentials(self, credential_config: dict) -> Any:
"""获取云厂商凭证"""
pass
@abstractmethod
def list_regions(self) -> List[str]:
"""列出可用区域"""
pass
@abstractmethod
def scan_resources(self, regions: List[str], services: List[str]) -> Dict[str, List[dict]]:
"""扫描资源"""
pass
class AWSScanner(CloudProviderScanner):
"""AWS扫描器实现"""
def __init__(self, credential_type: str, credential_config: dict):
self.credential_type = credential_type # 'assume_role' or 'access_key'
self.credential_config = credential_config
def get_credentials(self, credential_config: dict) -> boto3.Session:
"""获取AWS Session"""
pass
def list_regions(self) -> List[str]:
"""列出AWS区域"""
pass
def scan_resources(self, regions: List[str], services: List[str]) -> Dict[str, List[dict]]:
"""并行扫描AWS资源"""
pass
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Text, Boolean, Enum
from sqlalchemy.orm import relationship
from datetime import datetime
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(100), unique=True, nullable=False)
password_hash = Column(String(255), nullable=False)
role = Column(Enum('admin', 'power_user', 'user'), default='user')
created_at = Column(DateTime, default=datetime.utcnow)
is_active = Column(Boolean, default=True)
credentials = relationship('UserCredential', back_populates='user')
tasks = relationship('Task', back_populates='created_by_user')
class AWSCredential(Base):
__tablename__ = 'aws_credentials'
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
credential_type = Column(Enum('assume_role', 'access_key'), nullable=False)
account_id = Column(String(12), nullable=False)
# For assume_role
role_arn = Column(String(255))
external_id = Column(String(255))
# For access_key (encrypted)
access_key_id = Column(String(255))
secret_access_key_encrypted = Column(Text)
created_at = Column(DateTime, default=datetime.utcnow)
is_active = Column(Boolean, default=True)
users = relationship('UserCredential', back_populates='credential')
class UserCredential(Base):
__tablename__ = 'user_credentials'
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('users.id'), nullable=False)
credential_id = Column(Integer, ForeignKey('aws_credentials.id'), nullable=False)
assigned_at = Column(DateTime, default=datetime.utcnow)
user = relationship('User', back_populates='credentials')
credential = relationship('AWSCredential', back_populates='users')
class BaseAssumeRoleConfig(Base):
__tablename__ = 'base_assume_role_config'
id = Column(Integer, primary_key=True)
access_key_id = Column(String(255), nullable=False)
secret_access_key_encrypted = Column(Text, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow)
class Task(Base):
__tablename__ = 'tasks'
id = Column(Integer, primary_key=True)
name = Column(String(200), nullable=False)
status = Column(Enum('pending', 'running', 'completed', 'failed'), default='pending')
progress = Column(Integer, default=0)
created_by = Column(Integer, ForeignKey('users.id'), nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
started_at = Column(DateTime)
completed_at = Column(DateTime)
celery_task_id = Column(String(100)) # Celery任务ID,用于查询状态
# Task configuration (JSON)
credential_ids = Column(Text) # JSON array
regions = Column(Text) # JSON array
project_metadata = Column(Text) # JSON object
created_by_user = relationship('User', back_populates='tasks')
logs = relationship('TaskLog', back_populates='task')
report = relationship('Report', back_populates='task', uselist=False)
class TaskLog(Base):
__tablename__ = 'task_logs'
id = Column(Integer, primary_key=True)
task_id = Column(Integer, ForeignKey('tasks.id'), nullable=False)
level = Column(Enum('info', 'warning', 'error'), default='info')
message = Column(Text, nullable=False)
details = Column(Text) # JSON for stack trace, etc.
created_at = Column(DateTime, default=datetime.utcnow)
task = relationship('Task', back_populates='logs')
class Report(Base):
__tablename__ = 'reports'
id = Column(Integer, primary_key=True)
task_id = Column(Integer, ForeignKey('tasks.id'), nullable=False)
file_name = Column(String(255), nullable=False)
file_path = Column(String(500), nullable=False)
file_size = Column(Integer)
created_at = Column(DateTime, default=datetime.utcnow)
task = relationship('Task', back_populates='report')
class Worker(Base):
__tablename__ = 'workers'
id = Column(Integer, primary_key=True)
worker_id = Column(String(100), unique=True, nullable=False) # Celery worker hostname
status = Column(Enum('online', 'offline'), default='offline')
active_tasks = Column(Integer, default=0)
processed_tasks = Column(Integer, default=0)
last_heartbeat = Column(DateTime)
registered_at = Column(DateTime, default=datetime.utcnow)
# 扫描结果的统一数据格式
class ResourceData:
"""资源数据基类"""
account_id: str
region: str
service: str
resource_type: str
resource_id: str
name: str
attributes: dict # 服务特定属性
# 表格布局类型
class TableLayout:
HORIZONTAL = 'horizontal' # 横向表格:列标题在顶部,多行数据(如VPC、Subnet)
VERTICAL = 'vertical' # 纵向表格:属性名在左列,值在右列,每个资源一个表格(如EC2、RDS)
# 各服务的属性定义和表格布局(与sample-reports完全一致)
SERVICE_CONFIG = {
# ===== VPC相关资源 =====
'vpc': {
'layout': TableLayout.HORIZONTAL,
'resources': {
'VPC': ['Region', 'Name', 'ID', 'CIDR'],
}
},
'subnet': {
'layout': TableLayout.HORIZONTAL,
'resources': {
'Subnet': ['Name', 'ID', 'AZ', 'CIDR'],
}
},
'route_table': {
'layout': TableLayout.HORIZONTAL,
'resources': {
'Route Table': ['Name', 'ID', 'Subnet Associations'],
}
},
'internet_gateway': {
'layout': TableLayout.VERTICAL,
'resources': {
'Internet Gateway': ['Name'], # 每个IGW一个表格,只显示Name
}
},
'nat_gateway': {
'layout': TableLayout.HORIZONTAL,
'resources': {
'NAT Gateway': ['Name', 'ID', 'Public IP', 'Private IP'],
}
},
'security_group': {
'layout': TableLayout.HORIZONTAL,
'resources': {
'Security Group': ['Name', 'ID', 'Protocol', 'Port range', 'Source'],
}
},
'vpc_endpoint': {
'layout': TableLayout.HORIZONTAL,
'resources': {
'Endpoint': ['Name', 'ID', 'VPC', 'Service Name', 'Type'],
}
},
'vpc_peering': {
'layout': TableLayout.HORIZONTAL,
'resources': {
'VPC Peering': ['Name', 'Peering Connection ID', 'Requester VPC', 'Accepter VPC'],
}
},
'customer_gateway': {
'layout': TableLayout.HORIZONTAL,
'resources': {
'Customer Gateway': ['Name', 'Customer Gateway ID', 'IP Address'],
}
},
'virtual_private_gateway': {
'layout': TableLayout.HORIZONTAL,
'resources': {
'Virtual Private Gateway': ['Name', 'Virtual Private Gateway ID', 'VPC'],
}
},
'vpn_connection': {
'layout': TableLayout.HORIZONTAL,
'resources': {
'VPN Connection': ['Name', 'VPN ID', 'Routes'],
}
},
# ===== EC2相关资源 =====
'ec2': {
'layout': TableLayout.VERTICAL,
'resources': {
'Instance': ['Name', 'Instance ID', 'Instance Type', 'AZ', 'AMI',
'Public IP', 'Public DNS', 'Private IP', 'VPC ID', 'Subnet ID',
'Key', 'Security Groups', 'EBS Type', 'EBS Size', 'Encryption',
'Other Requirement'],
}
},
'elastic_ip': {
'layout': TableLayout.VERTICAL,
'resources': {
'Elastic IP': ['Name'], # 每个EIP一行,只显示Name
}
},
# ===== Auto Scaling =====
'autoscaling': {
'layout': TableLayout.VERTICAL,
'resources': {
'Auto Scaling Group': ['Name', 'Launch Template', 'AMI', 'Instance type',
'Key', 'Target Groups', 'Desired', 'Min', 'Max',
'Scaling Policy'],
}
},
# ===== ELB相关资源 =====
'elb': {
'layout': TableLayout.VERTICAL,
'resources': {
'Load Balancer': ['Name', 'Type', 'DNS', 'Scheme', 'VPC',
'Availability Zones', 'Subnet', 'Security Groups'],
}
},
'target_group': {
'layout': TableLayout.VERTICAL,
'resources': {
'Target Group': ['Load Balancer', 'TG Name', 'Port', 'Protocol',
'Registered Instances', 'Health Check Path'],
}
},
# ===== RDS =====
'rds': {
'layout': TableLayout.VERTICAL,
'resources': {
'DB Instance': ['Region', 'Endpoint', 'DB instance ID', 'DB name',
'Master Username', 'Port', 'DB Engine', 'DB Version',
'Instance Type', 'Storage type', 'Storage', 'Multi-AZ',
'Security Group', 'Deletion Protection',
'Performance Insights Enabled', 'CloudWatch Logs'],
}
},
# ===== ElastiCache =====
'elasticache': {
'layout': TableLayout.VERTICAL,
'resources': {
'Cache Cluster': ['Cluster ID', 'Engine', 'Engine Version', 'Node Type',
'Num Nodes', 'Status'],
}
},
# ===== EKS =====
'eks': {
'layout': TableLayout.VERTICAL,
'resources': {
'Cluster': ['Cluster Name', 'Version', 'Status', 'Endpoint', 'VPC ID'],
}
},
# ===== Lambda =====
'lambda': {
'layout': TableLayout.HORIZONTAL,
'resources': {
'Function': ['Function Name', 'Runtime', 'Memory (MB)', 'Timeout (s)', 'Last Modified'],
}
},
# ===== S3 =====
's3': {
'layout': TableLayout.VERTICAL,
'resources': {
'Bucket': ['Bucket Name'], # 每个Bucket一行,只显示Name
}
},
's3_event_notification': {
'layout': TableLayout.VERTICAL,
'resources': {
'S3 event notification': ['Bucket', 'Name', 'Event Type',
'Destination type', 'Destination'],
}
},
# ===== CloudFront (Global) =====
'cloudfront': {
'layout': TableLayout.VERTICAL,
'resources': {
'Distribution': ['CloudFront ID', 'Domain Name', 'CNAME',
'Origin Domain Name', 'Origin Protocol Policy',
'Viewer Protocol Policy', 'Allowed HTTP Methods',
'Cached HTTP Methods'],
}
},
# ===== Route 53 (Global) =====
'route53': {
'layout': TableLayout.HORIZONTAL,
'resources': {
'Hosted Zone': ['Zone ID', 'Name', 'Type', 'Record Count'],
}
},
# ===== ACM (Global) =====
'acm': {
'layout': TableLayout.VERTICAL,
'resources': {
'Certificate': ['Domain name'], # 每个证书一行,只显示Domain name
}
},
# ===== WAF (Global) =====
'waf': {
'layout': TableLayout.HORIZONTAL,
'resources': {
'Web ACL': ['WebACL Name', 'Scope', 'Rules Count', 'Associated Resources'],
}
},
# ===== SNS =====
'sns': {
'layout': TableLayout.HORIZONTAL,
'resources': {
'Topic': ['Topic Name', 'Topic Display Name', 'Subscription Protocol',
'Subscription Endpoint'],
}
},
# ===== CloudWatch =====
'cloudwatch': {
'layout': TableLayout.HORIZONTAL,
'resources': {
'Log Group': ['Log Group Name', 'Retention Days', 'Stored Bytes', 'KMS Encryption'],
}
},
# ===== EventBridge =====
'eventbridge': {
'layout': TableLayout.HORIZONTAL,
'resources': {
'Rule': ['Name', 'Description', 'Event Bus', 'State'],
}
},
# ===== CloudTrail =====
'cloudtrail': {
'layout': TableLayout.HORIZONTAL,
'resources': {
'Trail': ['Name', 'Multi-Region Trail', 'Log File Validation', 'KMS Encryption'],
}
},
# ===== Config =====
'config': {
'layout': TableLayout.HORIZONTAL,
'resources': {
'Config': ['Name', 'Regional Resources', 'Global Resources', 'Retention period'],
}
},
}
A property is a characteristic or behavior that should hold true across all valid executions of a system-essentially, a formal statement about what the system should do. Properties serve as the bridge between human-readable specifications and machine-verifiable correctness guarantees.
For any user with a given role (admin, power_user, user), the system should enforce the following access rules:
Validates: Requirements 1.3, 1.4, 1.5, 1.6
For any JWT token, the system should:
Validates: Requirements 1.1, 1.2
For any user creation request, the system should require all mandatory fields (username, password, email, role) and reject requests with missing fields.
Validates: Requirements 1.7
For any credential assignment to a user, the system should:
Validates: Requirements 1.8, 2.5
For any API response containing credentials, the system should mask sensitive information (Secret Access Keys) and never expose them in plaintext.
Validates: Requirements 2.7
For any sensitive data stored in the database (passwords, AWS secret keys), the system should encrypt them before storage and decrypt only when needed.
Validates: Requirements 2.3, 9.3
For any task creation request, the system should require selection of at least one AWS account, at least one region, and all required project metadata fields.
Validates: Requirements 3.1
For any scan task, regardless of selected regions, the system should always scan global resources (CloudFront, Route 53, ACM, WAF).
Validates: Requirements 3.2, 5.2
For any scan task with selected regions, the system should only scan regional services in those specific regions.
Validates: Requirements 5.3
For any scan task involving multiple AWS accounts, every resource record should include the AWS Account ID, and the report should include an Account column in all resource tables.
Validates: Requirements 3.3, 5.7
For any scan task, if a service scan encounters an error:
Validates: Requirements 4.5, 5.6, 8.2
For any generated report, services with no resources should not appear in the Implementation List section.
Validates: Requirements 4.6, 6.4
For any AWS API call that fails, the system should retry with exponential backoff up to 3 times before marking it as failed.
Validates: Requirements 5.5
For any scanned resource, the system should extract all attributes defined in the service column specification.
Validates: Requirements 5.4
For any generated report, all [placeholder] markers in the template should be replaced with actual values, and no placeholders should remain in the final document.
Validates: Requirements 6.2
For any generated report:
Validates: Requirements 6.3, 6.8, 6.9, 3.8
For any completed task, the generated report should be stored and accessible for download by authorized users.
Validates: Requirements 6.6, 6.7
For any error that occurs in the system, the log entry should include timestamp, context, and stack trace.
Validates: Requirements 8.1
For any task with errors, the error logs should be retrievable and displayable to the user.
Validates: Requirements 8.3
For any critical error, the system should not crash but gracefully handle the error and continue operation.
Validates: Requirements 8.5
For any database schema migration, existing data should be preserved without loss.
Validates: Requirements 9.4
For any failed task, the Celery worker should retry up to 3 times with exponential backoff before marking it as permanently failed.
Validates: Requirements 4.10
Authentication Errors
Validation Errors
AWS API Errors
System Errors
{
"error": {
"code": "ERROR_CODE",
"message": "Human-readable error message",
"details": {
"field": "specific field with error",
"reason": "detailed reason"
}
}
}
class RetryConfig:
max_retries: int = 3
base_delay: float = 1.0 # seconds
max_delay: float = 30.0 # seconds
exponential_base: float = 2.0
def retry_with_backoff(func, config: RetryConfig):
for attempt in range(config.max_retries):
try:
return func()
except RetryableError as e:
if attempt == config.max_retries - 1:
raise
delay = min(
config.base_delay * (config.exponential_base ** attempt),
config.max_delay
)
time.sleep(delay)
Unit tests will focus on:
Property-based tests will use hypothesis library to verify:
Integration tests will verify:
# pytest configuration
import pytest
from hypothesis import settings
# Property test settings
settings.register_profile("ci", max_examples=100)
settings.register_profile("dev", max_examples=20)
settings.load_profile("ci")
Each property test should be tagged with:
@pytest.mark.property
def test_rbac_enforcement():
"""
Feature: aws-resource-scanner
Property 1: Role-Based Access Control (RBAC)
Validates: Requirements 1.3, 1.4, 1.5, 1.6
"""
pass