elb.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. """
  2. Auto Scaling and ELB Resource Scanners
  3. Scans Auto Scaling Groups (with Launch Templates), Load Balancers (ALB, NLB, CLB),
  4. and Target Groups.
  5. Requirements:
  6. - 5.1: Scan Auto Scaling and ELB AWS services using boto3
  7. """
  8. import boto3
  9. from typing import List, Dict, Any
  10. import logging
  11. from app.scanners.base import ResourceData
  12. from app.scanners.utils import retry_with_backoff
  13. logger = logging.getLogger(__name__)
  14. class ELBServiceScanner:
  15. """Scanner for Auto Scaling and ELB AWS resources"""
  16. @staticmethod
  17. def _get_name_from_tags(tags: List[Dict[str, str]], default: str = '') -> str:
  18. """Extract Name tag value from tags list"""
  19. if not tags:
  20. return default
  21. for tag in tags:
  22. if tag.get('Key') == 'Name':
  23. return tag.get('Value', default)
  24. return default
  25. @staticmethod
  26. @retry_with_backoff()
  27. def scan_autoscaling_groups(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  28. """
  29. Scan Auto Scaling Groups in the specified region.
  30. Attributes (vertical layout - one table per ASG):
  31. Name, Launch Template, AMI, Instance type, Key, Target Groups,
  32. Desired, Min, Max, Scaling Policy
  33. """
  34. resources = []
  35. asg_client = session.client('autoscaling')
  36. ec2_client = session.client('ec2')
  37. paginator = asg_client.get_paginator('describe_auto_scaling_groups')
  38. for page in paginator.paginate():
  39. for asg in page.get('AutoScalingGroups', []):
  40. name = asg.get('AutoScalingGroupName', '')
  41. # Get Launch Template info
  42. launch_template_name = ''
  43. ami = ''
  44. instance_type = ''
  45. key_name = ''
  46. # Check for Launch Template
  47. lt = asg.get('LaunchTemplate')
  48. if lt:
  49. launch_template_name = lt.get('LaunchTemplateName', lt.get('LaunchTemplateId', ''))
  50. # Get Launch Template details
  51. try:
  52. lt_response = ec2_client.describe_launch_template_versions(
  53. LaunchTemplateId=lt.get('LaunchTemplateId', ''),
  54. Versions=[lt.get('Version', '$Latest')]
  55. )
  56. if lt_response.get('LaunchTemplateVersions'):
  57. lt_data = lt_response['LaunchTemplateVersions'][0].get('LaunchTemplateData', {})
  58. ami = lt_data.get('ImageId', '')
  59. instance_type = lt_data.get('InstanceType', '')
  60. key_name = lt_data.get('KeyName', '')
  61. except Exception as e:
  62. logger.warning(f"Failed to get launch template details: {str(e)}")
  63. # Check for Mixed Instances Policy
  64. mip = asg.get('MixedInstancesPolicy')
  65. if mip:
  66. lt_spec = mip.get('LaunchTemplate', {}).get('LaunchTemplateSpecification', {})
  67. if lt_spec:
  68. launch_template_name = lt_spec.get('LaunchTemplateName', lt_spec.get('LaunchTemplateId', ''))
  69. # Check for Launch Configuration (legacy)
  70. lc_name = asg.get('LaunchConfigurationName')
  71. if lc_name and not launch_template_name:
  72. launch_template_name = f"LC: {lc_name}"
  73. try:
  74. lc_response = asg_client.describe_launch_configurations(
  75. LaunchConfigurationNames=[lc_name]
  76. )
  77. if lc_response.get('LaunchConfigurations'):
  78. lc = lc_response['LaunchConfigurations'][0]
  79. ami = lc.get('ImageId', '')
  80. instance_type = lc.get('InstanceType', '')
  81. key_name = lc.get('KeyName', '')
  82. except Exception as e:
  83. logger.warning(f"Failed to get launch configuration details: {str(e)}")
  84. # Get Target Groups
  85. target_groups = []
  86. for tg_arn in asg.get('TargetGroupARNs', []):
  87. # Extract target group name from ARN
  88. tg_name = tg_arn.split('/')[-2] if '/' in tg_arn else tg_arn
  89. target_groups.append(tg_name)
  90. # Get Scaling Policies
  91. scaling_policies = []
  92. try:
  93. policy_response = asg_client.describe_policies(
  94. AutoScalingGroupName=name
  95. )
  96. for policy in policy_response.get('ScalingPolicies', []):
  97. scaling_policies.append(policy.get('PolicyName', ''))
  98. except Exception as e:
  99. logger.warning(f"Failed to get scaling policies: {str(e)}")
  100. resources.append(ResourceData(
  101. account_id=account_id,
  102. region=region,
  103. service='autoscaling',
  104. resource_type='Auto Scaling Group',
  105. resource_id=asg.get('AutoScalingGroupARN', name),
  106. name=name,
  107. attributes={
  108. 'Name': name,
  109. 'Launch Template': launch_template_name,
  110. 'AMI': ami,
  111. 'Instance type': instance_type,
  112. 'Key': key_name,
  113. 'Target Groups': ', '.join(target_groups) if target_groups else 'N/A',
  114. 'Desired': str(asg.get('DesiredCapacity', 0)),
  115. 'Min': str(asg.get('MinSize', 0)),
  116. 'Max': str(asg.get('MaxSize', 0)),
  117. 'Scaling Policy': ', '.join(scaling_policies) if scaling_policies else 'N/A'
  118. }
  119. ))
  120. return resources
  121. @staticmethod
  122. @retry_with_backoff()
  123. def scan_load_balancers(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  124. """
  125. Scan Load Balancers (ALB, NLB, CLB) in the specified region.
  126. Attributes (vertical layout - one table per LB):
  127. Name, Type, DNS, Scheme, VPC, Availability Zones, Subnet, Security Groups
  128. """
  129. resources = []
  130. # Scan ALB/NLB using elbv2
  131. elbv2_client = session.client('elbv2')
  132. try:
  133. paginator = elbv2_client.get_paginator('describe_load_balancers')
  134. for page in paginator.paginate():
  135. for lb in page.get('LoadBalancers', []):
  136. name = lb.get('LoadBalancerName', '')
  137. lb_type = lb.get('Type', 'application')
  138. # Get availability zones and subnets
  139. azs = []
  140. subnets = []
  141. for az_info in lb.get('AvailabilityZones', []):
  142. azs.append(az_info.get('ZoneName', ''))
  143. if az_info.get('SubnetId'):
  144. subnets.append(az_info['SubnetId'])
  145. # Get security groups (only for ALB)
  146. security_groups = lb.get('SecurityGroups', [])
  147. resources.append(ResourceData(
  148. account_id=account_id,
  149. region=region,
  150. service='elb',
  151. resource_type='Load Balancer',
  152. resource_id=lb.get('LoadBalancerArn', name),
  153. name=name,
  154. attributes={
  155. 'Name': name,
  156. 'Type': lb_type.upper(),
  157. 'DNS': lb.get('DNSName', ''),
  158. 'Scheme': lb.get('Scheme', ''),
  159. 'VPC': lb.get('VpcId', ''),
  160. 'Availability Zones': ', '.join(azs),
  161. 'Subnet': ', '.join(subnets),
  162. 'Security Groups': ', '.join(security_groups) if security_groups else 'N/A'
  163. }
  164. ))
  165. except Exception as e:
  166. logger.warning(f"Failed to scan ALB/NLB: {str(e)}")
  167. # Scan Classic Load Balancers
  168. elb_client = session.client('elb')
  169. try:
  170. paginator = elb_client.get_paginator('describe_load_balancers')
  171. for page in paginator.paginate():
  172. for lb in page.get('LoadBalancerDescriptions', []):
  173. name = lb.get('LoadBalancerName', '')
  174. resources.append(ResourceData(
  175. account_id=account_id,
  176. region=region,
  177. service='elb',
  178. resource_type='Load Balancer',
  179. resource_id=name,
  180. name=name,
  181. attributes={
  182. 'Name': name,
  183. 'Type': 'CLASSIC',
  184. 'DNS': lb.get('DNSName', ''),
  185. 'Scheme': lb.get('Scheme', ''),
  186. 'VPC': lb.get('VPCId', ''),
  187. 'Availability Zones': ', '.join(lb.get('AvailabilityZones', [])),
  188. 'Subnet': ', '.join(lb.get('Subnets', [])),
  189. 'Security Groups': ', '.join(lb.get('SecurityGroups', []))
  190. }
  191. ))
  192. except Exception as e:
  193. logger.warning(f"Failed to scan Classic ELB: {str(e)}")
  194. return resources
  195. @staticmethod
  196. @retry_with_backoff()
  197. def scan_target_groups(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  198. """
  199. Scan Target Groups in the specified region.
  200. Attributes (vertical layout - one table per TG):
  201. Load Balancer, TG Name, Port, Protocol, Registered Instances, Health Check Path
  202. """
  203. resources = []
  204. elbv2_client = session.client('elbv2')
  205. try:
  206. paginator = elbv2_client.get_paginator('describe_target_groups')
  207. for page in paginator.paginate():
  208. for tg in page.get('TargetGroups', []):
  209. name = tg.get('TargetGroupName', '')
  210. tg_arn = tg.get('TargetGroupArn', '')
  211. # Get associated load balancers
  212. lb_arns = tg.get('LoadBalancerArns', [])
  213. lb_names = []
  214. for lb_arn in lb_arns:
  215. # Extract LB name from ARN
  216. lb_name = lb_arn.split('/')[-2] if '/' in lb_arn else lb_arn
  217. lb_names.append(lb_name)
  218. # Get registered targets
  219. registered_instances = []
  220. try:
  221. targets_response = elbv2_client.describe_target_health(
  222. TargetGroupArn=tg_arn
  223. )
  224. for target in targets_response.get('TargetHealthDescriptions', []):
  225. target_id = target.get('Target', {}).get('Id', '')
  226. if target_id:
  227. registered_instances.append(target_id)
  228. except Exception as e:
  229. logger.warning(f"Failed to get target health: {str(e)}")
  230. resources.append(ResourceData(
  231. account_id=account_id,
  232. region=region,
  233. service='target_group',
  234. resource_type='Target Group',
  235. resource_id=tg_arn,
  236. name=name,
  237. attributes={
  238. 'Load Balancer': ', '.join(lb_names) if lb_names else 'N/A',
  239. 'TG Name': name,
  240. 'Port': str(tg.get('Port', '')),
  241. 'Protocol': tg.get('Protocol', ''),
  242. 'Registered Instances': ', '.join(registered_instances) if registered_instances else 'None',
  243. 'Health Check Path': tg.get('HealthCheckPath', 'N/A')
  244. }
  245. ))
  246. except Exception as e:
  247. logger.warning(f"Failed to scan target groups: {str(e)}")
  248. return resources