#!/usr/bin/env python3 """ CloudShell Scanner - AWS Resource Scanner for CloudShell Environment A standalone Python script that scans AWS resources using CloudShell's IAM credentials. This script is designed to run in AWS CloudShell without requiring Access Keys. Requirements: - 1.1: Single-file Python script, only depends on boto3 and Python standard library - 1.2: Automatically uses CloudShell environment's IAM credentials - 1.7: Displays progress information during scanning Usage: # Scan all regions python cloudshell_scanner.py # Scan specific regions python cloudshell_scanner.py --regions us-east-1,ap-northeast-1 # Specify output file python cloudshell_scanner.py --output my_scan.json # Scan specific services python cloudshell_scanner.py --services ec2,vpc,rds """ import argparse import json import logging import sys import time from datetime import datetime, timezone from functools import wraps from typing import Any, Callable, Dict, List, Optional, TypeVar import boto3 from botocore.exceptions import BotoCoreError, ClientError # Type variable for generic retry decorator T = TypeVar("T") # Scanner version __version__ = "1.0.0" # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger(__name__) # Retryable exceptions for exponential backoff RETRYABLE_EXCEPTIONS = ( ClientError, BotoCoreError, ConnectionError, TimeoutError, ) # Retryable error codes from AWS RETRYABLE_ERROR_CODES = { "Throttling", "ThrottlingException", "RequestThrottled", "RequestLimitExceeded", "ProvisionedThroughputExceededException", "ServiceUnavailable", "InternalError", "RequestTimeout", "RequestTimeoutException", } def retry_with_exponential_backoff( max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 30.0, exponential_base: float = 2.0, ) -> Callable: """ Decorator that implements retry logic with exponential backoff. This decorator will retry a function call if it raises a retryable exception. The delay between retries increases exponentially. Args: max_retries: Maximum number of retry attempts (default: 3) base_delay: Initial delay in seconds (default: 1.0) max_delay: Maximum delay in seconds (default: 30.0) exponential_base: Base for exponential calculation (default: 2.0) Returns: Decorated function with retry logic Requirements: - 1.8: Record errors and continue scanning other resources - Design: Network timeout - retry 3 times with exponential backoff """ def decorator(func: Callable[..., T]) -> Callable[..., T]: @wraps(func) def wrapper(*args, **kwargs) -> T: last_exception = None for attempt in range(max_retries + 1): try: return func(*args, **kwargs) except RETRYABLE_EXCEPTIONS as e: last_exception = e # Check if it's a retryable error code for ClientError if isinstance(e, ClientError): error_code = e.response.get("Error", {}).get("Code", "") if error_code not in RETRYABLE_ERROR_CODES: # Not a retryable error, raise immediately raise if attempt < max_retries: # Calculate delay with exponential backoff delay = min( base_delay * (exponential_base ** attempt), max_delay ) logger.warning( f"Attempt {attempt + 1}/{max_retries + 1} failed for " f"{func.__name__}: {str(e)}. Retrying in {delay:.1f}s..." ) time.sleep(delay) else: logger.error( f"All {max_retries + 1} attempts failed for " f"{func.__name__}: {str(e)}" ) # All retries exhausted, raise the last exception if last_exception: raise last_exception return wrapper return decorator def is_retryable_error(exception: Exception) -> bool: """ Check if an exception is retryable. Args: exception: The exception to check Returns: True if the exception is retryable, False otherwise """ if isinstance(exception, ClientError): error_code = exception.response.get("Error", {}).get("Code", "") return error_code in RETRYABLE_ERROR_CODES return isinstance(exception, RETRYABLE_EXCEPTIONS) class ProgressDisplay: """ Progress display utility for showing scan progress. Requirements: - 1.7: Displays progress information during scanning """ def __init__(self, total_tasks: int = 0): """ Initialize progress display. Args: total_tasks: Total number of tasks to track """ self.total_tasks = total_tasks self.completed_tasks = 0 self.current_service = "" self.current_region = "" def set_total(self, total: int) -> None: """Set total number of tasks.""" self.total_tasks = total self.completed_tasks = 0 def update(self, service: str, region: str, status: str = "scanning") -> None: """ Update progress display. Args: service: Current service being scanned region: Current region being scanned status: Status message """ self.current_service = service self.current_region = region if self.total_tasks > 0: percentage = (self.completed_tasks / self.total_tasks) * 100 progress_bar = self._create_progress_bar(percentage) print( f"\r{progress_bar} {percentage:5.1f}% | {status}: {service} in {region}", end="", flush=True, ) else: print(f"\r{status}: {service} in {region}", end="", flush=True) def increment(self) -> None: """Increment completed tasks counter.""" self.completed_tasks += 1 def complete(self, message: str = "Scan completed") -> None: """ Mark progress as complete. Args: message: Completion message """ if self.total_tasks > 0: progress_bar = self._create_progress_bar(100) print(f"\r{progress_bar} 100.0% | {message}") else: print(f"\r{message}") def _create_progress_bar(self, percentage: float, width: int = 30) -> str: """ Create a text-based progress bar. Args: percentage: Completion percentage (0-100) width: Width of the progress bar Returns: Progress bar string """ filled = int(width * percentage / 100) bar = "█" * filled + "░" * (width - filled) return f"[{bar}]" def log_error(self, service: str, region: str, error: str) -> None: """ Log an error during scanning. Args: service: Service that encountered the error region: Region where the error occurred error: Error message """ # Print newline to avoid overwriting progress bar print() logger.warning(f"Error scanning {service} in {region}: {error}") class CloudShellScanner: """ CloudShell environment AWS resource scanner. This class provides functionality to scan AWS resources using the IAM credentials automatically available in the CloudShell environment. Requirements: - 1.1: Single-file Python script, only depends on boto3 and Python standard library - 1.2: Automatically uses CloudShell environment's IAM credentials - 1.7: Displays progress information during scanning Attributes: SUPPORTED_SERVICES: List of all supported AWS services GLOBAL_SERVICES: List of global services (not region-specific) """ # All supported AWS services (must match AWSScanner.SUPPORTED_SERVICES) SUPPORTED_SERVICES: List[str] = [ "vpc", "subnet", "route_table", "internet_gateway", "nat_gateway", "security_group", "vpc_endpoint", "vpc_peering", "customer_gateway", "virtual_private_gateway", "vpn_connection", "ec2", "elastic_ip", "autoscaling", "elb", "target_group", "rds", "elasticache", "eks", "lambda", "s3", "s3_event_notification", "cloudfront", "route53", "acm", "waf", "sns", "cloudwatch", "eventbridge", "cloudtrail", "config", ] # Global services (not region-specific) GLOBAL_SERVICES: List[str] = [ "cloudfront", "route53", "waf", "s3", "s3_event_notification", "cloudtrail" ] def __init__(self): """ Initialize the CloudShell scanner. Automatically uses CloudShell environment's IAM credentials via boto3's default credential chain. Requirements: - 1.2: Automatically uses CloudShell environment's IAM credentials """ self._account_id: Optional[str] = None self._session: Optional[boto3.Session] = None self.progress = ProgressDisplay() # Initialize session using default credentials (CloudShell IAM) try: self._session = boto3.Session() logger.info("Initialized CloudShell scanner with default credentials") except Exception as e: logger.error(f"Failed to initialize boto3 session: {e}") raise def get_account_id(self) -> str: """ Get the current AWS account ID. Returns: AWS account ID string Raises: Exception: If unable to retrieve account ID """ if self._account_id: return self._account_id try: sts_client = self._session.client("sts") response = sts_client.get_caller_identity() self._account_id = response["Account"] logger.info(f"Retrieved account ID: {self._account_id}") return self._account_id except Exception as e: logger.error(f"Failed to get account ID: {e}") raise def list_regions(self) -> List[str]: """ List all available AWS regions. Returns: List of region names Requirements: - 1.4: Scan all available regions when not specified """ try: ec2_client = self._session.client("ec2", region_name="us-east-1") response = ec2_client.describe_regions() regions = [region["RegionName"] for region in response["Regions"]] logger.info(f"Found {len(regions)} available regions") return regions except Exception as e: logger.warning(f"Failed to list regions, using defaults: {e}") # Return default regions if API call fails return self._get_default_regions() def _get_default_regions(self) -> List[str]: """ Get default AWS regions as fallback. Returns: List of default region names """ return [ "us-east-1", "us-east-2", "us-west-1", "us-west-2", "eu-west-1", "eu-west-2", "eu-west-3", "eu-central-1", "ap-northeast-1", "ap-northeast-2", "ap-southeast-1", "ap-southeast-2", "ap-south-1", "sa-east-1", "ca-central-1", ] def filter_regions( self, requested_regions: Optional[List[str]] = None, ) -> List[str]: """ Filter and validate requested regions against available regions. This method implements region filtering logic: - If no regions specified, returns all available regions - If regions specified, validates them against available regions - Invalid regions are logged and filtered out Args: requested_regions: List of regions requested by user (None = all regions) Returns: List of valid region names to scan Requirements: - 1.3: Scan only specified regions when provided - 1.4: Scan all available regions when not specified """ # Get all available regions available_regions = self.list_regions() available_set = set(available_regions) # If no regions specified, return all available regions if requested_regions is None: logger.info(f"No regions specified, will scan all {len(available_regions)} available regions") return available_regions # Validate requested regions valid_regions = [] invalid_regions = [] for region in requested_regions: # Normalize region name (strip whitespace, lowercase) normalized_region = region.strip().lower() if normalized_region in available_set: valid_regions.append(normalized_region) else: invalid_regions.append(region) # Log invalid regions if invalid_regions: logger.warning( f"Ignoring invalid/unavailable regions: {invalid_regions}. " f"Available regions: {sorted(available_regions)}" ) # If no valid regions remain, fall back to all available regions if not valid_regions: logger.warning( "No valid regions specified, falling back to all available regions" ) return available_regions logger.info(f"Will scan {len(valid_regions)} specified regions: {valid_regions}") return valid_regions def validate_region(self, region: str) -> bool: """ Validate if a region is available. Args: region: Region name to validate Returns: True if region is valid, False otherwise """ try: available_regions = self.list_regions() return region.strip().lower() in set(available_regions) except Exception: # If we can't validate, assume it's valid and let the API call fail return True def scan_resources( self, regions: Optional[List[str]] = None, services: Optional[List[str]] = None, ) -> Dict[str, Any]: """ Scan AWS resources across specified regions and services. Args: regions: List of regions to scan (None = all available regions) services: List of services to scan (None = all supported services) Returns: Dictionary containing scan results with metadata, resources, and errors Requirements: - 1.3: Scan only specified regions when provided - 1.4: Scan all available regions when not specified - 1.5: Scan all supported service types - 1.7: Display progress information during scanning - 1.8: Record errors and continue scanning other resources """ # Get account ID account_id = self.get_account_id() # Filter and validate regions regions_to_scan = self.filter_regions(regions) logger.info(f"Scanning {len(regions_to_scan)} regions") # Determine services to scan services_to_scan = services if services else self.SUPPORTED_SERVICES.copy() logger.info(f"Scanning {len(services_to_scan)} services") # Validate services invalid_services = [s for s in services_to_scan if s not in self.SUPPORTED_SERVICES] if invalid_services: logger.warning(f"Ignoring unsupported services: {invalid_services}") services_to_scan = [s for s in services_to_scan if s in self.SUPPORTED_SERVICES] # Separate global and regional services global_services = [s for s in services_to_scan if s in self.GLOBAL_SERVICES] regional_services = [s for s in services_to_scan if s not in self.GLOBAL_SERVICES] # Calculate total tasks for progress tracking total_tasks = len(global_services) + (len(regional_services) * len(regions_to_scan)) self.progress.set_total(total_tasks) # Initialize result structure result: Dict[str, Any] = { "metadata": { "account_id": account_id, "scan_timestamp": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"), "regions_scanned": regions_to_scan, "services_scanned": services_to_scan, "scanner_version": __version__, "total_resources": 0, "total_errors": 0, }, "resources": {}, "errors": [], } # Scan global services first (only once, not per region) if global_services: logger.info(f"Scanning {len(global_services)} global services") self._scan_global_services( account_id=account_id, services=global_services, result=result, ) # Scan regional services if regional_services and regions_to_scan: logger.info(f"Scanning {len(regional_services)} regional services across {len(regions_to_scan)} regions") self._scan_regional_services( account_id=account_id, regions=regions_to_scan, services=regional_services, result=result, ) # Update metadata totals result["metadata"]["total_resources"] = sum( len(resources) for resources in result["resources"].values() ) result["metadata"]["total_errors"] = len(result["errors"]) self.progress.complete( f"Scan completed: {result['metadata']['total_resources']} resources, " f"{result['metadata']['total_errors']} errors" ) return result def _call_with_retry( self, func: Callable[..., T], *args, max_retries: int = 3, base_delay: float = 1.0, **kwargs, ) -> T: """ Call a function with retry logic and exponential backoff. This method wraps API calls with retry logic for transient failures. Args: func: Function to call *args: Positional arguments for the function max_retries: Maximum number of retry attempts base_delay: Initial delay in seconds **kwargs: Keyword arguments for the function Returns: Result of the function call Raises: Exception: If all retries are exhausted Requirements: - 1.8: Record errors and continue scanning other resources - Design: Network timeout - retry 3 times with exponential backoff """ last_exception = None for attempt in range(max_retries + 1): try: return func(*args, **kwargs) except RETRYABLE_EXCEPTIONS as e: last_exception = e # Check if it's a retryable error code for ClientError if isinstance(e, ClientError): error_code = e.response.get("Error", {}).get("Code", "") if error_code not in RETRYABLE_ERROR_CODES: # Not a retryable error, raise immediately raise if attempt < max_retries: # Calculate delay with exponential backoff delay = min(base_delay * (2 ** attempt), 30.0) logger.warning( f"Attempt {attempt + 1}/{max_retries + 1} failed: {str(e)}. " f"Retrying in {delay:.1f}s..." ) time.sleep(delay) else: logger.error(f"All {max_retries + 1} attempts failed: {str(e)}") # All retries exhausted, raise the last exception if last_exception: raise last_exception def _scan_global_services( self, account_id: str, services: List[str], result: Dict[str, Any], ) -> None: """ Scan global AWS services. Args: account_id: AWS account ID services: List of global services to scan result: Result dictionary to update Requirements: - 1.8: Record errors and continue scanning other resources """ for service in services: self.progress.update(service, "global", "Scanning") try: resources = self._scan_service( account_id=account_id, region="global", service=service, ) if resources: if service not in result["resources"]: result["resources"][service] = [] result["resources"][service].extend(resources) except Exception as e: # Capture detailed error information error_info = self._create_error_info( service=service, region="global", exception=e, ) result["errors"].append(error_info) self.progress.log_error(service, "global", str(e)) self.progress.increment() def _scan_regional_services( self, account_id: str, regions: List[str], services: List[str], result: Dict[str, Any], ) -> None: """ Scan regional AWS services. Args: account_id: AWS account ID regions: List of regions to scan services: List of regional services to scan result: Result dictionary to update Requirements: - 1.8: Record errors and continue scanning other resources """ for region in regions: for service in services: self.progress.update(service, region, "Scanning") try: resources = self._scan_service( account_id=account_id, region=region, service=service, ) if resources: if service not in result["resources"]: result["resources"][service] = [] result["resources"][service].extend(resources) except Exception as e: # Capture detailed error information error_info = self._create_error_info( service=service, region=region, exception=e, ) result["errors"].append(error_info) self.progress.log_error(service, region, str(e)) self.progress.increment() def _create_error_info( self, service: str, region: str, exception: Exception, ) -> Dict[str, Any]: """ Create a detailed error information dictionary. This method extracts detailed information from exceptions to provide useful error context for debugging and reporting. Args: service: Service that encountered the error region: Region where the error occurred exception: The exception that was raised Returns: Dictionary containing error details Requirements: - 1.8: Record errors and continue scanning other resources - 6.1: Display missing permission information when encountering permission errors """ error_info: Dict[str, Any] = { "service": service, "region": region, "error": str(exception), "error_type": type(exception).__name__, "details": None, } # Extract additional details from ClientError if isinstance(exception, ClientError): error_response = exception.response.get("Error", {}) error_code = error_response.get("Code", "") error_message = error_response.get("Message", "") error_info["details"] = { "error_code": error_code, "error_message": error_message, } # Check for permission errors and provide helpful information if error_code in ("AccessDenied", "AccessDeniedException", "UnauthorizedAccess"): error_info["details"]["permission_hint"] = ( f"Missing IAM permission for {service} in {region}. " f"Please ensure your IAM role has the necessary permissions." ) logger.warning( f"Permission denied for {service} in {region}: {error_message}" ) # Extract details from BotoCoreError elif isinstance(exception, BotoCoreError): error_info["details"] = { "botocore_error": str(exception), } return error_info def _scan_service( self, account_id: str, region: str, service: str, ) -> List[Dict[str, Any]]: """ Scan a single service in a specific region. Args: account_id: AWS account ID region: Region to scan (or 'global' for global services) service: Service to scan Returns: List of resource dictionaries Note: This is a placeholder method. Actual service scanning methods will be implemented in subsequent tasks (1.2-1.5). """ # Get the scanner method for this service scanner_method = self._get_scanner_method(service) if scanner_method is None: logger.warning(f"No scanner method found for service: {service}") return [] # Use us-east-1 for global services actual_region = "us-east-1" if region == "global" else region return scanner_method(account_id, actual_region) def _get_scanner_method(self, service: str) -> Optional[Callable]: """ Get the scanner method for a specific service. Args: service: Service name Returns: Scanner method callable or None if not found """ scanner_methods: Dict[str, Callable] = { # VPC related services (Task 1.2) "vpc": self._scan_vpcs, "subnet": self._scan_subnets, "route_table": self._scan_route_tables, "internet_gateway": self._scan_internet_gateways, "nat_gateway": self._scan_nat_gateways, "security_group": self._scan_security_groups, "vpc_endpoint": self._scan_vpc_endpoints, "vpc_peering": self._scan_vpc_peering, "customer_gateway": self._scan_customer_gateways, "virtual_private_gateway": self._scan_virtual_private_gateways, "vpn_connection": self._scan_vpn_connections, # EC2 and compute services (Task 1.3) "ec2": self._scan_ec2_instances, "elastic_ip": self._scan_elastic_ips, "autoscaling": self._scan_autoscaling_groups, "elb": self._scan_load_balancers, "target_group": self._scan_target_groups, "lambda": self._scan_lambda_functions, "eks": self._scan_eks_clusters, # Database and storage services (Task 1.4) "rds": self._scan_rds_instances, "elasticache": self._scan_elasticache_clusters, "s3": self._scan_s3_buckets, "s3_event_notification": self._scan_s3_event_notifications, # Global and monitoring services (Task 1.5) "cloudfront": self._scan_cloudfront_distributions, "route53": self._scan_route53_hosted_zones, "acm": self._scan_acm_certificates, "waf": self._scan_waf_web_acls, "sns": self._scan_sns_topics, "cloudwatch": self._scan_cloudwatch_log_groups, "eventbridge": self._scan_eventbridge_rules, "cloudtrail": self._scan_cloudtrail_trails, "config": self._scan_config_recorders, } return scanner_methods.get(service) def export_json(self, result: Dict[str, Any], output_path: str) -> None: """ Export scan results to a JSON file. This method serializes the scan result to a JSON file with proper handling of non-serializable types (datetime, bytes, sets, etc.). Args: result: Scan result dictionary containing metadata, resources, and errors output_path: Path to output JSON file Requirements: - 1.6: Export results as JSON file when scan completes - 2.1: Include metadata fields (account_id, scan_timestamp, regions_scanned, services_scanned) - 2.2: Include resources field organized by service type - 2.3: Include errors field with scan error information - 2.4: Use JSON format encoding for serialization Raises: IOError: If unable to write to the output file TypeError: If result contains non-serializable types that cannot be converted """ try: # Validate the result structure before export self._validate_scan_data_structure(result) # Serialize with custom encoder for non-standard types json_str = json.dumps( result, indent=2, ensure_ascii=False, default=self._json_serializer, sort_keys=False, ) # Write to file with open(output_path, "w", encoding="utf-8") as f: f.write(json_str) logger.info(f"Scan results exported to: {output_path}") logger.info( f"Export summary: {result['metadata']['total_resources']} resources, " f"{result['metadata']['total_errors']} errors" ) except (IOError, OSError) as e: logger.error(f"Failed to write to {output_path}: {e}") raise except (TypeError, ValueError) as e: logger.error(f"Failed to serialize scan results: {e}") raise def _json_serializer(self, obj: Any) -> Any: """ Custom JSON serializer for non-standard types. Handles datetime, date, bytes, sets, and other non-JSON-serializable types. Args: obj: Object to serialize Returns: JSON-serializable representation of the object Requirements: - 2.4: Use JSON format encoding (handle non-serializable types gracefully) """ # Handle datetime objects - convert to ISO 8601 format if isinstance(obj, datetime): # Ensure UTC timezone and proper ISO 8601 format if obj.tzinfo is None: obj = obj.replace(tzinfo=timezone.utc) return obj.isoformat().replace("+00:00", "Z") # Handle date objects if hasattr(obj, 'isoformat'): return obj.isoformat() # Handle bytes if isinstance(obj, bytes): return obj.decode('utf-8', errors='replace') # Handle sets if isinstance(obj, set): return list(obj) # Handle frozensets if isinstance(obj, frozenset): return list(obj) # Handle objects with __dict__ if hasattr(obj, '__dict__'): return obj.__dict__ # Fallback to string representation return str(obj) def _validate_scan_data_structure(self, data: Dict[str, Any]) -> None: """ Validate that the scan data structure matches the expected format. This method ensures the data structure conforms to the ScanData interface defined in the design document. Args: data: Scan data dictionary to validate Raises: ValueError: If required fields are missing or have incorrect types Requirements: - 2.1: Metadata fields (account_id, scan_timestamp, regions_scanned, services_scanned) - 2.2: Resources field organized by service type - 2.3: Errors field with error information """ # Check top-level structure required_top_level = ["metadata", "resources", "errors"] for field in required_top_level: if field not in data: raise ValueError(f"Missing required top-level field: {field}") # Check metadata fields metadata = data.get("metadata", {}) required_metadata = [ "account_id", "scan_timestamp", "regions_scanned", "services_scanned", "scanner_version", "total_resources", "total_errors", ] missing_metadata = [f for f in required_metadata if f not in metadata] if missing_metadata: raise ValueError(f"Missing required metadata fields: {missing_metadata}") # Validate metadata field types if not isinstance(metadata.get("account_id"), str): raise ValueError("metadata.account_id must be a string") if not isinstance(metadata.get("scan_timestamp"), str): raise ValueError("metadata.scan_timestamp must be a string") if not isinstance(metadata.get("regions_scanned"), list): raise ValueError("metadata.regions_scanned must be a list") if not isinstance(metadata.get("services_scanned"), list): raise ValueError("metadata.services_scanned must be a list") if not isinstance(metadata.get("scanner_version"), str): raise ValueError("metadata.scanner_version must be a string") if not isinstance(metadata.get("total_resources"), int): raise ValueError("metadata.total_resources must be an integer") if not isinstance(metadata.get("total_errors"), int): raise ValueError("metadata.total_errors must be an integer") # Validate resources structure resources = data.get("resources", {}) if not isinstance(resources, dict): raise ValueError("resources must be a dictionary") # Validate errors structure errors = data.get("errors", []) if not isinstance(errors, list): raise ValueError("errors must be a list") @staticmethod def create_scan_data( account_id: str, regions_scanned: List[str], services_scanned: List[str], resources: Dict[str, List[Dict[str, Any]]], errors: List[Dict[str, Any]], scan_timestamp: Optional[str] = None, ) -> Dict[str, Any]: """ Create a properly structured ScanData dictionary. This is a factory method to create scan data with the correct structure as defined in the design document. Args: account_id: AWS account ID regions_scanned: List of regions that were scanned services_scanned: List of services that were scanned resources: Dictionary of resources organized by service type errors: List of error dictionaries scan_timestamp: Optional ISO 8601 timestamp (defaults to current time) Returns: Properly structured ScanData dictionary Requirements: - 2.1: Include metadata fields - 2.2: Include resources field organized by service type - 2.3: Include errors field """ if scan_timestamp is None: scan_timestamp = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") # Calculate totals total_resources = sum(len(res_list) for res_list in resources.values()) total_errors = len(errors) return { "metadata": { "account_id": account_id, "scan_timestamp": scan_timestamp, "regions_scanned": regions_scanned, "services_scanned": services_scanned, "scanner_version": __version__, "total_resources": total_resources, "total_errors": total_errors, }, "resources": resources, "errors": errors, } @staticmethod def load_scan_data(file_path: str) -> Dict[str, Any]: """ Load scan data from a JSON file. This method reads and parses a JSON file containing scan data, validating its structure. Args: file_path: Path to the JSON file to load Returns: Parsed scan data dictionary Raises: FileNotFoundError: If the file does not exist json.JSONDecodeError: If the file contains invalid JSON ValueError: If the JSON structure is invalid Requirements: - 2.5: Round-trip consistency (load what was exported) """ try: with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) # Create a temporary scanner instance to validate # We use a class method approach to avoid needing AWS credentials CloudShellScanner._validate_scan_data_structure_static(data) logger.info(f"Loaded scan data from: {file_path}") return data except FileNotFoundError: logger.error(f"File not found: {file_path}") raise except json.JSONDecodeError as e: logger.error(f"Invalid JSON in {file_path}: {e}") raise @staticmethod def _validate_scan_data_structure_static(data: Dict[str, Any]) -> None: """ Static version of _validate_scan_data_structure for use without instance. Args: data: Scan data dictionary to validate Raises: ValueError: If required fields are missing or have incorrect types """ # Check top-level structure required_top_level = ["metadata", "resources", "errors"] for field in required_top_level: if field not in data: raise ValueError(f"Missing required top-level field: {field}") # Check metadata fields metadata = data.get("metadata", {}) required_metadata = [ "account_id", "scan_timestamp", "regions_scanned", "services_scanned", "scanner_version", "total_resources", "total_errors", ] missing_metadata = [f for f in required_metadata if f not in metadata] if missing_metadata: raise ValueError(f"Missing required metadata fields: {missing_metadata}") # Helper method to get resource name from tags def _get_name_from_tags( self, tags: Optional[List[Dict[str, str]]], default: str = "" ) -> str: """ Extract Name tag value from tags list. Args: tags: List of tag dictionaries with 'Key' and 'Value' default: Default value if Name tag not found Returns: Name tag value or default """ if not tags: return default for tag in tags: if tag.get("Key") == "Name": return tag.get("Value", default) return default # ========================================================================= # VPC Related Service Scanners (Task 1.2) # ========================================================================= def _scan_vpcs(self, account_id: str, region: str) -> List[Dict[str, Any]]: """ Scan VPCs in the specified region. Args: account_id: AWS account ID region: Region to scan Returns: List of VPC resource dictionaries Attributes: Region, Name, ID, CIDR """ resources = [] ec2_client = self._session.client("ec2", region_name=region) paginator = ec2_client.get_paginator("describe_vpcs") for page in paginator.paginate(): for vpc in page.get("Vpcs", []): name = self._get_name_from_tags(vpc.get("Tags", []), vpc["VpcId"]) resources.append({ "account_id": account_id, "region": region, "service": "vpc", "resource_type": "VPC", "resource_id": vpc["VpcId"], "name": name, "attributes": { "Region": region, "Name": name, "ID": vpc["VpcId"], "CIDR": vpc.get("CidrBlock", ""), }, }) return resources def _scan_subnets(self, account_id: str, region: str) -> List[Dict[str, Any]]: """ Scan Subnets in the specified region. Args: account_id: AWS account ID region: Region to scan Returns: List of Subnet resource dictionaries Attributes: Name, ID, AZ, CIDR """ resources = [] ec2_client = self._session.client("ec2", region_name=region) paginator = ec2_client.get_paginator("describe_subnets") for page in paginator.paginate(): for subnet in page.get("Subnets", []): name = self._get_name_from_tags( subnet.get("Tags", []), subnet["SubnetId"] ) resources.append({ "account_id": account_id, "region": region, "service": "subnet", "resource_type": "Subnet", "resource_id": subnet["SubnetId"], "name": name, "attributes": { "Name": name, "ID": subnet["SubnetId"], "AZ": subnet.get("AvailabilityZone", ""), "CIDR": subnet.get("CidrBlock", ""), }, }) return resources def _scan_route_tables(self, account_id: str, region: str) -> List[Dict[str, Any]]: """ Scan Route Tables in the specified region. Args: account_id: AWS account ID region: Region to scan Returns: List of Route Table resource dictionaries Attributes: Name, ID, Subnet Associations """ resources = [] ec2_client = self._session.client("ec2", region_name=region) paginator = ec2_client.get_paginator("describe_route_tables") for page in paginator.paginate(): for rt in page.get("RouteTables", []): name = self._get_name_from_tags( rt.get("Tags", []), rt["RouteTableId"] ) # Get subnet associations associations = [] for assoc in rt.get("Associations", []): if assoc.get("SubnetId"): associations.append(assoc["SubnetId"]) resources.append({ "account_id": account_id, "region": region, "service": "route_table", "resource_type": "Route Table", "resource_id": rt["RouteTableId"], "name": name, "attributes": { "Name": name, "ID": rt["RouteTableId"], "Subnet Associations": ", ".join(associations) if associations else "None", }, }) return resources def _scan_internet_gateways( self, account_id: str, region: str ) -> List[Dict[str, Any]]: """ Scan Internet Gateways in the specified region. Args: account_id: AWS account ID region: Region to scan Returns: List of Internet Gateway resource dictionaries Attributes: Name, ID """ resources = [] ec2_client = self._session.client("ec2", region_name=region) paginator = ec2_client.get_paginator("describe_internet_gateways") for page in paginator.paginate(): for igw in page.get("InternetGateways", []): igw_id = igw["InternetGatewayId"] name = self._get_name_from_tags(igw.get("Tags", []), igw_id) resources.append({ "account_id": account_id, "region": region, "service": "internet_gateway", "resource_type": "Internet Gateway", "resource_id": igw_id, "name": name, "attributes": { "Name": name, "ID": igw_id, }, }) return resources def _scan_nat_gateways(self, account_id: str, region: str) -> List[Dict[str, Any]]: """ Scan NAT Gateways in the specified region. Args: account_id: AWS account ID region: Region to scan Returns: List of NAT Gateway resource dictionaries Attributes: Name, ID, Public IP, Private IP """ resources = [] ec2_client = self._session.client("ec2", region_name=region) paginator = ec2_client.get_paginator("describe_nat_gateways") for page in paginator.paginate(): for nat in page.get("NatGateways", []): # Skip deleted NAT gateways if nat.get("State") == "deleted": continue name = self._get_name_from_tags( nat.get("Tags", []), nat["NatGatewayId"] ) # Get IP addresses from addresses public_ip = "" private_ip = "" for addr in nat.get("NatGatewayAddresses", []): if addr.get("PublicIp"): public_ip = addr["PublicIp"] if addr.get("PrivateIp"): private_ip = addr["PrivateIp"] resources.append({ "account_id": account_id, "region": region, "service": "nat_gateway", "resource_type": "NAT Gateway", "resource_id": nat["NatGatewayId"], "name": name, "attributes": { "Name": name, "ID": nat["NatGatewayId"], "Public IP": public_ip, "Private IP": private_ip, }, }) return resources def _scan_security_groups( self, account_id: str, region: str ) -> List[Dict[str, Any]]: """ Scan Security Groups in the specified region. Args: account_id: AWS account ID region: Region to scan Returns: List of Security Group resource dictionaries Attributes: Name, ID, Protocol, Port range, Source Note: Creates one entry per inbound rule """ resources = [] ec2_client = self._session.client("ec2", region_name=region) paginator = ec2_client.get_paginator("describe_security_groups") for page in paginator.paginate(): for sg in page.get("SecurityGroups", []): sg_name = sg.get("GroupName", sg["GroupId"]) # Process inbound rules for rule in sg.get("IpPermissions", []): protocol = rule.get("IpProtocol", "-1") if protocol == "-1": protocol = "All" # Get port range from_port = rule.get("FromPort", "All") to_port = rule.get("ToPort", "All") if from_port == to_port: port_range = str(from_port) if from_port != "All" else "All" else: port_range = f"{from_port}-{to_port}" # Get sources sources = [] for ip_range in rule.get("IpRanges", []): sources.append(ip_range.get("CidrIp", "")) for ip_range in rule.get("Ipv6Ranges", []): sources.append(ip_range.get("CidrIpv6", "")) for group in rule.get("UserIdGroupPairs", []): sources.append(group.get("GroupId", "")) source = ", ".join(sources) if sources else "N/A" resources.append({ "account_id": account_id, "region": region, "service": "security_group", "resource_type": "Security Group", "resource_id": sg["GroupId"], "name": sg_name, "attributes": { "Name": sg_name, "ID": sg["GroupId"], "Protocol": protocol, "Port range": port_range, "Source": source, }, }) # If no inbound rules, still add the security group if not sg.get("IpPermissions"): resources.append({ "account_id": account_id, "region": region, "service": "security_group", "resource_type": "Security Group", "resource_id": sg["GroupId"], "name": sg_name, "attributes": { "Name": sg_name, "ID": sg["GroupId"], "Protocol": "N/A", "Port range": "N/A", "Source": "N/A", }, }) return resources def _scan_vpc_endpoints( self, account_id: str, region: str ) -> List[Dict[str, Any]]: """ Scan VPC Endpoints in the specified region. Args: account_id: AWS account ID region: Region to scan Returns: List of VPC Endpoint resource dictionaries Attributes: Name, ID, VPC, Service Name, Type """ resources = [] ec2_client = self._session.client("ec2", region_name=region) paginator = ec2_client.get_paginator("describe_vpc_endpoints") for page in paginator.paginate(): for endpoint in page.get("VpcEndpoints", []): name = self._get_name_from_tags( endpoint.get("Tags", []), endpoint["VpcEndpointId"] ) resources.append({ "account_id": account_id, "region": region, "service": "vpc_endpoint", "resource_type": "Endpoint", "resource_id": endpoint["VpcEndpointId"], "name": name, "attributes": { "Name": name, "ID": endpoint["VpcEndpointId"], "VPC": endpoint.get("VpcId", ""), "Service Name": endpoint.get("ServiceName", ""), "Type": endpoint.get("VpcEndpointType", ""), }, }) return resources def _scan_vpc_peering(self, account_id: str, region: str) -> List[Dict[str, Any]]: """ Scan VPC Peering Connections in the specified region. Args: account_id: AWS account ID region: Region to scan Returns: List of VPC Peering resource dictionaries Attributes: Name, Peering Connection ID, Requester VPC, Accepter VPC """ resources = [] ec2_client = self._session.client("ec2", region_name=region) paginator = ec2_client.get_paginator("describe_vpc_peering_connections") for page in paginator.paginate(): for peering in page.get("VpcPeeringConnections", []): # Skip deleted/rejected peerings status = peering.get("Status", {}).get("Code", "") if status in ["deleted", "rejected", "failed"]: continue name = self._get_name_from_tags( peering.get("Tags", []), peering["VpcPeeringConnectionId"] ) requester_vpc = peering.get("RequesterVpcInfo", {}).get("VpcId", "") accepter_vpc = peering.get("AccepterVpcInfo", {}).get("VpcId", "") resources.append({ "account_id": account_id, "region": region, "service": "vpc_peering", "resource_type": "VPC Peering", "resource_id": peering["VpcPeeringConnectionId"], "name": name, "attributes": { "Name": name, "Peering Connection ID": peering["VpcPeeringConnectionId"], "Requester VPC": requester_vpc, "Accepter VPC": accepter_vpc, }, }) return resources def _scan_customer_gateways( self, account_id: str, region: str ) -> List[Dict[str, Any]]: """ Scan Customer Gateways in the specified region. Args: account_id: AWS account ID region: Region to scan Returns: List of Customer Gateway resource dictionaries Attributes: Name, Customer Gateway ID, IP Address """ resources = [] ec2_client = self._session.client("ec2", region_name=region) response = ec2_client.describe_customer_gateways() for cgw in response.get("CustomerGateways", []): # Skip deleted gateways if cgw.get("State") == "deleted": continue name = self._get_name_from_tags( cgw.get("Tags", []), cgw["CustomerGatewayId"] ) resources.append({ "account_id": account_id, "region": region, "service": "customer_gateway", "resource_type": "Customer Gateway", "resource_id": cgw["CustomerGatewayId"], "name": name, "attributes": { "Name": name, "Customer Gateway ID": cgw["CustomerGatewayId"], "IP Address": cgw.get("IpAddress", ""), }, }) return resources def _scan_virtual_private_gateways( self, account_id: str, region: str ) -> List[Dict[str, Any]]: """ Scan Virtual Private Gateways in the specified region. Args: account_id: AWS account ID region: Region to scan Returns: List of Virtual Private Gateway resource dictionaries Attributes: Name, Virtual Private Gateway ID, VPC """ resources = [] ec2_client = self._session.client("ec2", region_name=region) response = ec2_client.describe_vpn_gateways() for vgw in response.get("VpnGateways", []): # Skip deleted gateways if vgw.get("State") == "deleted": continue name = self._get_name_from_tags( vgw.get("Tags", []), vgw["VpnGatewayId"] ) # Get attached VPC vpc_id = "" for attachment in vgw.get("VpcAttachments", []): if attachment.get("State") == "attached": vpc_id = attachment.get("VpcId", "") break resources.append({ "account_id": account_id, "region": region, "service": "virtual_private_gateway", "resource_type": "Virtual Private Gateway", "resource_id": vgw["VpnGatewayId"], "name": name, "attributes": { "Name": name, "Virtual Private Gateway ID": vgw["VpnGatewayId"], "VPC": vpc_id, }, }) return resources def _scan_vpn_connections( self, account_id: str, region: str ) -> List[Dict[str, Any]]: """ Scan VPN Connections in the specified region. Args: account_id: AWS account ID region: Region to scan Returns: List of VPN Connection resource dictionaries Attributes: Name, VPN ID, Routes """ resources = [] ec2_client = self._session.client("ec2", region_name=region) response = ec2_client.describe_vpn_connections() for vpn in response.get("VpnConnections", []): # Skip deleted connections if vpn.get("State") == "deleted": continue name = self._get_name_from_tags( vpn.get("Tags", []), vpn["VpnConnectionId"] ) # Get routes routes = [] for route in vpn.get("Routes", []): if route.get("DestinationCidrBlock"): routes.append(route["DestinationCidrBlock"]) resources.append({ "account_id": account_id, "region": region, "service": "vpn_connection", "resource_type": "VPN Connection", "resource_id": vpn["VpnConnectionId"], "name": name, "attributes": { "Name": name, "VPN ID": vpn["VpnConnectionId"], "Routes": ", ".join(routes) if routes else "N/A", }, }) return resources # ========================================================================= # EC2 and Compute Service Scanners (Task 1.3) # ========================================================================= def _scan_ec2_instances( self, account_id: str, region: str ) -> List[Dict[str, Any]]: """ Scan EC2 Instances in the specified region. Args: account_id: AWS account ID region: Region to scan Returns: List of EC2 Instance resource dictionaries Attributes: Name, Instance ID, Instance Type, AZ, AMI, Public IP, Public DNS, Private IP, VPC ID, Subnet ID, Key, Security Groups, EBS Type, EBS Size, Encryption """ resources = [] ec2_client = self._session.client("ec2", region_name=region) paginator = ec2_client.get_paginator("describe_instances") for page in paginator.paginate(): for reservation in page.get("Reservations", []): for instance in reservation.get("Instances", []): # Skip terminated instances state = instance.get("State", {}).get("Name", "") if state == "terminated": continue name = self._get_name_from_tags( instance.get("Tags", []), instance["InstanceId"] ) # Get security groups security_groups = [] for sg in instance.get("SecurityGroups", []): security_groups.append( sg.get("GroupName", sg.get("GroupId", "")) ) # Get EBS volume info ebs_type = "" ebs_size = "" ebs_encrypted = "" for block_device in instance.get("BlockDeviceMappings", []): ebs = block_device.get("Ebs", {}) if ebs.get("VolumeId"): # Get volume details try: vol_response = ec2_client.describe_volumes( VolumeIds=[ebs["VolumeId"]] ) if vol_response.get("Volumes"): volume = vol_response["Volumes"][0] ebs_type = volume.get("VolumeType", "") ebs_size = f"{volume.get('Size', '')} GB" ebs_encrypted = ( "Yes" if volume.get("Encrypted") else "No" ) except Exception as e: logger.warning( f"Failed to get volume details: {str(e)}" ) break # Only get first volume for simplicity resources.append({ "account_id": account_id, "region": region, "service": "ec2", "resource_type": "Instance", "resource_id": instance["InstanceId"], "name": name, "attributes": { "Name": name, "Instance ID": instance["InstanceId"], "Instance Type": instance.get("InstanceType", ""), "AZ": instance.get("Placement", {}).get( "AvailabilityZone", "" ), "AMI": instance.get("ImageId", ""), "Public IP": instance.get("PublicIpAddress", ""), "Public DNS": instance.get("PublicDnsName", ""), "Private IP": instance.get("PrivateIpAddress", ""), "VPC ID": instance.get("VpcId", ""), "Subnet ID": instance.get("SubnetId", ""), "Key": instance.get("KeyName", ""), "Security Groups": ", ".join(security_groups), "EBS Type": ebs_type, "EBS Size": ebs_size, "Encryption": ebs_encrypted, "Other Requirement": "", }, }) return resources def _scan_elastic_ips( self, account_id: str, region: str ) -> List[Dict[str, Any]]: """ Scan Elastic IPs in the specified region. Args: account_id: AWS account ID region: Region to scan Returns: List of Elastic IP resource dictionaries Attributes: Name, Elastic IP """ resources = [] ec2_client = self._session.client("ec2", region_name=region) response = ec2_client.describe_addresses() for eip in response.get("Addresses", []): public_ip = eip.get("PublicIp", "") name = self._get_name_from_tags( eip.get("Tags", []), public_ip or eip.get("AllocationId", ""), ) resources.append({ "account_id": account_id, "region": region, "service": "elastic_ip", "resource_type": "Elastic IP", "resource_id": eip.get("AllocationId", public_ip), "name": name, "attributes": { "Name": name, "Elastic IP": public_ip, }, }) return resources def _scan_autoscaling_groups( self, account_id: str, region: str ) -> List[Dict[str, Any]]: """ Scan Auto Scaling Groups in the specified region. Args: account_id: AWS account ID region: Region to scan Returns: List of Auto Scaling Group resource dictionaries Attributes: Name, Launch Template, AMI, Instance type, Key, Target Groups, Desired, Min, Max, Scaling Policy """ resources = [] asg_client = self._session.client("autoscaling", region_name=region) ec2_client = self._session.client("ec2", region_name=region) paginator = asg_client.get_paginator("describe_auto_scaling_groups") for page in paginator.paginate(): for asg in page.get("AutoScalingGroups", []): name = asg.get("AutoScalingGroupName", "") # Get Launch Template info launch_template_name = "" ami = "" instance_type = "" key_name = "" # Check for Launch Template lt = asg.get("LaunchTemplate") if lt: launch_template_name = lt.get( "LaunchTemplateName", lt.get("LaunchTemplateId", "") ) # Get Launch Template details try: lt_response = ec2_client.describe_launch_template_versions( LaunchTemplateId=lt.get("LaunchTemplateId", ""), Versions=[lt.get("Version", "$Latest")], ) if lt_response.get("LaunchTemplateVersions"): lt_data = lt_response["LaunchTemplateVersions"][0].get( "LaunchTemplateData", {} ) ami = lt_data.get("ImageId", "") instance_type = lt_data.get("InstanceType", "") key_name = lt_data.get("KeyName", "") except Exception as e: logger.warning( f"Failed to get launch template details: {str(e)}" ) # Check for Mixed Instances Policy mip = asg.get("MixedInstancesPolicy") if mip: lt_spec = mip.get("LaunchTemplate", {}).get( "LaunchTemplateSpecification", {} ) if lt_spec: launch_template_name = lt_spec.get( "LaunchTemplateName", lt_spec.get("LaunchTemplateId", "") ) # Check for Launch Configuration (legacy) lc_name = asg.get("LaunchConfigurationName") if lc_name and not launch_template_name: launch_template_name = f"LC: {lc_name}" try: lc_response = asg_client.describe_launch_configurations( LaunchConfigurationNames=[lc_name] ) if lc_response.get("LaunchConfigurations"): lc = lc_response["LaunchConfigurations"][0] ami = lc.get("ImageId", "") instance_type = lc.get("InstanceType", "") key_name = lc.get("KeyName", "") except Exception as e: logger.warning( f"Failed to get launch configuration details: {str(e)}" ) # Get Target Groups target_groups = [] for tg_arn in asg.get("TargetGroupARNs", []): # Extract target group name from ARN tg_name = tg_arn.split("/")[-2] if "/" in tg_arn else tg_arn target_groups.append(tg_name) # Get Scaling Policies scaling_policies = [] try: policy_response = asg_client.describe_policies( AutoScalingGroupName=name ) for policy in policy_response.get("ScalingPolicies", []): scaling_policies.append(policy.get("PolicyName", "")) except Exception as e: logger.warning(f"Failed to get scaling policies: {str(e)}") resources.append({ "account_id": account_id, "region": region, "service": "autoscaling", "resource_type": "Auto Scaling Group", "resource_id": asg.get("AutoScalingGroupARN", name), "name": name, "attributes": { "Name": name, "Launch Template": launch_template_name, "AMI": ami, "Instance type": instance_type, "Key": key_name, "Target Groups": ( ", ".join(target_groups) if target_groups else "N/A" ), "Desired": str(asg.get("DesiredCapacity", 0)), "Min": str(asg.get("MinSize", 0)), "Max": str(asg.get("MaxSize", 0)), "Scaling Policy": ( ", ".join(scaling_policies) if scaling_policies else "N/A" ), }, }) return resources def _scan_load_balancers( self, account_id: str, region: str ) -> List[Dict[str, Any]]: """ Scan Load Balancers (ALB, NLB, CLB) in the specified region. Args: account_id: AWS account ID region: Region to scan Returns: List of Load Balancer resource dictionaries Attributes: Name, Type, DNS, Scheme, VPC, Availability Zones, Subnet, Security Groups """ resources = [] # Scan ALB/NLB using elbv2 elbv2_client = self._session.client("elbv2", region_name=region) try: paginator = elbv2_client.get_paginator("describe_load_balancers") for page in paginator.paginate(): for lb in page.get("LoadBalancers", []): name = lb.get("LoadBalancerName", "") lb_type = lb.get("Type", "application") # Get availability zones and subnets azs = [] subnets = [] for az_info in lb.get("AvailabilityZones", []): azs.append(az_info.get("ZoneName", "")) if az_info.get("SubnetId"): subnets.append(az_info["SubnetId"]) # Get security groups (only for ALB) security_groups = lb.get("SecurityGroups", []) resources.append({ "account_id": account_id, "region": region, "service": "elb", "resource_type": "Load Balancer", "resource_id": lb.get("LoadBalancerArn", name), "name": name, "attributes": { "Name": name, "Type": lb_type.upper(), "DNS": lb.get("DNSName", ""), "Scheme": lb.get("Scheme", ""), "VPC": lb.get("VpcId", ""), "Availability Zones": ", ".join(azs), "Subnet": ", ".join(subnets), "Security Groups": ( ", ".join(security_groups) if security_groups else "N/A" ), }, }) except Exception as e: logger.warning(f"Failed to scan ALB/NLB: {str(e)}") # Scan Classic Load Balancers elb_client = self._session.client("elb", region_name=region) try: paginator = elb_client.get_paginator("describe_load_balancers") for page in paginator.paginate(): for lb in page.get("LoadBalancerDescriptions", []): name = lb.get("LoadBalancerName", "") resources.append({ "account_id": account_id, "region": region, "service": "elb", "resource_type": "Load Balancer", "resource_id": name, "name": name, "attributes": { "Name": name, "Type": "CLASSIC", "DNS": lb.get("DNSName", ""), "Scheme": lb.get("Scheme", ""), "VPC": lb.get("VPCId", ""), "Availability Zones": ", ".join( lb.get("AvailabilityZones", []) ), "Subnet": ", ".join(lb.get("Subnets", [])), "Security Groups": ", ".join( lb.get("SecurityGroups", []) ), }, }) except Exception as e: logger.warning(f"Failed to scan Classic ELB: {str(e)}") return resources def _scan_target_groups( self, account_id: str, region: str ) -> List[Dict[str, Any]]: """ Scan Target Groups in the specified region. Args: account_id: AWS account ID region: Region to scan Returns: List of Target Group resource dictionaries Attributes: Load Balancer, TG Name, Port, Protocol, Registered Instances, Health Check Path """ resources = [] elbv2_client = self._session.client("elbv2", region_name=region) try: paginator = elbv2_client.get_paginator("describe_target_groups") for page in paginator.paginate(): for tg in page.get("TargetGroups", []): name = tg.get("TargetGroupName", "") tg_arn = tg.get("TargetGroupArn", "") # Get associated load balancers lb_arns = tg.get("LoadBalancerArns", []) lb_names = [] for lb_arn in lb_arns: # Extract LB name from ARN lb_name = lb_arn.split("/")[-2] if "/" in lb_arn else lb_arn lb_names.append(lb_name) # Get registered targets registered_instances = [] try: targets_response = elbv2_client.describe_target_health( TargetGroupArn=tg_arn ) for target in targets_response.get( "TargetHealthDescriptions", [] ): target_id = target.get("Target", {}).get("Id", "") if target_id: registered_instances.append(target_id) except Exception as e: logger.warning(f"Failed to get target health: {str(e)}") resources.append({ "account_id": account_id, "region": region, "service": "target_group", "resource_type": "Target Group", "resource_id": tg_arn, "name": name, "attributes": { "Load Balancer": ( ", ".join(lb_names) if lb_names else "N/A" ), "TG Name": name, "Port": str(tg.get("Port", "")), "Protocol": tg.get("Protocol", ""), "Registered Instances": ( ", ".join(registered_instances) if registered_instances else "None" ), "Health Check Path": tg.get("HealthCheckPath", "N/A"), }, }) except Exception as e: logger.warning(f"Failed to scan target groups: {str(e)}") return resources def _scan_lambda_functions( self, account_id: str, region: str ) -> List[Dict[str, Any]]: """ Scan Lambda Functions in the specified region. Args: account_id: AWS account ID region: Region to scan Returns: List of Lambda Function resource dictionaries Attributes: Function Name, Runtime, Memory (MB), Timeout (s), Last Modified """ resources = [] lambda_client = self._session.client("lambda", region_name=region) try: paginator = lambda_client.get_paginator("list_functions") for page in paginator.paginate(): for func in page.get("Functions", []): func_name = func.get("FunctionName", "") resources.append({ "account_id": account_id, "region": region, "service": "lambda", "resource_type": "Function", "resource_id": func.get("FunctionArn", func_name), "name": func_name, "attributes": { "Function Name": func_name, "Runtime": func.get("Runtime", "N/A"), "Memory (MB)": str(func.get("MemorySize", "")), "Timeout (s)": str(func.get("Timeout", "")), "Last Modified": func.get("LastModified", ""), }, }) except Exception as e: logger.warning(f"Failed to scan Lambda functions: {str(e)}") return resources def _scan_eks_clusters( self, account_id: str, region: str ) -> List[Dict[str, Any]]: """ Scan EKS Clusters in the specified region. Args: account_id: AWS account ID region: Region to scan Returns: List of EKS Cluster resource dictionaries Attributes: Cluster Name, Version, Status, Endpoint, VPC ID """ resources = [] eks_client = self._session.client("eks", region_name=region) try: # List clusters paginator = eks_client.get_paginator("list_clusters") cluster_names = [] for page in paginator.paginate(): cluster_names.extend(page.get("clusters", [])) # Get details for each cluster for cluster_name in cluster_names: try: response = eks_client.describe_cluster(name=cluster_name) cluster = response.get("cluster", {}) resources.append({ "account_id": account_id, "region": region, "service": "eks", "resource_type": "Cluster", "resource_id": cluster.get("arn", cluster_name), "name": cluster_name, "attributes": { "Cluster Name": cluster_name, "Version": cluster.get("version", ""), "Status": cluster.get("status", ""), "Endpoint": cluster.get("endpoint", ""), "VPC ID": cluster.get("resourcesVpcConfig", {}).get( "vpcId", "" ), }, }) except Exception as e: logger.warning( f"Failed to describe EKS cluster {cluster_name}: {str(e)}" ) except Exception as e: logger.warning(f"Failed to list EKS clusters: {str(e)}") return resources # ========================================================================= # Database and Storage Service Scanners (Task 1.4) # ========================================================================= def _scan_rds_instances( self, account_id: str, region: str ) -> List[Dict[str, Any]]: """ Scan RDS DB Instances in the specified region. Args: account_id: AWS account ID region: Region to scan Returns: List of RDS DB Instance resource dictionaries Attributes (vertical layout - one table per instance): Region, Endpoint, DB instance ID, DB name, Master Username, Port, DB Engine, DB Version, Instance Type, Storage type, Storage, Multi-AZ, Security Group, Deletion Protection, Performance Insights Enabled, CloudWatch Logs """ resources = [] rds_client = self._session.client("rds", region_name=region) try: paginator = rds_client.get_paginator("describe_db_instances") for page in paginator.paginate(): for db in page.get("DBInstances", []): db_id = db.get("DBInstanceIdentifier", "") # Get security groups security_groups = [] for sg in db.get("VpcSecurityGroups", []): security_groups.append(sg.get("VpcSecurityGroupId", "")) # Get CloudWatch logs exports cw_logs = db.get("EnabledCloudwatchLogsExports", []) # Get endpoint endpoint = db.get("Endpoint", {}) endpoint_address = endpoint.get("Address", "") port = endpoint.get("Port", "") resources.append({ "account_id": account_id, "region": region, "service": "rds", "resource_type": "DB Instance", "resource_id": db.get("DBInstanceArn", db_id), "name": db_id, "attributes": { "Region": region, "Endpoint": endpoint_address, "DB instance ID": db_id, "DB name": db.get("DBName", ""), "Master Username": db.get("MasterUsername", ""), "Port": str(port), "DB Engine": db.get("Engine", ""), "DB Version": db.get("EngineVersion", ""), "Instance Type": db.get("DBInstanceClass", ""), "Storage type": db.get("StorageType", ""), "Storage": f"{db.get('AllocatedStorage', '')} GB", "Multi-AZ": "Yes" if db.get("MultiAZ") else "No", "Security Group": ", ".join(security_groups), "Deletion Protection": ( "Yes" if db.get("DeletionProtection") else "No" ), "Performance Insights Enabled": ( "Yes" if db.get("PerformanceInsightsEnabled") else "No" ), "CloudWatch Logs": ( ", ".join(cw_logs) if cw_logs else "N/A" ), }, }) except Exception as e: logger.warning(f"Failed to scan RDS instances: {str(e)}") return resources def _scan_elasticache_clusters( self, account_id: str, region: str ) -> List[Dict[str, Any]]: """ Scan ElastiCache Clusters in the specified region. Args: account_id: AWS account ID region: Region to scan Returns: List of ElastiCache Cluster resource dictionaries Attributes (vertical layout - one table per cluster): Cluster ID, Engine, Engine Version, Node Type, Num Nodes, Status """ resources = [] elasticache_client = self._session.client("elasticache", region_name=region) # Scan cache clusters (Redis/Memcached) try: paginator = elasticache_client.get_paginator("describe_cache_clusters") for page in paginator.paginate(ShowCacheNodeInfo=True): for cluster in page.get("CacheClusters", []): cluster_id = cluster.get("CacheClusterId", "") resources.append({ "account_id": account_id, "region": region, "service": "elasticache", "resource_type": "Cache Cluster", "resource_id": cluster.get("ARN", cluster_id), "name": cluster_id, "attributes": { "Cluster ID": cluster_id, "Engine": cluster.get("Engine", ""), "Engine Version": cluster.get("EngineVersion", ""), "Node Type": cluster.get("CacheNodeType", ""), "Num Nodes": str(cluster.get("NumCacheNodes", 0)), "Status": cluster.get("CacheClusterStatus", ""), }, }) except Exception as e: logger.warning(f"Failed to scan ElastiCache clusters: {str(e)}") # Also scan replication groups (Redis cluster mode) try: paginator = elasticache_client.get_paginator("describe_replication_groups") for page in paginator.paginate(): for rg in page.get("ReplicationGroups", []): rg_id = rg.get("ReplicationGroupId", "") # Count nodes num_nodes = 0 for node_group in rg.get("NodeGroups", []): num_nodes += len(node_group.get("NodeGroupMembers", [])) # Get node type from member clusters node_type = "" member_clusters = rg.get("MemberClusters", []) if member_clusters: try: cluster_response = elasticache_client.describe_cache_clusters( CacheClusterId=member_clusters[0] ) if cluster_response.get("CacheClusters"): node_type = cluster_response["CacheClusters"][0].get( "CacheNodeType", "" ) except Exception: pass resources.append({ "account_id": account_id, "region": region, "service": "elasticache", "resource_type": "Cache Cluster", "resource_id": rg.get("ARN", rg_id), "name": rg_id, "attributes": { "Cluster ID": rg_id, "Engine": "redis", "Engine Version": "", "Node Type": node_type, "Num Nodes": str(num_nodes), "Status": rg.get("Status", ""), }, }) except Exception as e: logger.warning(f"Failed to scan ElastiCache replication groups: {str(e)}") return resources def _scan_s3_buckets( self, account_id: str, region: str ) -> List[Dict[str, Any]]: """ Scan S3 Buckets (global service, scanned once from us-east-1). Args: account_id: AWS account ID region: Region to scan (should be us-east-1 for global service) Returns: List of S3 Bucket resource dictionaries Attributes (horizontal layout): Region, Bucket Name """ resources = [] s3_client = self._session.client("s3", region_name=region) try: response = s3_client.list_buckets() for bucket in response.get("Buckets", []): bucket_name = bucket.get("Name", "") # Get bucket location try: location_response = s3_client.get_bucket_location( Bucket=bucket_name ) bucket_region = ( location_response.get("LocationConstraint") or "us-east-1" ) except Exception: bucket_region = "unknown" resources.append({ "account_id": account_id, "region": "global", "service": "s3", "resource_type": "Bucket", "resource_id": bucket_name, "name": bucket_name, "attributes": { "Region": bucket_region, "Bucket Name": bucket_name, }, }) except Exception as e: logger.warning(f"Failed to scan S3 buckets: {str(e)}") return resources def _scan_s3_event_notifications( self, account_id: str, region: str ) -> List[Dict[str, Any]]: """ Scan S3 Event Notifications (global service, scanned once from us-east-1). Args: account_id: AWS account ID region: Region to scan (should be us-east-1 for global service) Returns: List of S3 Event Notification resource dictionaries Attributes (vertical layout): Bucket, Name, Event Type, Destination type, Destination """ resources = [] s3_client = self._session.client("s3", region_name=region) try: # First get all buckets buckets_response = s3_client.list_buckets() for bucket in buckets_response.get("Buckets", []): bucket_name = bucket.get("Name", "") try: # Get notification configuration notif_response = s3_client.get_bucket_notification_configuration( Bucket=bucket_name ) # Process Lambda function configurations for config in notif_response.get( "LambdaFunctionConfigurations", [] ): config_id = config.get("Id", "Lambda") events = config.get("Events", []) lambda_arn = config.get("LambdaFunctionArn", "") resources.append({ "account_id": account_id, "region": "global", "service": "s3_event_notification", "resource_type": "S3 event notification", "resource_id": f"{bucket_name}/{config_id}", "name": config_id, "attributes": { "Bucket": bucket_name, "Name": config_id, "Event Type": ", ".join(events), "Destination type": "Lambda", "Destination": ( lambda_arn.split(":")[-1] if lambda_arn else "" ), }, }) # Process SQS queue configurations for config in notif_response.get("QueueConfigurations", []): config_id = config.get("Id", "SQS") events = config.get("Events", []) queue_arn = config.get("QueueArn", "") resources.append({ "account_id": account_id, "region": "global", "service": "s3_event_notification", "resource_type": "S3 event notification", "resource_id": f"{bucket_name}/{config_id}", "name": config_id, "attributes": { "Bucket": bucket_name, "Name": config_id, "Event Type": ", ".join(events), "Destination type": "SQS", "Destination": ( queue_arn.split(":")[-1] if queue_arn else "" ), }, }) # Process SNS topic configurations for config in notif_response.get("TopicConfigurations", []): config_id = config.get("Id", "SNS") events = config.get("Events", []) topic_arn = config.get("TopicArn", "") resources.append({ "account_id": account_id, "region": "global", "service": "s3_event_notification", "resource_type": "S3 event notification", "resource_id": f"{bucket_name}/{config_id}", "name": config_id, "attributes": { "Bucket": bucket_name, "Name": config_id, "Event Type": ", ".join(events), "Destination type": "SNS", "Destination": ( topic_arn.split(":")[-1] if topic_arn else "" ), }, }) except Exception as e: # Skip buckets we can't access logger.debug( f"Failed to get notifications for bucket {bucket_name}: " f"{str(e)}" ) except Exception as e: logger.warning(f"Failed to scan S3 event notifications: {str(e)}") return resources # ========================================================================= # Global and Monitoring Service Scanners (Task 1.5) # ========================================================================= def _scan_cloudfront_distributions( self, account_id: str, region: str ) -> List[Dict[str, Any]]: """ Scan CloudFront Distributions (global service). Args: account_id: AWS account ID region: Region to scan (should be us-east-1 for global service) Returns: List of CloudFront Distribution resource dictionaries Attributes (vertical layout - one table per distribution): CloudFront ID, Domain Name, CNAME, Origin Domain Name, Origin Protocol Policy, Viewer Protocol Policy, Allowed HTTP Methods, Cached HTTP Methods """ resources = [] # CloudFront is a global service, always use us-east-1 cf_client = self._session.client("cloudfront", region_name="us-east-1") try: paginator = cf_client.get_paginator("list_distributions") for page in paginator.paginate(): distribution_list = page.get("DistributionList", {}) for dist in distribution_list.get("Items", []): dist_id = dist.get("Id", "") # Get aliases (CNAMEs) aliases = dist.get("Aliases", {}).get("Items", []) # Get origin info origins = dist.get("Origins", {}).get("Items", []) origin_domain = "" origin_protocol = "" if origins: origin = origins[0] origin_domain = origin.get("DomainName", "") custom_origin = origin.get("CustomOriginConfig", {}) if custom_origin: origin_protocol = custom_origin.get( "OriginProtocolPolicy", "" ) else: origin_protocol = "S3" # Get default cache behavior default_behavior = dist.get("DefaultCacheBehavior", {}) viewer_protocol = default_behavior.get( "ViewerProtocolPolicy", "" ) allowed_methods = default_behavior.get( "AllowedMethods", {} ).get("Items", []) cached_methods = default_behavior.get( "AllowedMethods", {} ).get("CachedMethods", {}).get("Items", []) resources.append({ "account_id": account_id, "region": "global", "service": "cloudfront", "resource_type": "Distribution", "resource_id": dist.get("ARN", dist_id), "name": dist_id, "attributes": { "CloudFront ID": dist_id, "Domain Name": dist.get("DomainName", ""), "CNAME": ", ".join(aliases) if aliases else "N/A", "Origin Domain Name": origin_domain, "Origin Protocol Policy": origin_protocol, "Viewer Protocol Policy": viewer_protocol, "Allowed HTTP Methods": ", ".join(allowed_methods), "Cached HTTP Methods": ", ".join(cached_methods), }, }) except Exception as e: logger.warning(f"Failed to scan CloudFront distributions: {str(e)}") return resources def _scan_route53_hosted_zones( self, account_id: str, region: str ) -> List[Dict[str, Any]]: """ Scan Route 53 Hosted Zones (global service). Args: account_id: AWS account ID region: Region to scan (should be us-east-1 for global service) Returns: List of Route 53 Hosted Zone resource dictionaries Attributes (horizontal layout): Zone ID, Name, Type, Record Count """ resources = [] # Route 53 is a global service route53_client = self._session.client("route53", region_name="us-east-1") try: paginator = route53_client.get_paginator("list_hosted_zones") for page in paginator.paginate(): for zone in page.get("HostedZones", []): zone_id = zone.get("Id", "").replace("/hostedzone/", "") zone_name = zone.get("Name", "") # Determine zone type zone_type = ( "Private" if zone.get("Config", {}).get("PrivateZone") else "Public" ) resources.append({ "account_id": account_id, "region": "global", "service": "route53", "resource_type": "Hosted Zone", "resource_id": zone_id, "name": zone_name, "attributes": { "Zone ID": zone_id, "Name": zone_name, "Type": zone_type, "Record Count": str( zone.get("ResourceRecordSetCount", 0) ), }, }) except Exception as e: logger.warning(f"Failed to scan Route 53 hosted zones: {str(e)}") return resources def _scan_acm_certificates( self, account_id: str, region: str ) -> List[Dict[str, Any]]: """ Scan ACM Certificates (regional service). Args: account_id: AWS account ID region: Region to scan Returns: List of ACM Certificate resource dictionaries Attributes (horizontal layout): Domain name, Additional names """ resources = [] # ACM is a regional service acm_client = self._session.client("acm", region_name=region) try: paginator = acm_client.get_paginator("list_certificates") for page in paginator.paginate(): for cert in page.get("CertificateSummaryList", []): domain_name = cert.get("DomainName", "") cert_arn = cert.get("CertificateArn", "") # Get additional names (Subject Alternative Names) additional_names = "" try: cert_detail = acm_client.describe_certificate( CertificateArn=cert_arn ) sans = cert_detail.get("Certificate", {}).get( "SubjectAlternativeNames", [] ) # Filter out the main domain name from SANs additional = [san for san in sans if san != domain_name] additional_names = ", ".join(additional) if additional else "" except Exception: pass resources.append({ "account_id": account_id, "region": region, "service": "acm", "resource_type": "Certificate", "resource_id": cert_arn, "name": domain_name, "attributes": { "Domain name": domain_name, "Additional names": additional_names, }, }) except Exception as e: logger.warning(f"Failed to scan ACM certificates in {region}: {str(e)}") return resources def _scan_waf_web_acls( self, account_id: str, region: str ) -> List[Dict[str, Any]]: """ Scan WAF Web ACLs (global service for CloudFront). Args: account_id: AWS account ID region: Region to scan (should be us-east-1 for global service) Returns: List of WAF Web ACL resource dictionaries Attributes (horizontal layout): WebACL Name, Scope, Rules Count, Associated Resources """ resources = [] # Scan WAFv2 global (CloudFront) Web ACLs wafv2_client = self._session.client("wafv2", region_name="us-east-1") try: # List CloudFront Web ACLs (CLOUDFRONT scope) response = wafv2_client.list_web_acls(Scope="CLOUDFRONT") for acl in response.get("WebACLs", []): acl_name = acl.get("Name", "") acl_id = acl.get("Id", "") acl_arn = acl.get("ARN", "") # Get Web ACL details for rules count rules_count = 0 associated_resources = [] try: acl_response = wafv2_client.get_web_acl( Name=acl_name, Scope="CLOUDFRONT", Id=acl_id, ) web_acl = acl_response.get("WebACL", {}) rules_count = len(web_acl.get("Rules", [])) # Get associated resources resources_response = wafv2_client.list_resources_for_web_acl( WebACLArn=acl_arn ) for resource_arn in resources_response.get("ResourceArns", []): # Extract resource name from ARN resource_name = resource_arn.split("/")[-1] associated_resources.append(resource_name) except Exception as e: logger.debug(f"Failed to get WAF ACL details: {str(e)}") resources.append({ "account_id": account_id, "region": "global", "service": "waf", "resource_type": "Web ACL", "resource_id": acl_arn, "name": acl_name, "attributes": { "WebACL Name": acl_name, "Scope": "CLOUDFRONT", "Rules Count": str(rules_count), "Associated Resources": ( ", ".join(associated_resources) if associated_resources else "None" ), }, }) except Exception as e: logger.warning(f"Failed to scan WAFv2 Web ACLs: {str(e)}") # Also scan regional WAF Web ACLs try: response = wafv2_client.list_web_acls(Scope="REGIONAL") for acl in response.get("WebACLs", []): acl_name = acl.get("Name", "") acl_id = acl.get("Id", "") acl_arn = acl.get("ARN", "") rules_count = 0 associated_resources = [] try: acl_response = wafv2_client.get_web_acl( Name=acl_name, Scope="REGIONAL", Id=acl_id, ) web_acl = acl_response.get("WebACL", {}) rules_count = len(web_acl.get("Rules", [])) resources_response = wafv2_client.list_resources_for_web_acl( WebACLArn=acl_arn ) for resource_arn in resources_response.get("ResourceArns", []): resource_name = resource_arn.split("/")[-1] associated_resources.append(resource_name) except Exception as e: logger.debug(f"Failed to get WAF ACL details: {str(e)}") resources.append({ "account_id": account_id, "region": "global", "service": "waf", "resource_type": "Web ACL", "resource_id": acl_arn, "name": acl_name, "attributes": { "WebACL Name": acl_name, "Scope": "REGIONAL", "Rules Count": str(rules_count), "Associated Resources": ( ", ".join(associated_resources) if associated_resources else "None" ), }, }) except Exception as e: logger.warning(f"Failed to scan regional WAFv2 Web ACLs: {str(e)}") return resources def _scan_sns_topics( self, account_id: str, region: str ) -> List[Dict[str, Any]]: """ Scan SNS Topics in the specified region. Args: account_id: AWS account ID region: Region to scan Returns: List of SNS Topic resource dictionaries Attributes (horizontal layout): Topic Name, Topic Display Name, Subscription Protocol, Subscription Endpoint """ resources = [] sns_client = self._session.client("sns", region_name=region) try: paginator = sns_client.get_paginator("list_topics") for page in paginator.paginate(): for topic in page.get("Topics", []): topic_arn = topic.get("TopicArn", "") topic_name = topic_arn.split(":")[-1] if topic_arn else "" # Get topic attributes display_name = "" try: attrs_response = sns_client.get_topic_attributes( TopicArn=topic_arn ) attrs = attrs_response.get("Attributes", {}) display_name = attrs.get("DisplayName", "") except Exception as e: logger.debug(f"Failed to get topic attributes: {str(e)}") # Get subscriptions subscriptions = [] try: sub_paginator = sns_client.get_paginator( "list_subscriptions_by_topic" ) for sub_page in sub_paginator.paginate(TopicArn=topic_arn): for sub in sub_page.get("Subscriptions", []): protocol = sub.get("Protocol", "") endpoint = sub.get("Endpoint", "") subscriptions.append({ "protocol": protocol, "endpoint": endpoint, }) except Exception as e: logger.debug(f"Failed to get subscriptions: {str(e)}") # Create one entry per subscription, or one entry if no subscriptions if subscriptions: for sub in subscriptions: resources.append({ "account_id": account_id, "region": region, "service": "sns", "resource_type": "Topic", "resource_id": topic_arn, "name": topic_name, "attributes": { "Topic Name": topic_name, "Topic Display Name": display_name, "Subscription Protocol": sub["protocol"], "Subscription Endpoint": sub["endpoint"], }, }) else: resources.append({ "account_id": account_id, "region": region, "service": "sns", "resource_type": "Topic", "resource_id": topic_arn, "name": topic_name, "attributes": { "Topic Name": topic_name, "Topic Display Name": display_name, "Subscription Protocol": "N/A", "Subscription Endpoint": "N/A", }, }) except Exception as e: logger.warning(f"Failed to scan SNS topics: {str(e)}") return resources def _scan_cloudwatch_log_groups( self, account_id: str, region: str ) -> List[Dict[str, Any]]: """ Scan CloudWatch Log Groups in the specified region. Args: account_id: AWS account ID region: Region to scan Returns: List of CloudWatch Log Group resource dictionaries Attributes (horizontal layout): Log Group Name, Retention Days, Stored Bytes, KMS Encryption """ resources = [] logs_client = self._session.client("logs", region_name=region) try: paginator = logs_client.get_paginator("describe_log_groups") for page in paginator.paginate(): for log_group in page.get("logGroups", []): log_group_name = log_group.get("logGroupName", "") # Get retention in days retention = log_group.get("retentionInDays") retention_str = str(retention) if retention else "Never Expire" # Get stored bytes stored_bytes = log_group.get("storedBytes", 0) stored_str = ( f"{stored_bytes / (1024*1024):.2f} MB" if stored_bytes else "0 MB" ) # Check KMS encryption kms_key = log_group.get("kmsKeyId", "") kms_encrypted = "Yes" if kms_key else "No" resources.append({ "account_id": account_id, "region": region, "service": "cloudwatch", "resource_type": "Log Group", "resource_id": log_group.get("arn", log_group_name), "name": log_group_name, "attributes": { "Log Group Name": log_group_name, "Retention Days": retention_str, "Stored Bytes": stored_str, "KMS Encryption": kms_encrypted, }, }) except Exception as e: logger.warning(f"Failed to scan CloudWatch log groups: {str(e)}") return resources def _scan_eventbridge_rules( self, account_id: str, region: str ) -> List[Dict[str, Any]]: """ Scan EventBridge Rules in the specified region. Args: account_id: AWS account ID region: Region to scan Returns: List of EventBridge Rule resource dictionaries Attributes (horizontal layout): Name, Description, Event Bus, State """ resources = [] events_client = self._session.client("events", region_name=region) try: # List event buses first buses_response = events_client.list_event_buses() event_buses = [ bus.get("Name", "default") for bus in buses_response.get("EventBuses", []) ] # If no buses found, use default if not event_buses: event_buses = ["default"] for bus_name in event_buses: try: paginator = events_client.get_paginator("list_rules") for page in paginator.paginate(EventBusName=bus_name): for rule in page.get("Rules", []): rule_name = rule.get("Name", "") resources.append({ "account_id": account_id, "region": region, "service": "eventbridge", "resource_type": "Rule", "resource_id": rule.get("Arn", rule_name), "name": rule_name, "attributes": { "Name": rule_name, "Description": rule.get("Description", ""), "Event Bus": bus_name, "State": rule.get("State", ""), }, }) except Exception as e: logger.debug( f"Failed to list rules for bus {bus_name}: {str(e)}" ) except Exception as e: logger.warning(f"Failed to scan EventBridge rules: {str(e)}") return resources def _scan_cloudtrail_trails( self, account_id: str, region: str ) -> List[Dict[str, Any]]: """ Scan CloudTrail Trails (global service). Args: account_id: AWS account ID region: Region to scan (should be us-east-1 for global service) Returns: List of CloudTrail Trail resource dictionaries Attributes (horizontal layout): Name, Multi-Region Trail, Log File Validation, KMS Encryption """ resources = [] cloudtrail_client = self._session.client( "cloudtrail", region_name="us-east-1" ) try: response = cloudtrail_client.describe_trails() for trail in response.get("trailList", []): trail_name = trail.get("Name", "") # Get multi-region status is_multi_region = trail.get("IsMultiRegionTrail", False) resources.append({ "account_id": account_id, "region": "global", "service": "cloudtrail", "resource_type": "Trail", "resource_id": trail.get("TrailARN", trail_name), "name": trail_name, "attributes": { "Name": trail_name, "Multi-Region Trail": "Yes" if is_multi_region else "No", "Log File Validation": ( "Yes" if trail.get("LogFileValidationEnabled") else "No" ), "KMS Encryption": ( "Yes" if trail.get("KmsKeyId") else "No" ), }, }) except Exception as e: logger.warning(f"Failed to scan CloudTrail trails: {str(e)}") return resources def _scan_config_recorders( self, account_id: str, region: str ) -> List[Dict[str, Any]]: """ Scan AWS Config Recorders in the specified region. Args: account_id: AWS account ID region: Region to scan Returns: List of AWS Config Recorder resource dictionaries Attributes (horizontal layout): Name, Regional Resources, Global Resources, Retention period """ resources = [] config_client = self._session.client("config", region_name=region) try: response = config_client.describe_configuration_recorders() for recorder in response.get("ConfigurationRecorders", []): recorder_name = recorder.get("name", "") # Get recording group settings recording_group = recorder.get("recordingGroup", {}) all_supported = recording_group.get("allSupported", False) include_global = recording_group.get( "includeGlobalResourceTypes", False ) # Get retention period retention_period = "N/A" try: retention_response = ( config_client.describe_retention_configurations() ) for retention in retention_response.get( "RetentionConfigurations", [] ): retention_period = ( f"{retention.get('RetentionPeriodInDays', 'N/A')} days" ) break except Exception: pass resources.append({ "account_id": account_id, "region": region, "service": "config", "resource_type": "Config", "resource_id": recorder_name, "name": recorder_name, "attributes": { "Name": recorder_name, "Regional Resources": "Yes" if all_supported else "No", "Global Resources": "Yes" if include_global else "No", "Retention period": retention_period, }, }) except Exception as e: logger.warning(f"Failed to scan Config recorders: {str(e)}") return resources def parse_arguments() -> argparse.Namespace: """ Parse command-line arguments. Returns: Parsed arguments namespace """ parser = argparse.ArgumentParser( description="CloudShell Scanner - AWS Resource Scanner for CloudShell Environment", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Scan all regions and services python cloudshell_scanner.py # Scan specific regions python cloudshell_scanner.py --regions us-east-1,ap-northeast-1 # Specify output file python cloudshell_scanner.py --output my_scan.json # Scan specific services python cloudshell_scanner.py --services ec2,vpc,rds # Combine options python cloudshell_scanner.py --regions us-east-1 --services ec2,vpc --output scan.json """, ) parser.add_argument( "--regions", type=str, default=None, help="Comma-separated list of AWS regions to scan (default: all regions)", ) parser.add_argument( "--output", type=str, default="scan_result.json", help="Output JSON file path (default: scan_result.json)", ) parser.add_argument( "--services", type=str, default=None, help="Comma-separated list of services to scan (default: all services)", ) parser.add_argument( "--version", action="version", version=f"CloudShell Scanner v{__version__}", ) parser.add_argument( "--verbose", "-v", action="store_true", help="Enable verbose logging", ) parser.add_argument( "--list-services", action="store_true", help="List all supported services and exit", ) return parser.parse_args() def main() -> int: """ Main entry point for the CloudShell Scanner. Returns: Exit code (0 for success, non-zero for failure) """ args = parse_arguments() # Set logging level if args.verbose: logging.getLogger().setLevel(logging.DEBUG) logger.debug("Verbose logging enabled") # List services and exit if requested if args.list_services: print("Supported services:") for service in CloudShellScanner.SUPPORTED_SERVICES: global_marker = " (global)" if service in CloudShellScanner.GLOBAL_SERVICES else "" print(f" - {service}{global_marker}") return 0 # Parse regions regions: Optional[List[str]] = None if args.regions: regions = [r.strip() for r in args.regions.split(",")] logger.info(f"Regions specified: {regions}") # Parse services services: Optional[List[str]] = None if args.services: services = [s.strip() for s in args.services.split(",")] logger.info(f"Services specified: {services}") try: # Initialize scanner print(f"CloudShell Scanner v{__version__}") print("=" * 50) scanner = CloudShellScanner() # Get account info account_id = scanner.get_account_id() print(f"AWS Account: {account_id}") print("=" * 50) # Run scan result = scanner.scan_resources(regions=regions, services=services) # Export results scanner.export_json(result, args.output) # Print summary print("\n" + "=" * 50) print("Scan Summary:") print(f" Account ID: {result['metadata']['account_id']}") print(f" Regions scanned: {len(result['metadata']['regions_scanned'])}") print(f" Services scanned: {len(result['metadata']['services_scanned'])}") print(f" Total resources: {result['metadata']['total_resources']}") print(f" Total errors: {result['metadata']['total_errors']}") print(f" Output file: {args.output}") print("=" * 50) return 0 except KeyboardInterrupt: print("\n\nScan interrupted by user") return 130 except Exception as e: logger.error(f"Scan failed: {e}") if args.verbose: import traceback traceback.print_exc() return 1 if __name__ == "__main__": sys.exit(main())