aws_scanner.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569
  1. """
  2. AWS Resource Scanner Implementation
  3. This module implements the CloudProviderScanner interface for AWS,
  4. providing comprehensive resource scanning capabilities across all supported services.
  5. Requirements:
  6. - 4.3: Process multiple accounts/regions in parallel
  7. - 4.4: Scan all supported AWS services concurrently
  8. - 5.5: Retry with exponential backoff up to 3 times
  9. - 8.2: Record error details in task record
  10. """
  11. import boto3
  12. from botocore.exceptions import ClientError, BotoCoreError
  13. from concurrent.futures import ThreadPoolExecutor, as_completed
  14. from typing import List, Dict, Any, Optional, Callable
  15. import logging
  16. import traceback
  17. from app.scanners.base import CloudProviderScanner, ResourceData, ScanResult
  18. from app.scanners.credentials import AWSCredentialProvider, CredentialError
  19. from app.scanners.utils import retry_with_backoff
  20. logger = logging.getLogger(__name__)
  21. class AWSScanner(CloudProviderScanner):
  22. """
  23. AWS Resource Scanner implementation.
  24. Scans AWS resources across multiple regions and services using boto3,
  25. with support for parallel scanning and automatic retry with exponential backoff.
  26. """
  27. # All supported AWS services
  28. SUPPORTED_SERVICES = [
  29. 'vpc', 'subnet', 'route_table', 'internet_gateway', 'nat_gateway',
  30. 'security_group', 'vpc_endpoint', 'vpc_peering',
  31. 'customer_gateway', 'virtual_private_gateway', 'vpn_connection',
  32. 'ec2', 'elastic_ip',
  33. 'autoscaling', 'elb', 'target_group',
  34. 'rds', 'elasticache',
  35. 'eks', 'lambda', 's3', 's3_event_notification',
  36. 'cloudfront', 'route53', 'acm', 'waf',
  37. 'sns', 'cloudwatch', 'eventbridge', 'cloudtrail', 'config'
  38. ]
  39. # Global services (not region-specific)
  40. GLOBAL_SERVICES = ['cloudfront', 'route53', 'waf', 's3', 's3_event_notification', 'cloudtrail']
  41. # Maximum workers for parallel scanning
  42. MAX_WORKERS = 10
  43. def __init__(self, credential_provider: AWSCredentialProvider):
  44. """
  45. Initialize the AWS Scanner.
  46. Args:
  47. credential_provider: AWSCredentialProvider instance for authentication
  48. """
  49. self.credential_provider = credential_provider
  50. self._account_id: Optional[str] = None
  51. @property
  52. def provider_name(self) -> str:
  53. return 'AWS'
  54. @property
  55. def supported_services(self) -> List[str]:
  56. return self.SUPPORTED_SERVICES.copy()
  57. @property
  58. def global_services(self) -> List[str]:
  59. return self.GLOBAL_SERVICES.copy()
  60. def get_credentials(self, credential_config: Dict[str, Any]) -> boto3.Session:
  61. """Get boto3 Session from credential configuration"""
  62. return self.credential_provider.get_session()
  63. def list_regions(self) -> List[str]:
  64. """List all available AWS regions"""
  65. try:
  66. session = self.credential_provider.get_session(region_name='us-east-1')
  67. ec2_client = session.client('ec2')
  68. response = ec2_client.describe_regions()
  69. return [region['RegionName'] for region in response['Regions']]
  70. except Exception as e:
  71. logger.error(f"Failed to list regions: {str(e)}")
  72. # Return default regions if API call fails
  73. return [
  74. 'us-east-1', 'us-east-2', 'us-west-1', 'us-west-2',
  75. 'eu-west-1', 'eu-west-2', 'eu-west-3', 'eu-central-1',
  76. 'ap-northeast-1', 'ap-northeast-2', 'ap-southeast-1', 'ap-southeast-2',
  77. 'ap-south-1', 'sa-east-1', 'ca-central-1'
  78. ]
  79. def validate_credentials(self) -> bool:
  80. """Validate AWS credentials"""
  81. return self.credential_provider.validate()
  82. def get_account_id(self) -> str:
  83. """Get AWS account ID"""
  84. if self._account_id:
  85. return self._account_id
  86. self._account_id = self.credential_provider.get_account_id()
  87. return self._account_id
  88. def scan_resources(
  89. self,
  90. regions: List[str],
  91. services: Optional[List[str]] = None,
  92. progress_callback: Optional[Callable[[int, int, str], None]] = None
  93. ) -> ScanResult:
  94. """
  95. Scan AWS resources across specified regions and services.
  96. Args:
  97. regions: List of regions to scan
  98. services: Optional list of services to scan (None = all)
  99. progress_callback: Optional callback for progress updates
  100. Returns:
  101. ScanResult with all discovered resources
  102. """
  103. result = ScanResult(success=True)
  104. account_id = self.get_account_id()
  105. # Determine services to scan
  106. services_to_scan = services if services else self.SUPPORTED_SERVICES
  107. # Separate global and regional services
  108. global_services = [s for s in services_to_scan if s in self.GLOBAL_SERVICES]
  109. regional_services = [s for s in services_to_scan if s not in self.GLOBAL_SERVICES]
  110. # Calculate total tasks for progress tracking
  111. total_tasks = len(global_services) + (len(regional_services) * len(regions))
  112. completed_tasks = 0
  113. # Scan global services first (only once, not per region)
  114. if global_services:
  115. logger.info(f"Scanning {len(global_services)} global services")
  116. global_results = self._scan_services_parallel(
  117. account_id=account_id,
  118. region='us-east-1', # Global services use us-east-1
  119. services=global_services,
  120. is_global=True
  121. )
  122. for service, resources in global_results['resources'].items():
  123. for resource in resources:
  124. result.add_resource(service, resource)
  125. for error in global_results['errors']:
  126. result.add_error(**error)
  127. completed_tasks += len(global_services)
  128. if progress_callback:
  129. progress_callback(completed_tasks, total_tasks, "Completed global services scan")
  130. # Scan regional services in parallel across regions
  131. if regional_services and regions:
  132. logger.info(f"Scanning {len(regional_services)} services across {len(regions)} regions")
  133. with ThreadPoolExecutor(max_workers=self.MAX_WORKERS) as executor:
  134. futures = {}
  135. for region in regions:
  136. future = executor.submit(
  137. self._scan_services_parallel,
  138. account_id=account_id,
  139. region=region,
  140. services=regional_services,
  141. is_global=False
  142. )
  143. futures[future] = region
  144. for future in as_completed(futures):
  145. region = futures[future]
  146. try:
  147. region_results = future.result()
  148. for service, resources in region_results['resources'].items():
  149. for resource in resources:
  150. result.add_resource(service, resource)
  151. for error in region_results['errors']:
  152. result.add_error(**error)
  153. completed_tasks += len(regional_services)
  154. if progress_callback:
  155. progress_callback(
  156. completed_tasks, total_tasks,
  157. f"Completed scan for region: {region}"
  158. )
  159. except Exception as e:
  160. logger.error(f"Failed to scan region {region}: {str(e)}")
  161. result.add_error(
  162. service='all',
  163. region=region,
  164. error=f"Region scan failed: {str(e)}",
  165. details=None
  166. )
  167. completed_tasks += len(regional_services)
  168. # Set metadata
  169. result.metadata = {
  170. 'account_id': account_id,
  171. 'regions_scanned': regions,
  172. 'services_scanned': services_to_scan,
  173. 'total_resources': sum(len(r) for r in result.resources.values()),
  174. 'total_errors': len(result.errors)
  175. }
  176. # Mark as failed if there were critical errors
  177. if result.errors and not result.resources:
  178. result.success = False
  179. return result
  180. def _scan_services_parallel(
  181. self,
  182. account_id: str,
  183. region: str,
  184. services: List[str],
  185. is_global: bool = False
  186. ) -> Dict[str, Any]:
  187. """
  188. Scan multiple services in parallel within a single region.
  189. Args:
  190. account_id: AWS account ID
  191. region: Region to scan
  192. services: List of services to scan
  193. is_global: Whether these are global services
  194. Returns:
  195. Dictionary with 'resources' and 'errors' keys
  196. """
  197. resources: Dict[str, List[ResourceData]] = {}
  198. errors: List[Dict[str, Any]] = []
  199. # Get session for this region
  200. try:
  201. session = self.credential_provider.get_session(region_name=region)
  202. except CredentialError as e:
  203. logger.error(f"Failed to get session for region {region}: {str(e)}")
  204. return {
  205. 'resources': {},
  206. 'errors': [{
  207. 'service': 'all',
  208. 'region': region,
  209. 'error': str(e),
  210. 'details': None
  211. }]
  212. }
  213. # Scan services in parallel
  214. with ThreadPoolExecutor(max_workers=self.MAX_WORKERS) as executor:
  215. futures = {}
  216. for service in services:
  217. scanner_method = self._get_scanner_method(service)
  218. if scanner_method:
  219. future = executor.submit(
  220. self._scan_service_safe,
  221. scanner_method=scanner_method,
  222. session=session,
  223. account_id=account_id,
  224. region='global' if is_global else region,
  225. service=service
  226. )
  227. futures[future] = service
  228. for future in as_completed(futures):
  229. service = futures[future]
  230. try:
  231. service_result = future.result()
  232. if service_result['resources']:
  233. resources[service] = service_result['resources']
  234. if service_result['error']:
  235. errors.append(service_result['error'])
  236. except Exception as e:
  237. logger.error(f"Unexpected error scanning {service}: {str(e)}")
  238. errors.append({
  239. 'service': service,
  240. 'region': region,
  241. 'error': str(e),
  242. 'details': None
  243. })
  244. return {'resources': resources, 'errors': errors}
  245. def _scan_service_safe(
  246. self,
  247. scanner_method: Callable,
  248. session: boto3.Session,
  249. account_id: str,
  250. region: str,
  251. service: str
  252. ) -> Dict[str, Any]:
  253. """
  254. Safely scan a single service, catching and logging errors.
  255. Requirements:
  256. - 8.2: Record error details in task record (via error dict)
  257. Returns:
  258. Dictionary with 'resources' and 'error' keys
  259. """
  260. try:
  261. resources = scanner_method(session, account_id, region)
  262. return {'resources': resources, 'error': None}
  263. except ClientError as e:
  264. error_code = e.response.get('Error', {}).get('Code', 'Unknown')
  265. error_message = e.response.get('Error', {}).get('Message', str(e))
  266. stack_trace = traceback.format_exc()
  267. logger.warning(f"Error scanning {service} in {region}: {error_code} - {error_message}")
  268. return {
  269. 'resources': [],
  270. 'error': {
  271. 'service': service,
  272. 'region': region,
  273. 'error': f"{error_code}: {error_message}",
  274. 'error_type': 'ClientError',
  275. 'details': {
  276. 'error_code': error_code,
  277. 'stack_trace': stack_trace
  278. }
  279. }
  280. }
  281. except Exception as e:
  282. stack_trace = traceback.format_exc()
  283. logger.error(f"Unexpected error scanning {service} in {region}: {str(e)}")
  284. return {
  285. 'resources': [],
  286. 'error': {
  287. 'service': service,
  288. 'region': region,
  289. 'error': str(e),
  290. 'error_type': type(e).__name__,
  291. 'details': {
  292. 'stack_trace': stack_trace
  293. }
  294. }
  295. }
  296. def _get_scanner_method(self, service: str) -> Optional[Callable]:
  297. """Get the scanner method for a specific service"""
  298. scanner_methods = {
  299. 'vpc': self._scan_vpcs,
  300. 'subnet': self._scan_subnets,
  301. 'route_table': self._scan_route_tables,
  302. 'internet_gateway': self._scan_internet_gateways,
  303. 'nat_gateway': self._scan_nat_gateways,
  304. 'security_group': self._scan_security_groups,
  305. 'vpc_endpoint': self._scan_vpc_endpoints,
  306. 'vpc_peering': self._scan_vpc_peering,
  307. 'customer_gateway': self._scan_customer_gateways,
  308. 'virtual_private_gateway': self._scan_virtual_private_gateways,
  309. 'vpn_connection': self._scan_vpn_connections,
  310. 'ec2': self._scan_ec2_instances,
  311. 'elastic_ip': self._scan_elastic_ips,
  312. 'autoscaling': self._scan_autoscaling_groups,
  313. 'elb': self._scan_load_balancers,
  314. 'target_group': self._scan_target_groups,
  315. 'rds': self._scan_rds_instances,
  316. 'elasticache': self._scan_elasticache_clusters,
  317. 'eks': self._scan_eks_clusters,
  318. 'lambda': self._scan_lambda_functions,
  319. 's3': self._scan_s3_buckets,
  320. 's3_event_notification': self._scan_s3_event_notifications,
  321. 'cloudfront': self._scan_cloudfront_distributions,
  322. 'route53': self._scan_route53_hosted_zones,
  323. 'acm': self._scan_acm_certificates,
  324. 'waf': self._scan_waf_web_acls,
  325. 'sns': self._scan_sns_topics,
  326. 'cloudwatch': self._scan_cloudwatch_log_groups,
  327. 'eventbridge': self._scan_eventbridge_rules,
  328. 'cloudtrail': self._scan_cloudtrail_trails,
  329. 'config': self._scan_config_recorders,
  330. }
  331. return scanner_methods.get(service)
  332. # Helper method to get resource name from tags
  333. def _get_name_from_tags(self, tags: List[Dict[str, str]], default: str = '') -> str:
  334. """Extract Name tag value from tags list"""
  335. if not tags:
  336. return default
  337. for tag in tags:
  338. if tag.get('Key') == 'Name':
  339. return tag.get('Value', default)
  340. return default
  341. # ==================== VPC Related Scanners ====================
  342. def _scan_vpcs(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  343. """Scan VPCs"""
  344. from app.scanners.services.vpc import VPCServiceScanner
  345. return VPCServiceScanner.scan_vpcs(session, account_id, region)
  346. def _scan_subnets(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  347. """Scan Subnets"""
  348. from app.scanners.services.vpc import VPCServiceScanner
  349. return VPCServiceScanner.scan_subnets(session, account_id, region)
  350. def _scan_route_tables(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  351. """Scan Route Tables"""
  352. from app.scanners.services.vpc import VPCServiceScanner
  353. return VPCServiceScanner.scan_route_tables(session, account_id, region)
  354. def _scan_internet_gateways(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  355. """Scan Internet Gateways"""
  356. from app.scanners.services.vpc import VPCServiceScanner
  357. return VPCServiceScanner.scan_internet_gateways(session, account_id, region)
  358. def _scan_nat_gateways(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  359. """Scan NAT Gateways"""
  360. from app.scanners.services.vpc import VPCServiceScanner
  361. return VPCServiceScanner.scan_nat_gateways(session, account_id, region)
  362. def _scan_security_groups(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  363. """Scan Security Groups"""
  364. from app.scanners.services.vpc import VPCServiceScanner
  365. return VPCServiceScanner.scan_security_groups(session, account_id, region)
  366. def _scan_vpc_endpoints(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  367. """Scan VPC Endpoints"""
  368. from app.scanners.services.vpc import VPCServiceScanner
  369. return VPCServiceScanner.scan_vpc_endpoints(session, account_id, region)
  370. def _scan_vpc_peering(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  371. """Scan VPC Peering Connections"""
  372. from app.scanners.services.vpc import VPCServiceScanner
  373. return VPCServiceScanner.scan_vpc_peering(session, account_id, region)
  374. def _scan_customer_gateways(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  375. """Scan Customer Gateways"""
  376. from app.scanners.services.vpc import VPCServiceScanner
  377. return VPCServiceScanner.scan_customer_gateways(session, account_id, region)
  378. def _scan_virtual_private_gateways(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  379. """Scan Virtual Private Gateways"""
  380. from app.scanners.services.vpc import VPCServiceScanner
  381. return VPCServiceScanner.scan_virtual_private_gateways(session, account_id, region)
  382. def _scan_vpn_connections(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  383. """Scan VPN Connections"""
  384. from app.scanners.services.vpc import VPCServiceScanner
  385. return VPCServiceScanner.scan_vpn_connections(session, account_id, region)
  386. # ==================== EC2 Related Scanners ====================
  387. def _scan_ec2_instances(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  388. """Scan EC2 Instances"""
  389. from app.scanners.services.ec2 import EC2ServiceScanner
  390. return EC2ServiceScanner.scan_ec2_instances(session, account_id, region)
  391. def _scan_elastic_ips(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  392. """Scan Elastic IPs"""
  393. from app.scanners.services.ec2 import EC2ServiceScanner
  394. return EC2ServiceScanner.scan_elastic_ips(session, account_id, region)
  395. # ==================== Auto Scaling and ELB Scanners ====================
  396. def _scan_autoscaling_groups(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  397. """Scan Auto Scaling Groups"""
  398. from app.scanners.services.elb import ELBServiceScanner
  399. return ELBServiceScanner.scan_autoscaling_groups(session, account_id, region)
  400. def _scan_load_balancers(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  401. """Scan Load Balancers"""
  402. from app.scanners.services.elb import ELBServiceScanner
  403. return ELBServiceScanner.scan_load_balancers(session, account_id, region)
  404. def _scan_target_groups(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  405. """Scan Target Groups"""
  406. from app.scanners.services.elb import ELBServiceScanner
  407. return ELBServiceScanner.scan_target_groups(session, account_id, region)
  408. # ==================== Database Service Scanners ====================
  409. def _scan_rds_instances(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  410. """Scan RDS DB Instances"""
  411. from app.scanners.services.database import DatabaseServiceScanner
  412. return DatabaseServiceScanner.scan_rds_instances(session, account_id, region)
  413. def _scan_elasticache_clusters(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  414. """Scan ElastiCache Clusters"""
  415. from app.scanners.services.database import DatabaseServiceScanner
  416. return DatabaseServiceScanner.scan_elasticache_clusters(session, account_id, region)
  417. # ==================== Compute and Storage Service Scanners ====================
  418. def _scan_eks_clusters(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  419. """Scan EKS Clusters"""
  420. from app.scanners.services.compute import ComputeServiceScanner
  421. return ComputeServiceScanner.scan_eks_clusters(session, account_id, region)
  422. def _scan_lambda_functions(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  423. """Scan Lambda Functions"""
  424. from app.scanners.services.compute import ComputeServiceScanner
  425. return ComputeServiceScanner.scan_lambda_functions(session, account_id, region)
  426. def _scan_s3_buckets(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  427. """Scan S3 Buckets"""
  428. from app.scanners.services.compute import ComputeServiceScanner
  429. return ComputeServiceScanner.scan_s3_buckets(session, account_id, region)
  430. def _scan_s3_event_notifications(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  431. """Scan S3 Event Notifications"""
  432. from app.scanners.services.compute import ComputeServiceScanner
  433. return ComputeServiceScanner.scan_s3_event_notifications(session, account_id, region)
  434. # ==================== Global Service Scanners ====================
  435. def _scan_cloudfront_distributions(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  436. """Scan CloudFront Distributions"""
  437. from app.scanners.services.global_services import GlobalServiceScanner
  438. return GlobalServiceScanner.scan_cloudfront_distributions(session, account_id, region)
  439. def _scan_route53_hosted_zones(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  440. """Scan Route 53 Hosted Zones"""
  441. from app.scanners.services.global_services import GlobalServiceScanner
  442. return GlobalServiceScanner.scan_route53_hosted_zones(session, account_id, region)
  443. def _scan_acm_certificates(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  444. """Scan ACM Certificates"""
  445. from app.scanners.services.global_services import GlobalServiceScanner
  446. return GlobalServiceScanner.scan_acm_certificates(session, account_id, region)
  447. def _scan_waf_web_acls(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  448. """Scan WAF Web ACLs"""
  449. from app.scanners.services.global_services import GlobalServiceScanner
  450. return GlobalServiceScanner.scan_waf_web_acls(session, account_id, region)
  451. # ==================== Monitoring and Management Service Scanners ====================
  452. def _scan_sns_topics(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  453. """Scan SNS Topics"""
  454. from app.scanners.services.monitoring import MonitoringServiceScanner
  455. return MonitoringServiceScanner.scan_sns_topics(session, account_id, region)
  456. def _scan_cloudwatch_log_groups(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  457. """Scan CloudWatch Log Groups"""
  458. from app.scanners.services.monitoring import MonitoringServiceScanner
  459. return MonitoringServiceScanner.scan_cloudwatch_log_groups(session, account_id, region)
  460. def _scan_eventbridge_rules(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  461. """Scan EventBridge Rules"""
  462. from app.scanners.services.monitoring import MonitoringServiceScanner
  463. return MonitoringServiceScanner.scan_eventbridge_rules(session, account_id, region)
  464. def _scan_cloudtrail_trails(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  465. """Scan CloudTrail Trails"""
  466. from app.scanners.services.monitoring import MonitoringServiceScanner
  467. return MonitoringServiceScanner.scan_cloudtrail_trails(session, account_id, region)
  468. def _scan_config_recorders(self, session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
  469. """Scan AWS Config Recorders"""
  470. from app.scanners.services.monitoring import MonitoringServiceScanner
  471. return MonitoringServiceScanner.scan_config_recorders(session, account_id, region)