""" Monitoring and Management Service Scanners Scans SNS Topics, CloudWatch Log Groups, EventBridge Rules, CloudTrail Trails, and Config Recorders. Requirements: - 5.1: Scan monitoring and management AWS services using boto3 """ import boto3 from typing import List, Dict, Any import logging from app.scanners.base import ResourceData from app.scanners.utils import retry_with_backoff logger = logging.getLogger(__name__) class MonitoringServiceScanner: """Scanner for monitoring and management AWS resources""" @staticmethod @retry_with_backoff() def scan_sns_topics(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ Scan SNS Topics in the specified region. Attributes (horizontal layout): Topic Name, Topic Display Name, Subscription Protocol, Subscription Endpoint """ resources = [] sns_client = session.client('sns') try: paginator = sns_client.get_paginator('list_topics') for page in paginator.paginate(): for topic in page.get('Topics', []): topic_arn = topic.get('TopicArn', '') topic_name = topic_arn.split(':')[-1] if topic_arn else '' # Get topic attributes display_name = '' try: attrs_response = sns_client.get_topic_attributes(TopicArn=topic_arn) attrs = attrs_response.get('Attributes', {}) display_name = attrs.get('DisplayName', '') except Exception as e: logger.debug(f"Failed to get topic attributes: {str(e)}") # Get subscriptions subscriptions = [] try: sub_paginator = sns_client.get_paginator('list_subscriptions_by_topic') for sub_page in sub_paginator.paginate(TopicArn=topic_arn): for sub in sub_page.get('Subscriptions', []): protocol = sub.get('Protocol', '') endpoint = sub.get('Endpoint', '') subscriptions.append({ 'protocol': protocol, 'endpoint': endpoint }) except Exception as e: logger.debug(f"Failed to get subscriptions: {str(e)}") # Create one entry per subscription, or one entry if no subscriptions if subscriptions: for sub in subscriptions: resources.append(ResourceData( account_id=account_id, region=region, service='sns', resource_type='Topic', resource_id=topic_arn, name=topic_name, attributes={ 'Topic Name': topic_name, 'Topic Display Name': display_name, 'Subscription Protocol': sub['protocol'], 'Subscription Endpoint': sub['endpoint'] } )) else: resources.append(ResourceData( account_id=account_id, region=region, service='sns', resource_type='Topic', resource_id=topic_arn, name=topic_name, attributes={ 'Topic Name': topic_name, 'Topic Display Name': display_name, 'Subscription Protocol': 'N/A', 'Subscription Endpoint': 'N/A' } )) except Exception as e: logger.warning(f"Failed to scan SNS topics: {str(e)}") return resources @staticmethod @retry_with_backoff() def scan_cloudwatch_log_groups(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ Scan CloudWatch Log Groups in the specified region. Attributes (horizontal layout): Log Group Name, Retention Days, Stored Bytes, KMS Encryption """ resources = [] logs_client = session.client('logs') try: paginator = logs_client.get_paginator('describe_log_groups') for page in paginator.paginate(): for log_group in page.get('logGroups', []): log_group_name = log_group.get('logGroupName', '') # Get retention in days retention = log_group.get('retentionInDays') retention_str = str(retention) if retention else 'Never Expire' # Get stored bytes stored_bytes = log_group.get('storedBytes', 0) stored_str = f"{stored_bytes / (1024*1024):.2f} MB" if stored_bytes else '0 MB' # Check KMS encryption kms_key = log_group.get('kmsKeyId', '') kms_encrypted = 'Yes' if kms_key else 'No' resources.append(ResourceData( account_id=account_id, region=region, service='cloudwatch', resource_type='Log Group', resource_id=log_group.get('arn', log_group_name), name=log_group_name, attributes={ 'Log Group Name': log_group_name, 'Retention Days': retention_str, 'Stored Bytes': stored_str, 'KMS Encryption': kms_encrypted } )) except Exception as e: logger.warning(f"Failed to scan CloudWatch log groups: {str(e)}") return resources @staticmethod @retry_with_backoff() def scan_eventbridge_rules(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ Scan EventBridge Rules in the specified region. Attributes (horizontal layout): Name, Description, Event Bus, State """ resources = [] events_client = session.client('events') try: # List event buses first buses_response = events_client.list_event_buses() event_buses = [bus.get('Name', 'default') for bus in buses_response.get('EventBuses', [])] # If no buses found, use default if not event_buses: event_buses = ['default'] for bus_name in event_buses: try: paginator = events_client.get_paginator('list_rules') for page in paginator.paginate(EventBusName=bus_name): for rule in page.get('Rules', []): rule_name = rule.get('Name', '') resources.append(ResourceData( account_id=account_id, region=region, service='eventbridge', resource_type='Rule', resource_id=rule.get('Arn', rule_name), name=rule_name, attributes={ 'Name': rule_name, 'Description': rule.get('Description', ''), 'Event Bus': bus_name, 'State': rule.get('State', '') } )) except Exception as e: logger.debug(f"Failed to list rules for bus {bus_name}: {str(e)}") except Exception as e: logger.warning(f"Failed to scan EventBridge rules: {str(e)}") return resources @staticmethod @retry_with_backoff() def scan_cloudtrail_trails(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ Scan CloudTrail Trails in the specified region. Attributes (horizontal layout): Name, Multi-Region Trail, Log File Validation, KMS Encryption """ resources = [] cloudtrail_client = session.client('cloudtrail') try: response = cloudtrail_client.describe_trails() for trail in response.get('trailList', []): trail_name = trail.get('Name', '') # Only include trails that are in this region or are multi-region trail_region = trail.get('HomeRegion', '') is_multi_region = trail.get('IsMultiRegionTrail', False) if trail_region != region and not is_multi_region: continue resources.append(ResourceData( account_id=account_id, region=region, service='cloudtrail', resource_type='Trail', resource_id=trail.get('TrailARN', trail_name), name=trail_name, attributes={ 'Name': trail_name, 'Multi-Region Trail': 'Yes' if is_multi_region else 'No', 'Log File Validation': 'Yes' if trail.get('LogFileValidationEnabled') else 'No', 'KMS Encryption': 'Yes' if trail.get('KmsKeyId') else 'No' } )) except Exception as e: logger.warning(f"Failed to scan CloudTrail trails: {str(e)}") return resources @staticmethod @retry_with_backoff() def scan_config_recorders(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ Scan AWS Config Recorders in the specified region. Attributes (horizontal layout): Name, Regional Resources, Global Resources, Retention period """ resources = [] config_client = session.client('config') try: response = config_client.describe_configuration_recorders() for recorder in response.get('ConfigurationRecorders', []): recorder_name = recorder.get('name', '') # Get recording group settings recording_group = recorder.get('recordingGroup', {}) all_supported = recording_group.get('allSupported', False) include_global = recording_group.get('includeGlobalResourceTypes', False) # Get retention period retention_period = 'N/A' try: retention_response = config_client.describe_retention_configurations() for retention in retention_response.get('RetentionConfigurations', []): retention_period = f"{retention.get('RetentionPeriodInDays', 'N/A')} days" break except Exception: pass resources.append(ResourceData( account_id=account_id, region=region, service='config', resource_type='Config', resource_id=recorder_name, name=recorder_name, attributes={ 'Name': recorder_name, 'Regional Resources': 'Yes' if all_supported else 'No', 'Global Resources': 'Yes' if include_global else 'No', 'Retention period': retention_period } )) except Exception as e: logger.warning(f"Failed to scan Config recorders: {str(e)}") return resources