vpc.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489
  1. """
  2. VPC Related Resource Scanners
  3. Scans VPC, Subnet, Route Table, Internet Gateway, NAT Gateway,
  4. Security Group, VPC Endpoint, VPC Peering, Customer Gateway,
  5. Virtual Private Gateway, and VPN Connection resources.
  6. Requirements:
  7. - 5.1: Scan VPC-related AWS services using boto3
  8. """
  9. import boto3
  10. from typing import List, Dict, Any
  11. import logging
  12. from app.scanners.base import ResourceData
  13. from app.scanners.utils import retry_with_backoff
  14. logger = logging.getLogger(__name__)
  15. class VPCServiceScanner:
  16. """Scanner for VPC-related AWS resources"""
  17. @staticmethod
  18. def _get_name_from_tags(tags: List[Dict[str, str]], default: str = '') -> str:
  19. """Extract Name tag value from tags list"""
  20. if not tags:
  21. return default
  22. for tag in tags:
  23. if tag.get('Key') == 'Name':
  24. return tag.get('Value', default)
  25. return default
  26. @staticmethod
  27. @retry_with_backoff()
  28. def scan_vpcs(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  29. """
  30. Scan VPCs in the specified region.
  31. Attributes: Region, Name, ID, CIDR
  32. """
  33. resources = []
  34. ec2_client = session.client('ec2')
  35. paginator = ec2_client.get_paginator('describe_vpcs')
  36. for page in paginator.paginate():
  37. for vpc in page.get('Vpcs', []):
  38. name = VPCServiceScanner._get_name_from_tags(vpc.get('Tags', []), vpc['VpcId'])
  39. resources.append(ResourceData(
  40. account_id=account_id,
  41. region=region,
  42. service='vpc',
  43. resource_type='VPC',
  44. resource_id=vpc['VpcId'],
  45. name=name,
  46. attributes={
  47. 'Region': region,
  48. 'Name': name,
  49. 'ID': vpc['VpcId'],
  50. 'CIDR': vpc.get('CidrBlock', '')
  51. }
  52. ))
  53. return resources
  54. @staticmethod
  55. @retry_with_backoff()
  56. def scan_subnets(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  57. """
  58. Scan Subnets in the specified region.
  59. Attributes: Name, ID, AZ, CIDR
  60. """
  61. resources = []
  62. ec2_client = session.client('ec2')
  63. paginator = ec2_client.get_paginator('describe_subnets')
  64. for page in paginator.paginate():
  65. for subnet in page.get('Subnets', []):
  66. name = VPCServiceScanner._get_name_from_tags(subnet.get('Tags', []), subnet['SubnetId'])
  67. resources.append(ResourceData(
  68. account_id=account_id,
  69. region=region,
  70. service='subnet',
  71. resource_type='Subnet',
  72. resource_id=subnet['SubnetId'],
  73. name=name,
  74. attributes={
  75. 'Name': name,
  76. 'ID': subnet['SubnetId'],
  77. 'AZ': subnet.get('AvailabilityZone', ''),
  78. 'CIDR': subnet.get('CidrBlock', '')
  79. }
  80. ))
  81. return resources
  82. @staticmethod
  83. @retry_with_backoff()
  84. def scan_route_tables(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  85. """
  86. Scan Route Tables in the specified region.
  87. Attributes: Name, ID, Subnet Associations
  88. """
  89. resources = []
  90. ec2_client = session.client('ec2')
  91. paginator = ec2_client.get_paginator('describe_route_tables')
  92. for page in paginator.paginate():
  93. for rt in page.get('RouteTables', []):
  94. name = VPCServiceScanner._get_name_from_tags(rt.get('Tags', []), rt['RouteTableId'])
  95. # Get subnet associations
  96. associations = []
  97. for assoc in rt.get('Associations', []):
  98. if assoc.get('SubnetId'):
  99. associations.append(assoc['SubnetId'])
  100. resources.append(ResourceData(
  101. account_id=account_id,
  102. region=region,
  103. service='route_table',
  104. resource_type='Route Table',
  105. resource_id=rt['RouteTableId'],
  106. name=name,
  107. attributes={
  108. 'Name': name,
  109. 'ID': rt['RouteTableId'],
  110. 'Subnet Associations': ', '.join(associations) if associations else 'None'
  111. }
  112. ))
  113. return resources
  114. @staticmethod
  115. @retry_with_backoff()
  116. def scan_internet_gateways(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  117. """
  118. Scan Internet Gateways in the specified region.
  119. Attributes: Name, ID (vertical layout - one table per IGW)
  120. """
  121. resources = []
  122. ec2_client = session.client('ec2')
  123. paginator = ec2_client.get_paginator('describe_internet_gateways')
  124. for page in paginator.paginate():
  125. for igw in page.get('InternetGateways', []):
  126. igw_id = igw['InternetGatewayId']
  127. name = VPCServiceScanner._get_name_from_tags(igw.get('Tags', []), igw_id)
  128. resources.append(ResourceData(
  129. account_id=account_id,
  130. region=region,
  131. service='internet_gateway',
  132. resource_type='Internet Gateway',
  133. resource_id=igw_id,
  134. name=name,
  135. attributes={
  136. 'Name': name,
  137. 'ID': igw_id
  138. }
  139. ))
  140. return resources
  141. @staticmethod
  142. @retry_with_backoff()
  143. def scan_nat_gateways(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  144. """
  145. Scan NAT Gateways in the specified region.
  146. Attributes: Name, ID, Public IP, Private IP
  147. """
  148. resources = []
  149. ec2_client = session.client('ec2')
  150. paginator = ec2_client.get_paginator('describe_nat_gateways')
  151. for page in paginator.paginate():
  152. for nat in page.get('NatGateways', []):
  153. # Skip deleted NAT gateways
  154. if nat.get('State') == 'deleted':
  155. continue
  156. name = VPCServiceScanner._get_name_from_tags(nat.get('Tags', []), nat['NatGatewayId'])
  157. # Get IP addresses from addresses
  158. public_ip = ''
  159. private_ip = ''
  160. for addr in nat.get('NatGatewayAddresses', []):
  161. if addr.get('PublicIp'):
  162. public_ip = addr['PublicIp']
  163. if addr.get('PrivateIp'):
  164. private_ip = addr['PrivateIp']
  165. resources.append(ResourceData(
  166. account_id=account_id,
  167. region=region,
  168. service='nat_gateway',
  169. resource_type='NAT Gateway',
  170. resource_id=nat['NatGatewayId'],
  171. name=name,
  172. attributes={
  173. 'Name': name,
  174. 'ID': nat['NatGatewayId'],
  175. 'Public IP': public_ip,
  176. 'Private IP': private_ip
  177. }
  178. ))
  179. return resources
  180. @staticmethod
  181. @retry_with_backoff()
  182. def scan_security_groups(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  183. """
  184. Scan Security Groups in the specified region.
  185. Attributes: Name, ID, Protocol, Port range, Source
  186. Note: Creates one entry per inbound rule
  187. """
  188. resources = []
  189. ec2_client = session.client('ec2')
  190. paginator = ec2_client.get_paginator('describe_security_groups')
  191. for page in paginator.paginate():
  192. for sg in page.get('SecurityGroups', []):
  193. sg_name = sg.get('GroupName', sg['GroupId'])
  194. # Process inbound rules
  195. for rule in sg.get('IpPermissions', []):
  196. protocol = rule.get('IpProtocol', '-1')
  197. if protocol == '-1':
  198. protocol = 'All'
  199. # Get port range
  200. from_port = rule.get('FromPort', 'All')
  201. to_port = rule.get('ToPort', 'All')
  202. if from_port == to_port:
  203. port_range = str(from_port) if from_port != 'All' else 'All'
  204. else:
  205. port_range = f"{from_port}-{to_port}"
  206. # Get sources
  207. sources = []
  208. for ip_range in rule.get('IpRanges', []):
  209. sources.append(ip_range.get('CidrIp', ''))
  210. for ip_range in rule.get('Ipv6Ranges', []):
  211. sources.append(ip_range.get('CidrIpv6', ''))
  212. for group in rule.get('UserIdGroupPairs', []):
  213. sources.append(group.get('GroupId', ''))
  214. source = ', '.join(sources) if sources else 'N/A'
  215. resources.append(ResourceData(
  216. account_id=account_id,
  217. region=region,
  218. service='security_group',
  219. resource_type='Security Group',
  220. resource_id=sg['GroupId'],
  221. name=sg_name,
  222. attributes={
  223. 'Name': sg_name,
  224. 'ID': sg['GroupId'],
  225. 'Protocol': protocol,
  226. 'Port range': port_range,
  227. 'Source': source
  228. }
  229. ))
  230. # If no inbound rules, still add the security group
  231. if not sg.get('IpPermissions'):
  232. resources.append(ResourceData(
  233. account_id=account_id,
  234. region=region,
  235. service='security_group',
  236. resource_type='Security Group',
  237. resource_id=sg['GroupId'],
  238. name=sg_name,
  239. attributes={
  240. 'Name': sg_name,
  241. 'ID': sg['GroupId'],
  242. 'Protocol': 'N/A',
  243. 'Port range': 'N/A',
  244. 'Source': 'N/A'
  245. }
  246. ))
  247. return resources
  248. @staticmethod
  249. @retry_with_backoff()
  250. def scan_vpc_endpoints(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  251. """
  252. Scan VPC Endpoints in the specified region.
  253. Attributes: Name, ID, VPC, Service Name, Type
  254. """
  255. resources = []
  256. ec2_client = session.client('ec2')
  257. paginator = ec2_client.get_paginator('describe_vpc_endpoints')
  258. for page in paginator.paginate():
  259. for endpoint in page.get('VpcEndpoints', []):
  260. name = VPCServiceScanner._get_name_from_tags(endpoint.get('Tags', []), endpoint['VpcEndpointId'])
  261. resources.append(ResourceData(
  262. account_id=account_id,
  263. region=region,
  264. service='vpc_endpoint',
  265. resource_type='Endpoint',
  266. resource_id=endpoint['VpcEndpointId'],
  267. name=name,
  268. attributes={
  269. 'Name': name,
  270. 'ID': endpoint['VpcEndpointId'],
  271. 'VPC': endpoint.get('VpcId', ''),
  272. 'Service Name': endpoint.get('ServiceName', ''),
  273. 'Type': endpoint.get('VpcEndpointType', '')
  274. }
  275. ))
  276. return resources
  277. @staticmethod
  278. @retry_with_backoff()
  279. def scan_vpc_peering(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  280. """
  281. Scan VPC Peering Connections in the specified region.
  282. Attributes: Name, Peering Connection ID, Requester VPC, Accepter VPC
  283. """
  284. resources = []
  285. ec2_client = session.client('ec2')
  286. paginator = ec2_client.get_paginator('describe_vpc_peering_connections')
  287. for page in paginator.paginate():
  288. for peering in page.get('VpcPeeringConnections', []):
  289. # Skip deleted/rejected peerings
  290. status = peering.get('Status', {}).get('Code', '')
  291. if status in ['deleted', 'rejected', 'failed']:
  292. continue
  293. name = VPCServiceScanner._get_name_from_tags(
  294. peering.get('Tags', []),
  295. peering['VpcPeeringConnectionId']
  296. )
  297. requester_vpc = peering.get('RequesterVpcInfo', {}).get('VpcId', '')
  298. accepter_vpc = peering.get('AccepterVpcInfo', {}).get('VpcId', '')
  299. resources.append(ResourceData(
  300. account_id=account_id,
  301. region=region,
  302. service='vpc_peering',
  303. resource_type='VPC Peering',
  304. resource_id=peering['VpcPeeringConnectionId'],
  305. name=name,
  306. attributes={
  307. 'Name': name,
  308. 'Peering Connection ID': peering['VpcPeeringConnectionId'],
  309. 'Requester VPC': requester_vpc,
  310. 'Accepter VPC': accepter_vpc
  311. }
  312. ))
  313. return resources
  314. @staticmethod
  315. @retry_with_backoff()
  316. def scan_customer_gateways(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  317. """
  318. Scan Customer Gateways in the specified region.
  319. Attributes: Name, Customer Gateway ID, IP Address
  320. """
  321. resources = []
  322. ec2_client = session.client('ec2')
  323. response = ec2_client.describe_customer_gateways()
  324. for cgw in response.get('CustomerGateways', []):
  325. # Skip deleted gateways
  326. if cgw.get('State') == 'deleted':
  327. continue
  328. name = VPCServiceScanner._get_name_from_tags(cgw.get('Tags', []), cgw['CustomerGatewayId'])
  329. resources.append(ResourceData(
  330. account_id=account_id,
  331. region=region,
  332. service='customer_gateway',
  333. resource_type='Customer Gateway',
  334. resource_id=cgw['CustomerGatewayId'],
  335. name=name,
  336. attributes={
  337. 'Name': name,
  338. 'Customer Gateway ID': cgw['CustomerGatewayId'],
  339. 'IP Address': cgw.get('IpAddress', '')
  340. }
  341. ))
  342. return resources
  343. @staticmethod
  344. @retry_with_backoff()
  345. def scan_virtual_private_gateways(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  346. """
  347. Scan Virtual Private Gateways in the specified region.
  348. Attributes: Name, Virtual Private Gateway ID, VPC
  349. """
  350. resources = []
  351. ec2_client = session.client('ec2')
  352. response = ec2_client.describe_vpn_gateways()
  353. for vgw in response.get('VpnGateways', []):
  354. # Skip deleted gateways
  355. if vgw.get('State') == 'deleted':
  356. continue
  357. name = VPCServiceScanner._get_name_from_tags(vgw.get('Tags', []), vgw['VpnGatewayId'])
  358. # Get attached VPC
  359. vpc_id = ''
  360. for attachment in vgw.get('VpcAttachments', []):
  361. if attachment.get('State') == 'attached':
  362. vpc_id = attachment.get('VpcId', '')
  363. break
  364. resources.append(ResourceData(
  365. account_id=account_id,
  366. region=region,
  367. service='virtual_private_gateway',
  368. resource_type='Virtual Private Gateway',
  369. resource_id=vgw['VpnGatewayId'],
  370. name=name,
  371. attributes={
  372. 'Name': name,
  373. 'Virtual Private Gateway ID': vgw['VpnGatewayId'],
  374. 'VPC': vpc_id
  375. }
  376. ))
  377. return resources
  378. @staticmethod
  379. @retry_with_backoff()
  380. def scan_vpn_connections(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  381. """
  382. Scan VPN Connections in the specified region.
  383. Attributes: Name, VPN ID, Routes
  384. """
  385. resources = []
  386. ec2_client = session.client('ec2')
  387. response = ec2_client.describe_vpn_connections()
  388. for vpn in response.get('VpnConnections', []):
  389. # Skip deleted connections
  390. if vpn.get('State') == 'deleted':
  391. continue
  392. name = VPCServiceScanner._get_name_from_tags(vpn.get('Tags', []), vpn['VpnConnectionId'])
  393. # Get routes
  394. routes = []
  395. for route in vpn.get('Routes', []):
  396. if route.get('DestinationCidrBlock'):
  397. routes.append(route['DestinationCidrBlock'])
  398. resources.append(ResourceData(
  399. account_id=account_id,
  400. region=region,
  401. service='vpn_connection',
  402. resource_type='VPN Connection',
  403. resource_id=vpn['VpnConnectionId'],
  404. name=name,
  405. attributes={
  406. 'Name': name,
  407. 'VPN ID': vpn['VpnConnectionId'],
  408. 'Routes': ', '.join(routes) if routes else 'N/A'
  409. }
  410. ))
  411. return resources