| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285 |
- """
- Auto Scaling and ELB Resource Scanners
- Scans Auto Scaling Groups (with Launch Templates), Load Balancers (ALB, NLB, CLB),
- and Target Groups.
- Requirements:
- - 5.1: Scan Auto Scaling and ELB AWS services using boto3
- """
- import boto3
- from typing import List, Dict, Any
- import logging
- from app.scanners.base import ResourceData
- from app.scanners.utils import retry_with_backoff
- logger = logging.getLogger(__name__)
- class ELBServiceScanner:
- """Scanner for Auto Scaling and ELB AWS resources"""
-
- @staticmethod
- def _get_name_from_tags(tags: List[Dict[str, str]], default: str = '') -> str:
- """Extract Name tag value from tags list"""
- if not tags:
- return default
- for tag in tags:
- if tag.get('Key') == 'Name':
- return tag.get('Value', default)
- return default
-
- @staticmethod
- @retry_with_backoff()
- def scan_autoscaling_groups(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """
- Scan Auto Scaling Groups in the specified region.
-
- Attributes (vertical layout - one table per ASG):
- Name, Launch Template, AMI, Instance type, Key, Target Groups,
- Desired, Min, Max, Scaling Policy
- """
- resources = []
- asg_client = session.client('autoscaling')
- ec2_client = session.client('ec2')
-
- paginator = asg_client.get_paginator('describe_auto_scaling_groups')
- for page in paginator.paginate():
- for asg in page.get('AutoScalingGroups', []):
- name = asg.get('AutoScalingGroupName', '')
-
- # Get Launch Template info
- launch_template_name = ''
- ami = ''
- instance_type = ''
- key_name = ''
-
- # Check for Launch Template
- lt = asg.get('LaunchTemplate')
- if lt:
- launch_template_name = lt.get('LaunchTemplateName', lt.get('LaunchTemplateId', ''))
- # Get Launch Template details
- try:
- lt_response = ec2_client.describe_launch_template_versions(
- LaunchTemplateId=lt.get('LaunchTemplateId', ''),
- Versions=[lt.get('Version', '$Latest')]
- )
- if lt_response.get('LaunchTemplateVersions'):
- lt_data = lt_response['LaunchTemplateVersions'][0].get('LaunchTemplateData', {})
- ami = lt_data.get('ImageId', '')
- instance_type = lt_data.get('InstanceType', '')
- key_name = lt_data.get('KeyName', '')
- except Exception as e:
- logger.warning(f"Failed to get launch template details: {str(e)}")
-
- # Check for Mixed Instances Policy
- mip = asg.get('MixedInstancesPolicy')
- if mip:
- lt_spec = mip.get('LaunchTemplate', {}).get('LaunchTemplateSpecification', {})
- if lt_spec:
- launch_template_name = lt_spec.get('LaunchTemplateName', lt_spec.get('LaunchTemplateId', ''))
-
- # Check for Launch Configuration (legacy)
- lc_name = asg.get('LaunchConfigurationName')
- if lc_name and not launch_template_name:
- launch_template_name = f"LC: {lc_name}"
- try:
- lc_response = asg_client.describe_launch_configurations(
- LaunchConfigurationNames=[lc_name]
- )
- if lc_response.get('LaunchConfigurations'):
- lc = lc_response['LaunchConfigurations'][0]
- ami = lc.get('ImageId', '')
- instance_type = lc.get('InstanceType', '')
- key_name = lc.get('KeyName', '')
- except Exception as e:
- logger.warning(f"Failed to get launch configuration details: {str(e)}")
-
- # Get Target Groups
- target_groups = []
- for tg_arn in asg.get('TargetGroupARNs', []):
- # Extract target group name from ARN
- tg_name = tg_arn.split('/')[-2] if '/' in tg_arn else tg_arn
- target_groups.append(tg_name)
-
- # Get Scaling Policies
- scaling_policies = []
- try:
- policy_response = asg_client.describe_policies(
- AutoScalingGroupName=name
- )
- for policy in policy_response.get('ScalingPolicies', []):
- scaling_policies.append(policy.get('PolicyName', ''))
- except Exception as e:
- logger.warning(f"Failed to get scaling policies: {str(e)}")
-
- resources.append(ResourceData(
- account_id=account_id,
- region=region,
- service='autoscaling',
- resource_type='Auto Scaling Group',
- resource_id=asg.get('AutoScalingGroupARN', name),
- name=name,
- attributes={
- 'Name': name,
- 'Launch Template': launch_template_name,
- 'AMI': ami,
- 'Instance type': instance_type,
- 'Key': key_name,
- 'Target Groups': ', '.join(target_groups) if target_groups else 'N/A',
- 'Desired': str(asg.get('DesiredCapacity', 0)),
- 'Min': str(asg.get('MinSize', 0)),
- 'Max': str(asg.get('MaxSize', 0)),
- 'Scaling Policy': ', '.join(scaling_policies) if scaling_policies else 'N/A'
- }
- ))
-
- return resources
-
- @staticmethod
- @retry_with_backoff()
- def scan_load_balancers(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """
- Scan Load Balancers (ALB, NLB, CLB) in the specified region.
-
- Attributes (vertical layout - one table per LB):
- Name, Type, DNS, Scheme, VPC, Availability Zones, Subnet, Security Groups
- """
- resources = []
-
- # Scan ALB/NLB using elbv2
- elbv2_client = session.client('elbv2')
-
- try:
- paginator = elbv2_client.get_paginator('describe_load_balancers')
- for page in paginator.paginate():
- for lb in page.get('LoadBalancers', []):
- name = lb.get('LoadBalancerName', '')
- lb_type = lb.get('Type', 'application')
-
- # Get availability zones and subnets
- azs = []
- subnets = []
- for az_info in lb.get('AvailabilityZones', []):
- azs.append(az_info.get('ZoneName', ''))
- if az_info.get('SubnetId'):
- subnets.append(az_info['SubnetId'])
-
- # Get security groups (only for ALB)
- security_groups = lb.get('SecurityGroups', [])
-
- resources.append(ResourceData(
- account_id=account_id,
- region=region,
- service='elb',
- resource_type='Load Balancer',
- resource_id=lb.get('LoadBalancerArn', name),
- name=name,
- attributes={
- 'Name': name,
- 'Type': lb_type.upper(),
- 'DNS': lb.get('DNSName', ''),
- 'Scheme': lb.get('Scheme', ''),
- 'VPC': lb.get('VpcId', ''),
- 'Availability Zones': ', '.join(azs),
- 'Subnet': ', '.join(subnets),
- 'Security Groups': ', '.join(security_groups) if security_groups else 'N/A'
- }
- ))
- except Exception as e:
- logger.warning(f"Failed to scan ALB/NLB: {str(e)}")
-
- # Scan Classic Load Balancers
- elb_client = session.client('elb')
-
- try:
- paginator = elb_client.get_paginator('describe_load_balancers')
- for page in paginator.paginate():
- for lb in page.get('LoadBalancerDescriptions', []):
- name = lb.get('LoadBalancerName', '')
-
- resources.append(ResourceData(
- account_id=account_id,
- region=region,
- service='elb',
- resource_type='Load Balancer',
- resource_id=name,
- name=name,
- attributes={
- 'Name': name,
- 'Type': 'CLASSIC',
- 'DNS': lb.get('DNSName', ''),
- 'Scheme': lb.get('Scheme', ''),
- 'VPC': lb.get('VPCId', ''),
- 'Availability Zones': ', '.join(lb.get('AvailabilityZones', [])),
- 'Subnet': ', '.join(lb.get('Subnets', [])),
- 'Security Groups': ', '.join(lb.get('SecurityGroups', []))
- }
- ))
- except Exception as e:
- logger.warning(f"Failed to scan Classic ELB: {str(e)}")
-
- return resources
-
- @staticmethod
- @retry_with_backoff()
- def scan_target_groups(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """
- Scan Target Groups in the specified region.
-
- Attributes (vertical layout - one table per TG):
- Load Balancer, TG Name, Port, Protocol, Registered Instances, Health Check Path
- """
- resources = []
- elbv2_client = session.client('elbv2')
-
- try:
- paginator = elbv2_client.get_paginator('describe_target_groups')
- for page in paginator.paginate():
- for tg in page.get('TargetGroups', []):
- name = tg.get('TargetGroupName', '')
- tg_arn = tg.get('TargetGroupArn', '')
-
- # Get associated load balancers
- lb_arns = tg.get('LoadBalancerArns', [])
- lb_names = []
- for lb_arn in lb_arns:
- # Extract LB name from ARN
- lb_name = lb_arn.split('/')[-2] if '/' in lb_arn else lb_arn
- lb_names.append(lb_name)
-
- # Get registered targets
- registered_instances = []
- try:
- targets_response = elbv2_client.describe_target_health(
- TargetGroupArn=tg_arn
- )
- for target in targets_response.get('TargetHealthDescriptions', []):
- target_id = target.get('Target', {}).get('Id', '')
- if target_id:
- registered_instances.append(target_id)
- except Exception as e:
- logger.warning(f"Failed to get target health: {str(e)}")
-
- resources.append(ResourceData(
- account_id=account_id,
- region=region,
- service='target_group',
- resource_type='Target Group',
- resource_id=tg_arn,
- name=name,
- attributes={
- 'Load Balancer': ', '.join(lb_names) if lb_names else 'N/A',
- 'TG Name': name,
- 'Port': str(tg.get('Port', '')),
- 'Protocol': tg.get('Protocol', ''),
- 'Registered Instances': ', '.join(registered_instances) if registered_instances else 'None',
- 'Health Check Path': tg.get('HealthCheckPath', 'N/A')
- }
- ))
- except Exception as e:
- logger.warning(f"Failed to scan target groups: {str(e)}")
-
- return resources
|