| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- """
- EC2 Related Resource Scanners
- Scans EC2 Instances (with EBS volumes, AMI info) and Elastic IPs.
- Requirements:
- - 5.1: Scan EC2-related AWS services using boto3
- """
- import boto3
- from typing import List, Dict, Any
- import logging
- from app.scanners.base import ResourceData
- from app.scanners.utils import retry_with_backoff
- logger = logging.getLogger(__name__)
- class EC2ServiceScanner:
- """Scanner for EC2-related AWS resources"""
-
- @staticmethod
- def _get_name_from_tags(tags: List[Dict[str, str]], default: str = '') -> str:
- """Extract Name tag value from tags list"""
- if not tags:
- return default
- for tag in tags:
- if tag.get('Key') == 'Name':
- return tag.get('Value', default)
- return default
-
- @staticmethod
- @retry_with_backoff()
- def scan_ec2_instances(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """
- Scan EC2 Instances in the specified region.
-
- Attributes (vertical layout - one table per instance):
- Name, Instance ID, Instance Type, AZ, AMI,
- Public IP, Public DNS, Private IP, VPC ID, Subnet ID,
- Key, Security Groups, EBS Type, EBS Size, Encryption, Other Requirement
- """
- resources = []
- ec2_client = session.client('ec2')
-
- paginator = ec2_client.get_paginator('describe_instances')
- for page in paginator.paginate():
- for reservation in page.get('Reservations', []):
- for instance in reservation.get('Instances', []):
- # Skip terminated instances
- state = instance.get('State', {}).get('Name', '')
- if state == 'terminated':
- continue
-
- name = EC2ServiceScanner._get_name_from_tags(
- instance.get('Tags', []),
- instance['InstanceId']
- )
-
- # Get security groups
- security_groups = []
- for sg in instance.get('SecurityGroups', []):
- security_groups.append(sg.get('GroupName', sg.get('GroupId', '')))
-
- # Get EBS volume info
- ebs_type = ''
- ebs_size = ''
- ebs_encrypted = ''
-
- for block_device in instance.get('BlockDeviceMappings', []):
- ebs = block_device.get('Ebs', {})
- if ebs.get('VolumeId'):
- # Get volume details
- try:
- vol_response = ec2_client.describe_volumes(
- VolumeIds=[ebs['VolumeId']]
- )
- if vol_response.get('Volumes'):
- volume = vol_response['Volumes'][0]
- ebs_type = volume.get('VolumeType', '')
- ebs_size = f"{volume.get('Size', '')} GB"
- ebs_encrypted = 'Yes' if volume.get('Encrypted') else 'No'
- except Exception as e:
- logger.warning(f"Failed to get volume details: {str(e)}")
- break # Only get first volume for simplicity
-
- resources.append(ResourceData(
- account_id=account_id,
- region=region,
- service='ec2',
- resource_type='Instance',
- resource_id=instance['InstanceId'],
- name=name,
- attributes={
- 'Name': name,
- 'Instance ID': instance['InstanceId'],
- 'Instance Type': instance.get('InstanceType', ''),
- 'AZ': instance.get('Placement', {}).get('AvailabilityZone', ''),
- 'AMI': instance.get('ImageId', ''),
- 'Public IP': instance.get('PublicIpAddress', ''),
- 'Public DNS': instance.get('PublicDnsName', ''),
- 'Private IP': instance.get('PrivateIpAddress', ''),
- 'VPC ID': instance.get('VpcId', ''),
- 'Subnet ID': instance.get('SubnetId', ''),
- 'Key': instance.get('KeyName', ''),
- 'Security Groups': ', '.join(security_groups),
- 'EBS Type': ebs_type,
- 'EBS Size': ebs_size,
- 'Encryption': ebs_encrypted,
- 'Other Requirement': ''
- }
- ))
-
- return resources
-
- @staticmethod
- @retry_with_backoff()
- def scan_elastic_ips(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """
- Scan Elastic IPs in the specified region.
-
- Attributes (horizontal layout): Name, Elastic IP
- """
- resources = []
- ec2_client = session.client('ec2')
-
- response = ec2_client.describe_addresses()
- for eip in response.get('Addresses', []):
- public_ip = eip.get('PublicIp', '')
- name = EC2ServiceScanner._get_name_from_tags(
- eip.get('Tags', []),
- public_ip or eip.get('AllocationId', '')
- )
-
- resources.append(ResourceData(
- account_id=account_id,
- region=region,
- service='elastic_ip',
- resource_type='Elastic IP',
- resource_id=eip.get('AllocationId', public_ip),
- name=name,
- attributes={
- 'Name': name,
- 'Elastic IP': public_ip
- }
- ))
-
- return resources
|