""" Global Service Scanners Scans CloudFront Distributions, Route 53 Hosted Zones, ACM Certificates, and WAF Web ACLs. These are global services that are not region-specific. Requirements: - 5.1: Scan global AWS services using boto3 - 5.2: Scan global resources regardless of selected regions """ import boto3 from typing import List, Dict, Any import logging from app.scanners.base import ResourceData from app.scanners.utils import retry_with_backoff logger = logging.getLogger(__name__) class GlobalServiceScanner: """Scanner for global AWS resources""" @staticmethod @retry_with_backoff() def scan_cloudfront_distributions(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ Scan CloudFront Distributions (global service). Attributes (vertical layout - one table per distribution): CloudFront ID, Domain Name, CNAME, Origin Domain Name, Origin Protocol Policy, Viewer Protocol Policy, Allowed HTTP Methods, Cached HTTP Methods """ resources = [] # CloudFront is a global service, always use us-east-1 cf_client = session.client('cloudfront', region_name='us-east-1') try: paginator = cf_client.get_paginator('list_distributions') for page in paginator.paginate(): distribution_list = page.get('DistributionList', {}) for dist in distribution_list.get('Items', []): dist_id = dist.get('Id', '') # Get aliases (CNAMEs) aliases = dist.get('Aliases', {}).get('Items', []) # Get origin info origins = dist.get('Origins', {}).get('Items', []) origin_domain = '' origin_protocol = '' if origins: origin = origins[0] origin_domain = origin.get('DomainName', '') custom_origin = origin.get('CustomOriginConfig', {}) if custom_origin: origin_protocol = custom_origin.get('OriginProtocolPolicy', '') else: origin_protocol = 'S3' # Get default cache behavior default_behavior = dist.get('DefaultCacheBehavior', {}) viewer_protocol = default_behavior.get('ViewerProtocolPolicy', '') allowed_methods = default_behavior.get('AllowedMethods', {}).get('Items', []) cached_methods = default_behavior.get('AllowedMethods', {}).get('CachedMethods', {}).get('Items', []) resources.append(ResourceData( account_id=account_id, region='global', service='cloudfront', resource_type='Distribution', resource_id=dist.get('ARN', dist_id), name=dist_id, attributes={ 'CloudFront ID': dist_id, 'Domain Name': dist.get('DomainName', ''), 'CNAME': ', '.join(aliases) if aliases else 'N/A', 'Origin Domain Name': origin_domain, 'Origin Protocol Policy': origin_protocol, 'Viewer Protocol Policy': viewer_protocol, 'Allowed HTTP Methods': ', '.join(allowed_methods), 'Cached HTTP Methods': ', '.join(cached_methods) } )) except Exception as e: logger.warning(f"Failed to scan CloudFront distributions: {str(e)}") return resources @staticmethod @retry_with_backoff() def scan_route53_hosted_zones(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ Scan Route 53 Hosted Zones (global service). Attributes (horizontal layout): Zone ID, Name, Type, Record Count """ resources = [] # Route 53 is a global service route53_client = session.client('route53', region_name='us-east-1') try: paginator = route53_client.get_paginator('list_hosted_zones') for page in paginator.paginate(): for zone in page.get('HostedZones', []): zone_id = zone.get('Id', '').replace('/hostedzone/', '') zone_name = zone.get('Name', '') # Determine zone type zone_type = 'Private' if zone.get('Config', {}).get('PrivateZone') else 'Public' resources.append(ResourceData( account_id=account_id, region='global', service='route53', resource_type='Hosted Zone', resource_id=zone_id, name=zone_name, attributes={ 'Zone ID': zone_id, 'Name': zone_name, 'Type': zone_type, 'Record Count': str(zone.get('ResourceRecordSetCount', 0)) } )) except Exception as e: logger.warning(f"Failed to scan Route 53 hosted zones: {str(e)}") return resources @staticmethod @retry_with_backoff() def scan_acm_certificates(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ Scan ACM Certificates (regional service). Attributes (horizontal layout): Domain name, Additional names """ resources = [] # ACM is a regional service acm_client = session.client('acm', region_name=region) try: paginator = acm_client.get_paginator('list_certificates') for page in paginator.paginate(): for cert in page.get('CertificateSummaryList', []): domain_name = cert.get('DomainName', '') cert_arn = cert.get('CertificateArn', '') # Get additional names (Subject Alternative Names) additional_names = '' try: cert_detail = acm_client.describe_certificate(CertificateArn=cert_arn) sans = cert_detail.get('Certificate', {}).get('SubjectAlternativeNames', []) # Filter out the main domain name from SANs additional = [san for san in sans if san != domain_name] additional_names = ', '.join(additional) if additional else '' except Exception: pass resources.append(ResourceData( account_id=account_id, region=region, service='acm', resource_type='Certificate', resource_id=cert_arn, name=domain_name, attributes={ 'Domain name': domain_name, 'Additional names': additional_names } )) except Exception as e: logger.warning(f"Failed to scan ACM certificates in {region}: {str(e)}") return resources @staticmethod @retry_with_backoff() def scan_waf_web_acls(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ Scan WAF Web ACLs (global service for CloudFront). Attributes (horizontal layout): WebACL Name, Scope, Rules Count, Associated Resources """ resources = [] # Scan WAFv2 global (CloudFront) Web ACLs wafv2_client = session.client('wafv2', region_name='us-east-1') try: # List CloudFront Web ACLs (CLOUDFRONT scope) response = wafv2_client.list_web_acls(Scope='CLOUDFRONT') for acl in response.get('WebACLs', []): acl_name = acl.get('Name', '') acl_id = acl.get('Id', '') acl_arn = acl.get('ARN', '') # Get Web ACL details for rules count rules_count = 0 associated_resources = [] try: acl_response = wafv2_client.get_web_acl( Name=acl_name, Scope='CLOUDFRONT', Id=acl_id ) web_acl = acl_response.get('WebACL', {}) rules_count = len(web_acl.get('Rules', [])) # Get associated resources resources_response = wafv2_client.list_resources_for_web_acl( WebACLArn=acl_arn ) for resource_arn in resources_response.get('ResourceArns', []): # Extract resource name from ARN resource_name = resource_arn.split('/')[-1] associated_resources.append(resource_name) except Exception as e: logger.debug(f"Failed to get WAF ACL details: {str(e)}") resources.append(ResourceData( account_id=account_id, region='global', service='waf', resource_type='Web ACL', resource_id=acl_arn, name=acl_name, attributes={ 'WebACL Name': acl_name, 'Scope': 'CLOUDFRONT', 'Rules Count': str(rules_count), 'Associated Resources': ', '.join(associated_resources) if associated_resources else 'None' } )) except Exception as e: logger.warning(f"Failed to scan WAFv2 Web ACLs: {str(e)}") # Also scan regional WAF Web ACLs try: response = wafv2_client.list_web_acls(Scope='REGIONAL') for acl in response.get('WebACLs', []): acl_name = acl.get('Name', '') acl_id = acl.get('Id', '') acl_arn = acl.get('ARN', '') rules_count = 0 associated_resources = [] try: acl_response = wafv2_client.get_web_acl( Name=acl_name, Scope='REGIONAL', Id=acl_id ) web_acl = acl_response.get('WebACL', {}) rules_count = len(web_acl.get('Rules', [])) resources_response = wafv2_client.list_resources_for_web_acl( WebACLArn=acl_arn ) for resource_arn in resources_response.get('ResourceArns', []): resource_name = resource_arn.split('/')[-1] associated_resources.append(resource_name) except Exception as e: logger.debug(f"Failed to get WAF ACL details: {str(e)}") resources.append(ResourceData( account_id=account_id, region='global', service='waf', resource_type='Web ACL', resource_id=acl_arn, name=acl_name, attributes={ 'WebACL Name': acl_name, 'Scope': 'REGIONAL', 'Rules Count': str(rules_count), 'Associated Resources': ', '.join(associated_resources) if associated_resources else 'None' } )) except Exception as e: logger.warning(f"Failed to scan regional WAFv2 Web ACLs: {str(e)}") return resources