""" VPC Related Resource Scanners Scans VPC, Subnet, Route Table, Internet Gateway, NAT Gateway, Security Group, VPC Endpoint, VPC Peering, Customer Gateway, Virtual Private Gateway, and VPN Connection resources. Requirements: - 5.1: Scan VPC-related AWS services using boto3 """ import boto3 from typing import List, Dict, Any import logging from app.scanners.base import ResourceData from app.scanners.utils import retry_with_backoff logger = logging.getLogger(__name__) class VPCServiceScanner: """Scanner for VPC-related AWS resources""" @staticmethod def _get_name_from_tags(tags: List[Dict[str, str]], default: str = '') -> str: """Extract Name tag value from tags list""" if not tags: return default for tag in tags: if tag.get('Key') == 'Name': return tag.get('Value', default) return default @staticmethod @retry_with_backoff() def scan_vpcs(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ Scan VPCs in the specified region. Attributes: Region, Name, ID, CIDR """ resources = [] ec2_client = session.client('ec2') paginator = ec2_client.get_paginator('describe_vpcs') for page in paginator.paginate(): for vpc in page.get('Vpcs', []): name = VPCServiceScanner._get_name_from_tags(vpc.get('Tags', []), vpc['VpcId']) resources.append(ResourceData( account_id=account_id, region=region, service='vpc', resource_type='VPC', resource_id=vpc['VpcId'], name=name, attributes={ 'Region': region, 'Name': name, 'ID': vpc['VpcId'], 'CIDR': vpc.get('CidrBlock', '') } )) return resources @staticmethod @retry_with_backoff() def scan_subnets(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ Scan Subnets in the specified region. Attributes: Name, ID, AZ, CIDR """ resources = [] ec2_client = session.client('ec2') paginator = ec2_client.get_paginator('describe_subnets') for page in paginator.paginate(): for subnet in page.get('Subnets', []): name = VPCServiceScanner._get_name_from_tags(subnet.get('Tags', []), subnet['SubnetId']) resources.append(ResourceData( account_id=account_id, region=region, service='subnet', resource_type='Subnet', resource_id=subnet['SubnetId'], name=name, attributes={ 'Name': name, 'ID': subnet['SubnetId'], 'AZ': subnet.get('AvailabilityZone', ''), 'CIDR': subnet.get('CidrBlock', '') } )) return resources @staticmethod @retry_with_backoff() def scan_route_tables(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ Scan Route Tables in the specified region. Attributes: Name, ID, Subnet Associations """ resources = [] ec2_client = session.client('ec2') paginator = ec2_client.get_paginator('describe_route_tables') for page in paginator.paginate(): for rt in page.get('RouteTables', []): name = VPCServiceScanner._get_name_from_tags(rt.get('Tags', []), rt['RouteTableId']) # Get subnet associations associations = [] for assoc in rt.get('Associations', []): if assoc.get('SubnetId'): associations.append(assoc['SubnetId']) resources.append(ResourceData( account_id=account_id, region=region, service='route_table', resource_type='Route Table', resource_id=rt['RouteTableId'], name=name, attributes={ 'Name': name, 'ID': rt['RouteTableId'], 'Subnet Associations': ', '.join(associations) if associations else 'None' } )) return resources @staticmethod @retry_with_backoff() def scan_internet_gateways(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ Scan Internet Gateways in the specified region. Attributes: Name, ID (vertical layout - one table per IGW) """ resources = [] ec2_client = session.client('ec2') paginator = ec2_client.get_paginator('describe_internet_gateways') for page in paginator.paginate(): for igw in page.get('InternetGateways', []): igw_id = igw['InternetGatewayId'] name = VPCServiceScanner._get_name_from_tags(igw.get('Tags', []), igw_id) resources.append(ResourceData( account_id=account_id, region=region, service='internet_gateway', resource_type='Internet Gateway', resource_id=igw_id, name=name, attributes={ 'Name': name, 'ID': igw_id } )) return resources @staticmethod @retry_with_backoff() def scan_nat_gateways(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ Scan NAT Gateways in the specified region. Attributes: Name, ID, Public IP, Private IP """ resources = [] ec2_client = session.client('ec2') paginator = ec2_client.get_paginator('describe_nat_gateways') for page in paginator.paginate(): for nat in page.get('NatGateways', []): # Skip deleted NAT gateways if nat.get('State') == 'deleted': continue name = VPCServiceScanner._get_name_from_tags(nat.get('Tags', []), nat['NatGatewayId']) # Get IP addresses from addresses public_ip = '' private_ip = '' for addr in nat.get('NatGatewayAddresses', []): if addr.get('PublicIp'): public_ip = addr['PublicIp'] if addr.get('PrivateIp'): private_ip = addr['PrivateIp'] resources.append(ResourceData( account_id=account_id, region=region, service='nat_gateway', resource_type='NAT Gateway', resource_id=nat['NatGatewayId'], name=name, attributes={ 'Name': name, 'ID': nat['NatGatewayId'], 'Public IP': public_ip, 'Private IP': private_ip } )) return resources @staticmethod @retry_with_backoff() def scan_security_groups(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ Scan Security Groups in the specified region. Attributes: Name, ID, Protocol, Port range, Source Note: Creates one entry per inbound rule """ resources = [] ec2_client = session.client('ec2') paginator = ec2_client.get_paginator('describe_security_groups') for page in paginator.paginate(): for sg in page.get('SecurityGroups', []): sg_name = sg.get('GroupName', sg['GroupId']) # Process inbound rules for rule in sg.get('IpPermissions', []): protocol = rule.get('IpProtocol', '-1') if protocol == '-1': protocol = 'All' # Get port range from_port = rule.get('FromPort', 'All') to_port = rule.get('ToPort', 'All') if from_port == to_port: port_range = str(from_port) if from_port != 'All' else 'All' else: port_range = f"{from_port}-{to_port}" # Get sources sources = [] for ip_range in rule.get('IpRanges', []): sources.append(ip_range.get('CidrIp', '')) for ip_range in rule.get('Ipv6Ranges', []): sources.append(ip_range.get('CidrIpv6', '')) for group in rule.get('UserIdGroupPairs', []): sources.append(group.get('GroupId', '')) source = ', '.join(sources) if sources else 'N/A' resources.append(ResourceData( account_id=account_id, region=region, service='security_group', resource_type='Security Group', resource_id=sg['GroupId'], name=sg_name, attributes={ 'Name': sg_name, 'ID': sg['GroupId'], 'Protocol': protocol, 'Port range': port_range, 'Source': source } )) # If no inbound rules, still add the security group if not sg.get('IpPermissions'): resources.append(ResourceData( account_id=account_id, region=region, service='security_group', resource_type='Security Group', resource_id=sg['GroupId'], name=sg_name, attributes={ 'Name': sg_name, 'ID': sg['GroupId'], 'Protocol': 'N/A', 'Port range': 'N/A', 'Source': 'N/A' } )) return resources @staticmethod @retry_with_backoff() def scan_vpc_endpoints(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ Scan VPC Endpoints in the specified region. Attributes: Name, ID, VPC, Service Name, Type """ resources = [] ec2_client = session.client('ec2') paginator = ec2_client.get_paginator('describe_vpc_endpoints') for page in paginator.paginate(): for endpoint in page.get('VpcEndpoints', []): name = VPCServiceScanner._get_name_from_tags(endpoint.get('Tags', []), endpoint['VpcEndpointId']) resources.append(ResourceData( account_id=account_id, region=region, service='vpc_endpoint', resource_type='Endpoint', resource_id=endpoint['VpcEndpointId'], name=name, attributes={ 'Name': name, 'ID': endpoint['VpcEndpointId'], 'VPC': endpoint.get('VpcId', ''), 'Service Name': endpoint.get('ServiceName', ''), 'Type': endpoint.get('VpcEndpointType', '') } )) return resources @staticmethod @retry_with_backoff() def scan_vpc_peering(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ Scan VPC Peering Connections in the specified region. Attributes: Name, Peering Connection ID, Requester VPC, Accepter VPC """ resources = [] ec2_client = session.client('ec2') paginator = ec2_client.get_paginator('describe_vpc_peering_connections') for page in paginator.paginate(): for peering in page.get('VpcPeeringConnections', []): # Skip deleted/rejected peerings status = peering.get('Status', {}).get('Code', '') if status in ['deleted', 'rejected', 'failed']: continue name = VPCServiceScanner._get_name_from_tags( peering.get('Tags', []), peering['VpcPeeringConnectionId'] ) requester_vpc = peering.get('RequesterVpcInfo', {}).get('VpcId', '') accepter_vpc = peering.get('AccepterVpcInfo', {}).get('VpcId', '') resources.append(ResourceData( account_id=account_id, region=region, service='vpc_peering', resource_type='VPC Peering', resource_id=peering['VpcPeeringConnectionId'], name=name, attributes={ 'Name': name, 'Peering Connection ID': peering['VpcPeeringConnectionId'], 'Requester VPC': requester_vpc, 'Accepter VPC': accepter_vpc } )) return resources @staticmethod @retry_with_backoff() def scan_customer_gateways(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ Scan Customer Gateways in the specified region. Attributes: Name, Customer Gateway ID, IP Address """ resources = [] ec2_client = session.client('ec2') response = ec2_client.describe_customer_gateways() for cgw in response.get('CustomerGateways', []): # Skip deleted gateways if cgw.get('State') == 'deleted': continue name = VPCServiceScanner._get_name_from_tags(cgw.get('Tags', []), cgw['CustomerGatewayId']) resources.append(ResourceData( account_id=account_id, region=region, service='customer_gateway', resource_type='Customer Gateway', resource_id=cgw['CustomerGatewayId'], name=name, attributes={ 'Name': name, 'Customer Gateway ID': cgw['CustomerGatewayId'], 'IP Address': cgw.get('IpAddress', '') } )) return resources @staticmethod @retry_with_backoff() def scan_virtual_private_gateways(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ Scan Virtual Private Gateways in the specified region. Attributes: Name, Virtual Private Gateway ID, VPC """ resources = [] ec2_client = session.client('ec2') response = ec2_client.describe_vpn_gateways() for vgw in response.get('VpnGateways', []): # Skip deleted gateways if vgw.get('State') == 'deleted': continue name = VPCServiceScanner._get_name_from_tags(vgw.get('Tags', []), vgw['VpnGatewayId']) # Get attached VPC vpc_id = '' for attachment in vgw.get('VpcAttachments', []): if attachment.get('State') == 'attached': vpc_id = attachment.get('VpcId', '') break resources.append(ResourceData( account_id=account_id, region=region, service='virtual_private_gateway', resource_type='Virtual Private Gateway', resource_id=vgw['VpnGatewayId'], name=name, attributes={ 'Name': name, 'Virtual Private Gateway ID': vgw['VpnGatewayId'], 'VPC': vpc_id } )) return resources @staticmethod @retry_with_backoff() def scan_vpn_connections(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ Scan VPN Connections in the specified region. Attributes: Name, VPN ID, Routes """ resources = [] ec2_client = session.client('ec2') response = ec2_client.describe_vpn_connections() for vpn in response.get('VpnConnections', []): # Skip deleted connections if vpn.get('State') == 'deleted': continue name = VPCServiceScanner._get_name_from_tags(vpn.get('Tags', []), vpn['VpnConnectionId']) # Get routes routes = [] for route in vpn.get('Routes', []): if route.get('DestinationCidrBlock'): routes.append(route['DestinationCidrBlock']) resources.append(ResourceData( account_id=account_id, region=region, service='vpn_connection', resource_type='VPN Connection', resource_id=vpn['VpnConnectionId'], name=name, attributes={ 'Name': name, 'VPN ID': vpn['VpnConnectionId'], 'Routes': ', '.join(routes) if routes else 'N/A' } )) return resources