# AWS 服务扫描器开发指南 本文档为开发人员提供在维护和扩展 AWS 服务扫描器时应遵循的规范和最佳实践。 ## 目录 1. [架构概述](#架构概述) 2. [添加新服务扫描器](#添加新服务扫描器) 3. [代码规范](#代码规范) 4. [错误处理](#错误处理) 5. [测试要求](#测试要求) 6. [常见问题](#常见问题) --- ## 架构概述 ### 目录结构 ``` backend/app/scanners/ ├── base.py # 抽象基类和数据结构定义 ├── utils.py # 通用工具函数(重试逻辑等) ├── credentials.py # AWS 凭证管理 ├── aws_scanner.py # AWS 扫描器主实现 └── services/ # 各服务扫描器实现 ├── __init__.py ├── vpc.py # VPC 相关服务 ├── ec2.py # EC2 相关服务 ├── elb.py # 负载均衡相关服务 ├── database.py # 数据库服务(RDS、ElastiCache) ├── compute.py # 计算和存储服务(EKS、Lambda、S3) ├── global_services.py # 全局服务(CloudFront、Route53、ACM、WAF) └── monitoring.py # 监控服务(CloudWatch、SNS、EventBridge) ``` ### 核心数据结构 ```python @dataclass class ResourceData: account_id: str # AWS 账户 ID region: str # 区域(全局服务使用 'global') service: str # 服务名称(如 'ec2', 'vpc') resource_type: str # 资源类型(如 'Instance', 'VPC') resource_id: str # 资源唯一标识符(ARN 或 ID) name: str # 资源名称 attributes: Dict[str, Any] # 服务特定属性 ``` --- ## 添加新服务扫描器 ### 步骤 1:确定服务分类 根据服务类型选择合适的文件: | 服务类型 | 文件 | 示例 | |---------|------|------| | VPC 网络相关 | `vpc.py` | VPC, Subnet, Security Group | | EC2 计算相关 | `ec2.py` | EC2 Instance, Elastic IP | | 负载均衡相关 | `elb.py` | ALB, NLB, Target Group | | 数据库相关 | `database.py` | RDS, ElastiCache | | 计算和存储 | `compute.py` | EKS, Lambda, S3 | | 全局服务 | `global_services.py` | CloudFront, Route53, WAF | | 监控管理 | `monitoring.py` | CloudWatch, SNS, EventBridge | 如果新服务不属于现有分类,可以创建新的服务文件。 ### 步骤 2:实现扫描方法 在对应的服务文件中添加静态方法: ```python from app.scanners.base import ResourceData from app.scanners.utils import retry_with_backoff class YourServiceScanner: """Scanner for your AWS resources""" @staticmethod @retry_with_backoff() # 必须添加重试装饰器 def scan_your_resource(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ 扫描 Your Resource 资源。 Attributes (horizontal/vertical layout): 列出所有返回的属性字段 """ resources = [] client = session.client('your-service') try: # 使用分页器处理大量数据 paginator = client.get_paginator('list_resources') for page in paginator.paginate(): for item in page.get('Resources', []): resources.append(ResourceData( account_id=account_id, region=region, service='your_service', resource_type='Your Resource', resource_id=item.get('Arn', item.get('Id')), name=item.get('Name', ''), attributes={ 'Attribute1': item.get('Attr1', ''), 'Attribute2': item.get('Attr2', ''), } )) except Exception as e: logger.warning(f"Failed to scan your resources: {str(e)}") return resources ``` ### 步骤 3:注册到主扫描器 在 `aws_scanner.py` 中完成以下修改: #### 3.1 添加到支持的服务列表 ```python class AWSScanner(CloudProviderScanner): SUPPORTED_SERVICES = [ # ... 现有服务 'your_service', # 添加新服务 ] # 如果是全局服务,还需添加到这里 GLOBAL_SERVICES = ['cloudfront', 'route53', 'waf', 's3', 'your_global_service'] ``` #### 3.2 添加扫描方法映射 ```python def _get_scanner_method(self, service: str) -> Optional[Callable]: scanner_methods = { # ... 现有映射 'your_service': self._scan_your_service, } return scanner_methods.get(service) ``` #### 3.3 实现包装方法 ```python def _scan_your_service(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """Scan Your Service""" from app.scanners.services.your_module import YourServiceScanner return YourServiceScanner.scan_your_resource(session, account_id, region) ``` --- ## 代码规范 ### 命名规范 | 类型 | 规范 | 示例 | |------|------|------| | 服务名称 | 小写下划线 | `security_group`, `nat_gateway` | | 资源类型 | 首字母大写空格分隔 | `Security Group`, `NAT Gateway` | | 扫描方法 | `scan_` 前缀 | `scan_vpcs`, `scan_ec2_instances` | | 类名 | 大驼峰 + Scanner 后缀 | `VPCServiceScanner` | ### 属性字段规范 ```python attributes={ # 使用人类可读的键名 'Instance ID': instance_id, # ✓ 正确 'instance_id': instance_id, # ✗ 避免 # 列表转为逗号分隔字符串 'Security Groups': ', '.join(sg_ids) if sg_ids else 'N/A', # 空值处理 'Public IP': public_ip or 'N/A', # 数值转字符串 'Port': str(port), 'Memory (MB)': str(memory_size), } ``` ### 全局服务处理 全局服务(如 CloudFront、Route53)需要特殊处理: ```python @staticmethod @retry_with_backoff() def scan_global_resource(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: # 全局服务始终使用 us-east-1 client = session.client('cloudfront', region_name='us-east-1') # region 参数设为 'global' resources.append(ResourceData( account_id=account_id, region='global', # 重要:全局服务使用 'global' service='cloudfront', # ... )) ``` ### 分页处理 始终使用分页器处理可能返回大量数据的 API: ```python # ✓ 正确:使用分页器 paginator = client.get_paginator('describe_instances') for page in paginator.paginate(): for reservation in page.get('Reservations', []): # 处理数据 # ✗ 避免:直接调用可能截断数据 response = client.describe_instances() ``` ### 从 Tags 获取名称 使用统一的辅助方法: ```python @staticmethod def _get_name_from_tags(tags: List[Dict[str, str]], default: str = '') -> str: """Extract Name tag value from tags list""" if not tags: return default for tag in tags: if tag.get('Key') == 'Name': return tag.get('Value', default) return default # 使用示例 name = self._get_name_from_tags(resource.get('Tags', []), resource['ResourceId']) ``` --- ## 错误处理 ### 重试机制 所有扫描方法必须使用 `@retry_with_backoff()` 装饰器: ```python from app.scanners.utils import retry_with_backoff @staticmethod @retry_with_backoff() # 默认重试 3 次,指数退避 def scan_resources(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: # ... ``` 重试配置(在 `utils.py` 中定义): - 最大重试次数:3 - 基础延迟:1 秒 - 最大延迟:30 秒 - 退避系数:2.0 ### 异常处理 ```python try: # API 调用 response = client.describe_resources() except ClientError as e: error_code = e.response.get('Error', {}).get('Code', '') # 记录警告但不中断扫描 logger.warning(f"Failed to scan resources in {region}: {error_code}") except Exception as e: logger.warning(f"Unexpected error scanning resources: {str(e)}") # 始终返回已收集的资源,即使部分失败 return resources ``` ### 不可重试的错误 以下错误不会触发重试: - `AccessDenied` - 权限不足 - `UnauthorizedAccess` - 未授权 - `InvalidParameterValue` - 参数错误 - `ValidationError` - 验证错误 --- ## 测试要求 ### 单元测试 为每个新扫描器编写单元测试: ```python # tests/test_scanners/test_your_service.py import pytest from unittest.mock import Mock, patch from app.scanners.services.your_module import YourServiceScanner class TestYourServiceScanner: @patch('boto3.Session') def test_scan_your_resource_success(self, mock_session): # 模拟 API 响应 mock_client = Mock() mock_session.client.return_value = mock_client mock_client.get_paginator.return_value.paginate.return_value = [ {'Resources': [{'Id': 'res-123', 'Name': 'test'}]} ] # 执行扫描 resources = YourServiceScanner.scan_your_resource( mock_session, 'account-123', 'us-east-1' ) # 验证结果 assert len(resources) == 1 assert resources[0].resource_id == 'res-123' @patch('boto3.Session') def test_scan_your_resource_handles_error(self, mock_session): mock_client = Mock() mock_session.client.return_value = mock_client mock_client.get_paginator.side_effect = Exception("API Error") # 应该返回空列表而不是抛出异常 resources = YourServiceScanner.scan_your_resource( mock_session, 'account-123', 'us-east-1' ) assert resources == [] ``` ### 集成测试 使用真实 AWS 凭证进行集成测试(在 CI/CD 中可选): ```python @pytest.mark.integration def test_scan_with_real_credentials(): # 需要配置真实的 AWS 凭证 pass ``` --- ## 常见问题 ### Q: 如何处理需要额外 API 调用获取详情的资源? ```python # 先列出资源,再获取详情 for item in list_response.get('Items', []): try: detail = client.describe_resource(ResourceId=item['Id']) # 使用详情数据 except Exception as e: logger.debug(f"Failed to get details for {item['Id']}: {str(e)}") # 使用基本数据继续 ``` ### Q: 如何处理区域特定的服务? 某些服务(如 ACM)在特定区域有特殊用途: ```python # ACM 证书:扫描选定区域 + us-east-1(用于 CloudFront) acm_regions = list(regions) if 'us-east-1' not in acm_regions: acm_regions.append('us-east-1') ``` ### Q: 如何跳过已删除/无效的资源? ```python for resource in response.get('Resources', []): # 跳过已删除的资源 if resource.get('State') in ['deleted', 'deleting', 'failed']: continue # 处理有效资源 ``` ### Q: 如何处理嵌套资源? 某些资源需要展开为多条记录(如安全组规则): ```python for sg in security_groups: for rule in sg.get('IpPermissions', []): # 每条规则创建一个 ResourceData resources.append(ResourceData(...)) # 如果没有规则,仍然记录安全组本身 if not sg.get('IpPermissions'): resources.append(ResourceData(...)) ``` --- ## 检查清单 添加新服务扫描器前,请确认: - [ ] 选择了正确的服务文件 - [ ] 方法使用了 `@staticmethod` 和 `@retry_with_backoff()` 装饰器 - [ ] 方法签名符合规范:`(session, account_id, region) -> List[ResourceData]` - [ ] 使用分页器处理列表 API - [ ] 正确处理全局服务(region='global',使用 us-east-1) - [ ] 属性键名使用人类可读格式 - [ ] 空值使用 'N/A' 或空字符串 - [ ] 异常被捕获并记录,不会中断扫描 - [ ] 在 `aws_scanner.py` 中注册了新服务 - [ ] 编写了单元测试 --- ## 参考资料 - [boto3 文档](https://boto3.amazonaws.com/v1/documentation/api/latest/index.html) - [AWS API 参考](https://docs.aws.amazon.com/index.html) - 项目需求文档:`.kiro/specs/aws-resource-scanner/requirements.md`