本文档为开发人员提供在维护和扩展 AWS 服务扫描器时应遵循的规范和最佳实践。
backend/app/scanners/
├── base.py # 抽象基类和数据结构定义
├── utils.py # 通用工具函数(重试逻辑等)
├── credentials.py # AWS 凭证管理
├── aws_scanner.py # AWS 扫描器主实现
└── services/ # 各服务扫描器实现
├── __init__.py
├── vpc.py # VPC 相关服务
├── ec2.py # EC2 相关服务
├── elb.py # 负载均衡相关服务
├── database.py # 数据库服务(RDS、ElastiCache)
├── compute.py # 计算和存储服务(EKS、Lambda、S3)
├── global_services.py # 全局服务(CloudFront、Route53、ACM、WAF)
└── monitoring.py # 监控服务(CloudWatch、SNS、EventBridge)
@dataclass
class ResourceData:
account_id: str # AWS 账户 ID
region: str # 区域(全局服务使用 'global')
service: str # 服务名称(如 'ec2', 'vpc')
resource_type: str # 资源类型(如 'Instance', 'VPC')
resource_id: str # 资源唯一标识符(ARN 或 ID)
name: str # 资源名称
attributes: Dict[str, Any] # 服务特定属性
根据服务类型选择合适的文件:
| 服务类型 | 文件 | 示例 |
|---|---|---|
| VPC 网络相关 | vpc.py |
VPC, Subnet, Security Group |
| EC2 计算相关 | ec2.py |
EC2 Instance, Elastic IP |
| 负载均衡相关 | elb.py |
ALB, NLB, Target Group |
| 数据库相关 | database.py |
RDS, ElastiCache |
| 计算和存储 | compute.py |
EKS, Lambda, S3 |
| 全局服务 | global_services.py |
CloudFront, Route53, WAF |
| 监控管理 | monitoring.py |
CloudWatch, SNS, EventBridge |
如果新服务不属于现有分类,可以创建新的服务文件。
在对应的服务文件中添加静态方法:
from app.scanners.base import ResourceData
from app.scanners.utils import retry_with_backoff
class YourServiceScanner:
"""Scanner for your AWS resources"""
@staticmethod
@retry_with_backoff() # 必须添加重试装饰器
def scan_your_resource(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
"""
扫描 Your Resource 资源。
Attributes (horizontal/vertical layout):
列出所有返回的属性字段
"""
resources = []
client = session.client('your-service')
try:
# 使用分页器处理大量数据
paginator = client.get_paginator('list_resources')
for page in paginator.paginate():
for item in page.get('Resources', []):
resources.append(ResourceData(
account_id=account_id,
region=region,
service='your_service',
resource_type='Your Resource',
resource_id=item.get('Arn', item.get('Id')),
name=item.get('Name', ''),
attributes={
'Attribute1': item.get('Attr1', ''),
'Attribute2': item.get('Attr2', ''),
}
))
except Exception as e:
logger.warning(f"Failed to scan your resources: {str(e)}")
return resources
在 aws_scanner.py 中完成以下修改:
class AWSScanner(CloudProviderScanner):
SUPPORTED_SERVICES = [
# ... 现有服务
'your_service', # 添加新服务
]
# 如果是全局服务,还需添加到这里
GLOBAL_SERVICES = ['cloudfront', 'route53', 'waf', 's3', 'your_global_service']
def _get_scanner_method(self, service: str) -> Optional[Callable]:
scanner_methods = {
# ... 现有映射
'your_service': self._scan_your_service,
}
return scanner_methods.get(service)
def _scan_your_service(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
"""Scan Your Service"""
from app.scanners.services.your_module import YourServiceScanner
return YourServiceScanner.scan_your_resource(session, account_id, region)
| 类型 | 规范 | 示例 |
|---|---|---|
| 服务名称 | 小写下划线 | security_group, nat_gateway |
| 资源类型 | 首字母大写空格分隔 | Security Group, NAT Gateway |
| 扫描方法 | scan_ 前缀 |
scan_vpcs, scan_ec2_instances |
| 类名 | 大驼峰 + Scanner 后缀 | VPCServiceScanner |
attributes={
# 使用人类可读的键名
'Instance ID': instance_id, # ✓ 正确
'instance_id': instance_id, # ✗ 避免
# 列表转为逗号分隔字符串
'Security Groups': ', '.join(sg_ids) if sg_ids else 'N/A',
# 空值处理
'Public IP': public_ip or 'N/A',
# 数值转字符串
'Port': str(port),
'Memory (MB)': str(memory_size),
}
全局服务(如 CloudFront、Route53)需要特殊处理:
@staticmethod
@retry_with_backoff()
def scan_global_resource(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
# 全局服务始终使用 us-east-1
client = session.client('cloudfront', region_name='us-east-1')
# region 参数设为 'global'
resources.append(ResourceData(
account_id=account_id,
region='global', # 重要:全局服务使用 'global'
service='cloudfront',
# ...
))
始终使用分页器处理可能返回大量数据的 API:
# ✓ 正确:使用分页器
paginator = client.get_paginator('describe_instances')
for page in paginator.paginate():
for reservation in page.get('Reservations', []):
# 处理数据
# ✗ 避免:直接调用可能截断数据
response = client.describe_instances()
使用统一的辅助方法:
@staticmethod
def _get_name_from_tags(tags: List[Dict[str, str]], default: str = '') -> str:
"""Extract Name tag value from tags list"""
if not tags:
return default
for tag in tags:
if tag.get('Key') == 'Name':
return tag.get('Value', default)
return default
# 使用示例
name = self._get_name_from_tags(resource.get('Tags', []), resource['ResourceId'])
所有扫描方法必须使用 @retry_with_backoff() 装饰器:
from app.scanners.utils import retry_with_backoff
@staticmethod
@retry_with_backoff() # 默认重试 3 次,指数退避
def scan_resources(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
# ...
重试配置(在 utils.py 中定义):
try:
# API 调用
response = client.describe_resources()
except ClientError as e:
error_code = e.response.get('Error', {}).get('Code', '')
# 记录警告但不中断扫描
logger.warning(f"Failed to scan resources in {region}: {error_code}")
except Exception as e:
logger.warning(f"Unexpected error scanning resources: {str(e)}")
# 始终返回已收集的资源,即使部分失败
return resources
以下错误不会触发重试:
AccessDenied - 权限不足UnauthorizedAccess - 未授权InvalidParameterValue - 参数错误ValidationError - 验证错误为每个新扫描器编写单元测试:
# tests/test_scanners/test_your_service.py
import pytest
from unittest.mock import Mock, patch
from app.scanners.services.your_module import YourServiceScanner
class TestYourServiceScanner:
@patch('boto3.Session')
def test_scan_your_resource_success(self, mock_session):
# 模拟 API 响应
mock_client = Mock()
mock_session.client.return_value = mock_client
mock_client.get_paginator.return_value.paginate.return_value = [
{'Resources': [{'Id': 'res-123', 'Name': 'test'}]}
]
# 执行扫描
resources = YourServiceScanner.scan_your_resource(
mock_session, 'account-123', 'us-east-1'
)
# 验证结果
assert len(resources) == 1
assert resources[0].resource_id == 'res-123'
@patch('boto3.Session')
def test_scan_your_resource_handles_error(self, mock_session):
mock_client = Mock()
mock_session.client.return_value = mock_client
mock_client.get_paginator.side_effect = Exception("API Error")
# 应该返回空列表而不是抛出异常
resources = YourServiceScanner.scan_your_resource(
mock_session, 'account-123', 'us-east-1'
)
assert resources == []
使用真实 AWS 凭证进行集成测试(在 CI/CD 中可选):
@pytest.mark.integration
def test_scan_with_real_credentials():
# 需要配置真实的 AWS 凭证
pass
# 先列出资源,再获取详情
for item in list_response.get('Items', []):
try:
detail = client.describe_resource(ResourceId=item['Id'])
# 使用详情数据
except Exception as e:
logger.debug(f"Failed to get details for {item['Id']}: {str(e)}")
# 使用基本数据继续
某些服务(如 ACM)在特定区域有特殊用途:
# ACM 证书:扫描选定区域 + us-east-1(用于 CloudFront)
acm_regions = list(regions)
if 'us-east-1' not in acm_regions:
acm_regions.append('us-east-1')
for resource in response.get('Resources', []):
# 跳过已删除的资源
if resource.get('State') in ['deleted', 'deleting', 'failed']:
continue
# 处理有效资源
某些资源需要展开为多条记录(如安全组规则):
for sg in security_groups:
for rule in sg.get('IpPermissions', []):
# 每条规则创建一个 ResourceData
resources.append(ResourceData(...))
# 如果没有规则,仍然记录安全组本身
if not sg.get('IpPermissions'):
resources.append(ResourceData(...))
添加新服务扫描器前,请确认:
@staticmethod 和 @retry_with_backoff() 装饰器(session, account_id, region) -> List[ResourceData]aws_scanner.py 中注册了新服务.kiro/specs/aws-resource-scanner/requirements.md