""" EC2 Related Resource Scanners Scans EC2 Instances (with EBS volumes, AMI info) and Elastic IPs. Requirements: - 5.1: Scan EC2-related AWS services using boto3 """ import boto3 from typing import List, Dict, Any import logging from app.scanners.base import ResourceData from app.scanners.utils import retry_with_backoff logger = logging.getLogger(__name__) class EC2ServiceScanner: """Scanner for EC2-related AWS resources""" @staticmethod def _get_name_from_tags(tags: List[Dict[str, str]], default: str = '') -> str: """Extract Name tag value from tags list""" if not tags: return default for tag in tags: if tag.get('Key') == 'Name': return tag.get('Value', default) return default @staticmethod @retry_with_backoff() def scan_ec2_instances(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ Scan EC2 Instances in the specified region. Attributes (vertical layout - one table per instance): Name, Instance ID, Instance Type, AZ, AMI, Public IP, Public DNS, Private IP, VPC ID, Subnet ID, Key, Security Groups, EBS Type, EBS Size, Encryption, Other Requirement """ resources = [] ec2_client = session.client('ec2') paginator = ec2_client.get_paginator('describe_instances') for page in paginator.paginate(): for reservation in page.get('Reservations', []): for instance in reservation.get('Instances', []): # Skip terminated instances state = instance.get('State', {}).get('Name', '') if state == 'terminated': continue name = EC2ServiceScanner._get_name_from_tags( instance.get('Tags', []), instance['InstanceId'] ) # Get security groups security_groups = [] for sg in instance.get('SecurityGroups', []): security_groups.append(sg.get('GroupName', sg.get('GroupId', ''))) # Get EBS volume info ebs_type = '' ebs_size = '' ebs_encrypted = '' for block_device in instance.get('BlockDeviceMappings', []): ebs = block_device.get('Ebs', {}) if ebs.get('VolumeId'): # Get volume details try: vol_response = ec2_client.describe_volumes( VolumeIds=[ebs['VolumeId']] ) if vol_response.get('Volumes'): volume = vol_response['Volumes'][0] ebs_type = volume.get('VolumeType', '') ebs_size = f"{volume.get('Size', '')} GB" ebs_encrypted = 'Yes' if volume.get('Encrypted') else 'No' except Exception as e: logger.warning(f"Failed to get volume details: {str(e)}") break # Only get first volume for simplicity resources.append(ResourceData( account_id=account_id, region=region, service='ec2', resource_type='Instance', resource_id=instance['InstanceId'], name=name, attributes={ 'Name': name, 'Instance ID': instance['InstanceId'], 'Instance Type': instance.get('InstanceType', ''), 'AZ': instance.get('Placement', {}).get('AvailabilityZone', ''), 'AMI': instance.get('ImageId', ''), 'Public IP': instance.get('PublicIpAddress', ''), 'Public DNS': instance.get('PublicDnsName', ''), 'Private IP': instance.get('PrivateIpAddress', ''), 'VPC ID': instance.get('VpcId', ''), 'Subnet ID': instance.get('SubnetId', ''), 'Key': instance.get('KeyName', ''), 'Security Groups': ', '.join(security_groups), 'EBS Type': ebs_type, 'EBS Size': ebs_size, 'Encryption': ebs_encrypted, 'Other Requirement': '' } )) return resources @staticmethod @retry_with_backoff() def scan_elastic_ips(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ Scan Elastic IPs in the specified region. Attributes (horizontal layout): Name, Elastic IP """ resources = [] ec2_client = session.client('ec2') response = ec2_client.describe_addresses() for eip in response.get('Addresses', []): public_ip = eip.get('PublicIp', '') name = EC2ServiceScanner._get_name_from_tags( eip.get('Tags', []), public_ip or eip.get('AllocationId', '') ) resources.append(ResourceData( account_id=account_id, region=region, service='elastic_ip', resource_type='Elastic IP', resource_id=eip.get('AllocationId', public_ip), name=name, attributes={ 'Name': name, 'Elastic IP': public_ip } )) return resources