""" Auto Scaling and ELB Resource Scanners Scans Auto Scaling Groups (with Launch Templates), Load Balancers (ALB, NLB, CLB), and Target Groups. Requirements: - 5.1: Scan Auto Scaling and ELB AWS services using boto3 """ import boto3 from typing import List, Dict, Any import logging from app.scanners.base import ResourceData from app.scanners.utils import retry_with_backoff logger = logging.getLogger(__name__) class ELBServiceScanner: """Scanner for Auto Scaling and ELB AWS resources""" @staticmethod def _get_name_from_tags(tags: List[Dict[str, str]], default: str = '') -> str: """Extract Name tag value from tags list""" if not tags: return default for tag in tags: if tag.get('Key') == 'Name': return tag.get('Value', default) return default @staticmethod @retry_with_backoff() def scan_autoscaling_groups(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ Scan Auto Scaling Groups in the specified region. Attributes (vertical layout - one table per ASG): Name, Launch Template, AMI, Instance type, Key, Target Groups, Desired, Min, Max, Scaling Policy """ resources = [] asg_client = session.client('autoscaling') ec2_client = session.client('ec2') paginator = asg_client.get_paginator('describe_auto_scaling_groups') for page in paginator.paginate(): for asg in page.get('AutoScalingGroups', []): name = asg.get('AutoScalingGroupName', '') # Get Launch Template info launch_template_name = '' ami = '' instance_type = '' key_name = '' # Check for Launch Template lt = asg.get('LaunchTemplate') if lt: launch_template_name = lt.get('LaunchTemplateName', lt.get('LaunchTemplateId', '')) # Get Launch Template details try: lt_response = ec2_client.describe_launch_template_versions( LaunchTemplateId=lt.get('LaunchTemplateId', ''), Versions=[lt.get('Version', '$Latest')] ) if lt_response.get('LaunchTemplateVersions'): lt_data = lt_response['LaunchTemplateVersions'][0].get('LaunchTemplateData', {}) ami = lt_data.get('ImageId', '') instance_type = lt_data.get('InstanceType', '') key_name = lt_data.get('KeyName', '') except Exception as e: logger.warning(f"Failed to get launch template details: {str(e)}") # Check for Mixed Instances Policy mip = asg.get('MixedInstancesPolicy') if mip: lt_spec = mip.get('LaunchTemplate', {}).get('LaunchTemplateSpecification', {}) if lt_spec: launch_template_name = lt_spec.get('LaunchTemplateName', lt_spec.get('LaunchTemplateId', '')) # Check for Launch Configuration (legacy) lc_name = asg.get('LaunchConfigurationName') if lc_name and not launch_template_name: launch_template_name = f"LC: {lc_name}" try: lc_response = asg_client.describe_launch_configurations( LaunchConfigurationNames=[lc_name] ) if lc_response.get('LaunchConfigurations'): lc = lc_response['LaunchConfigurations'][0] ami = lc.get('ImageId', '') instance_type = lc.get('InstanceType', '') key_name = lc.get('KeyName', '') except Exception as e: logger.warning(f"Failed to get launch configuration details: {str(e)}") # Get Target Groups target_groups = [] for tg_arn in asg.get('TargetGroupARNs', []): # Extract target group name from ARN tg_name = tg_arn.split('/')[-2] if '/' in tg_arn else tg_arn target_groups.append(tg_name) # Get Scaling Policies scaling_policies = [] try: policy_response = asg_client.describe_policies( AutoScalingGroupName=name ) for policy in policy_response.get('ScalingPolicies', []): scaling_policies.append(policy.get('PolicyName', '')) except Exception as e: logger.warning(f"Failed to get scaling policies: {str(e)}") resources.append(ResourceData( account_id=account_id, region=region, service='autoscaling', resource_type='Auto Scaling Group', resource_id=asg.get('AutoScalingGroupARN', name), name=name, attributes={ 'Name': name, 'Launch Template': launch_template_name, 'AMI': ami, 'Instance type': instance_type, 'Key': key_name, 'Target Groups': ', '.join(target_groups) if target_groups else 'N/A', 'Desired': str(asg.get('DesiredCapacity', 0)), 'Min': str(asg.get('MinSize', 0)), 'Max': str(asg.get('MaxSize', 0)), 'Scaling Policy': ', '.join(scaling_policies) if scaling_policies else 'N/A' } )) return resources @staticmethod @retry_with_backoff() def scan_load_balancers(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ Scan Load Balancers (ALB, NLB, CLB) in the specified region. Attributes (vertical layout - one table per LB): Name, Type, DNS, Scheme, VPC, Availability Zones, Subnet, Security Groups """ resources = [] # Scan ALB/NLB using elbv2 elbv2_client = session.client('elbv2') try: paginator = elbv2_client.get_paginator('describe_load_balancers') for page in paginator.paginate(): for lb in page.get('LoadBalancers', []): name = lb.get('LoadBalancerName', '') lb_type = lb.get('Type', 'application') # Get availability zones and subnets azs = [] subnets = [] for az_info in lb.get('AvailabilityZones', []): azs.append(az_info.get('ZoneName', '')) if az_info.get('SubnetId'): subnets.append(az_info['SubnetId']) # Get security groups (only for ALB) security_groups = lb.get('SecurityGroups', []) resources.append(ResourceData( account_id=account_id, region=region, service='elb', resource_type='Load Balancer', resource_id=lb.get('LoadBalancerArn', name), name=name, attributes={ 'Name': name, 'Type': lb_type.upper(), 'DNS': lb.get('DNSName', ''), 'Scheme': lb.get('Scheme', ''), 'VPC': lb.get('VpcId', ''), 'Availability Zones': ', '.join(azs), 'Subnet': ', '.join(subnets), 'Security Groups': ', '.join(security_groups) if security_groups else 'N/A' } )) except Exception as e: logger.warning(f"Failed to scan ALB/NLB: {str(e)}") # Scan Classic Load Balancers elb_client = session.client('elb') try: paginator = elb_client.get_paginator('describe_load_balancers') for page in paginator.paginate(): for lb in page.get('LoadBalancerDescriptions', []): name = lb.get('LoadBalancerName', '') resources.append(ResourceData( account_id=account_id, region=region, service='elb', resource_type='Load Balancer', resource_id=name, name=name, attributes={ 'Name': name, 'Type': 'CLASSIC', 'DNS': lb.get('DNSName', ''), 'Scheme': lb.get('Scheme', ''), 'VPC': lb.get('VPCId', ''), 'Availability Zones': ', '.join(lb.get('AvailabilityZones', [])), 'Subnet': ', '.join(lb.get('Subnets', [])), 'Security Groups': ', '.join(lb.get('SecurityGroups', [])) } )) except Exception as e: logger.warning(f"Failed to scan Classic ELB: {str(e)}") return resources @staticmethod @retry_with_backoff() def scan_target_groups(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ Scan Target Groups in the specified region. Attributes (vertical layout - one table per TG): Load Balancer, TG Name, Port, Protocol, Registered Instances, Health Check Path """ resources = [] elbv2_client = session.client('elbv2') try: paginator = elbv2_client.get_paginator('describe_target_groups') for page in paginator.paginate(): for tg in page.get('TargetGroups', []): name = tg.get('TargetGroupName', '') tg_arn = tg.get('TargetGroupArn', '') # Get associated load balancers lb_arns = tg.get('LoadBalancerArns', []) lb_names = [] for lb_arn in lb_arns: # Extract LB name from ARN lb_name = lb_arn.split('/')[-2] if '/' in lb_arn else lb_arn lb_names.append(lb_name) # Get registered targets registered_instances = [] try: targets_response = elbv2_client.describe_target_health( TargetGroupArn=tg_arn ) for target in targets_response.get('TargetHealthDescriptions', []): target_id = target.get('Target', {}).get('Id', '') if target_id: registered_instances.append(target_id) except Exception as e: logger.warning(f"Failed to get target health: {str(e)}") resources.append(ResourceData( account_id=account_id, region=region, service='target_group', resource_type='Target Group', resource_id=tg_arn, name=name, attributes={ 'Load Balancer': ', '.join(lb_names) if lb_names else 'N/A', 'TG Name': name, 'Port': str(tg.get('Port', '')), 'Protocol': tg.get('Protocol', ''), 'Registered Instances': ', '.join(registered_instances) if registered_instances else 'None', 'Health Check Path': tg.get('HealthCheckPath', 'N/A') } )) except Exception as e: logger.warning(f"Failed to scan target groups: {str(e)}") return resources