| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569 |
- """
- AWS Resource Scanner Implementation
- This module implements the CloudProviderScanner interface for AWS,
- providing comprehensive resource scanning capabilities across all supported services.
- Requirements:
- - 4.3: Process multiple accounts/regions in parallel
- - 4.4: Scan all supported AWS services concurrently
- - 5.5: Retry with exponential backoff up to 3 times
- - 8.2: Record error details in task record
- """
- import boto3
- from botocore.exceptions import ClientError, BotoCoreError
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from typing import List, Dict, Any, Optional, Callable
- import logging
- import traceback
- from app.scanners.base import CloudProviderScanner, ResourceData, ScanResult
- from app.scanners.credentials import AWSCredentialProvider, CredentialError
- from app.scanners.utils import retry_with_backoff
- logger = logging.getLogger(__name__)
- class AWSScanner(CloudProviderScanner):
- """
- AWS Resource Scanner implementation.
-
- Scans AWS resources across multiple regions and services using boto3,
- with support for parallel scanning and automatic retry with exponential backoff.
- """
-
- # All supported AWS services
- SUPPORTED_SERVICES = [
- 'vpc', 'subnet', 'route_table', 'internet_gateway', 'nat_gateway',
- 'security_group', 'vpc_endpoint', 'vpc_peering',
- 'customer_gateway', 'virtual_private_gateway', 'vpn_connection',
- 'ec2', 'elastic_ip',
- 'autoscaling', 'elb', 'target_group',
- 'rds', 'elasticache',
- 'eks', 'lambda', 's3', 's3_event_notification',
- 'cloudfront', 'route53', 'acm', 'waf',
- 'sns', 'cloudwatch', 'eventbridge', 'cloudtrail', 'config'
- ]
-
- # Global services (not region-specific)
- GLOBAL_SERVICES = ['cloudfront', 'route53', 'waf', 's3', 's3_event_notification', 'cloudtrail']
-
- # Maximum workers for parallel scanning
- MAX_WORKERS = 10
-
- def __init__(self, credential_provider: AWSCredentialProvider):
- """
- Initialize the AWS Scanner.
-
- Args:
- credential_provider: AWSCredentialProvider instance for authentication
- """
- self.credential_provider = credential_provider
- self._account_id: Optional[str] = None
-
- @property
- def provider_name(self) -> str:
- return 'AWS'
-
- @property
- def supported_services(self) -> List[str]:
- return self.SUPPORTED_SERVICES.copy()
-
- @property
- def global_services(self) -> List[str]:
- return self.GLOBAL_SERVICES.copy()
-
- def get_credentials(self, credential_config: Dict[str, Any]) -> boto3.Session:
- """Get boto3 Session from credential configuration"""
- return self.credential_provider.get_session()
-
- def list_regions(self) -> List[str]:
- """List all available AWS regions"""
- try:
- session = self.credential_provider.get_session(region_name='us-east-1')
- ec2_client = session.client('ec2')
- response = ec2_client.describe_regions()
- return [region['RegionName'] for region in response['Regions']]
- except Exception as e:
- logger.error(f"Failed to list regions: {str(e)}")
- # Return default regions if API call fails
- return [
- 'us-east-1', 'us-east-2', 'us-west-1', 'us-west-2',
- 'eu-west-1', 'eu-west-2', 'eu-west-3', 'eu-central-1',
- 'ap-northeast-1', 'ap-northeast-2', 'ap-southeast-1', 'ap-southeast-2',
- 'ap-south-1', 'sa-east-1', 'ca-central-1'
- ]
-
- def validate_credentials(self) -> bool:
- """Validate AWS credentials"""
- return self.credential_provider.validate()
-
- def get_account_id(self) -> str:
- """Get AWS account ID"""
- if self._account_id:
- return self._account_id
- self._account_id = self.credential_provider.get_account_id()
- return self._account_id
-
- def scan_resources(
- self,
- regions: List[str],
- services: Optional[List[str]] = None,
- progress_callback: Optional[Callable[[int, int, str], None]] = None
- ) -> ScanResult:
- """
- Scan AWS resources across specified regions and services.
-
- Args:
- regions: List of regions to scan
- services: Optional list of services to scan (None = all)
- progress_callback: Optional callback for progress updates
-
- Returns:
- ScanResult with all discovered resources
- """
- result = ScanResult(success=True)
- account_id = self.get_account_id()
-
- # Determine services to scan
- services_to_scan = services if services else self.SUPPORTED_SERVICES
-
- # Separate global and regional services
- global_services = [s for s in services_to_scan if s in self.GLOBAL_SERVICES]
- regional_services = [s for s in services_to_scan if s not in self.GLOBAL_SERVICES]
-
- # Calculate total tasks for progress tracking
- total_tasks = len(global_services) + (len(regional_services) * len(regions))
- completed_tasks = 0
-
- # Scan global services first (only once, not per region)
- if global_services:
- logger.info(f"Scanning {len(global_services)} global services")
- global_results = self._scan_services_parallel(
- account_id=account_id,
- region='us-east-1', # Global services use us-east-1
- services=global_services,
- is_global=True
- )
-
- for service, resources in global_results['resources'].items():
- for resource in resources:
- result.add_resource(service, resource)
-
- for error in global_results['errors']:
- result.add_error(**error)
-
- completed_tasks += len(global_services)
- if progress_callback:
- progress_callback(completed_tasks, total_tasks, "Completed global services scan")
-
- # Scan regional services in parallel across regions
- if regional_services and regions:
- logger.info(f"Scanning {len(regional_services)} services across {len(regions)} regions")
-
- with ThreadPoolExecutor(max_workers=self.MAX_WORKERS) as executor:
- futures = {}
-
- for region in regions:
- future = executor.submit(
- self._scan_services_parallel,
- account_id=account_id,
- region=region,
- services=regional_services,
- is_global=False
- )
- futures[future] = region
-
- for future in as_completed(futures):
- region = futures[future]
- try:
- region_results = future.result()
-
- for service, resources in region_results['resources'].items():
- for resource in resources:
- result.add_resource(service, resource)
-
- for error in region_results['errors']:
- result.add_error(**error)
-
- completed_tasks += len(regional_services)
- if progress_callback:
- progress_callback(
- completed_tasks, total_tasks,
- f"Completed scan for region: {region}"
- )
-
- except Exception as e:
- logger.error(f"Failed to scan region {region}: {str(e)}")
- result.add_error(
- service='all',
- region=region,
- error=f"Region scan failed: {str(e)}",
- details=None
- )
- completed_tasks += len(regional_services)
-
- # Set metadata
- result.metadata = {
- 'account_id': account_id,
- 'regions_scanned': regions,
- 'services_scanned': services_to_scan,
- 'total_resources': sum(len(r) for r in result.resources.values()),
- 'total_errors': len(result.errors)
- }
-
- # Mark as failed if there were critical errors
- if result.errors and not result.resources:
- result.success = False
-
- return result
-
- def _scan_services_parallel(
- self,
- account_id: str,
- region: str,
- services: List[str],
- is_global: bool = False
- ) -> Dict[str, Any]:
- """
- Scan multiple services in parallel within a single region.
-
- Args:
- account_id: AWS account ID
- region: Region to scan
- services: List of services to scan
- is_global: Whether these are global services
-
- Returns:
- Dictionary with 'resources' and 'errors' keys
- """
- resources: Dict[str, List[ResourceData]] = {}
- errors: List[Dict[str, Any]] = []
-
- # Get session for this region
- try:
- session = self.credential_provider.get_session(region_name=region)
- except CredentialError as e:
- logger.error(f"Failed to get session for region {region}: {str(e)}")
- return {
- 'resources': {},
- 'errors': [{
- 'service': 'all',
- 'region': region,
- 'error': str(e),
- 'details': None
- }]
- }
-
- # Scan services in parallel
- with ThreadPoolExecutor(max_workers=self.MAX_WORKERS) as executor:
- futures = {}
-
- for service in services:
- scanner_method = self._get_scanner_method(service)
- if scanner_method:
- future = executor.submit(
- self._scan_service_safe,
- scanner_method=scanner_method,
- session=session,
- account_id=account_id,
- region='global' if is_global else region,
- service=service
- )
- futures[future] = service
-
- for future in as_completed(futures):
- service = futures[future]
- try:
- service_result = future.result()
- if service_result['resources']:
- resources[service] = service_result['resources']
- if service_result['error']:
- errors.append(service_result['error'])
- except Exception as e:
- logger.error(f"Unexpected error scanning {service}: {str(e)}")
- errors.append({
- 'service': service,
- 'region': region,
- 'error': str(e),
- 'details': None
- })
-
- return {'resources': resources, 'errors': errors}
-
- def _scan_service_safe(
- self,
- scanner_method: Callable,
- session: boto3.Session,
- account_id: str,
- region: str,
- service: str
- ) -> Dict[str, Any]:
- """
- Safely scan a single service, catching and logging errors.
-
- Requirements:
- - 8.2: Record error details in task record (via error dict)
-
- Returns:
- Dictionary with 'resources' and 'error' keys
- """
- try:
- resources = scanner_method(session, account_id, region)
- return {'resources': resources, 'error': None}
- except ClientError as e:
- error_code = e.response.get('Error', {}).get('Code', 'Unknown')
- error_message = e.response.get('Error', {}).get('Message', str(e))
- stack_trace = traceback.format_exc()
- logger.warning(f"Error scanning {service} in {region}: {error_code} - {error_message}")
- return {
- 'resources': [],
- 'error': {
- 'service': service,
- 'region': region,
- 'error': f"{error_code}: {error_message}",
- 'error_type': 'ClientError',
- 'details': {
- 'error_code': error_code,
- 'stack_trace': stack_trace
- }
- }
- }
- except Exception as e:
- stack_trace = traceback.format_exc()
- logger.error(f"Unexpected error scanning {service} in {region}: {str(e)}")
- return {
- 'resources': [],
- 'error': {
- 'service': service,
- 'region': region,
- 'error': str(e),
- 'error_type': type(e).__name__,
- 'details': {
- 'stack_trace': stack_trace
- }
- }
- }
-
- def _get_scanner_method(self, service: str) -> Optional[Callable]:
- """Get the scanner method for a specific service"""
- scanner_methods = {
- 'vpc': self._scan_vpcs,
- 'subnet': self._scan_subnets,
- 'route_table': self._scan_route_tables,
- 'internet_gateway': self._scan_internet_gateways,
- 'nat_gateway': self._scan_nat_gateways,
- 'security_group': self._scan_security_groups,
- 'vpc_endpoint': self._scan_vpc_endpoints,
- 'vpc_peering': self._scan_vpc_peering,
- 'customer_gateway': self._scan_customer_gateways,
- 'virtual_private_gateway': self._scan_virtual_private_gateways,
- 'vpn_connection': self._scan_vpn_connections,
- 'ec2': self._scan_ec2_instances,
- 'elastic_ip': self._scan_elastic_ips,
- 'autoscaling': self._scan_autoscaling_groups,
- 'elb': self._scan_load_balancers,
- 'target_group': self._scan_target_groups,
- 'rds': self._scan_rds_instances,
- 'elasticache': self._scan_elasticache_clusters,
- 'eks': self._scan_eks_clusters,
- 'lambda': self._scan_lambda_functions,
- 's3': self._scan_s3_buckets,
- 's3_event_notification': self._scan_s3_event_notifications,
- 'cloudfront': self._scan_cloudfront_distributions,
- 'route53': self._scan_route53_hosted_zones,
- 'acm': self._scan_acm_certificates,
- 'waf': self._scan_waf_web_acls,
- 'sns': self._scan_sns_topics,
- 'cloudwatch': self._scan_cloudwatch_log_groups,
- 'eventbridge': self._scan_eventbridge_rules,
- 'cloudtrail': self._scan_cloudtrail_trails,
- 'config': self._scan_config_recorders,
- }
- return scanner_methods.get(service)
-
- # Helper method to get resource name from tags
- def _get_name_from_tags(self, tags: List[Dict[str, str]], default: str = '') -> str:
- """Extract Name tag value from tags list"""
- if not tags:
- return default
- for tag in tags:
- if tag.get('Key') == 'Name':
- return tag.get('Value', default)
- return default
-
- # ==================== VPC Related Scanners ====================
-
- def _scan_vpcs(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """Scan VPCs"""
- from app.scanners.services.vpc import VPCServiceScanner
- return VPCServiceScanner.scan_vpcs(session, account_id, region)
-
- def _scan_subnets(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """Scan Subnets"""
- from app.scanners.services.vpc import VPCServiceScanner
- return VPCServiceScanner.scan_subnets(session, account_id, region)
-
- def _scan_route_tables(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """Scan Route Tables"""
- from app.scanners.services.vpc import VPCServiceScanner
- return VPCServiceScanner.scan_route_tables(session, account_id, region)
-
- def _scan_internet_gateways(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """Scan Internet Gateways"""
- from app.scanners.services.vpc import VPCServiceScanner
- return VPCServiceScanner.scan_internet_gateways(session, account_id, region)
-
- def _scan_nat_gateways(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """Scan NAT Gateways"""
- from app.scanners.services.vpc import VPCServiceScanner
- return VPCServiceScanner.scan_nat_gateways(session, account_id, region)
-
- def _scan_security_groups(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """Scan Security Groups"""
- from app.scanners.services.vpc import VPCServiceScanner
- return VPCServiceScanner.scan_security_groups(session, account_id, region)
-
- def _scan_vpc_endpoints(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """Scan VPC Endpoints"""
- from app.scanners.services.vpc import VPCServiceScanner
- return VPCServiceScanner.scan_vpc_endpoints(session, account_id, region)
-
- def _scan_vpc_peering(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """Scan VPC Peering Connections"""
- from app.scanners.services.vpc import VPCServiceScanner
- return VPCServiceScanner.scan_vpc_peering(session, account_id, region)
-
- def _scan_customer_gateways(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """Scan Customer Gateways"""
- from app.scanners.services.vpc import VPCServiceScanner
- return VPCServiceScanner.scan_customer_gateways(session, account_id, region)
-
- def _scan_virtual_private_gateways(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """Scan Virtual Private Gateways"""
- from app.scanners.services.vpc import VPCServiceScanner
- return VPCServiceScanner.scan_virtual_private_gateways(session, account_id, region)
-
- def _scan_vpn_connections(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """Scan VPN Connections"""
- from app.scanners.services.vpc import VPCServiceScanner
- return VPCServiceScanner.scan_vpn_connections(session, account_id, region)
-
- # ==================== EC2 Related Scanners ====================
-
- def _scan_ec2_instances(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """Scan EC2 Instances"""
- from app.scanners.services.ec2 import EC2ServiceScanner
- return EC2ServiceScanner.scan_ec2_instances(session, account_id, region)
-
- def _scan_elastic_ips(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """Scan Elastic IPs"""
- from app.scanners.services.ec2 import EC2ServiceScanner
- return EC2ServiceScanner.scan_elastic_ips(session, account_id, region)
-
- # ==================== Auto Scaling and ELB Scanners ====================
-
- def _scan_autoscaling_groups(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """Scan Auto Scaling Groups"""
- from app.scanners.services.elb import ELBServiceScanner
- return ELBServiceScanner.scan_autoscaling_groups(session, account_id, region)
-
- def _scan_load_balancers(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """Scan Load Balancers"""
- from app.scanners.services.elb import ELBServiceScanner
- return ELBServiceScanner.scan_load_balancers(session, account_id, region)
-
- def _scan_target_groups(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """Scan Target Groups"""
- from app.scanners.services.elb import ELBServiceScanner
- return ELBServiceScanner.scan_target_groups(session, account_id, region)
-
- # ==================== Database Service Scanners ====================
-
- def _scan_rds_instances(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """Scan RDS DB Instances"""
- from app.scanners.services.database import DatabaseServiceScanner
- return DatabaseServiceScanner.scan_rds_instances(session, account_id, region)
-
- def _scan_elasticache_clusters(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """Scan ElastiCache Clusters"""
- from app.scanners.services.database import DatabaseServiceScanner
- return DatabaseServiceScanner.scan_elasticache_clusters(session, account_id, region)
-
- # ==================== Compute and Storage Service Scanners ====================
-
- def _scan_eks_clusters(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """Scan EKS Clusters"""
- from app.scanners.services.compute import ComputeServiceScanner
- return ComputeServiceScanner.scan_eks_clusters(session, account_id, region)
-
- def _scan_lambda_functions(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """Scan Lambda Functions"""
- from app.scanners.services.compute import ComputeServiceScanner
- return ComputeServiceScanner.scan_lambda_functions(session, account_id, region)
-
- def _scan_s3_buckets(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """Scan S3 Buckets"""
- from app.scanners.services.compute import ComputeServiceScanner
- return ComputeServiceScanner.scan_s3_buckets(session, account_id, region)
-
- def _scan_s3_event_notifications(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """Scan S3 Event Notifications"""
- from app.scanners.services.compute import ComputeServiceScanner
- return ComputeServiceScanner.scan_s3_event_notifications(session, account_id, region)
-
- # ==================== Global Service Scanners ====================
-
- def _scan_cloudfront_distributions(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """Scan CloudFront Distributions"""
- from app.scanners.services.global_services import GlobalServiceScanner
- return GlobalServiceScanner.scan_cloudfront_distributions(session, account_id, region)
-
- def _scan_route53_hosted_zones(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """Scan Route 53 Hosted Zones"""
- from app.scanners.services.global_services import GlobalServiceScanner
- return GlobalServiceScanner.scan_route53_hosted_zones(session, account_id, region)
-
- def _scan_acm_certificates(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """Scan ACM Certificates"""
- from app.scanners.services.global_services import GlobalServiceScanner
- return GlobalServiceScanner.scan_acm_certificates(session, account_id, region)
-
- def _scan_waf_web_acls(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """Scan WAF Web ACLs"""
- from app.scanners.services.global_services import GlobalServiceScanner
- return GlobalServiceScanner.scan_waf_web_acls(session, account_id, region)
-
- # ==================== Monitoring and Management Service Scanners ====================
-
- def _scan_sns_topics(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """Scan SNS Topics"""
- from app.scanners.services.monitoring import MonitoringServiceScanner
- return MonitoringServiceScanner.scan_sns_topics(session, account_id, region)
-
- def _scan_cloudwatch_log_groups(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """Scan CloudWatch Log Groups"""
- from app.scanners.services.monitoring import MonitoringServiceScanner
- return MonitoringServiceScanner.scan_cloudwatch_log_groups(session, account_id, region)
-
- def _scan_eventbridge_rules(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """Scan EventBridge Rules"""
- from app.scanners.services.monitoring import MonitoringServiceScanner
- return MonitoringServiceScanner.scan_eventbridge_rules(session, account_id, region)
-
- def _scan_cloudtrail_trails(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """Scan CloudTrail Trails"""
- from app.scanners.services.monitoring import MonitoringServiceScanner
- return MonitoringServiceScanner.scan_cloudtrail_trails(session, account_id, region)
-
- def _scan_config_recorders(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """Scan AWS Config Recorders"""
- from app.scanners.services.monitoring import MonitoringServiceScanner
- return MonitoringServiceScanner.scan_config_recorders(session, account_id, region)
|