""" Database Service Scanners Scans RDS DB Instances and ElastiCache Clusters. Requirements: - 5.1: Scan database AWS services using boto3 """ import boto3 from typing import List, Dict, Any import logging from app.scanners.base import ResourceData from app.scanners.utils import retry_with_backoff logger = logging.getLogger(__name__) class DatabaseServiceScanner: """Scanner for database AWS resources""" @staticmethod @retry_with_backoff() def scan_rds_instances(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ Scan RDS DB Instances in the specified region. Attributes (vertical layout - one table per instance): Region, Endpoint, DB instance ID, DB name, Master Username, Port, DB Engine, DB Version, Instance Type, Storage type, Storage, Multi-AZ, Security Group, Deletion Protection, Performance Insights Enabled, CloudWatch Logs """ resources = [] rds_client = session.client('rds') paginator = rds_client.get_paginator('describe_db_instances') for page in paginator.paginate(): for db in page.get('DBInstances', []): db_id = db.get('DBInstanceIdentifier', '') # Get security groups security_groups = [] for sg in db.get('VpcSecurityGroups', []): security_groups.append(sg.get('VpcSecurityGroupId', '')) # Get CloudWatch logs exports cw_logs = db.get('EnabledCloudwatchLogsExports', []) # Get endpoint endpoint = db.get('Endpoint', {}) endpoint_address = endpoint.get('Address', '') port = endpoint.get('Port', '') resources.append(ResourceData( account_id=account_id, region=region, service='rds', resource_type='DB Instance', resource_id=db.get('DBInstanceArn', db_id), name=db_id, attributes={ 'Region': region, 'Endpoint': endpoint_address, 'DB instance ID': db_id, 'DB name': db.get('DBName', ''), 'Master Username': db.get('MasterUsername', ''), 'Port': str(port), 'DB Engine': db.get('Engine', ''), 'DB Version': db.get('EngineVersion', ''), 'Instance Type': db.get('DBInstanceClass', ''), 'Storage type': db.get('StorageType', ''), 'Storage': f"{db.get('AllocatedStorage', '')} GB", 'Multi-AZ': 'Yes' if db.get('MultiAZ') else 'No', 'Security Group': ', '.join(security_groups), 'Deletion Protection': 'Yes' if db.get('DeletionProtection') else 'No', 'Performance Insights Enabled': 'Yes' if db.get('PerformanceInsightsEnabled') else 'No', 'CloudWatch Logs': ', '.join(cw_logs) if cw_logs else 'N/A' } )) return resources @staticmethod @retry_with_backoff() def scan_elasticache_clusters(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]: """ Scan ElastiCache Clusters in the specified region. Attributes (vertical layout - one table per cluster): Cluster ID, Engine, Engine Version, Node Type, Num Nodes, Status """ resources = [] elasticache_client = session.client('elasticache') # Scan cache clusters (Redis/Memcached) try: paginator = elasticache_client.get_paginator('describe_cache_clusters') for page in paginator.paginate(ShowCacheNodeInfo=True): for cluster in page.get('CacheClusters', []): cluster_id = cluster.get('CacheClusterId', '') resources.append(ResourceData( account_id=account_id, region=region, service='elasticache', resource_type='Cache Cluster', resource_id=cluster.get('ARN', cluster_id), name=cluster_id, attributes={ 'Cluster ID': cluster_id, 'Engine': cluster.get('Engine', ''), 'Engine Version': cluster.get('EngineVersion', ''), 'Node Type': cluster.get('CacheNodeType', ''), 'Num Nodes': str(cluster.get('NumCacheNodes', 0)), 'Status': cluster.get('CacheClusterStatus', '') } )) except Exception as e: logger.warning(f"Failed to scan ElastiCache clusters: {str(e)}") # Also scan replication groups (Redis cluster mode) try: paginator = elasticache_client.get_paginator('describe_replication_groups') for page in paginator.paginate(): for rg in page.get('ReplicationGroups', []): rg_id = rg.get('ReplicationGroupId', '') # Count nodes num_nodes = 0 for node_group in rg.get('NodeGroups', []): num_nodes += len(node_group.get('NodeGroupMembers', [])) # Get node type from member clusters node_type = '' member_clusters = rg.get('MemberClusters', []) if member_clusters: try: cluster_response = elasticache_client.describe_cache_clusters( CacheClusterId=member_clusters[0] ) if cluster_response.get('CacheClusters'): node_type = cluster_response['CacheClusters'][0].get('CacheNodeType', '') except Exception: pass resources.append(ResourceData( account_id=account_id, region=region, service='elasticache', resource_type='Cache Cluster', resource_id=rg.get('ARN', rg_id), name=rg_id, attributes={ 'Cluster ID': rg_id, 'Engine': 'redis', 'Engine Version': '', 'Node Type': node_type, 'Num Nodes': str(num_nodes), 'Status': rg.get('Status', '') } )) except Exception as e: logger.warning(f"Failed to scan ElastiCache replication groups: {str(e)}") return resources