| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- """
- Database Service Scanners
- Scans RDS DB Instances and ElastiCache Clusters.
- Requirements:
- - 5.1: Scan database AWS services using boto3
- """
- import boto3
- from typing import List, Dict, Any
- import logging
- from app.scanners.base import ResourceData
- from app.scanners.utils import retry_with_backoff
- logger = logging.getLogger(__name__)
- class DatabaseServiceScanner:
- """Scanner for database AWS resources"""
-
- @staticmethod
- @retry_with_backoff()
- def scan_rds_instances(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """
- Scan RDS DB Instances in the specified region.
-
- Attributes (vertical layout - one table per instance):
- Region, Endpoint, DB instance ID, DB name, Master Username, Port,
- DB Engine, DB Version, Instance Type, Storage type, Storage, Multi-AZ,
- Security Group, Deletion Protection, Performance Insights Enabled, CloudWatch Logs
- """
- resources = []
- rds_client = session.client('rds')
-
- paginator = rds_client.get_paginator('describe_db_instances')
- for page in paginator.paginate():
- for db in page.get('DBInstances', []):
- db_id = db.get('DBInstanceIdentifier', '')
-
- # Get security groups
- security_groups = []
- for sg in db.get('VpcSecurityGroups', []):
- security_groups.append(sg.get('VpcSecurityGroupId', ''))
-
- # Get CloudWatch logs exports
- cw_logs = db.get('EnabledCloudwatchLogsExports', [])
-
- # Get endpoint
- endpoint = db.get('Endpoint', {})
- endpoint_address = endpoint.get('Address', '')
- port = endpoint.get('Port', '')
-
- resources.append(ResourceData(
- account_id=account_id,
- region=region,
- service='rds',
- resource_type='DB Instance',
- resource_id=db.get('DBInstanceArn', db_id),
- name=db_id,
- attributes={
- 'Region': region,
- 'Endpoint': endpoint_address,
- 'DB instance ID': db_id,
- 'DB name': db.get('DBName', ''),
- 'Master Username': db.get('MasterUsername', ''),
- 'Port': str(port),
- 'DB Engine': db.get('Engine', ''),
- 'DB Version': db.get('EngineVersion', ''),
- 'Instance Type': db.get('DBInstanceClass', ''),
- 'Storage type': db.get('StorageType', ''),
- 'Storage': f"{db.get('AllocatedStorage', '')} GB",
- 'Multi-AZ': 'Yes' if db.get('MultiAZ') else 'No',
- 'Security Group': ', '.join(security_groups),
- 'Deletion Protection': 'Yes' if db.get('DeletionProtection') else 'No',
- 'Performance Insights Enabled': 'Yes' if db.get('PerformanceInsightsEnabled') else 'No',
- 'CloudWatch Logs': ', '.join(cw_logs) if cw_logs else 'N/A'
- }
- ))
-
- return resources
-
- @staticmethod
- @retry_with_backoff()
- def scan_elasticache_clusters(session: boto3.Session, account_id: str, region: str) -> List[ResourceData]:
- """
- Scan ElastiCache Clusters in the specified region.
-
- Attributes (vertical layout - one table per cluster):
- Cluster ID, Engine, Engine Version, Node Type, Num Nodes, Status
- """
- resources = []
- elasticache_client = session.client('elasticache')
-
- # Scan cache clusters (Redis/Memcached)
- try:
- paginator = elasticache_client.get_paginator('describe_cache_clusters')
- for page in paginator.paginate(ShowCacheNodeInfo=True):
- for cluster in page.get('CacheClusters', []):
- cluster_id = cluster.get('CacheClusterId', '')
-
- resources.append(ResourceData(
- account_id=account_id,
- region=region,
- service='elasticache',
- resource_type='Cache Cluster',
- resource_id=cluster.get('ARN', cluster_id),
- name=cluster_id,
- attributes={
- 'Cluster ID': cluster_id,
- 'Engine': cluster.get('Engine', ''),
- 'Engine Version': cluster.get('EngineVersion', ''),
- 'Node Type': cluster.get('CacheNodeType', ''),
- 'Num Nodes': str(cluster.get('NumCacheNodes', 0)),
- 'Status': cluster.get('CacheClusterStatus', '')
- }
- ))
- except Exception as e:
- logger.warning(f"Failed to scan ElastiCache clusters: {str(e)}")
-
- # Also scan replication groups (Redis cluster mode)
- try:
- paginator = elasticache_client.get_paginator('describe_replication_groups')
- for page in paginator.paginate():
- for rg in page.get('ReplicationGroups', []):
- rg_id = rg.get('ReplicationGroupId', '')
-
- # Count nodes
- num_nodes = 0
- for node_group in rg.get('NodeGroups', []):
- num_nodes += len(node_group.get('NodeGroupMembers', []))
-
- # Get node type from member clusters
- node_type = ''
- member_clusters = rg.get('MemberClusters', [])
- if member_clusters:
- try:
- cluster_response = elasticache_client.describe_cache_clusters(
- CacheClusterId=member_clusters[0]
- )
- if cluster_response.get('CacheClusters'):
- node_type = cluster_response['CacheClusters'][0].get('CacheNodeType', '')
- except Exception:
- pass
-
- resources.append(ResourceData(
- account_id=account_id,
- region=region,
- service='elasticache',
- resource_type='Cache Cluster',
- resource_id=rg.get('ARN', rg_id),
- name=rg_id,
- attributes={
- 'Cluster ID': rg_id,
- 'Engine': 'redis',
- 'Engine Version': '',
- 'Node Type': node_type,
- 'Num Nodes': str(num_nodes),
- 'Status': rg.get('Status', '')
- }
- ))
- except Exception as e:
- logger.warning(f"Failed to scan ElastiCache replication groups: {str(e)}")
-
- return resources
|