| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246 |
- """
- Compute and Storage Service Scanners
- Scans EKS Clusters, Lambda Functions, S3 Buckets, and S3 Event Notifications.
- Requirements:
- - 5.1: Scan compute and storage AWS services using boto3
- """
- import boto3
- from typing import List, Dict, Any
- import logging
- from app.scanners.base import ResourceData
- from app.scanners.utils import retry_with_backoff
- logger = logging.getLogger(__name__)
- class ComputeServiceScanner:
- """Scanner for compute and storage AWS resources"""
-
- @staticmethod
- @retry_with_backoff()
- def scan_eks_clusters(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """
- Scan EKS Clusters in the specified region.
-
- Attributes (vertical layout - one table per cluster):
- Cluster Name, Version, Status, Endpoint, VPC ID
- """
- resources = []
- eks_client = session.client('eks')
-
- try:
- # List clusters
- paginator = eks_client.get_paginator('list_clusters')
- cluster_names = []
- for page in paginator.paginate():
- cluster_names.extend(page.get('clusters', []))
-
- # Get details for each cluster
- for cluster_name in cluster_names:
- try:
- response = eks_client.describe_cluster(name=cluster_name)
- cluster = response.get('cluster', {})
-
- resources.append(ResourceData(
- account_id=account_id,
- region=region,
- service='eks',
- resource_type='Cluster',
- resource_id=cluster.get('arn', cluster_name),
- name=cluster_name,
- attributes={
- 'Cluster Name': cluster_name,
- 'Version': cluster.get('version', ''),
- 'Status': cluster.get('status', ''),
- 'Endpoint': cluster.get('endpoint', ''),
- 'VPC ID': cluster.get('resourcesVpcConfig', {}).get('vpcId', '')
- }
- ))
- except Exception as e:
- logger.warning(f"Failed to describe EKS cluster {cluster_name}: {str(e)}")
- except Exception as e:
- logger.warning(f"Failed to list EKS clusters: {str(e)}")
-
- return resources
-
- @staticmethod
- @retry_with_backoff()
- def scan_lambda_functions(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """
- Scan Lambda Functions in the specified region.
-
- Attributes (horizontal layout):
- Function Name, Runtime, Memory (MB), Timeout (s), Last Modified
- """
- resources = []
- lambda_client = session.client('lambda')
-
- try:
- paginator = lambda_client.get_paginator('list_functions')
- for page in paginator.paginate():
- for func in page.get('Functions', []):
- func_name = func.get('FunctionName', '')
-
- resources.append(ResourceData(
- account_id=account_id,
- region=region,
- service='lambda',
- resource_type='Function',
- resource_id=func.get('FunctionArn', func_name),
- name=func_name,
- attributes={
- 'Function Name': func_name,
- 'Runtime': func.get('Runtime', 'N/A'),
- 'Memory (MB)': str(func.get('MemorySize', '')),
- 'Timeout (s)': str(func.get('Timeout', '')),
- 'Last Modified': func.get('LastModified', '')
- }
- ))
- except Exception as e:
- logger.warning(f"Failed to scan Lambda functions: {str(e)}")
-
- return resources
-
- @staticmethod
- @retry_with_backoff()
- def scan_s3_buckets(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """
- Scan S3 Buckets (global service, scanned once).
-
- Attributes (horizontal layout): Region, Bucket Name
- """
- resources = []
- s3_client = session.client('s3')
-
- try:
- response = s3_client.list_buckets()
- for bucket in response.get('Buckets', []):
- bucket_name = bucket.get('Name', '')
-
- # Get bucket location
- try:
- location_response = s3_client.get_bucket_location(Bucket=bucket_name)
- bucket_region = location_response.get('LocationConstraint') or 'us-east-1'
- except Exception:
- bucket_region = 'unknown'
-
- resources.append(ResourceData(
- account_id=account_id,
- region='global',
- service='s3',
- resource_type='Bucket',
- resource_id=bucket_name,
- name=bucket_name,
- attributes={
- 'Region': bucket_region,
- 'Bucket Name': bucket_name
- }
- ))
- except Exception as e:
- logger.warning(f"Failed to scan S3 buckets: {str(e)}")
-
- return resources
-
- @staticmethod
- @retry_with_backoff()
- def scan_s3_event_notifications(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """
- Scan S3 Event Notifications (global service).
-
- Attributes (vertical layout):
- Bucket, Name, Event Type, Destination type, Destination
- """
- resources = []
- s3_client = session.client('s3')
-
- try:
- # First get all buckets
- buckets_response = s3_client.list_buckets()
-
- for bucket in buckets_response.get('Buckets', []):
- bucket_name = bucket.get('Name', '')
-
- try:
- # Get notification configuration
- notif_response = s3_client.get_bucket_notification_configuration(
- Bucket=bucket_name
- )
-
- # Process Lambda function configurations
- for config in notif_response.get('LambdaFunctionConfigurations', []):
- config_id = config.get('Id', 'Lambda')
- events = config.get('Events', [])
- lambda_arn = config.get('LambdaFunctionArn', '')
-
- resources.append(ResourceData(
- account_id=account_id,
- region='global',
- service='s3_event_notification',
- resource_type='S3 event notification',
- resource_id=f"{bucket_name}/{config_id}",
- name=config_id,
- attributes={
- 'Bucket': bucket_name,
- 'Name': config_id,
- 'Event Type': ', '.join(events),
- 'Destination type': 'Lambda',
- 'Destination': lambda_arn.split(':')[-1] if lambda_arn else ''
- }
- ))
-
- # Process SQS queue configurations
- for config in notif_response.get('QueueConfigurations', []):
- config_id = config.get('Id', 'SQS')
- events = config.get('Events', [])
- queue_arn = config.get('QueueArn', '')
-
- resources.append(ResourceData(
- account_id=account_id,
- region='global',
- service='s3_event_notification',
- resource_type='S3 event notification',
- resource_id=f"{bucket_name}/{config_id}",
- name=config_id,
- attributes={
- 'Bucket': bucket_name,
- 'Name': config_id,
- 'Event Type': ', '.join(events),
- 'Destination type': 'SQS',
- 'Destination': queue_arn.split(':')[-1] if queue_arn else ''
- }
- ))
-
- # Process SNS topic configurations
- for config in notif_response.get('TopicConfigurations', []):
- config_id = config.get('Id', 'SNS')
- events = config.get('Events', [])
- topic_arn = config.get('TopicArn', '')
-
- resources.append(ResourceData(
- account_id=account_id,
- region='global',
- service='s3_event_notification',
- resource_type='S3 event notification',
- resource_id=f"{bucket_name}/{config_id}",
- name=config_id,
- attributes={
- 'Bucket': bucket_name,
- 'Name': config_id,
- 'Event Type': ', '.join(events),
- 'Destination type': 'SNS',
- 'Destination': topic_arn.split(':')[-1] if topic_arn else ''
- }
- ))
-
- except Exception as e:
- # Skip buckets we can't access
- logger.debug(f"Failed to get notifications for bucket {bucket_name}: {str(e)}")
-
- except Exception as e:
- logger.warning(f"Failed to scan S3 event notifications: {str(e)}")
-
- return resources
|