DEVELOPMENT_GUIDE.md 12 KB

AWS 服务扫描器开发指南

本文档为开发人员提供在维护和扩展 AWS 服务扫描器时应遵循的规范和最佳实践。

目录

  1. 架构概述
  2. 添加新服务扫描器
  3. 代码规范
  4. 错误处理
  5. 测试要求
  6. 常见问题

架构概述

目录结构

backend/app/scanners/
├── base.py                    # 抽象基类和数据结构定义
├── utils.py                   # 通用工具函数(重试逻辑等)
├── credentials.py             # AWS 凭证管理
├── aws_scanner.py             # AWS 扫描器主实现
└── services/                  # 各服务扫描器实现
    ├── __init__.py
    ├── vpc.py                 # VPC 相关服务
    ├── ec2.py                 # EC2 相关服务
    ├── elb.py                 # 负载均衡相关服务
    ├── database.py            # 数据库服务(RDS、ElastiCache)
    ├── compute.py             # 计算和存储服务(EKS、Lambda、S3)
    ├── global_services.py     # 全局服务(CloudFront、Route53、ACM、WAF)
    └── monitoring.py          # 监控服务(CloudWatch、SNS、EventBridge)

核心数据结构

@dataclass
class ResourceData:
    account_id: str           # AWS 账户 ID
    region: str               # 区域(全局服务使用 'global')
    service: str              # 服务名称(如 'ec2', 'vpc')
    resource_type: str        # 资源类型(如 'Instance', 'VPC')
    resource_id: str          # 资源唯一标识符(ARN 或 ID)
    name: str                 # 资源名称
    attributes: Dict[str, Any]  # 服务特定属性

添加新服务扫描器

步骤 1:确定服务分类

根据服务类型选择合适的文件:

服务类型 文件 示例
VPC 网络相关 vpc.py VPC, Subnet, Security Group
EC2 计算相关 ec2.py EC2 Instance, Elastic IP
负载均衡相关 elb.py ALB, NLB, Target Group
数据库相关 database.py RDS, ElastiCache
计算和存储 compute.py EKS, Lambda, S3
全局服务 global_services.py CloudFront, Route53, WAF
监控管理 monitoring.py CloudWatch, SNS, EventBridge

如果新服务不属于现有分类,可以创建新的服务文件。

步骤 2:实现扫描方法

在对应的服务文件中添加静态方法:

from app.scanners.base import ResourceData
from app.scanners.utils import retry_with_backoff

class YourServiceScanner:
    """Scanner for your AWS resources"""
    
    @staticmethod
    @retry_with_backoff()  # 必须添加重试装饰器
    def scan_your_resource(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
        """
        扫描 Your Resource 资源。
        
        Attributes (horizontal/vertical layout):
            列出所有返回的属性字段
        """
        resources = []
        client = session.client('your-service')
        
        try:
            # 使用分页器处理大量数据
            paginator = client.get_paginator('list_resources')
            for page in paginator.paginate():
                for item in page.get('Resources', []):
                    resources.append(ResourceData(
                        account_id=account_id,
                        region=region,
                        service='your_service',
                        resource_type='Your Resource',
                        resource_id=item.get('Arn', item.get('Id')),
                        name=item.get('Name', ''),
                        attributes={
                            'Attribute1': item.get('Attr1', ''),
                            'Attribute2': item.get('Attr2', ''),
                        }
                    ))
        except Exception as e:
            logger.warning(f"Failed to scan your resources: {str(e)}")
        
        return resources

步骤 3:注册到主扫描器

aws_scanner.py 中完成以下修改:

3.1 添加到支持的服务列表

class AWSScanner(CloudProviderScanner):
    SUPPORTED_SERVICES = [
        # ... 现有服务
        'your_service',  # 添加新服务
    ]
    
    # 如果是全局服务,还需添加到这里
    GLOBAL_SERVICES = ['cloudfront', 'route53', 'waf', 's3', 'your_global_service']

3.2 添加扫描方法映射

def _get_scanner_method(self, service: str) -> Optional[Callable]:
    scanner_methods = {
        # ... 现有映射
        'your_service': self._scan_your_service,
    }
    return scanner_methods.get(service)

3.3 实现包装方法

def _scan_your_service(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
    """Scan Your Service"""
    from app.scanners.services.your_module import YourServiceScanner
    return YourServiceScanner.scan_your_resource(session, account_id, region)

代码规范

命名规范

类型 规范 示例
服务名称 小写下划线 security_group, nat_gateway
资源类型 首字母大写空格分隔 Security Group, NAT Gateway
扫描方法 scan_ 前缀 scan_vpcs, scan_ec2_instances
类名 大驼峰 + Scanner 后缀 VPCServiceScanner

属性字段规范

attributes={
    # 使用人类可读的键名
    'Instance ID': instance_id,        # ✓ 正确
    'instance_id': instance_id,        # ✗ 避免
    
    # 列表转为逗号分隔字符串
    'Security Groups': ', '.join(sg_ids) if sg_ids else 'N/A',
    
    # 空值处理
    'Public IP': public_ip or 'N/A',
    
    # 数值转字符串
    'Port': str(port),
    'Memory (MB)': str(memory_size),
}

全局服务处理

全局服务(如 CloudFront、Route53)需要特殊处理:

@staticmethod
@retry_with_backoff()
def scan_global_resource(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
    # 全局服务始终使用 us-east-1
    client = session.client('cloudfront', region_name='us-east-1')
    
    # region 参数设为 'global'
    resources.append(ResourceData(
        account_id=account_id,
        region='global',  # 重要:全局服务使用 'global'
        service='cloudfront',
        # ...
    ))

分页处理

始终使用分页器处理可能返回大量数据的 API:

# ✓ 正确:使用分页器
paginator = client.get_paginator('describe_instances')
for page in paginator.paginate():
    for reservation in page.get('Reservations', []):
        # 处理数据

# ✗ 避免:直接调用可能截断数据
response = client.describe_instances()

从 Tags 获取名称

使用统一的辅助方法:

@staticmethod
def _get_name_from_tags(tags: List[Dict[str, str]], default: str = '') -> str:
    """Extract Name tag value from tags list"""
    if not tags:
        return default
    for tag in tags:
        if tag.get('Key') == 'Name':
            return tag.get('Value', default)
    return default

# 使用示例
name = self._get_name_from_tags(resource.get('Tags', []), resource['ResourceId'])

错误处理

重试机制

所有扫描方法必须使用 @retry_with_backoff() 装饰器:

from app.scanners.utils import retry_with_backoff

@staticmethod
@retry_with_backoff()  # 默认重试 3 次,指数退避
def scan_resources(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
    # ...

重试配置(在 utils.py 中定义):

  • 最大重试次数:3
  • 基础延迟:1 秒
  • 最大延迟:30 秒
  • 退避系数:2.0

异常处理

try:
    # API 调用
    response = client.describe_resources()
except ClientError as e:
    error_code = e.response.get('Error', {}).get('Code', '')
    # 记录警告但不中断扫描
    logger.warning(f"Failed to scan resources in {region}: {error_code}")
except Exception as e:
    logger.warning(f"Unexpected error scanning resources: {str(e)}")

# 始终返回已收集的资源,即使部分失败
return resources

不可重试的错误

以下错误不会触发重试:

  • AccessDenied - 权限不足
  • UnauthorizedAccess - 未授权
  • InvalidParameterValue - 参数错误
  • ValidationError - 验证错误

测试要求

单元测试

为每个新扫描器编写单元测试:

# tests/test_scanners/test_your_service.py
import pytest
from unittest.mock import Mock, patch
from app.scanners.services.your_module import YourServiceScanner

class TestYourServiceScanner:
    
    @patch('boto3.Session')
    def test_scan_your_resource_success(self, mock_session):
        # 模拟 API 响应
        mock_client = Mock()
        mock_session.client.return_value = mock_client
        mock_client.get_paginator.return_value.paginate.return_value = [
            {'Resources': [{'Id': 'res-123', 'Name': 'test'}]}
        ]
        
        # 执行扫描
        resources = YourServiceScanner.scan_your_resource(
            mock_session, 'account-123', 'us-east-1'
        )
        
        # 验证结果
        assert len(resources) == 1
        assert resources[0].resource_id == 'res-123'
    
    @patch('boto3.Session')
    def test_scan_your_resource_handles_error(self, mock_session):
        mock_client = Mock()
        mock_session.client.return_value = mock_client
        mock_client.get_paginator.side_effect = Exception("API Error")
        
        # 应该返回空列表而不是抛出异常
        resources = YourServiceScanner.scan_your_resource(
            mock_session, 'account-123', 'us-east-1'
        )
        
        assert resources == []

集成测试

使用真实 AWS 凭证进行集成测试(在 CI/CD 中可选):

@pytest.mark.integration
def test_scan_with_real_credentials():
    # 需要配置真实的 AWS 凭证
    pass

常见问题

Q: 如何处理需要额外 API 调用获取详情的资源?

# 先列出资源,再获取详情
for item in list_response.get('Items', []):
    try:
        detail = client.describe_resource(ResourceId=item['Id'])
        # 使用详情数据
    except Exception as e:
        logger.debug(f"Failed to get details for {item['Id']}: {str(e)}")
        # 使用基本数据继续

Q: 如何处理区域特定的服务?

某些服务(如 ACM)在特定区域有特殊用途:

# ACM 证书:扫描选定区域 + us-east-1(用于 CloudFront)
acm_regions = list(regions)
if 'us-east-1' not in acm_regions:
    acm_regions.append('us-east-1')

Q: 如何跳过已删除/无效的资源?

for resource in response.get('Resources', []):
    # 跳过已删除的资源
    if resource.get('State') in ['deleted', 'deleting', 'failed']:
        continue
    # 处理有效资源

Q: 如何处理嵌套资源?

某些资源需要展开为多条记录(如安全组规则):

for sg in security_groups:
    for rule in sg.get('IpPermissions', []):
        # 每条规则创建一个 ResourceData
        resources.append(ResourceData(...))
    
    # 如果没有规则,仍然记录安全组本身
    if not sg.get('IpPermissions'):
        resources.append(ResourceData(...))

检查清单

添加新服务扫描器前,请确认:

  • 选择了正确的服务文件
  • 方法使用了 @staticmethod@retry_with_backoff() 装饰器
  • 方法签名符合规范:(session, account_id, region) -> List[ResourceData]
  • 使用分页器处理列表 API
  • 正确处理全局服务(region='global',使用 us-east-1)
  • 属性键名使用人类可读格式
  • 空值使用 'N/A' 或空字符串
  • 异常被捕获并记录,不会中断扫描
  • aws_scanner.py 中注册了新服务
  • 编写了单元测试

参考资料