ec2.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. """
  2. EC2 Related Resource Scanners
  3. Scans EC2 Instances (with EBS volumes, AMI info) and Elastic IPs.
  4. Requirements:
  5. - 5.1: Scan EC2-related AWS services using boto3
  6. """
  7. import boto3
  8. from typing import List, Dict, Any
  9. import logging
  10. from app.scanners.base import ResourceData
  11. from app.scanners.utils import retry_with_backoff
  12. logger = logging.getLogger(__name__)
  13. class EC2ServiceScanner:
  14. """Scanner for EC2-related AWS resources"""
  15. @staticmethod
  16. def _get_name_from_tags(tags: List[Dict[str, str]], default: str = '') -> str:
  17. """Extract Name tag value from tags list"""
  18. if not tags:
  19. return default
  20. for tag in tags:
  21. if tag.get('Key') == 'Name':
  22. return tag.get('Value', default)
  23. return default
  24. @staticmethod
  25. @retry_with_backoff()
  26. def scan_ec2_instances(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  27. """
  28. Scan EC2 Instances in the specified region.
  29. Attributes (vertical layout - one table per instance):
  30. Name, Instance ID, Instance Type, AZ, AMI,
  31. Public IP, Public DNS, Private IP, VPC ID, Subnet ID,
  32. Key, Security Groups, EBS Type, EBS Size, Encryption, Other Requirement
  33. """
  34. resources = []
  35. ec2_client = session.client('ec2')
  36. paginator = ec2_client.get_paginator('describe_instances')
  37. for page in paginator.paginate():
  38. for reservation in page.get('Reservations', []):
  39. for instance in reservation.get('Instances', []):
  40. # Skip terminated instances
  41. state = instance.get('State', {}).get('Name', '')
  42. if state == 'terminated':
  43. continue
  44. name = EC2ServiceScanner._get_name_from_tags(
  45. instance.get('Tags', []),
  46. instance['InstanceId']
  47. )
  48. # Get security groups
  49. security_groups = []
  50. for sg in instance.get('SecurityGroups', []):
  51. security_groups.append(sg.get('GroupName', sg.get('GroupId', '')))
  52. # Get EBS volume info
  53. ebs_type = ''
  54. ebs_size = ''
  55. ebs_encrypted = ''
  56. for block_device in instance.get('BlockDeviceMappings', []):
  57. ebs = block_device.get('Ebs', {})
  58. if ebs.get('VolumeId'):
  59. # Get volume details
  60. try:
  61. vol_response = ec2_client.describe_volumes(
  62. VolumeIds=[ebs['VolumeId']]
  63. )
  64. if vol_response.get('Volumes'):
  65. volume = vol_response['Volumes'][0]
  66. ebs_type = volume.get('VolumeType', '')
  67. ebs_size = f"{volume.get('Size', '')} GB"
  68. ebs_encrypted = 'Yes' if volume.get('Encrypted') else 'No'
  69. except Exception as e:
  70. logger.warning(f"Failed to get volume details: {str(e)}")
  71. break # Only get first volume for simplicity
  72. resources.append(ResourceData(
  73. account_id=account_id,
  74. region=region,
  75. service='ec2',
  76. resource_type='Instance',
  77. resource_id=instance['InstanceId'],
  78. name=name,
  79. attributes={
  80. 'Name': name,
  81. 'Instance ID': instance['InstanceId'],
  82. 'Instance Type': instance.get('InstanceType', ''),
  83. 'AZ': instance.get('Placement', {}).get('AvailabilityZone', ''),
  84. 'AMI': instance.get('ImageId', ''),
  85. 'Public IP': instance.get('PublicIpAddress', ''),
  86. 'Public DNS': instance.get('PublicDnsName', ''),
  87. 'Private IP': instance.get('PrivateIpAddress', ''),
  88. 'VPC ID': instance.get('VpcId', ''),
  89. 'Subnet ID': instance.get('SubnetId', ''),
  90. 'Key': instance.get('KeyName', ''),
  91. 'Security Groups': ', '.join(security_groups),
  92. 'EBS Type': ebs_type,
  93. 'EBS Size': ebs_size,
  94. 'Encryption': ebs_encrypted,
  95. 'Other Requirement': ''
  96. }
  97. ))
  98. return resources
  99. @staticmethod
  100. @retry_with_backoff()
  101. def scan_elastic_ips(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  102. """
  103. Scan Elastic IPs in the specified region.
  104. Attributes (horizontal layout): Name, Elastic IP
  105. """
  106. resources = []
  107. ec2_client = session.client('ec2')
  108. response = ec2_client.describe_addresses()
  109. for eip in response.get('Addresses', []):
  110. public_ip = eip.get('PublicIp', '')
  111. name = EC2ServiceScanner._get_name_from_tags(
  112. eip.get('Tags', []),
  113. public_ip or eip.get('AllocationId', '')
  114. )
  115. resources.append(ResourceData(
  116. account_id=account_id,
  117. region=region,
  118. service='elastic_ip',
  119. resource_type='Elastic IP',
  120. resource_id=eip.get('AllocationId', public_ip),
  121. name=name,
  122. attributes={
  123. 'Name': name,
  124. 'Elastic IP': public_ip
  125. }
  126. ))
  127. return resources