| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292 |
- """
- Global Service Scanners
- Scans CloudFront Distributions, Route 53 Hosted Zones, ACM Certificates, and WAF Web ACLs.
- These are global services that are not region-specific.
- Requirements:
- - 5.1: Scan global AWS services using boto3
- - 5.2: Scan global resources regardless of selected regions
- """
- import boto3
- from typing import List, Dict, Any
- import logging
- from app.scanners.base import ResourceData
- from app.scanners.utils import retry_with_backoff
- logger = logging.getLogger(__name__)
- class GlobalServiceScanner:
- """Scanner for global AWS resources"""
-
- @staticmethod
- @retry_with_backoff()
- def scan_cloudfront_distributions(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """
- Scan CloudFront Distributions (global service).
-
- Attributes (vertical layout - one table per distribution):
- CloudFront ID, Domain Name, CNAME, Origin Domain Name,
- Origin Protocol Policy, Viewer Protocol Policy,
- Allowed HTTP Methods, Cached HTTP Methods
- """
- resources = []
- # CloudFront is a global service, always use us-east-1
- cf_client = session.client('cloudfront', region_name='us-east-1')
-
- try:
- paginator = cf_client.get_paginator('list_distributions')
- for page in paginator.paginate():
- distribution_list = page.get('DistributionList', {})
- for dist in distribution_list.get('Items', []):
- dist_id = dist.get('Id', '')
-
- # Get aliases (CNAMEs)
- aliases = dist.get('Aliases', {}).get('Items', [])
-
- # Get origin info
- origins = dist.get('Origins', {}).get('Items', [])
- origin_domain = ''
- origin_protocol = ''
- if origins:
- origin = origins[0]
- origin_domain = origin.get('DomainName', '')
- custom_origin = origin.get('CustomOriginConfig', {})
- if custom_origin:
- origin_protocol = custom_origin.get('OriginProtocolPolicy', '')
- else:
- origin_protocol = 'S3'
-
- # Get default cache behavior
- default_behavior = dist.get('DefaultCacheBehavior', {})
- viewer_protocol = default_behavior.get('ViewerProtocolPolicy', '')
- allowed_methods = default_behavior.get('AllowedMethods', {}).get('Items', [])
- cached_methods = default_behavior.get('AllowedMethods', {}).get('CachedMethods', {}).get('Items', [])
-
- resources.append(ResourceData(
- account_id=account_id,
- region='global',
- service='cloudfront',
- resource_type='Distribution',
- resource_id=dist.get('ARN', dist_id),
- name=dist_id,
- attributes={
- 'CloudFront ID': dist_id,
- 'Domain Name': dist.get('DomainName', ''),
- 'CNAME': ', '.join(aliases) if aliases else 'N/A',
- 'Origin Domain Name': origin_domain,
- 'Origin Protocol Policy': origin_protocol,
- 'Viewer Protocol Policy': viewer_protocol,
- 'Allowed HTTP Methods': ', '.join(allowed_methods),
- 'Cached HTTP Methods': ', '.join(cached_methods)
- }
- ))
- except Exception as e:
- logger.warning(f"Failed to scan CloudFront distributions: {str(e)}")
-
- return resources
-
- @staticmethod
- @retry_with_backoff()
- def scan_route53_hosted_zones(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """
- Scan Route 53 Hosted Zones (global service).
-
- Attributes (horizontal layout):
- Zone ID, Name, Type, Record Count
- """
- resources = []
- # Route 53 is a global service
- route53_client = session.client('route53', region_name='us-east-1')
-
- try:
- paginator = route53_client.get_paginator('list_hosted_zones')
- for page in paginator.paginate():
- for zone in page.get('HostedZones', []):
- zone_id = zone.get('Id', '').replace('/hostedzone/', '')
- zone_name = zone.get('Name', '')
-
- # Determine zone type
- zone_type = 'Private' if zone.get('Config', {}).get('PrivateZone') else 'Public'
-
- resources.append(ResourceData(
- account_id=account_id,
- region='global',
- service='route53',
- resource_type='Hosted Zone',
- resource_id=zone_id,
- name=zone_name,
- attributes={
- 'Zone ID': zone_id,
- 'Name': zone_name,
- 'Type': zone_type,
- 'Record Count': str(zone.get('ResourceRecordSetCount', 0))
- }
- ))
- except Exception as e:
- logger.warning(f"Failed to scan Route 53 hosted zones: {str(e)}")
-
- return resources
-
- @staticmethod
- @retry_with_backoff()
- def scan_acm_certificates(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """
- Scan ACM Certificates (regional service).
-
- Attributes (horizontal layout): Domain name, Additional names
- """
- resources = []
- # ACM is a regional service
- acm_client = session.client('acm', region_name=region)
-
- try:
- paginator = acm_client.get_paginator('list_certificates')
- for page in paginator.paginate():
- for cert in page.get('CertificateSummaryList', []):
- domain_name = cert.get('DomainName', '')
- cert_arn = cert.get('CertificateArn', '')
-
- # Get additional names (Subject Alternative Names)
- additional_names = ''
- try:
- cert_detail = acm_client.describe_certificate(CertificateArn=cert_arn)
- sans = cert_detail.get('Certificate', {}).get('SubjectAlternativeNames', [])
- # Filter out the main domain name from SANs
- additional = [san for san in sans if san != domain_name]
- additional_names = ', '.join(additional) if additional else ''
- except Exception:
- pass
-
- resources.append(ResourceData(
- account_id=account_id,
- region=region,
- service='acm',
- resource_type='Certificate',
- resource_id=cert_arn,
- name=domain_name,
- attributes={
- 'Domain name': domain_name,
- 'Additional names': additional_names
- }
- ))
- except Exception as e:
- logger.warning(f"Failed to scan ACM certificates in {region}: {str(e)}")
-
- return resources
-
- @staticmethod
- @retry_with_backoff()
- def scan_waf_web_acls(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """
- Scan WAF Web ACLs (global service for CloudFront).
-
- Attributes (horizontal layout):
- WebACL Name, Scope, Rules Count, Associated Resources
- """
- resources = []
-
- # Scan WAFv2 global (CloudFront) Web ACLs
- wafv2_client = session.client('wafv2', region_name='us-east-1')
-
- try:
- # List CloudFront Web ACLs (CLOUDFRONT scope)
- response = wafv2_client.list_web_acls(Scope='CLOUDFRONT')
-
- for acl in response.get('WebACLs', []):
- acl_name = acl.get('Name', '')
- acl_id = acl.get('Id', '')
- acl_arn = acl.get('ARN', '')
-
- # Get Web ACL details for rules count
- rules_count = 0
- associated_resources = []
-
- try:
- acl_response = wafv2_client.get_web_acl(
- Name=acl_name,
- Scope='CLOUDFRONT',
- Id=acl_id
- )
- web_acl = acl_response.get('WebACL', {})
- rules_count = len(web_acl.get('Rules', []))
-
- # Get associated resources
- resources_response = wafv2_client.list_resources_for_web_acl(
- WebACLArn=acl_arn
- )
- for resource_arn in resources_response.get('ResourceArns', []):
- # Extract resource name from ARN
- resource_name = resource_arn.split('/')[-1]
- associated_resources.append(resource_name)
- except Exception as e:
- logger.debug(f"Failed to get WAF ACL details: {str(e)}")
-
- resources.append(ResourceData(
- account_id=account_id,
- region='global',
- service='waf',
- resource_type='Web ACL',
- resource_id=acl_arn,
- name=acl_name,
- attributes={
- 'WebACL Name': acl_name,
- 'Scope': 'CLOUDFRONT',
- 'Rules Count': str(rules_count),
- 'Associated Resources': ', '.join(associated_resources) if associated_resources else 'None'
- }
- ))
- except Exception as e:
- logger.warning(f"Failed to scan WAFv2 Web ACLs: {str(e)}")
-
- # Also scan regional WAF Web ACLs
- try:
- response = wafv2_client.list_web_acls(Scope='REGIONAL')
-
- for acl in response.get('WebACLs', []):
- acl_name = acl.get('Name', '')
- acl_id = acl.get('Id', '')
- acl_arn = acl.get('ARN', '')
-
- rules_count = 0
- associated_resources = []
-
- try:
- acl_response = wafv2_client.get_web_acl(
- Name=acl_name,
- Scope='REGIONAL',
- Id=acl_id
- )
- web_acl = acl_response.get('WebACL', {})
- rules_count = len(web_acl.get('Rules', []))
-
- resources_response = wafv2_client.list_resources_for_web_acl(
- WebACLArn=acl_arn
- )
- for resource_arn in resources_response.get('ResourceArns', []):
- resource_name = resource_arn.split('/')[-1]
- associated_resources.append(resource_name)
- except Exception as e:
- logger.debug(f"Failed to get WAF ACL details: {str(e)}")
-
- resources.append(ResourceData(
- account_id=account_id,
- region='global',
- service='waf',
- resource_type='Web ACL',
- resource_id=acl_arn,
- name=acl_name,
- attributes={
- 'WebACL Name': acl_name,
- 'Scope': 'REGIONAL',
- 'Rules Count': str(rules_count),
- 'Associated Resources': ', '.join(associated_resources) if associated_resources else 'None'
- }
- ))
- except Exception as e:
- logger.warning(f"Failed to scan regional WAFv2 Web ACLs: {str(e)}")
-
- return resources
|