| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 |
- """
- Monitoring and Management Service Scanners
- Scans SNS Topics, CloudWatch Log Groups, EventBridge Rules,
- CloudTrail Trails, and Config Recorders.
- Requirements:
- - 5.1: Scan monitoring and management AWS services using boto3
- """
- import boto3
- from typing import List, Dict, Any
- import logging
- from app.scanners.base import ResourceData
- from app.scanners.utils import retry_with_backoff
- logger = logging.getLogger(__name__)
- class MonitoringServiceScanner:
- """Scanner for monitoring and management AWS resources"""
-
- @staticmethod
- @retry_with_backoff()
- def scan_sns_topics(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """
- Scan SNS Topics in the specified region.
-
- Attributes (horizontal layout):
- Topic Name, Topic Display Name, Subscription Protocol, Subscription Endpoint
- """
- resources = []
- sns_client = session.client('sns')
-
- try:
- paginator = sns_client.get_paginator('list_topics')
- for page in paginator.paginate():
- for topic in page.get('Topics', []):
- topic_arn = topic.get('TopicArn', '')
- topic_name = topic_arn.split(':')[-1] if topic_arn else ''
-
- # Get topic attributes
- display_name = ''
- try:
- attrs_response = sns_client.get_topic_attributes(TopicArn=topic_arn)
- attrs = attrs_response.get('Attributes', {})
- display_name = attrs.get('DisplayName', '')
- except Exception as e:
- logger.debug(f"Failed to get topic attributes: {str(e)}")
-
- # Get subscriptions
- subscriptions = []
- try:
- sub_paginator = sns_client.get_paginator('list_subscriptions_by_topic')
- for sub_page in sub_paginator.paginate(TopicArn=topic_arn):
- for sub in sub_page.get('Subscriptions', []):
- protocol = sub.get('Protocol', '')
- endpoint = sub.get('Endpoint', '')
- subscriptions.append({
- 'protocol': protocol,
- 'endpoint': endpoint
- })
- except Exception as e:
- logger.debug(f"Failed to get subscriptions: {str(e)}")
-
- # Create one entry per subscription, or one entry if no subscriptions
- if subscriptions:
- for sub in subscriptions:
- resources.append(ResourceData(
- account_id=account_id,
- region=region,
- service='sns',
- resource_type='Topic',
- resource_id=topic_arn,
- name=topic_name,
- attributes={
- 'Topic Name': topic_name,
- 'Topic Display Name': display_name,
- 'Subscription Protocol': sub['protocol'],
- 'Subscription Endpoint': sub['endpoint']
- }
- ))
- else:
- resources.append(ResourceData(
- account_id=account_id,
- region=region,
- service='sns',
- resource_type='Topic',
- resource_id=topic_arn,
- name=topic_name,
- attributes={
- 'Topic Name': topic_name,
- 'Topic Display Name': display_name,
- 'Subscription Protocol': 'N/A',
- 'Subscription Endpoint': 'N/A'
- }
- ))
- except Exception as e:
- logger.warning(f"Failed to scan SNS topics: {str(e)}")
-
- return resources
-
- @staticmethod
- @retry_with_backoff()
- def scan_cloudwatch_log_groups(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """
- Scan CloudWatch Log Groups in the specified region.
-
- Attributes (horizontal layout):
- Log Group Name, Retention Days, Stored Bytes, KMS Encryption
- """
- resources = []
- logs_client = session.client('logs')
-
- try:
- paginator = logs_client.get_paginator('describe_log_groups')
- for page in paginator.paginate():
- for log_group in page.get('logGroups', []):
- log_group_name = log_group.get('logGroupName', '')
-
- # Get retention in days
- retention = log_group.get('retentionInDays')
- retention_str = str(retention) if retention else 'Never Expire'
-
- # Get stored bytes
- stored_bytes = log_group.get('storedBytes', 0)
- stored_str = f"{stored_bytes / (1024*1024):.2f} MB" if stored_bytes else '0 MB'
-
- # Check KMS encryption
- kms_key = log_group.get('kmsKeyId', '')
- kms_encrypted = 'Yes' if kms_key else 'No'
-
- resources.append(ResourceData(
- account_id=account_id,
- region=region,
- service='cloudwatch',
- resource_type='Log Group',
- resource_id=log_group.get('arn', log_group_name),
- name=log_group_name,
- attributes={
- 'Log Group Name': log_group_name,
- 'Retention Days': retention_str,
- 'Stored Bytes': stored_str,
- 'KMS Encryption': kms_encrypted
- }
- ))
- except Exception as e:
- logger.warning(f"Failed to scan CloudWatch log groups: {str(e)}")
-
- return resources
-
- @staticmethod
- @retry_with_backoff()
- def scan_eventbridge_rules(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """
- Scan EventBridge Rules in the specified region.
-
- Attributes (horizontal layout):
- Name, Description, Event Bus, State
- """
- resources = []
- events_client = session.client('events')
-
- try:
- # List event buses first
- buses_response = events_client.list_event_buses()
- event_buses = [bus.get('Name', 'default') for bus in buses_response.get('EventBuses', [])]
-
- # If no buses found, use default
- if not event_buses:
- event_buses = ['default']
-
- for bus_name in event_buses:
- try:
- paginator = events_client.get_paginator('list_rules')
- for page in paginator.paginate(EventBusName=bus_name):
- for rule in page.get('Rules', []):
- rule_name = rule.get('Name', '')
-
- resources.append(ResourceData(
- account_id=account_id,
- region=region,
- service='eventbridge',
- resource_type='Rule',
- resource_id=rule.get('Arn', rule_name),
- name=rule_name,
- attributes={
- 'Name': rule_name,
- 'Description': rule.get('Description', ''),
- 'Event Bus': bus_name,
- 'State': rule.get('State', '')
- }
- ))
- except Exception as e:
- logger.debug(f"Failed to list rules for bus {bus_name}: {str(e)}")
- except Exception as e:
- logger.warning(f"Failed to scan EventBridge rules: {str(e)}")
-
- return resources
-
- @staticmethod
- @retry_with_backoff()
- def scan_cloudtrail_trails(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """
- Scan CloudTrail Trails in the specified region.
-
- Attributes (horizontal layout):
- Name, Multi-Region Trail, Log File Validation, KMS Encryption
- """
- resources = []
- cloudtrail_client = session.client('cloudtrail')
-
- try:
- response = cloudtrail_client.describe_trails()
- for trail in response.get('trailList', []):
- trail_name = trail.get('Name', '')
-
- # Only include trails that are in this region or are multi-region
- trail_region = trail.get('HomeRegion', '')
- is_multi_region = trail.get('IsMultiRegionTrail', False)
-
- if trail_region != region and not is_multi_region:
- continue
-
- resources.append(ResourceData(
- account_id=account_id,
- region=region,
- service='cloudtrail',
- resource_type='Trail',
- resource_id=trail.get('TrailARN', trail_name),
- name=trail_name,
- attributes={
- 'Name': trail_name,
- 'Multi-Region Trail': 'Yes' if is_multi_region else 'No',
- 'Log File Validation': 'Yes' if trail.get('LogFileValidationEnabled') else 'No',
- 'KMS Encryption': 'Yes' if trail.get('KmsKeyId') else 'No'
- }
- ))
- except Exception as e:
- logger.warning(f"Failed to scan CloudTrail trails: {str(e)}")
-
- return resources
-
- @staticmethod
- @retry_with_backoff()
- def scan_config_recorders(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """
- Scan AWS Config Recorders in the specified region.
-
- Attributes (horizontal layout):
- Name, Regional Resources, Global Resources, Retention period
- """
- resources = []
- config_client = session.client('config')
-
- try:
- response = config_client.describe_configuration_recorders()
- for recorder in response.get('ConfigurationRecorders', []):
- recorder_name = recorder.get('name', '')
-
- # Get recording group settings
- recording_group = recorder.get('recordingGroup', {})
- all_supported = recording_group.get('allSupported', False)
- include_global = recording_group.get('includeGlobalResourceTypes', False)
-
- # Get retention period
- retention_period = 'N/A'
- try:
- retention_response = config_client.describe_retention_configurations()
- for retention in retention_response.get('RetentionConfigurations', []):
- retention_period = f"{retention.get('RetentionPeriodInDays', 'N/A')} days"
- break
- except Exception:
- pass
-
- resources.append(ResourceData(
- account_id=account_id,
- region=region,
- service='config',
- resource_type='Config',
- resource_id=recorder_name,
- name=recorder_name,
- attributes={
- 'Name': recorder_name,
- 'Regional Resources': 'Yes' if all_supported else 'No',
- 'Global Resources': 'Yes' if include_global else 'No',
- 'Retention period': retention_period
- }
- ))
- except Exception as e:
- logger.warning(f"Failed to scan Config recorders: {str(e)}")
-
- return resources
|