""" Compute and Storage Service Scanners Scans EKS Clusters, Lambda Functions, S3 Buckets, and S3 Event Notifications. Requirements: - 5.1: Scan compute and storage AWS services using boto3 """ import boto3 from typing import List, Dict, Any import logging from app.scanners.base import ResourceData from app.scanners.utils import retry_with_backoff logger = logging.getLogger(__name__) class ComputeServiceScanner: """Scanner for compute and storage AWS resources""" @staticmethod @retry_with_backoff() def scan_eks_clusters(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ Scan EKS Clusters in the specified region. Attributes (vertical layout - one table per cluster): Cluster Name, Version, Status, Endpoint, VPC ID """ resources = [] eks_client = session.client('eks') try: # List clusters paginator = eks_client.get_paginator('list_clusters') cluster_names = [] for page in paginator.paginate(): cluster_names.extend(page.get('clusters', [])) # Get details for each cluster for cluster_name in cluster_names: try: response = eks_client.describe_cluster(name=cluster_name) cluster = response.get('cluster', {}) resources.append(ResourceData( account_id=account_id, region=region, service='eks', resource_type='Cluster', resource_id=cluster.get('arn', cluster_name), name=cluster_name, attributes={ 'Cluster Name': cluster_name, 'Version': cluster.get('version', ''), 'Status': cluster.get('status', ''), 'Endpoint': cluster.get('endpoint', ''), 'VPC ID': cluster.get('resourcesVpcConfig', {}).get('vpcId', '') } )) except Exception as e: logger.warning(f"Failed to describe EKS cluster {cluster_name}: {str(e)}") except Exception as e: logger.warning(f"Failed to list EKS clusters: {str(e)}") return resources @staticmethod @retry_with_backoff() def scan_lambda_functions(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ Scan Lambda Functions in the specified region. Attributes (horizontal layout): Function Name, Runtime, Memory (MB), Timeout (s), Last Modified """ resources = [] lambda_client = session.client('lambda') try: paginator = lambda_client.get_paginator('list_functions') for page in paginator.paginate(): for func in page.get('Functions', []): func_name = func.get('FunctionName', '') resources.append(ResourceData( account_id=account_id, region=region, service='lambda', resource_type='Function', resource_id=func.get('FunctionArn', func_name), name=func_name, attributes={ 'Function Name': func_name, 'Runtime': func.get('Runtime', 'N/A'), 'Memory (MB)': str(func.get('MemorySize', '')), 'Timeout (s)': str(func.get('Timeout', '')), 'Last Modified': func.get('LastModified', '') } )) except Exception as e: logger.warning(f"Failed to scan Lambda functions: {str(e)}") return resources @staticmethod @retry_with_backoff() def scan_s3_buckets(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ Scan S3 Buckets (global service, scanned once). Attributes (horizontal layout): Region, Bucket Name """ resources = [] s3_client = session.client('s3') try: response = s3_client.list_buckets() for bucket in response.get('Buckets', []): bucket_name = bucket.get('Name', '') # Get bucket location try: location_response = s3_client.get_bucket_location(Bucket=bucket_name) bucket_region = location_response.get('LocationConstraint') or 'us-east-1' except Exception: bucket_region = 'unknown' resources.append(ResourceData( account_id=account_id, region='global', service='s3', resource_type='Bucket', resource_id=bucket_name, name=bucket_name, attributes={ 'Region': bucket_region, 'Bucket Name': bucket_name } )) except Exception as e: logger.warning(f"Failed to scan S3 buckets: {str(e)}") return resources @staticmethod @retry_with_backoff() def scan_s3_event_notifications(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ Scan S3 Event Notifications (global service). Attributes (vertical layout): Bucket, Name, Event Type, Destination type, Destination """ resources = [] s3_client = session.client('s3') try: # First get all buckets buckets_response = s3_client.list_buckets() for bucket in buckets_response.get('Buckets', []): bucket_name = bucket.get('Name', '') try: # Get notification configuration notif_response = s3_client.get_bucket_notification_configuration( Bucket=bucket_name ) # Process Lambda function configurations for config in notif_response.get('LambdaFunctionConfigurations', []): config_id = config.get('Id', 'Lambda') events = config.get('Events', []) lambda_arn = config.get('LambdaFunctionArn', '') resources.append(ResourceData( account_id=account_id, region='global', service='s3_event_notification', resource_type='S3 event notification', resource_id=f"{bucket_name}/{config_id}", name=config_id, attributes={ 'Bucket': bucket_name, 'Name': config_id, 'Event Type': ', '.join(events), 'Destination type': 'Lambda', 'Destination': lambda_arn.split(':')[-1] if lambda_arn else '' } )) # Process SQS queue configurations for config in notif_response.get('QueueConfigurations', []): config_id = config.get('Id', 'SQS') events = config.get('Events', []) queue_arn = config.get('QueueArn', '') resources.append(ResourceData( account_id=account_id, region='global', service='s3_event_notification', resource_type='S3 event notification', resource_id=f"{bucket_name}/{config_id}", name=config_id, attributes={ 'Bucket': bucket_name, 'Name': config_id, 'Event Type': ', '.join(events), 'Destination type': 'SQS', 'Destination': queue_arn.split(':')[-1] if queue_arn else '' } )) # Process SNS topic configurations for config in notif_response.get('TopicConfigurations', []): config_id = config.get('Id', 'SNS') events = config.get('Events', []) topic_arn = config.get('TopicArn', '') resources.append(ResourceData( account_id=account_id, region='global', service='s3_event_notification', resource_type='S3 event notification', resource_id=f"{bucket_name}/{config_id}", name=config_id, attributes={ 'Bucket': bucket_name, 'Name': config_id, 'Event Type': ', '.join(events), 'Destination type': 'SNS', 'Destination': topic_arn.split(':')[-1] if topic_arn else '' } )) except Exception as e: # Skip buckets we can't access logger.debug(f"Failed to get notifications for bucket {bucket_name}: {str(e)}") except Exception as e: logger.warning(f"Failed to scan S3 event notifications: {str(e)}") return resources