| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489 |
- """
- VPC Related Resource Scanners
- Scans VPC, Subnet, Route Table, Internet Gateway, NAT Gateway,
- Security Group, VPC Endpoint, VPC Peering, Customer Gateway,
- Virtual Private Gateway, and VPN Connection resources.
- Requirements:
- - 5.1: Scan VPC-related AWS services using boto3
- """
- import boto3
- from typing import List, Dict, Any
- import logging
- from app.scanners.base import ResourceData
- from app.scanners.utils import retry_with_backoff
- logger = logging.getLogger(__name__)
- class VPCServiceScanner:
- """Scanner for VPC-related AWS resources"""
-
- @staticmethod
- def _get_name_from_tags(tags: List[Dict[str, str]], default: str = '') -> str:
- """Extract Name tag value from tags list"""
- if not tags:
- return default
- for tag in tags:
- if tag.get('Key') == 'Name':
- return tag.get('Value', default)
- return default
-
- @staticmethod
- @retry_with_backoff()
- def scan_vpcs(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """
- Scan VPCs in the specified region.
-
- Attributes: Region, Name, ID, CIDR
- """
- resources = []
- ec2_client = session.client('ec2')
-
- paginator = ec2_client.get_paginator('describe_vpcs')
- for page in paginator.paginate():
- for vpc in page.get('Vpcs', []):
- name = VPCServiceScanner._get_name_from_tags(vpc.get('Tags', []), vpc['VpcId'])
-
- resources.append(ResourceData(
- account_id=account_id,
- region=region,
- service='vpc',
- resource_type='VPC',
- resource_id=vpc['VpcId'],
- name=name,
- attributes={
- 'Region': region,
- 'Name': name,
- 'ID': vpc['VpcId'],
- 'CIDR': vpc.get('CidrBlock', '')
- }
- ))
-
- return resources
-
- @staticmethod
- @retry_with_backoff()
- def scan_subnets(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """
- Scan Subnets in the specified region.
-
- Attributes: Name, ID, AZ, CIDR
- """
- resources = []
- ec2_client = session.client('ec2')
-
- paginator = ec2_client.get_paginator('describe_subnets')
- for page in paginator.paginate():
- for subnet in page.get('Subnets', []):
- name = VPCServiceScanner._get_name_from_tags(subnet.get('Tags', []), subnet['SubnetId'])
-
- resources.append(ResourceData(
- account_id=account_id,
- region=region,
- service='subnet',
- resource_type='Subnet',
- resource_id=subnet['SubnetId'],
- name=name,
- attributes={
- 'Name': name,
- 'ID': subnet['SubnetId'],
- 'AZ': subnet.get('AvailabilityZone', ''),
- 'CIDR': subnet.get('CidrBlock', '')
- }
- ))
-
- return resources
-
- @staticmethod
- @retry_with_backoff()
- def scan_route_tables(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """
- Scan Route Tables in the specified region.
-
- Attributes: Name, ID, Subnet Associations
- """
- resources = []
- ec2_client = session.client('ec2')
-
- paginator = ec2_client.get_paginator('describe_route_tables')
- for page in paginator.paginate():
- for rt in page.get('RouteTables', []):
- name = VPCServiceScanner._get_name_from_tags(rt.get('Tags', []), rt['RouteTableId'])
-
- # Get subnet associations
- associations = []
- for assoc in rt.get('Associations', []):
- if assoc.get('SubnetId'):
- associations.append(assoc['SubnetId'])
-
- resources.append(ResourceData(
- account_id=account_id,
- region=region,
- service='route_table',
- resource_type='Route Table',
- resource_id=rt['RouteTableId'],
- name=name,
- attributes={
- 'Name': name,
- 'ID': rt['RouteTableId'],
- 'Subnet Associations': ', '.join(associations) if associations else 'None'
- }
- ))
-
- return resources
-
- @staticmethod
- @retry_with_backoff()
- def scan_internet_gateways(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """
- Scan Internet Gateways in the specified region.
-
- Attributes: Name, ID (vertical layout - one table per IGW)
- """
- resources = []
- ec2_client = session.client('ec2')
-
- paginator = ec2_client.get_paginator('describe_internet_gateways')
- for page in paginator.paginate():
- for igw in page.get('InternetGateways', []):
- igw_id = igw['InternetGatewayId']
- name = VPCServiceScanner._get_name_from_tags(igw.get('Tags', []), igw_id)
-
- resources.append(ResourceData(
- account_id=account_id,
- region=region,
- service='internet_gateway',
- resource_type='Internet Gateway',
- resource_id=igw_id,
- name=name,
- attributes={
- 'Name': name,
- 'ID': igw_id
- }
- ))
-
- return resources
-
- @staticmethod
- @retry_with_backoff()
- def scan_nat_gateways(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """
- Scan NAT Gateways in the specified region.
-
- Attributes: Name, ID, Public IP, Private IP
- """
- resources = []
- ec2_client = session.client('ec2')
-
- paginator = ec2_client.get_paginator('describe_nat_gateways')
- for page in paginator.paginate():
- for nat in page.get('NatGateways', []):
- # Skip deleted NAT gateways
- if nat.get('State') == 'deleted':
- continue
-
- name = VPCServiceScanner._get_name_from_tags(nat.get('Tags', []), nat['NatGatewayId'])
-
- # Get IP addresses from addresses
- public_ip = ''
- private_ip = ''
- for addr in nat.get('NatGatewayAddresses', []):
- if addr.get('PublicIp'):
- public_ip = addr['PublicIp']
- if addr.get('PrivateIp'):
- private_ip = addr['PrivateIp']
-
- resources.append(ResourceData(
- account_id=account_id,
- region=region,
- service='nat_gateway',
- resource_type='NAT Gateway',
- resource_id=nat['NatGatewayId'],
- name=name,
- attributes={
- 'Name': name,
- 'ID': nat['NatGatewayId'],
- 'Public IP': public_ip,
- 'Private IP': private_ip
- }
- ))
-
- return resources
-
- @staticmethod
- @retry_with_backoff()
- def scan_security_groups(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """
- Scan Security Groups in the specified region.
-
- Attributes: Name, ID, Protocol, Port range, Source
- Note: Creates one entry per inbound rule
- """
- resources = []
- ec2_client = session.client('ec2')
-
- paginator = ec2_client.get_paginator('describe_security_groups')
- for page in paginator.paginate():
- for sg in page.get('SecurityGroups', []):
- sg_name = sg.get('GroupName', sg['GroupId'])
-
- # Process inbound rules
- for rule in sg.get('IpPermissions', []):
- protocol = rule.get('IpProtocol', '-1')
- if protocol == '-1':
- protocol = 'All'
-
- # Get port range
- from_port = rule.get('FromPort', 'All')
- to_port = rule.get('ToPort', 'All')
- if from_port == to_port:
- port_range = str(from_port) if from_port != 'All' else 'All'
- else:
- port_range = f"{from_port}-{to_port}"
-
- # Get sources
- sources = []
- for ip_range in rule.get('IpRanges', []):
- sources.append(ip_range.get('CidrIp', ''))
- for ip_range in rule.get('Ipv6Ranges', []):
- sources.append(ip_range.get('CidrIpv6', ''))
- for group in rule.get('UserIdGroupPairs', []):
- sources.append(group.get('GroupId', ''))
-
- source = ', '.join(sources) if sources else 'N/A'
-
- resources.append(ResourceData(
- account_id=account_id,
- region=region,
- service='security_group',
- resource_type='Security Group',
- resource_id=sg['GroupId'],
- name=sg_name,
- attributes={
- 'Name': sg_name,
- 'ID': sg['GroupId'],
- 'Protocol': protocol,
- 'Port range': port_range,
- 'Source': source
- }
- ))
-
- # If no inbound rules, still add the security group
- if not sg.get('IpPermissions'):
- resources.append(ResourceData(
- account_id=account_id,
- region=region,
- service='security_group',
- resource_type='Security Group',
- resource_id=sg['GroupId'],
- name=sg_name,
- attributes={
- 'Name': sg_name,
- 'ID': sg['GroupId'],
- 'Protocol': 'N/A',
- 'Port range': 'N/A',
- 'Source': 'N/A'
- }
- ))
-
- return resources
-
- @staticmethod
- @retry_with_backoff()
- def scan_vpc_endpoints(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """
- Scan VPC Endpoints in the specified region.
-
- Attributes: Name, ID, VPC, Service Name, Type
- """
- resources = []
- ec2_client = session.client('ec2')
-
- paginator = ec2_client.get_paginator('describe_vpc_endpoints')
- for page in paginator.paginate():
- for endpoint in page.get('VpcEndpoints', []):
- name = VPCServiceScanner._get_name_from_tags(endpoint.get('Tags', []), endpoint['VpcEndpointId'])
-
- resources.append(ResourceData(
- account_id=account_id,
- region=region,
- service='vpc_endpoint',
- resource_type='Endpoint',
- resource_id=endpoint['VpcEndpointId'],
- name=name,
- attributes={
- 'Name': name,
- 'ID': endpoint['VpcEndpointId'],
- 'VPC': endpoint.get('VpcId', ''),
- 'Service Name': endpoint.get('ServiceName', ''),
- 'Type': endpoint.get('VpcEndpointType', '')
- }
- ))
-
- return resources
-
- @staticmethod
- @retry_with_backoff()
- def scan_vpc_peering(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """
- Scan VPC Peering Connections in the specified region.
-
- Attributes: Name, Peering Connection ID, Requester VPC, Accepter VPC
- """
- resources = []
- ec2_client = session.client('ec2')
-
- paginator = ec2_client.get_paginator('describe_vpc_peering_connections')
- for page in paginator.paginate():
- for peering in page.get('VpcPeeringConnections', []):
- # Skip deleted/rejected peerings
- status = peering.get('Status', {}).get('Code', '')
- if status in ['deleted', 'rejected', 'failed']:
- continue
-
- name = VPCServiceScanner._get_name_from_tags(
- peering.get('Tags', []),
- peering['VpcPeeringConnectionId']
- )
-
- requester_vpc = peering.get('RequesterVpcInfo', {}).get('VpcId', '')
- accepter_vpc = peering.get('AccepterVpcInfo', {}).get('VpcId', '')
-
- resources.append(ResourceData(
- account_id=account_id,
- region=region,
- service='vpc_peering',
- resource_type='VPC Peering',
- resource_id=peering['VpcPeeringConnectionId'],
- name=name,
- attributes={
- 'Name': name,
- 'Peering Connection ID': peering['VpcPeeringConnectionId'],
- 'Requester VPC': requester_vpc,
- 'Accepter VPC': accepter_vpc
- }
- ))
-
- return resources
-
- @staticmethod
- @retry_with_backoff()
- def scan_customer_gateways(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """
- Scan Customer Gateways in the specified region.
-
- Attributes: Name, Customer Gateway ID, IP Address
- """
- resources = []
- ec2_client = session.client('ec2')
-
- response = ec2_client.describe_customer_gateways()
- for cgw in response.get('CustomerGateways', []):
- # Skip deleted gateways
- if cgw.get('State') == 'deleted':
- continue
-
- name = VPCServiceScanner._get_name_from_tags(cgw.get('Tags', []), cgw['CustomerGatewayId'])
-
- resources.append(ResourceData(
- account_id=account_id,
- region=region,
- service='customer_gateway',
- resource_type='Customer Gateway',
- resource_id=cgw['CustomerGatewayId'],
- name=name,
- attributes={
- 'Name': name,
- 'Customer Gateway ID': cgw['CustomerGatewayId'],
- 'IP Address': cgw.get('IpAddress', '')
- }
- ))
-
- return resources
-
- @staticmethod
- @retry_with_backoff()
- def scan_virtual_private_gateways(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """
- Scan Virtual Private Gateways in the specified region.
-
- Attributes: Name, Virtual Private Gateway ID, VPC
- """
- resources = []
- ec2_client = session.client('ec2')
-
- response = ec2_client.describe_vpn_gateways()
- for vgw in response.get('VpnGateways', []):
- # Skip deleted gateways
- if vgw.get('State') == 'deleted':
- continue
-
- name = VPCServiceScanner._get_name_from_tags(vgw.get('Tags', []), vgw['VpnGatewayId'])
-
- # Get attached VPC
- vpc_id = ''
- for attachment in vgw.get('VpcAttachments', []):
- if attachment.get('State') == 'attached':
- vpc_id = attachment.get('VpcId', '')
- break
-
- resources.append(ResourceData(
- account_id=account_id,
- region=region,
- service='virtual_private_gateway',
- resource_type='Virtual Private Gateway',
- resource_id=vgw['VpnGatewayId'],
- name=name,
- attributes={
- 'Name': name,
- 'Virtual Private Gateway ID': vgw['VpnGatewayId'],
- 'VPC': vpc_id
- }
- ))
-
- return resources
-
- @staticmethod
- @retry_with_backoff()
- def scan_vpn_connections(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """
- Scan VPN Connections in the specified region.
-
- Attributes: Name, VPN ID, Routes
- """
- resources = []
- ec2_client = session.client('ec2')
-
- response = ec2_client.describe_vpn_connections()
- for vpn in response.get('VpnConnections', []):
- # Skip deleted connections
- if vpn.get('State') == 'deleted':
- continue
-
- name = VPCServiceScanner._get_name_from_tags(vpn.get('Tags', []), vpn['VpnConnectionId'])
-
- # Get routes
- routes = []
- for route in vpn.get('Routes', []):
- if route.get('DestinationCidrBlock'):
- routes.append(route['DestinationCidrBlock'])
-
- resources.append(ResourceData(
- account_id=account_id,
- region=region,
- service='vpn_connection',
- resource_type='VPN Connection',
- resource_id=vpn['VpnConnectionId'],
- name=name,
- attributes={
- 'Name': name,
- 'VPN ID': vpn['VpnConnectionId'],
- 'Routes': ', '.join(routes) if routes else 'N/A'
- }
- ))
-
- return resources
|