| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384 |
- #!/usr/bin/env python3
- """
- CloudShell Scanner - AWS Resource Scanner for CloudShell Environment
- A standalone Python script that scans AWS resources using CloudShell's IAM credentials.
- This script is designed to run in AWS CloudShell without requiring Access Keys.
- Requirements:
- - 1.1: Single-file Python script, only depends on boto3 and Python standard library
- - 1.2: Automatically uses CloudShell environment's IAM credentials
- - 1.7: Displays progress information during scanning
- Usage:
- # Scan all regions
- python cloudshell_scanner.py
- # Scan specific regions
- python cloudshell_scanner.py --regions us-east-1,ap-northeast-1
- # Specify output file
- python cloudshell_scanner.py --output my_scan.json
- # Scan specific services
- python cloudshell_scanner.py --services ec2,vpc,rds
- """
- import argparse
- import json
- import logging
- import sys
- import time
- from datetime import datetime, timezone
- from functools import wraps
- from typing import Any, Callable, Dict, List, Optional, TypeVar
- import boto3
- from botocore.exceptions import BotoCoreError, ClientError
- # Type variable for generic retry decorator
- T = TypeVar("T")
- # Scanner version
- __version__ = "1.0.0"
- # Configure logging
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s - %(levelname)s - %(message)s",
- datefmt="%Y-%m-%d %H:%M:%S",
- )
- logger = logging.getLogger(__name__)
- # Retryable exceptions for exponential backoff
- RETRYABLE_EXCEPTIONS = (
- ClientError,
- BotoCoreError,
- ConnectionError,
- TimeoutError,
- )
- # Retryable error codes from AWS
- RETRYABLE_ERROR_CODES = {
- "Throttling",
- "ThrottlingException",
- "RequestThrottled",
- "RequestLimitExceeded",
- "ProvisionedThroughputExceededException",
- "ServiceUnavailable",
- "InternalError",
- "RequestTimeout",
- "RequestTimeoutException",
- }
- def retry_with_exponential_backoff(
- max_retries: int = 3,
- base_delay: float = 1.0,
- max_delay: float = 30.0,
- exponential_base: float = 2.0,
- ) -> Callable:
- """
- Decorator that implements retry logic with exponential backoff.
-
- This decorator will retry a function call if it raises a retryable exception.
- The delay between retries increases exponentially.
-
- Args:
- max_retries: Maximum number of retry attempts (default: 3)
- base_delay: Initial delay in seconds (default: 1.0)
- max_delay: Maximum delay in seconds (default: 30.0)
- exponential_base: Base for exponential calculation (default: 2.0)
-
- Returns:
- Decorated function with retry logic
-
- Requirements:
- - 1.8: Record errors and continue scanning other resources
- - Design: Network timeout - retry 3 times with exponential backoff
- """
- def decorator(func: Callable[..., T]) -> Callable[..., T]:
- @wraps(func)
- def wrapper(*args, **kwargs) -> T:
- last_exception = None
-
- for attempt in range(max_retries + 1):
- try:
- return func(*args, **kwargs)
- except RETRYABLE_EXCEPTIONS as e:
- last_exception = e
-
- # Check if it's a retryable error code for ClientError
- if isinstance(e, ClientError):
- error_code = e.response.get("Error", {}).get("Code", "")
- if error_code not in RETRYABLE_ERROR_CODES:
- # Not a retryable error, raise immediately
- raise
-
- if attempt < max_retries:
- # Calculate delay with exponential backoff
- delay = min(
- base_delay * (exponential_base ** attempt),
- max_delay
- )
- logger.warning(
- f"Attempt {attempt + 1}/{max_retries + 1} failed for "
- f"{func.__name__}: {str(e)}. Retrying in {delay:.1f}s..."
- )
- time.sleep(delay)
- else:
- logger.error(
- f"All {max_retries + 1} attempts failed for "
- f"{func.__name__}: {str(e)}"
- )
-
- # All retries exhausted, raise the last exception
- if last_exception:
- raise last_exception
-
- return wrapper
- return decorator
- def is_retryable_error(exception: Exception) -> bool:
- """
- Check if an exception is retryable.
-
- Args:
- exception: The exception to check
-
- Returns:
- True if the exception is retryable, False otherwise
- """
- if isinstance(exception, ClientError):
- error_code = exception.response.get("Error", {}).get("Code", "")
- return error_code in RETRYABLE_ERROR_CODES
- return isinstance(exception, RETRYABLE_EXCEPTIONS)
- class ProgressDisplay:
- """
- Progress display utility for showing scan progress.
-
- Requirements:
- - 1.7: Displays progress information during scanning
- """
-
- def __init__(self, total_tasks: int = 0):
- """
- Initialize progress display.
-
- Args:
- total_tasks: Total number of tasks to track
- """
- self.total_tasks = total_tasks
- self.completed_tasks = 0
- self.current_service = ""
- self.current_region = ""
-
- def set_total(self, total: int) -> None:
- """Set total number of tasks."""
- self.total_tasks = total
- self.completed_tasks = 0
-
- def update(self, service: str, region: str, status: str = "scanning") -> None:
- """
- Update progress display.
-
- Args:
- service: Current service being scanned
- region: Current region being scanned
- status: Status message
- """
- self.current_service = service
- self.current_region = region
-
- if self.total_tasks > 0:
- percentage = (self.completed_tasks / self.total_tasks) * 100
- progress_bar = self._create_progress_bar(percentage)
- print(
- f"\r{progress_bar} {percentage:5.1f}% | {status}: {service} in {region}",
- end="",
- flush=True,
- )
- else:
- print(f"\r{status}: {service} in {region}", end="", flush=True)
-
- def increment(self) -> None:
- """Increment completed tasks counter."""
- self.completed_tasks += 1
-
- def complete(self, message: str = "Scan completed") -> None:
- """
- Mark progress as complete.
-
- Args:
- message: Completion message
- """
- if self.total_tasks > 0:
- progress_bar = self._create_progress_bar(100)
- print(f"\r{progress_bar} 100.0% | {message}")
- else:
- print(f"\r{message}")
-
- def _create_progress_bar(self, percentage: float, width: int = 30) -> str:
- """
- Create a text-based progress bar.
-
- Args:
- percentage: Completion percentage (0-100)
- width: Width of the progress bar
-
- Returns:
- Progress bar string
- """
- filled = int(width * percentage / 100)
- bar = "█" * filled + "░" * (width - filled)
- return f"[{bar}]"
-
- def log_error(self, service: str, region: str, error: str) -> None:
- """
- Log an error during scanning.
-
- Args:
- service: Service that encountered the error
- region: Region where the error occurred
- error: Error message
- """
- # Print newline to avoid overwriting progress bar
- print()
- logger.warning(f"Error scanning {service} in {region}: {error}")
- class CloudShellScanner:
- """
- CloudShell environment AWS resource scanner.
-
- This class provides functionality to scan AWS resources using the IAM credentials
- automatically available in the CloudShell environment.
-
- Requirements:
- - 1.1: Single-file Python script, only depends on boto3 and Python standard library
- - 1.2: Automatically uses CloudShell environment's IAM credentials
- - 1.7: Displays progress information during scanning
-
- Attributes:
- SUPPORTED_SERVICES: List of all supported AWS services
- GLOBAL_SERVICES: List of global services (not region-specific)
- """
-
- # All supported AWS services (must match AWSScanner.SUPPORTED_SERVICES)
- SUPPORTED_SERVICES: List[str] = [
- "vpc", "subnet", "route_table", "internet_gateway", "nat_gateway",
- "security_group", "vpc_endpoint", "vpc_peering",
- "customer_gateway", "virtual_private_gateway", "vpn_connection",
- "ec2", "elastic_ip",
- "autoscaling", "elb", "target_group",
- "rds", "elasticache",
- "eks", "lambda", "s3", "s3_event_notification",
- "cloudfront", "route53", "acm", "waf",
- "sns", "cloudwatch", "eventbridge", "cloudtrail", "config",
- ]
-
- # Global services (not region-specific)
- GLOBAL_SERVICES: List[str] = [
- "cloudfront", "route53", "waf", "s3", "s3_event_notification", "cloudtrail"
- ]
-
- def __init__(self):
- """
- Initialize the CloudShell scanner.
-
- Automatically uses CloudShell environment's IAM credentials via boto3's
- default credential chain.
-
- Requirements:
- - 1.2: Automatically uses CloudShell environment's IAM credentials
- """
- self._account_id: Optional[str] = None
- self._session: Optional[boto3.Session] = None
- self.progress = ProgressDisplay()
-
- # Initialize session using default credentials (CloudShell IAM)
- try:
- self._session = boto3.Session()
- logger.info("Initialized CloudShell scanner with default credentials")
- except Exception as e:
- logger.error(f"Failed to initialize boto3 session: {e}")
- raise
-
- def get_account_id(self) -> str:
- """
- Get the current AWS account ID.
-
- Returns:
- AWS account ID string
-
- Raises:
- Exception: If unable to retrieve account ID
- """
- if self._account_id:
- return self._account_id
-
- try:
- sts_client = self._session.client("sts")
- response = sts_client.get_caller_identity()
- self._account_id = response["Account"]
- logger.info(f"Retrieved account ID: {self._account_id}")
- return self._account_id
- except Exception as e:
- logger.error(f"Failed to get account ID: {e}")
- raise
-
- def list_regions(self) -> List[str]:
- """
- List all available AWS regions.
-
- Returns:
- List of region names
-
- Requirements:
- - 1.4: Scan all available regions when not specified
- """
- try:
- ec2_client = self._session.client("ec2", region_name="us-east-1")
- response = ec2_client.describe_regions()
- regions = [region["RegionName"] for region in response["Regions"]]
- logger.info(f"Found {len(regions)} available regions")
- return regions
- except Exception as e:
- logger.warning(f"Failed to list regions, using defaults: {e}")
- # Return default regions if API call fails
- return self._get_default_regions()
-
- def _get_default_regions(self) -> List[str]:
- """
- Get default AWS regions as fallback.
-
- Returns:
- List of default region names
- """
- return [
- "us-east-1", "us-east-2", "us-west-1", "us-west-2",
- "eu-west-1", "eu-west-2", "eu-west-3", "eu-central-1",
- "ap-northeast-1", "ap-northeast-2", "ap-southeast-1", "ap-southeast-2",
- "ap-south-1", "sa-east-1", "ca-central-1",
- ]
-
- def filter_regions(
- self,
- requested_regions: Optional[List[str]] = None,
- ) -> List[str]:
- """
- Filter and validate requested regions against available regions.
-
- This method implements region filtering logic:
- - If no regions specified, returns all available regions
- - If regions specified, validates them against available regions
- - Invalid regions are logged and filtered out
-
- Args:
- requested_regions: List of regions requested by user (None = all regions)
-
- Returns:
- List of valid region names to scan
-
- Requirements:
- - 1.3: Scan only specified regions when provided
- - 1.4: Scan all available regions when not specified
- """
- # Get all available regions
- available_regions = self.list_regions()
- available_set = set(available_regions)
-
- # If no regions specified, return all available regions
- if requested_regions is None:
- logger.info(f"No regions specified, will scan all {len(available_regions)} available regions")
- return available_regions
-
- # Validate requested regions
- valid_regions = []
- invalid_regions = []
-
- for region in requested_regions:
- # Normalize region name (strip whitespace, lowercase)
- normalized_region = region.strip().lower()
-
- if normalized_region in available_set:
- valid_regions.append(normalized_region)
- else:
- invalid_regions.append(region)
-
- # Log invalid regions
- if invalid_regions:
- logger.warning(
- f"Ignoring invalid/unavailable regions: {invalid_regions}. "
- f"Available regions: {sorted(available_regions)}"
- )
-
- # If no valid regions remain, fall back to all available regions
- if not valid_regions:
- logger.warning(
- "No valid regions specified, falling back to all available regions"
- )
- return available_regions
-
- logger.info(f"Will scan {len(valid_regions)} specified regions: {valid_regions}")
- return valid_regions
-
- def validate_region(self, region: str) -> bool:
- """
- Validate if a region is available.
-
- Args:
- region: Region name to validate
-
- Returns:
- True if region is valid, False otherwise
- """
- try:
- available_regions = self.list_regions()
- return region.strip().lower() in set(available_regions)
- except Exception:
- # If we can't validate, assume it's valid and let the API call fail
- return True
-
- def scan_resources(
- self,
- regions: Optional[List[str]] = None,
- services: Optional[List[str]] = None,
- ) -> Dict[str, Any]:
- """
- Scan AWS resources across specified regions and services.
-
- Args:
- regions: List of regions to scan (None = all available regions)
- services: List of services to scan (None = all supported services)
-
- Returns:
- Dictionary containing scan results with metadata, resources, and errors
-
- Requirements:
- - 1.3: Scan only specified regions when provided
- - 1.4: Scan all available regions when not specified
- - 1.5: Scan all supported service types
- - 1.7: Display progress information during scanning
- - 1.8: Record errors and continue scanning other resources
- """
- # Get account ID
- account_id = self.get_account_id()
-
- # Filter and validate regions
- regions_to_scan = self.filter_regions(regions)
- logger.info(f"Scanning {len(regions_to_scan)} regions")
-
- # Determine services to scan
- services_to_scan = services if services else self.SUPPORTED_SERVICES.copy()
- logger.info(f"Scanning {len(services_to_scan)} services")
-
- # Validate services
- invalid_services = [s for s in services_to_scan if s not in self.SUPPORTED_SERVICES]
- if invalid_services:
- logger.warning(f"Ignoring unsupported services: {invalid_services}")
- services_to_scan = [s for s in services_to_scan if s in self.SUPPORTED_SERVICES]
-
- # Separate global and regional services
- global_services = [s for s in services_to_scan if s in self.GLOBAL_SERVICES]
- regional_services = [s for s in services_to_scan if s not in self.GLOBAL_SERVICES]
-
- # Calculate total tasks for progress tracking
- total_tasks = len(global_services) + (len(regional_services) * len(regions_to_scan))
- self.progress.set_total(total_tasks)
-
- # Initialize result structure
- result: Dict[str, Any] = {
- "metadata": {
- "account_id": account_id,
- "scan_timestamp": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
- "regions_scanned": regions_to_scan,
- "services_scanned": services_to_scan,
- "scanner_version": __version__,
- "total_resources": 0,
- "total_errors": 0,
- },
- "resources": {},
- "errors": [],
- }
-
- # Scan global services first (only once, not per region)
- if global_services:
- logger.info(f"Scanning {len(global_services)} global services")
- self._scan_global_services(
- account_id=account_id,
- services=global_services,
- result=result,
- )
-
- # Scan regional services
- if regional_services and regions_to_scan:
- logger.info(f"Scanning {len(regional_services)} regional services across {len(regions_to_scan)} regions")
- self._scan_regional_services(
- account_id=account_id,
- regions=regions_to_scan,
- services=regional_services,
- result=result,
- )
-
- # Update metadata totals
- result["metadata"]["total_resources"] = sum(
- len(resources) for resources in result["resources"].values()
- )
- result["metadata"]["total_errors"] = len(result["errors"])
-
- self.progress.complete(
- f"Scan completed: {result['metadata']['total_resources']} resources, "
- f"{result['metadata']['total_errors']} errors"
- )
-
- return result
-
- def _call_with_retry(
- self,
- func: Callable[..., T],
- *args,
- max_retries: int = 3,
- base_delay: float = 1.0,
- **kwargs,
- ) -> T:
- """
- Call a function with retry logic and exponential backoff.
-
- This method wraps API calls with retry logic for transient failures.
-
- Args:
- func: Function to call
- *args: Positional arguments for the function
- max_retries: Maximum number of retry attempts
- base_delay: Initial delay in seconds
- **kwargs: Keyword arguments for the function
-
- Returns:
- Result of the function call
-
- Raises:
- Exception: If all retries are exhausted
-
- Requirements:
- - 1.8: Record errors and continue scanning other resources
- - Design: Network timeout - retry 3 times with exponential backoff
- """
- last_exception = None
-
- for attempt in range(max_retries + 1):
- try:
- return func(*args, **kwargs)
- except RETRYABLE_EXCEPTIONS as e:
- last_exception = e
-
- # Check if it's a retryable error code for ClientError
- if isinstance(e, ClientError):
- error_code = e.response.get("Error", {}).get("Code", "")
- if error_code not in RETRYABLE_ERROR_CODES:
- # Not a retryable error, raise immediately
- raise
-
- if attempt < max_retries:
- # Calculate delay with exponential backoff
- delay = min(base_delay * (2 ** attempt), 30.0)
- logger.warning(
- f"Attempt {attempt + 1}/{max_retries + 1} failed: {str(e)}. "
- f"Retrying in {delay:.1f}s..."
- )
- time.sleep(delay)
- else:
- logger.error(f"All {max_retries + 1} attempts failed: {str(e)}")
-
- # All retries exhausted, raise the last exception
- if last_exception:
- raise last_exception
-
- def _scan_global_services(
- self,
- account_id: str,
- services: List[str],
- result: Dict[str, Any],
- ) -> None:
- """
- Scan global AWS services.
-
- Args:
- account_id: AWS account ID
- services: List of global services to scan
- result: Result dictionary to update
-
- Requirements:
- - 1.8: Record errors and continue scanning other resources
- """
- for service in services:
- self.progress.update(service, "global", "Scanning")
-
- try:
- resources = self._scan_service(
- account_id=account_id,
- region="global",
- service=service,
- )
-
- if resources:
- if service not in result["resources"]:
- result["resources"][service] = []
- result["resources"][service].extend(resources)
-
- except Exception as e:
- # Capture detailed error information
- error_info = self._create_error_info(
- service=service,
- region="global",
- exception=e,
- )
- result["errors"].append(error_info)
- self.progress.log_error(service, "global", str(e))
-
- self.progress.increment()
-
- def _scan_regional_services(
- self,
- account_id: str,
- regions: List[str],
- services: List[str],
- result: Dict[str, Any],
- ) -> None:
- """
- Scan regional AWS services.
-
- Args:
- account_id: AWS account ID
- regions: List of regions to scan
- services: List of regional services to scan
- result: Result dictionary to update
-
- Requirements:
- - 1.8: Record errors and continue scanning other resources
- """
- for region in regions:
- for service in services:
- self.progress.update(service, region, "Scanning")
-
- try:
- resources = self._scan_service(
- account_id=account_id,
- region=region,
- service=service,
- )
-
- if resources:
- if service not in result["resources"]:
- result["resources"][service] = []
- result["resources"][service].extend(resources)
-
- except Exception as e:
- # Capture detailed error information
- error_info = self._create_error_info(
- service=service,
- region=region,
- exception=e,
- )
- result["errors"].append(error_info)
- self.progress.log_error(service, region, str(e))
-
- self.progress.increment()
-
- def _create_error_info(
- self,
- service: str,
- region: str,
- exception: Exception,
- ) -> Dict[str, Any]:
- """
- Create a detailed error information dictionary.
-
- This method extracts detailed information from exceptions to provide
- useful error context for debugging and reporting.
-
- Args:
- service: Service that encountered the error
- region: Region where the error occurred
- exception: The exception that was raised
-
- Returns:
- Dictionary containing error details
-
- Requirements:
- - 1.8: Record errors and continue scanning other resources
- - 6.1: Display missing permission information when encountering permission errors
- """
- error_info: Dict[str, Any] = {
- "service": service,
- "region": region,
- "error": str(exception),
- "error_type": type(exception).__name__,
- "details": None,
- }
-
- # Extract additional details from ClientError
- if isinstance(exception, ClientError):
- error_response = exception.response.get("Error", {})
- error_code = error_response.get("Code", "")
- error_message = error_response.get("Message", "")
-
- error_info["details"] = {
- "error_code": error_code,
- "error_message": error_message,
- }
-
- # Check for permission errors and provide helpful information
- if error_code in ("AccessDenied", "AccessDeniedException", "UnauthorizedAccess"):
- error_info["details"]["permission_hint"] = (
- f"Missing IAM permission for {service} in {region}. "
- f"Please ensure your IAM role has the necessary permissions."
- )
- logger.warning(
- f"Permission denied for {service} in {region}: {error_message}"
- )
-
- # Extract details from BotoCoreError
- elif isinstance(exception, BotoCoreError):
- error_info["details"] = {
- "botocore_error": str(exception),
- }
-
- return error_info
-
- def _scan_service(
- self,
- account_id: str,
- region: str,
- service: str,
- ) -> List[Dict[str, Any]]:
- """
- Scan a single service in a specific region.
-
- Args:
- account_id: AWS account ID
- region: Region to scan (or 'global' for global services)
- service: Service to scan
-
- Returns:
- List of resource dictionaries
-
- Note:
- This is a placeholder method. Actual service scanning methods
- will be implemented in subsequent tasks (1.2-1.5).
- """
- # Get the scanner method for this service
- scanner_method = self._get_scanner_method(service)
-
- if scanner_method is None:
- logger.warning(f"No scanner method found for service: {service}")
- return []
-
- # Use us-east-1 for global services
- actual_region = "us-east-1" if region == "global" else region
-
- return scanner_method(account_id, actual_region)
-
- def _get_scanner_method(self, service: str) -> Optional[Callable]:
- """
- Get the scanner method for a specific service.
-
- Args:
- service: Service name
-
- Returns:
- Scanner method callable or None if not found
- """
- scanner_methods: Dict[str, Callable] = {
- # VPC related services (Task 1.2)
- "vpc": self._scan_vpcs,
- "subnet": self._scan_subnets,
- "route_table": self._scan_route_tables,
- "internet_gateway": self._scan_internet_gateways,
- "nat_gateway": self._scan_nat_gateways,
- "security_group": self._scan_security_groups,
- "vpc_endpoint": self._scan_vpc_endpoints,
- "vpc_peering": self._scan_vpc_peering,
- "customer_gateway": self._scan_customer_gateways,
- "virtual_private_gateway": self._scan_virtual_private_gateways,
- "vpn_connection": self._scan_vpn_connections,
-
- # EC2 and compute services (Task 1.3)
- "ec2": self._scan_ec2_instances,
- "elastic_ip": self._scan_elastic_ips,
- "autoscaling": self._scan_autoscaling_groups,
- "elb": self._scan_load_balancers,
- "target_group": self._scan_target_groups,
- "lambda": self._scan_lambda_functions,
- "eks": self._scan_eks_clusters,
-
- # Database and storage services (Task 1.4)
- "rds": self._scan_rds_instances,
- "elasticache": self._scan_elasticache_clusters,
- "s3": self._scan_s3_buckets,
- "s3_event_notification": self._scan_s3_event_notifications,
-
- # Global and monitoring services (Task 1.5)
- "cloudfront": self._scan_cloudfront_distributions,
- "route53": self._scan_route53_hosted_zones,
- "acm": self._scan_acm_certificates,
- "waf": self._scan_waf_web_acls,
- "sns": self._scan_sns_topics,
- "cloudwatch": self._scan_cloudwatch_log_groups,
- "eventbridge": self._scan_eventbridge_rules,
- "cloudtrail": self._scan_cloudtrail_trails,
- "config": self._scan_config_recorders,
- }
-
- return scanner_methods.get(service)
-
- def export_json(self, result: Dict[str, Any], output_path: str) -> None:
- """
- Export scan results to a JSON file.
-
- This method serializes the scan result to a JSON file with proper handling
- of non-serializable types (datetime, bytes, sets, etc.).
-
- Args:
- result: Scan result dictionary containing metadata, resources, and errors
- output_path: Path to output JSON file
-
- Requirements:
- - 1.6: Export results as JSON file when scan completes
- - 2.1: Include metadata fields (account_id, scan_timestamp, regions_scanned, services_scanned)
- - 2.2: Include resources field organized by service type
- - 2.3: Include errors field with scan error information
- - 2.4: Use JSON format encoding for serialization
-
- Raises:
- IOError: If unable to write to the output file
- TypeError: If result contains non-serializable types that cannot be converted
- """
- try:
- # Validate the result structure before export
- self._validate_scan_data_structure(result)
-
- # Serialize with custom encoder for non-standard types
- json_str = json.dumps(
- result,
- indent=2,
- ensure_ascii=False,
- default=self._json_serializer,
- sort_keys=False,
- )
-
- # Write to file
- with open(output_path, "w", encoding="utf-8") as f:
- f.write(json_str)
-
- logger.info(f"Scan results exported to: {output_path}")
- logger.info(
- f"Export summary: {result['metadata']['total_resources']} resources, "
- f"{result['metadata']['total_errors']} errors"
- )
- except (IOError, OSError) as e:
- logger.error(f"Failed to write to {output_path}: {e}")
- raise
- except (TypeError, ValueError) as e:
- logger.error(f"Failed to serialize scan results: {e}")
- raise
-
- def _json_serializer(self, obj: Any) -> Any:
- """
- Custom JSON serializer for non-standard types.
-
- Handles datetime, date, bytes, sets, and other non-JSON-serializable types.
-
- Args:
- obj: Object to serialize
-
- Returns:
- JSON-serializable representation of the object
-
- Requirements:
- - 2.4: Use JSON format encoding (handle non-serializable types gracefully)
- """
- # Handle datetime objects - convert to ISO 8601 format
- if isinstance(obj, datetime):
- # Ensure UTC timezone and proper ISO 8601 format
- if obj.tzinfo is None:
- obj = obj.replace(tzinfo=timezone.utc)
- return obj.isoformat().replace("+00:00", "Z")
-
- # Handle date objects
- if hasattr(obj, 'isoformat'):
- return obj.isoformat()
-
- # Handle bytes
- if isinstance(obj, bytes):
- return obj.decode('utf-8', errors='replace')
-
- # Handle sets
- if isinstance(obj, set):
- return list(obj)
-
- # Handle frozensets
- if isinstance(obj, frozenset):
- return list(obj)
-
- # Handle objects with __dict__
- if hasattr(obj, '__dict__'):
- return obj.__dict__
-
- # Fallback to string representation
- return str(obj)
-
- def _validate_scan_data_structure(self, data: Dict[str, Any]) -> None:
- """
- Validate that the scan data structure matches the expected format.
-
- This method ensures the data structure conforms to the ScanData interface
- defined in the design document.
-
- Args:
- data: Scan data dictionary to validate
-
- Raises:
- ValueError: If required fields are missing or have incorrect types
-
- Requirements:
- - 2.1: Metadata fields (account_id, scan_timestamp, regions_scanned, services_scanned)
- - 2.2: Resources field organized by service type
- - 2.3: Errors field with error information
- """
- # Check top-level structure
- required_top_level = ["metadata", "resources", "errors"]
- for field in required_top_level:
- if field not in data:
- raise ValueError(f"Missing required top-level field: {field}")
-
- # Check metadata fields
- metadata = data.get("metadata", {})
- required_metadata = [
- "account_id",
- "scan_timestamp",
- "regions_scanned",
- "services_scanned",
- "scanner_version",
- "total_resources",
- "total_errors",
- ]
-
- missing_metadata = [f for f in required_metadata if f not in metadata]
- if missing_metadata:
- raise ValueError(f"Missing required metadata fields: {missing_metadata}")
-
- # Validate metadata field types
- if not isinstance(metadata.get("account_id"), str):
- raise ValueError("metadata.account_id must be a string")
- if not isinstance(metadata.get("scan_timestamp"), str):
- raise ValueError("metadata.scan_timestamp must be a string")
- if not isinstance(metadata.get("regions_scanned"), list):
- raise ValueError("metadata.regions_scanned must be a list")
- if not isinstance(metadata.get("services_scanned"), list):
- raise ValueError("metadata.services_scanned must be a list")
- if not isinstance(metadata.get("scanner_version"), str):
- raise ValueError("metadata.scanner_version must be a string")
- if not isinstance(metadata.get("total_resources"), int):
- raise ValueError("metadata.total_resources must be an integer")
- if not isinstance(metadata.get("total_errors"), int):
- raise ValueError("metadata.total_errors must be an integer")
-
- # Validate resources structure
- resources = data.get("resources", {})
- if not isinstance(resources, dict):
- raise ValueError("resources must be a dictionary")
-
- # Validate errors structure
- errors = data.get("errors", [])
- if not isinstance(errors, list):
- raise ValueError("errors must be a list")
-
- @staticmethod
- def create_scan_data(
- account_id: str,
- regions_scanned: List[str],
- services_scanned: List[str],
- resources: Dict[str, List[Dict[str, Any]]],
- errors: List[Dict[str, Any]],
- scan_timestamp: Optional[str] = None,
- ) -> Dict[str, Any]:
- """
- Create a properly structured ScanData dictionary.
-
- This is a factory method to create scan data with the correct structure
- as defined in the design document.
-
- Args:
- account_id: AWS account ID
- regions_scanned: List of regions that were scanned
- services_scanned: List of services that were scanned
- resources: Dictionary of resources organized by service type
- errors: List of error dictionaries
- scan_timestamp: Optional ISO 8601 timestamp (defaults to current time)
-
- Returns:
- Properly structured ScanData dictionary
-
- Requirements:
- - 2.1: Include metadata fields
- - 2.2: Include resources field organized by service type
- - 2.3: Include errors field
- """
- if scan_timestamp is None:
- scan_timestamp = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
-
- # Calculate totals
- total_resources = sum(len(res_list) for res_list in resources.values())
- total_errors = len(errors)
-
- return {
- "metadata": {
- "account_id": account_id,
- "scan_timestamp": scan_timestamp,
- "regions_scanned": regions_scanned,
- "services_scanned": services_scanned,
- "scanner_version": __version__,
- "total_resources": total_resources,
- "total_errors": total_errors,
- },
- "resources": resources,
- "errors": errors,
- }
-
- @staticmethod
- def load_scan_data(file_path: str) -> Dict[str, Any]:
- """
- Load scan data from a JSON file.
-
- This method reads and parses a JSON file containing scan data,
- validating its structure.
-
- Args:
- file_path: Path to the JSON file to load
-
- Returns:
- Parsed scan data dictionary
-
- Raises:
- FileNotFoundError: If the file does not exist
- json.JSONDecodeError: If the file contains invalid JSON
- ValueError: If the JSON structure is invalid
-
- Requirements:
- - 2.5: Round-trip consistency (load what was exported)
- """
- try:
- with open(file_path, "r", encoding="utf-8") as f:
- data = json.load(f)
-
- # Create a temporary scanner instance to validate
- # We use a class method approach to avoid needing AWS credentials
- CloudShellScanner._validate_scan_data_structure_static(data)
-
- logger.info(f"Loaded scan data from: {file_path}")
- return data
- except FileNotFoundError:
- logger.error(f"File not found: {file_path}")
- raise
- except json.JSONDecodeError as e:
- logger.error(f"Invalid JSON in {file_path}: {e}")
- raise
-
- @staticmethod
- def _validate_scan_data_structure_static(data: Dict[str, Any]) -> None:
- """
- Static version of _validate_scan_data_structure for use without instance.
-
- Args:
- data: Scan data dictionary to validate
-
- Raises:
- ValueError: If required fields are missing or have incorrect types
- """
- # Check top-level structure
- required_top_level = ["metadata", "resources", "errors"]
- for field in required_top_level:
- if field not in data:
- raise ValueError(f"Missing required top-level field: {field}")
-
- # Check metadata fields
- metadata = data.get("metadata", {})
- required_metadata = [
- "account_id",
- "scan_timestamp",
- "regions_scanned",
- "services_scanned",
- "scanner_version",
- "total_resources",
- "total_errors",
- ]
-
- missing_metadata = [f for f in required_metadata if f not in metadata]
- if missing_metadata:
- raise ValueError(f"Missing required metadata fields: {missing_metadata}")
-
- # Helper method to get resource name from tags
- def _get_name_from_tags(
- self, tags: Optional[List[Dict[str, str]]], default: str = ""
- ) -> str:
- """
- Extract Name tag value from tags list.
-
- Args:
- tags: List of tag dictionaries with 'Key' and 'Value'
- default: Default value if Name tag not found
-
- Returns:
- Name tag value or default
- """
- if not tags:
- return default
- for tag in tags:
- if tag.get("Key") == "Name":
- return tag.get("Value", default)
- return default
- # =========================================================================
- # VPC Related Service Scanners (Task 1.2)
- # =========================================================================
- def _scan_vpcs(self, account_id: str, region: str) -> List[Dict[str, Any]]:
- """
- Scan VPCs in the specified region.
-
- Args:
- account_id: AWS account ID
- region: Region to scan
-
- Returns:
- List of VPC resource dictionaries
-
- Attributes: Region, Name, ID, CIDR
- """
- resources = []
- ec2_client = self._session.client("ec2", region_name=region)
-
- paginator = ec2_client.get_paginator("describe_vpcs")
- for page in paginator.paginate():
- for vpc in page.get("Vpcs", []):
- name = self._get_name_from_tags(vpc.get("Tags", []), vpc["VpcId"])
-
- resources.append({
- "account_id": account_id,
- "region": region,
- "service": "vpc",
- "resource_type": "VPC",
- "resource_id": vpc["VpcId"],
- "name": name,
- "attributes": {
- "Region": region,
- "Name": name,
- "ID": vpc["VpcId"],
- "CIDR": vpc.get("CidrBlock", ""),
- },
- })
-
- return resources
- def _scan_subnets(self, account_id: str, region: str) -> List[Dict[str, Any]]:
- """
- Scan Subnets in the specified region.
-
- Args:
- account_id: AWS account ID
- region: Region to scan
-
- Returns:
- List of Subnet resource dictionaries
-
- Attributes: Name, ID, AZ, CIDR
- """
- resources = []
- ec2_client = self._session.client("ec2", region_name=region)
-
- paginator = ec2_client.get_paginator("describe_subnets")
- for page in paginator.paginate():
- for subnet in page.get("Subnets", []):
- name = self._get_name_from_tags(
- subnet.get("Tags", []), subnet["SubnetId"]
- )
-
- resources.append({
- "account_id": account_id,
- "region": region,
- "service": "subnet",
- "resource_type": "Subnet",
- "resource_id": subnet["SubnetId"],
- "name": name,
- "attributes": {
- "Name": name,
- "ID": subnet["SubnetId"],
- "AZ": subnet.get("AvailabilityZone", ""),
- "CIDR": subnet.get("CidrBlock", ""),
- },
- })
-
- return resources
- def _scan_route_tables(self, account_id: str, region: str) -> List[Dict[str, Any]]:
- """
- Scan Route Tables in the specified region.
-
- Args:
- account_id: AWS account ID
- region: Region to scan
-
- Returns:
- List of Route Table resource dictionaries
-
- Attributes: Name, ID, Subnet Associations
- """
- resources = []
- ec2_client = self._session.client("ec2", region_name=region)
-
- paginator = ec2_client.get_paginator("describe_route_tables")
- for page in paginator.paginate():
- for rt in page.get("RouteTables", []):
- name = self._get_name_from_tags(
- rt.get("Tags", []), rt["RouteTableId"]
- )
-
- # Get subnet associations
- associations = []
- for assoc in rt.get("Associations", []):
- if assoc.get("SubnetId"):
- associations.append(assoc["SubnetId"])
-
- resources.append({
- "account_id": account_id,
- "region": region,
- "service": "route_table",
- "resource_type": "Route Table",
- "resource_id": rt["RouteTableId"],
- "name": name,
- "attributes": {
- "Name": name,
- "ID": rt["RouteTableId"],
- "Subnet Associations": ", ".join(associations) if associations else "None",
- },
- })
-
- return resources
- def _scan_internet_gateways(
- self, account_id: str, region: str
- ) -> List[Dict[str, Any]]:
- """
- Scan Internet Gateways in the specified region.
-
- Args:
- account_id: AWS account ID
- region: Region to scan
-
- Returns:
- List of Internet Gateway resource dictionaries
-
- Attributes: Name, ID
- """
- resources = []
- ec2_client = self._session.client("ec2", region_name=region)
-
- paginator = ec2_client.get_paginator("describe_internet_gateways")
- for page in paginator.paginate():
- for igw in page.get("InternetGateways", []):
- igw_id = igw["InternetGatewayId"]
- name = self._get_name_from_tags(igw.get("Tags", []), igw_id)
-
- resources.append({
- "account_id": account_id,
- "region": region,
- "service": "internet_gateway",
- "resource_type": "Internet Gateway",
- "resource_id": igw_id,
- "name": name,
- "attributes": {
- "Name": name,
- "ID": igw_id,
- },
- })
-
- return resources
- def _scan_nat_gateways(self, account_id: str, region: str) -> List[Dict[str, Any]]:
- """
- Scan NAT Gateways in the specified region.
-
- Args:
- account_id: AWS account ID
- region: Region to scan
-
- Returns:
- List of NAT Gateway resource dictionaries
-
- Attributes: Name, ID, Public IP, Private IP
- """
- resources = []
- ec2_client = self._session.client("ec2", region_name=region)
-
- paginator = ec2_client.get_paginator("describe_nat_gateways")
- for page in paginator.paginate():
- for nat in page.get("NatGateways", []):
- # Skip deleted NAT gateways
- if nat.get("State") == "deleted":
- continue
-
- name = self._get_name_from_tags(
- nat.get("Tags", []), nat["NatGatewayId"]
- )
-
- # Get IP addresses from addresses
- public_ip = ""
- private_ip = ""
- for addr in nat.get("NatGatewayAddresses", []):
- if addr.get("PublicIp"):
- public_ip = addr["PublicIp"]
- if addr.get("PrivateIp"):
- private_ip = addr["PrivateIp"]
-
- resources.append({
- "account_id": account_id,
- "region": region,
- "service": "nat_gateway",
- "resource_type": "NAT Gateway",
- "resource_id": nat["NatGatewayId"],
- "name": name,
- "attributes": {
- "Name": name,
- "ID": nat["NatGatewayId"],
- "Public IP": public_ip,
- "Private IP": private_ip,
- },
- })
-
- return resources
- def _scan_security_groups(
- self, account_id: str, region: str
- ) -> List[Dict[str, Any]]:
- """
- Scan Security Groups in the specified region.
-
- Args:
- account_id: AWS account ID
- region: Region to scan
-
- Returns:
- List of Security Group resource dictionaries
-
- Attributes: Name, ID, Protocol, Port range, Source
- Note: Creates one entry per inbound rule
- """
- resources = []
- ec2_client = self._session.client("ec2", region_name=region)
-
- paginator = ec2_client.get_paginator("describe_security_groups")
- for page in paginator.paginate():
- for sg in page.get("SecurityGroups", []):
- sg_name = sg.get("GroupName", sg["GroupId"])
-
- # Process inbound rules
- for rule in sg.get("IpPermissions", []):
- protocol = rule.get("IpProtocol", "-1")
- if protocol == "-1":
- protocol = "All"
-
- # Get port range
- from_port = rule.get("FromPort", "All")
- to_port = rule.get("ToPort", "All")
- if from_port == to_port:
- port_range = str(from_port) if from_port != "All" else "All"
- else:
- port_range = f"{from_port}-{to_port}"
-
- # Get sources
- sources = []
- for ip_range in rule.get("IpRanges", []):
- sources.append(ip_range.get("CidrIp", ""))
- for ip_range in rule.get("Ipv6Ranges", []):
- sources.append(ip_range.get("CidrIpv6", ""))
- for group in rule.get("UserIdGroupPairs", []):
- sources.append(group.get("GroupId", ""))
-
- source = ", ".join(sources) if sources else "N/A"
-
- resources.append({
- "account_id": account_id,
- "region": region,
- "service": "security_group",
- "resource_type": "Security Group",
- "resource_id": sg["GroupId"],
- "name": sg_name,
- "attributes": {
- "Name": sg_name,
- "ID": sg["GroupId"],
- "Protocol": protocol,
- "Port range": port_range,
- "Source": source,
- },
- })
-
- # If no inbound rules, still add the security group
- if not sg.get("IpPermissions"):
- resources.append({
- "account_id": account_id,
- "region": region,
- "service": "security_group",
- "resource_type": "Security Group",
- "resource_id": sg["GroupId"],
- "name": sg_name,
- "attributes": {
- "Name": sg_name,
- "ID": sg["GroupId"],
- "Protocol": "N/A",
- "Port range": "N/A",
- "Source": "N/A",
- },
- })
-
- return resources
- def _scan_vpc_endpoints(
- self, account_id: str, region: str
- ) -> List[Dict[str, Any]]:
- """
- Scan VPC Endpoints in the specified region.
-
- Args:
- account_id: AWS account ID
- region: Region to scan
-
- Returns:
- List of VPC Endpoint resource dictionaries
-
- Attributes: Name, ID, VPC, Service Name, Type
- """
- resources = []
- ec2_client = self._session.client("ec2", region_name=region)
-
- paginator = ec2_client.get_paginator("describe_vpc_endpoints")
- for page in paginator.paginate():
- for endpoint in page.get("VpcEndpoints", []):
- name = self._get_name_from_tags(
- endpoint.get("Tags", []), endpoint["VpcEndpointId"]
- )
-
- resources.append({
- "account_id": account_id,
- "region": region,
- "service": "vpc_endpoint",
- "resource_type": "Endpoint",
- "resource_id": endpoint["VpcEndpointId"],
- "name": name,
- "attributes": {
- "Name": name,
- "ID": endpoint["VpcEndpointId"],
- "VPC": endpoint.get("VpcId", ""),
- "Service Name": endpoint.get("ServiceName", ""),
- "Type": endpoint.get("VpcEndpointType", ""),
- },
- })
-
- return resources
- def _scan_vpc_peering(self, account_id: str, region: str) -> List[Dict[str, Any]]:
- """
- Scan VPC Peering Connections in the specified region.
-
- Args:
- account_id: AWS account ID
- region: Region to scan
-
- Returns:
- List of VPC Peering resource dictionaries
-
- Attributes: Name, Peering Connection ID, Requester VPC, Accepter VPC
- """
- resources = []
- ec2_client = self._session.client("ec2", region_name=region)
-
- paginator = ec2_client.get_paginator("describe_vpc_peering_connections")
- for page in paginator.paginate():
- for peering in page.get("VpcPeeringConnections", []):
- # Skip deleted/rejected peerings
- status = peering.get("Status", {}).get("Code", "")
- if status in ["deleted", "rejected", "failed"]:
- continue
-
- name = self._get_name_from_tags(
- peering.get("Tags", []), peering["VpcPeeringConnectionId"]
- )
-
- requester_vpc = peering.get("RequesterVpcInfo", {}).get("VpcId", "")
- accepter_vpc = peering.get("AccepterVpcInfo", {}).get("VpcId", "")
-
- resources.append({
- "account_id": account_id,
- "region": region,
- "service": "vpc_peering",
- "resource_type": "VPC Peering",
- "resource_id": peering["VpcPeeringConnectionId"],
- "name": name,
- "attributes": {
- "Name": name,
- "Peering Connection ID": peering["VpcPeeringConnectionId"],
- "Requester VPC": requester_vpc,
- "Accepter VPC": accepter_vpc,
- },
- })
-
- return resources
- def _scan_customer_gateways(
- self, account_id: str, region: str
- ) -> List[Dict[str, Any]]:
- """
- Scan Customer Gateways in the specified region.
-
- Args:
- account_id: AWS account ID
- region: Region to scan
-
- Returns:
- List of Customer Gateway resource dictionaries
-
- Attributes: Name, Customer Gateway ID, IP Address
- """
- resources = []
- ec2_client = self._session.client("ec2", region_name=region)
-
- response = ec2_client.describe_customer_gateways()
- for cgw in response.get("CustomerGateways", []):
- # Skip deleted gateways
- if cgw.get("State") == "deleted":
- continue
-
- name = self._get_name_from_tags(
- cgw.get("Tags", []), cgw["CustomerGatewayId"]
- )
-
- resources.append({
- "account_id": account_id,
- "region": region,
- "service": "customer_gateway",
- "resource_type": "Customer Gateway",
- "resource_id": cgw["CustomerGatewayId"],
- "name": name,
- "attributes": {
- "Name": name,
- "Customer Gateway ID": cgw["CustomerGatewayId"],
- "IP Address": cgw.get("IpAddress", ""),
- },
- })
-
- return resources
- def _scan_virtual_private_gateways(
- self, account_id: str, region: str
- ) -> List[Dict[str, Any]]:
- """
- Scan Virtual Private Gateways in the specified region.
-
- Args:
- account_id: AWS account ID
- region: Region to scan
-
- Returns:
- List of Virtual Private Gateway resource dictionaries
-
- Attributes: Name, Virtual Private Gateway ID, VPC
- """
- resources = []
- ec2_client = self._session.client("ec2", region_name=region)
-
- response = ec2_client.describe_vpn_gateways()
- for vgw in response.get("VpnGateways", []):
- # Skip deleted gateways
- if vgw.get("State") == "deleted":
- continue
-
- name = self._get_name_from_tags(
- vgw.get("Tags", []), vgw["VpnGatewayId"]
- )
-
- # Get attached VPC
- vpc_id = ""
- for attachment in vgw.get("VpcAttachments", []):
- if attachment.get("State") == "attached":
- vpc_id = attachment.get("VpcId", "")
- break
-
- resources.append({
- "account_id": account_id,
- "region": region,
- "service": "virtual_private_gateway",
- "resource_type": "Virtual Private Gateway",
- "resource_id": vgw["VpnGatewayId"],
- "name": name,
- "attributes": {
- "Name": name,
- "Virtual Private Gateway ID": vgw["VpnGatewayId"],
- "VPC": vpc_id,
- },
- })
-
- return resources
- def _scan_vpn_connections(
- self, account_id: str, region: str
- ) -> List[Dict[str, Any]]:
- """
- Scan VPN Connections in the specified region.
-
- Args:
- account_id: AWS account ID
- region: Region to scan
-
- Returns:
- List of VPN Connection resource dictionaries
-
- Attributes: Name, VPN ID, Routes
- """
- resources = []
- ec2_client = self._session.client("ec2", region_name=region)
-
- response = ec2_client.describe_vpn_connections()
- for vpn in response.get("VpnConnections", []):
- # Skip deleted connections
- if vpn.get("State") == "deleted":
- continue
-
- name = self._get_name_from_tags(
- vpn.get("Tags", []), vpn["VpnConnectionId"]
- )
-
- # Get routes
- routes = []
- for route in vpn.get("Routes", []):
- if route.get("DestinationCidrBlock"):
- routes.append(route["DestinationCidrBlock"])
-
- resources.append({
- "account_id": account_id,
- "region": region,
- "service": "vpn_connection",
- "resource_type": "VPN Connection",
- "resource_id": vpn["VpnConnectionId"],
- "name": name,
- "attributes": {
- "Name": name,
- "VPN ID": vpn["VpnConnectionId"],
- "Routes": ", ".join(routes) if routes else "N/A",
- },
- })
-
- return resources
- # =========================================================================
- # EC2 and Compute Service Scanners (Task 1.3)
- # =========================================================================
- def _scan_ec2_instances(
- self, account_id: str, region: str
- ) -> List[Dict[str, Any]]:
- """
- Scan EC2 Instances in the specified region.
-
- Args:
- account_id: AWS account ID
- region: Region to scan
-
- Returns:
- List of EC2 Instance resource dictionaries
-
- Attributes: Name, Instance ID, Instance Type, AZ, AMI,
- Public IP, Public DNS, Private IP, VPC ID, Subnet ID,
- Key, Security Groups, EBS Type, EBS Size, Encryption
- """
- resources = []
- ec2_client = self._session.client("ec2", region_name=region)
-
- paginator = ec2_client.get_paginator("describe_instances")
- for page in paginator.paginate():
- for reservation in page.get("Reservations", []):
- for instance in reservation.get("Instances", []):
- # Skip terminated instances
- state = instance.get("State", {}).get("Name", "")
- if state == "terminated":
- continue
-
- name = self._get_name_from_tags(
- instance.get("Tags", []), instance["InstanceId"]
- )
-
- # Get security groups
- security_groups = []
- for sg in instance.get("SecurityGroups", []):
- security_groups.append(
- sg.get("GroupName", sg.get("GroupId", ""))
- )
-
- # Get EBS volume info
- ebs_type = ""
- ebs_size = ""
- ebs_encrypted = ""
-
- for block_device in instance.get("BlockDeviceMappings", []):
- ebs = block_device.get("Ebs", {})
- if ebs.get("VolumeId"):
- # Get volume details
- try:
- vol_response = ec2_client.describe_volumes(
- VolumeIds=[ebs["VolumeId"]]
- )
- if vol_response.get("Volumes"):
- volume = vol_response["Volumes"][0]
- ebs_type = volume.get("VolumeType", "")
- ebs_size = f"{volume.get('Size', '')} GB"
- ebs_encrypted = (
- "Yes" if volume.get("Encrypted") else "No"
- )
- except Exception as e:
- logger.warning(
- f"Failed to get volume details: {str(e)}"
- )
- break # Only get first volume for simplicity
-
- resources.append({
- "account_id": account_id,
- "region": region,
- "service": "ec2",
- "resource_type": "Instance",
- "resource_id": instance["InstanceId"],
- "name": name,
- "attributes": {
- "Name": name,
- "Instance ID": instance["InstanceId"],
- "Instance Type": instance.get("InstanceType", ""),
- "AZ": instance.get("Placement", {}).get(
- "AvailabilityZone", ""
- ),
- "AMI": instance.get("ImageId", ""),
- "Public IP": instance.get("PublicIpAddress", ""),
- "Public DNS": instance.get("PublicDnsName", ""),
- "Private IP": instance.get("PrivateIpAddress", ""),
- "VPC ID": instance.get("VpcId", ""),
- "Subnet ID": instance.get("SubnetId", ""),
- "Key": instance.get("KeyName", ""),
- "Security Groups": ", ".join(security_groups),
- "EBS Type": ebs_type,
- "EBS Size": ebs_size,
- "Encryption": ebs_encrypted,
- "Other Requirement": "",
- },
- })
-
- return resources
- def _scan_elastic_ips(
- self, account_id: str, region: str
- ) -> List[Dict[str, Any]]:
- """
- Scan Elastic IPs in the specified region.
-
- Args:
- account_id: AWS account ID
- region: Region to scan
-
- Returns:
- List of Elastic IP resource dictionaries
-
- Attributes: Name, Elastic IP
- """
- resources = []
- ec2_client = self._session.client("ec2", region_name=region)
-
- response = ec2_client.describe_addresses()
- for eip in response.get("Addresses", []):
- public_ip = eip.get("PublicIp", "")
- name = self._get_name_from_tags(
- eip.get("Tags", []),
- public_ip or eip.get("AllocationId", ""),
- )
-
- resources.append({
- "account_id": account_id,
- "region": region,
- "service": "elastic_ip",
- "resource_type": "Elastic IP",
- "resource_id": eip.get("AllocationId", public_ip),
- "name": name,
- "attributes": {
- "Name": name,
- "Elastic IP": public_ip,
- },
- })
-
- return resources
- def _scan_autoscaling_groups(
- self, account_id: str, region: str
- ) -> List[Dict[str, Any]]:
- """
- Scan Auto Scaling Groups in the specified region.
-
- Args:
- account_id: AWS account ID
- region: Region to scan
-
- Returns:
- List of Auto Scaling Group resource dictionaries
-
- Attributes: Name, Launch Template, AMI, Instance type, Key, Target Groups,
- Desired, Min, Max, Scaling Policy
- """
- resources = []
- asg_client = self._session.client("autoscaling", region_name=region)
- ec2_client = self._session.client("ec2", region_name=region)
-
- paginator = asg_client.get_paginator("describe_auto_scaling_groups")
- for page in paginator.paginate():
- for asg in page.get("AutoScalingGroups", []):
- name = asg.get("AutoScalingGroupName", "")
-
- # Get Launch Template info
- launch_template_name = ""
- ami = ""
- instance_type = ""
- key_name = ""
-
- # Check for Launch Template
- lt = asg.get("LaunchTemplate")
- if lt:
- launch_template_name = lt.get(
- "LaunchTemplateName", lt.get("LaunchTemplateId", "")
- )
- # Get Launch Template details
- try:
- lt_response = ec2_client.describe_launch_template_versions(
- LaunchTemplateId=lt.get("LaunchTemplateId", ""),
- Versions=[lt.get("Version", "$Latest")],
- )
- if lt_response.get("LaunchTemplateVersions"):
- lt_data = lt_response["LaunchTemplateVersions"][0].get(
- "LaunchTemplateData", {}
- )
- ami = lt_data.get("ImageId", "")
- instance_type = lt_data.get("InstanceType", "")
- key_name = lt_data.get("KeyName", "")
- except Exception as e:
- logger.warning(
- f"Failed to get launch template details: {str(e)}"
- )
-
- # Check for Mixed Instances Policy
- mip = asg.get("MixedInstancesPolicy")
- if mip:
- lt_spec = mip.get("LaunchTemplate", {}).get(
- "LaunchTemplateSpecification", {}
- )
- if lt_spec:
- launch_template_name = lt_spec.get(
- "LaunchTemplateName", lt_spec.get("LaunchTemplateId", "")
- )
-
- # Check for Launch Configuration (legacy)
- lc_name = asg.get("LaunchConfigurationName")
- if lc_name and not launch_template_name:
- launch_template_name = f"LC: {lc_name}"
- try:
- lc_response = asg_client.describe_launch_configurations(
- LaunchConfigurationNames=[lc_name]
- )
- if lc_response.get("LaunchConfigurations"):
- lc = lc_response["LaunchConfigurations"][0]
- ami = lc.get("ImageId", "")
- instance_type = lc.get("InstanceType", "")
- key_name = lc.get("KeyName", "")
- except Exception as e:
- logger.warning(
- f"Failed to get launch configuration details: {str(e)}"
- )
-
- # Get Target Groups
- target_groups = []
- for tg_arn in asg.get("TargetGroupARNs", []):
- # Extract target group name from ARN
- tg_name = tg_arn.split("/")[-2] if "/" in tg_arn else tg_arn
- target_groups.append(tg_name)
-
- # Get Scaling Policies
- scaling_policies = []
- try:
- policy_response = asg_client.describe_policies(
- AutoScalingGroupName=name
- )
- for policy in policy_response.get("ScalingPolicies", []):
- scaling_policies.append(policy.get("PolicyName", ""))
- except Exception as e:
- logger.warning(f"Failed to get scaling policies: {str(e)}")
-
- resources.append({
- "account_id": account_id,
- "region": region,
- "service": "autoscaling",
- "resource_type": "Auto Scaling Group",
- "resource_id": asg.get("AutoScalingGroupARN", name),
- "name": name,
- "attributes": {
- "Name": name,
- "Launch Template": launch_template_name,
- "AMI": ami,
- "Instance type": instance_type,
- "Key": key_name,
- "Target Groups": (
- ", ".join(target_groups) if target_groups else "N/A"
- ),
- "Desired": str(asg.get("DesiredCapacity", 0)),
- "Min": str(asg.get("MinSize", 0)),
- "Max": str(asg.get("MaxSize", 0)),
- "Scaling Policy": (
- ", ".join(scaling_policies) if scaling_policies else "N/A"
- ),
- },
- })
-
- return resources
- def _scan_load_balancers(
- self, account_id: str, region: str
- ) -> List[Dict[str, Any]]:
- """
- Scan Load Balancers (ALB, NLB, CLB) in the specified region.
-
- Args:
- account_id: AWS account ID
- region: Region to scan
-
- Returns:
- List of Load Balancer resource dictionaries
-
- Attributes: Name, Type, DNS, Scheme, VPC, Availability Zones, Subnet,
- Security Groups
- """
- resources = []
-
- # Scan ALB/NLB using elbv2
- elbv2_client = self._session.client("elbv2", region_name=region)
-
- try:
- paginator = elbv2_client.get_paginator("describe_load_balancers")
- for page in paginator.paginate():
- for lb in page.get("LoadBalancers", []):
- name = lb.get("LoadBalancerName", "")
- lb_type = lb.get("Type", "application")
-
- # Get availability zones and subnets
- azs = []
- subnets = []
- for az_info in lb.get("AvailabilityZones", []):
- azs.append(az_info.get("ZoneName", ""))
- if az_info.get("SubnetId"):
- subnets.append(az_info["SubnetId"])
-
- # Get security groups (only for ALB)
- security_groups = lb.get("SecurityGroups", [])
-
- resources.append({
- "account_id": account_id,
- "region": region,
- "service": "elb",
- "resource_type": "Load Balancer",
- "resource_id": lb.get("LoadBalancerArn", name),
- "name": name,
- "attributes": {
- "Name": name,
- "Type": lb_type.upper(),
- "DNS": lb.get("DNSName", ""),
- "Scheme": lb.get("Scheme", ""),
- "VPC": lb.get("VpcId", ""),
- "Availability Zones": ", ".join(azs),
- "Subnet": ", ".join(subnets),
- "Security Groups": (
- ", ".join(security_groups)
- if security_groups
- else "N/A"
- ),
- },
- })
- except Exception as e:
- logger.warning(f"Failed to scan ALB/NLB: {str(e)}")
-
- # Scan Classic Load Balancers
- elb_client = self._session.client("elb", region_name=region)
-
- try:
- paginator = elb_client.get_paginator("describe_load_balancers")
- for page in paginator.paginate():
- for lb in page.get("LoadBalancerDescriptions", []):
- name = lb.get("LoadBalancerName", "")
-
- resources.append({
- "account_id": account_id,
- "region": region,
- "service": "elb",
- "resource_type": "Load Balancer",
- "resource_id": name,
- "name": name,
- "attributes": {
- "Name": name,
- "Type": "CLASSIC",
- "DNS": lb.get("DNSName", ""),
- "Scheme": lb.get("Scheme", ""),
- "VPC": lb.get("VPCId", ""),
- "Availability Zones": ", ".join(
- lb.get("AvailabilityZones", [])
- ),
- "Subnet": ", ".join(lb.get("Subnets", [])),
- "Security Groups": ", ".join(
- lb.get("SecurityGroups", [])
- ),
- },
- })
- except Exception as e:
- logger.warning(f"Failed to scan Classic ELB: {str(e)}")
-
- return resources
- def _scan_target_groups(
- self, account_id: str, region: str
- ) -> List[Dict[str, Any]]:
- """
- Scan Target Groups in the specified region.
-
- Args:
- account_id: AWS account ID
- region: Region to scan
-
- Returns:
- List of Target Group resource dictionaries
-
- Attributes: Load Balancer, TG Name, Port, Protocol, Registered Instances,
- Health Check Path
- """
- resources = []
- elbv2_client = self._session.client("elbv2", region_name=region)
-
- try:
- paginator = elbv2_client.get_paginator("describe_target_groups")
- for page in paginator.paginate():
- for tg in page.get("TargetGroups", []):
- name = tg.get("TargetGroupName", "")
- tg_arn = tg.get("TargetGroupArn", "")
-
- # Get associated load balancers
- lb_arns = tg.get("LoadBalancerArns", [])
- lb_names = []
- for lb_arn in lb_arns:
- # Extract LB name from ARN
- lb_name = lb_arn.split("/")[-2] if "/" in lb_arn else lb_arn
- lb_names.append(lb_name)
-
- # Get registered targets
- registered_instances = []
- try:
- targets_response = elbv2_client.describe_target_health(
- TargetGroupArn=tg_arn
- )
- for target in targets_response.get(
- "TargetHealthDescriptions", []
- ):
- target_id = target.get("Target", {}).get("Id", "")
- if target_id:
- registered_instances.append(target_id)
- except Exception as e:
- logger.warning(f"Failed to get target health: {str(e)}")
-
- resources.append({
- "account_id": account_id,
- "region": region,
- "service": "target_group",
- "resource_type": "Target Group",
- "resource_id": tg_arn,
- "name": name,
- "attributes": {
- "Load Balancer": (
- ", ".join(lb_names) if lb_names else "N/A"
- ),
- "TG Name": name,
- "Port": str(tg.get("Port", "")),
- "Protocol": tg.get("Protocol", ""),
- "Registered Instances": (
- ", ".join(registered_instances)
- if registered_instances
- else "None"
- ),
- "Health Check Path": tg.get("HealthCheckPath", "N/A"),
- },
- })
- except Exception as e:
- logger.warning(f"Failed to scan target groups: {str(e)}")
-
- return resources
- def _scan_lambda_functions(
- self, account_id: str, region: str
- ) -> List[Dict[str, Any]]:
- """
- Scan Lambda Functions in the specified region.
-
- Args:
- account_id: AWS account ID
- region: Region to scan
-
- Returns:
- List of Lambda Function resource dictionaries
-
- Attributes: Function Name, Runtime, Memory (MB), Timeout (s), Last Modified
- """
- resources = []
- lambda_client = self._session.client("lambda", region_name=region)
-
- try:
- paginator = lambda_client.get_paginator("list_functions")
- for page in paginator.paginate():
- for func in page.get("Functions", []):
- func_name = func.get("FunctionName", "")
-
- resources.append({
- "account_id": account_id,
- "region": region,
- "service": "lambda",
- "resource_type": "Function",
- "resource_id": func.get("FunctionArn", func_name),
- "name": func_name,
- "attributes": {
- "Function Name": func_name,
- "Runtime": func.get("Runtime", "N/A"),
- "Memory (MB)": str(func.get("MemorySize", "")),
- "Timeout (s)": str(func.get("Timeout", "")),
- "Last Modified": func.get("LastModified", ""),
- },
- })
- except Exception as e:
- logger.warning(f"Failed to scan Lambda functions: {str(e)}")
-
- return resources
- def _scan_eks_clusters(
- self, account_id: str, region: str
- ) -> List[Dict[str, Any]]:
- """
- Scan EKS Clusters in the specified region.
-
- Args:
- account_id: AWS account ID
- region: Region to scan
-
- Returns:
- List of EKS Cluster resource dictionaries
-
- Attributes: Cluster Name, Version, Status, Endpoint, VPC ID
- """
- resources = []
- eks_client = self._session.client("eks", region_name=region)
-
- try:
- # List clusters
- paginator = eks_client.get_paginator("list_clusters")
- cluster_names = []
- for page in paginator.paginate():
- cluster_names.extend(page.get("clusters", []))
-
- # Get details for each cluster
- for cluster_name in cluster_names:
- try:
- response = eks_client.describe_cluster(name=cluster_name)
- cluster = response.get("cluster", {})
-
- resources.append({
- "account_id": account_id,
- "region": region,
- "service": "eks",
- "resource_type": "Cluster",
- "resource_id": cluster.get("arn", cluster_name),
- "name": cluster_name,
- "attributes": {
- "Cluster Name": cluster_name,
- "Version": cluster.get("version", ""),
- "Status": cluster.get("status", ""),
- "Endpoint": cluster.get("endpoint", ""),
- "VPC ID": cluster.get("resourcesVpcConfig", {}).get(
- "vpcId", ""
- ),
- },
- })
- except Exception as e:
- logger.warning(
- f"Failed to describe EKS cluster {cluster_name}: {str(e)}"
- )
- except Exception as e:
- logger.warning(f"Failed to list EKS clusters: {str(e)}")
-
- return resources
- # =========================================================================
- # Database and Storage Service Scanners (Task 1.4)
- # =========================================================================
- def _scan_rds_instances(
- self, account_id: str, region: str
- ) -> List[Dict[str, Any]]:
- """
- Scan RDS DB Instances in the specified region.
-
- Args:
- account_id: AWS account ID
- region: Region to scan
-
- Returns:
- List of RDS DB Instance resource dictionaries
-
- Attributes (vertical layout - one table per instance):
- Region, Endpoint, DB instance ID, DB name, Master Username, Port,
- DB Engine, DB Version, Instance Type, Storage type, Storage, Multi-AZ,
- Security Group, Deletion Protection, Performance Insights Enabled, CloudWatch Logs
- """
- resources = []
- rds_client = self._session.client("rds", region_name=region)
-
- try:
- paginator = rds_client.get_paginator("describe_db_instances")
- for page in paginator.paginate():
- for db in page.get("DBInstances", []):
- db_id = db.get("DBInstanceIdentifier", "")
-
- # Get security groups
- security_groups = []
- for sg in db.get("VpcSecurityGroups", []):
- security_groups.append(sg.get("VpcSecurityGroupId", ""))
-
- # Get CloudWatch logs exports
- cw_logs = db.get("EnabledCloudwatchLogsExports", [])
-
- # Get endpoint
- endpoint = db.get("Endpoint", {})
- endpoint_address = endpoint.get("Address", "")
- port = endpoint.get("Port", "")
-
- resources.append({
- "account_id": account_id,
- "region": region,
- "service": "rds",
- "resource_type": "DB Instance",
- "resource_id": db.get("DBInstanceArn", db_id),
- "name": db_id,
- "attributes": {
- "Region": region,
- "Endpoint": endpoint_address,
- "DB instance ID": db_id,
- "DB name": db.get("DBName", ""),
- "Master Username": db.get("MasterUsername", ""),
- "Port": str(port),
- "DB Engine": db.get("Engine", ""),
- "DB Version": db.get("EngineVersion", ""),
- "Instance Type": db.get("DBInstanceClass", ""),
- "Storage type": db.get("StorageType", ""),
- "Storage": f"{db.get('AllocatedStorage', '')} GB",
- "Multi-AZ": "Yes" if db.get("MultiAZ") else "No",
- "Security Group": ", ".join(security_groups),
- "Deletion Protection": (
- "Yes" if db.get("DeletionProtection") else "No"
- ),
- "Performance Insights Enabled": (
- "Yes" if db.get("PerformanceInsightsEnabled") else "No"
- ),
- "CloudWatch Logs": (
- ", ".join(cw_logs) if cw_logs else "N/A"
- ),
- },
- })
- except Exception as e:
- logger.warning(f"Failed to scan RDS instances: {str(e)}")
-
- return resources
- def _scan_elasticache_clusters(
- self, account_id: str, region: str
- ) -> List[Dict[str, Any]]:
- """
- Scan ElastiCache Clusters in the specified region.
-
- Args:
- account_id: AWS account ID
- region: Region to scan
-
- Returns:
- List of ElastiCache Cluster resource dictionaries
-
- Attributes (vertical layout - one table per cluster):
- Cluster ID, Engine, Engine Version, Node Type, Num Nodes, Status
- """
- resources = []
- elasticache_client = self._session.client("elasticache", region_name=region)
-
- # Scan cache clusters (Redis/Memcached)
- try:
- paginator = elasticache_client.get_paginator("describe_cache_clusters")
- for page in paginator.paginate(ShowCacheNodeInfo=True):
- for cluster in page.get("CacheClusters", []):
- cluster_id = cluster.get("CacheClusterId", "")
-
- resources.append({
- "account_id": account_id,
- "region": region,
- "service": "elasticache",
- "resource_type": "Cache Cluster",
- "resource_id": cluster.get("ARN", cluster_id),
- "name": cluster_id,
- "attributes": {
- "Cluster ID": cluster_id,
- "Engine": cluster.get("Engine", ""),
- "Engine Version": cluster.get("EngineVersion", ""),
- "Node Type": cluster.get("CacheNodeType", ""),
- "Num Nodes": str(cluster.get("NumCacheNodes", 0)),
- "Status": cluster.get("CacheClusterStatus", ""),
- },
- })
- except Exception as e:
- logger.warning(f"Failed to scan ElastiCache clusters: {str(e)}")
-
- # Also scan replication groups (Redis cluster mode)
- try:
- paginator = elasticache_client.get_paginator("describe_replication_groups")
- for page in paginator.paginate():
- for rg in page.get("ReplicationGroups", []):
- rg_id = rg.get("ReplicationGroupId", "")
-
- # Count nodes
- num_nodes = 0
- for node_group in rg.get("NodeGroups", []):
- num_nodes += len(node_group.get("NodeGroupMembers", []))
-
- # Get node type from member clusters
- node_type = ""
- member_clusters = rg.get("MemberClusters", [])
- if member_clusters:
- try:
- cluster_response = elasticache_client.describe_cache_clusters(
- CacheClusterId=member_clusters[0]
- )
- if cluster_response.get("CacheClusters"):
- node_type = cluster_response["CacheClusters"][0].get(
- "CacheNodeType", ""
- )
- except Exception:
- pass
-
- resources.append({
- "account_id": account_id,
- "region": region,
- "service": "elasticache",
- "resource_type": "Cache Cluster",
- "resource_id": rg.get("ARN", rg_id),
- "name": rg_id,
- "attributes": {
- "Cluster ID": rg_id,
- "Engine": "redis",
- "Engine Version": "",
- "Node Type": node_type,
- "Num Nodes": str(num_nodes),
- "Status": rg.get("Status", ""),
- },
- })
- except Exception as e:
- logger.warning(f"Failed to scan ElastiCache replication groups: {str(e)}")
-
- return resources
- def _scan_s3_buckets(
- self, account_id: str, region: str
- ) -> List[Dict[str, Any]]:
- """
- Scan S3 Buckets (global service, scanned once from us-east-1).
-
- Args:
- account_id: AWS account ID
- region: Region to scan (should be us-east-1 for global service)
-
- Returns:
- List of S3 Bucket resource dictionaries
-
- Attributes (horizontal layout): Region, Bucket Name
- """
- resources = []
- s3_client = self._session.client("s3", region_name=region)
-
- try:
- response = s3_client.list_buckets()
- for bucket in response.get("Buckets", []):
- bucket_name = bucket.get("Name", "")
-
- # Get bucket location
- try:
- location_response = s3_client.get_bucket_location(
- Bucket=bucket_name
- )
- bucket_region = (
- location_response.get("LocationConstraint") or "us-east-1"
- )
- except Exception:
- bucket_region = "unknown"
-
- resources.append({
- "account_id": account_id,
- "region": "global",
- "service": "s3",
- "resource_type": "Bucket",
- "resource_id": bucket_name,
- "name": bucket_name,
- "attributes": {
- "Region": bucket_region,
- "Bucket Name": bucket_name,
- },
- })
- except Exception as e:
- logger.warning(f"Failed to scan S3 buckets: {str(e)}")
-
- return resources
- def _scan_s3_event_notifications(
- self, account_id: str, region: str
- ) -> List[Dict[str, Any]]:
- """
- Scan S3 Event Notifications (global service, scanned once from us-east-1).
-
- Args:
- account_id: AWS account ID
- region: Region to scan (should be us-east-1 for global service)
-
- Returns:
- List of S3 Event Notification resource dictionaries
-
- Attributes (vertical layout):
- Bucket, Name, Event Type, Destination type, Destination
- """
- resources = []
- s3_client = self._session.client("s3", region_name=region)
-
- try:
- # First get all buckets
- buckets_response = s3_client.list_buckets()
-
- for bucket in buckets_response.get("Buckets", []):
- bucket_name = bucket.get("Name", "")
-
- try:
- # Get notification configuration
- notif_response = s3_client.get_bucket_notification_configuration(
- Bucket=bucket_name
- )
-
- # Process Lambda function configurations
- for config in notif_response.get(
- "LambdaFunctionConfigurations", []
- ):
- config_id = config.get("Id", "Lambda")
- events = config.get("Events", [])
- lambda_arn = config.get("LambdaFunctionArn", "")
-
- resources.append({
- "account_id": account_id,
- "region": "global",
- "service": "s3_event_notification",
- "resource_type": "S3 event notification",
- "resource_id": f"{bucket_name}/{config_id}",
- "name": config_id,
- "attributes": {
- "Bucket": bucket_name,
- "Name": config_id,
- "Event Type": ", ".join(events),
- "Destination type": "Lambda",
- "Destination": (
- lambda_arn.split(":")[-1] if lambda_arn else ""
- ),
- },
- })
-
- # Process SQS queue configurations
- for config in notif_response.get("QueueConfigurations", []):
- config_id = config.get("Id", "SQS")
- events = config.get("Events", [])
- queue_arn = config.get("QueueArn", "")
-
- resources.append({
- "account_id": account_id,
- "region": "global",
- "service": "s3_event_notification",
- "resource_type": "S3 event notification",
- "resource_id": f"{bucket_name}/{config_id}",
- "name": config_id,
- "attributes": {
- "Bucket": bucket_name,
- "Name": config_id,
- "Event Type": ", ".join(events),
- "Destination type": "SQS",
- "Destination": (
- queue_arn.split(":")[-1] if queue_arn else ""
- ),
- },
- })
-
- # Process SNS topic configurations
- for config in notif_response.get("TopicConfigurations", []):
- config_id = config.get("Id", "SNS")
- events = config.get("Events", [])
- topic_arn = config.get("TopicArn", "")
-
- resources.append({
- "account_id": account_id,
- "region": "global",
- "service": "s3_event_notification",
- "resource_type": "S3 event notification",
- "resource_id": f"{bucket_name}/{config_id}",
- "name": config_id,
- "attributes": {
- "Bucket": bucket_name,
- "Name": config_id,
- "Event Type": ", ".join(events),
- "Destination type": "SNS",
- "Destination": (
- topic_arn.split(":")[-1] if topic_arn else ""
- ),
- },
- })
-
- except Exception as e:
- # Skip buckets we can't access
- logger.debug(
- f"Failed to get notifications for bucket {bucket_name}: "
- f"{str(e)}"
- )
-
- except Exception as e:
- logger.warning(f"Failed to scan S3 event notifications: {str(e)}")
-
- return resources
- # =========================================================================
- # Global and Monitoring Service Scanners (Task 1.5)
- # =========================================================================
- def _scan_cloudfront_distributions(
- self, account_id: str, region: str
- ) -> List[Dict[str, Any]]:
- """
- Scan CloudFront Distributions (global service).
-
- Args:
- account_id: AWS account ID
- region: Region to scan (should be us-east-1 for global service)
-
- Returns:
- List of CloudFront Distribution resource dictionaries
-
- Attributes (vertical layout - one table per distribution):
- CloudFront ID, Domain Name, CNAME, Origin Domain Name,
- Origin Protocol Policy, Viewer Protocol Policy,
- Allowed HTTP Methods, Cached HTTP Methods
- """
- resources = []
- # CloudFront is a global service, always use us-east-1
- cf_client = self._session.client("cloudfront", region_name="us-east-1")
-
- try:
- paginator = cf_client.get_paginator("list_distributions")
- for page in paginator.paginate():
- distribution_list = page.get("DistributionList", {})
- for dist in distribution_list.get("Items", []):
- dist_id = dist.get("Id", "")
-
- # Get aliases (CNAMEs)
- aliases = dist.get("Aliases", {}).get("Items", [])
-
- # Get origin info
- origins = dist.get("Origins", {}).get("Items", [])
- origin_domain = ""
- origin_protocol = ""
- if origins:
- origin = origins[0]
- origin_domain = origin.get("DomainName", "")
- custom_origin = origin.get("CustomOriginConfig", {})
- if custom_origin:
- origin_protocol = custom_origin.get(
- "OriginProtocolPolicy", ""
- )
- else:
- origin_protocol = "S3"
-
- # Get default cache behavior
- default_behavior = dist.get("DefaultCacheBehavior", {})
- viewer_protocol = default_behavior.get(
- "ViewerProtocolPolicy", ""
- )
- allowed_methods = default_behavior.get(
- "AllowedMethods", {}
- ).get("Items", [])
- cached_methods = default_behavior.get(
- "AllowedMethods", {}
- ).get("CachedMethods", {}).get("Items", [])
-
- resources.append({
- "account_id": account_id,
- "region": "global",
- "service": "cloudfront",
- "resource_type": "Distribution",
- "resource_id": dist.get("ARN", dist_id),
- "name": dist_id,
- "attributes": {
- "CloudFront ID": dist_id,
- "Domain Name": dist.get("DomainName", ""),
- "CNAME": ", ".join(aliases) if aliases else "N/A",
- "Origin Domain Name": origin_domain,
- "Origin Protocol Policy": origin_protocol,
- "Viewer Protocol Policy": viewer_protocol,
- "Allowed HTTP Methods": ", ".join(allowed_methods),
- "Cached HTTP Methods": ", ".join(cached_methods),
- },
- })
- except Exception as e:
- logger.warning(f"Failed to scan CloudFront distributions: {str(e)}")
-
- return resources
- def _scan_route53_hosted_zones(
- self, account_id: str, region: str
- ) -> List[Dict[str, Any]]:
- """
- Scan Route 53 Hosted Zones (global service).
-
- Args:
- account_id: AWS account ID
- region: Region to scan (should be us-east-1 for global service)
-
- Returns:
- List of Route 53 Hosted Zone resource dictionaries
-
- Attributes (horizontal layout):
- Zone ID, Name, Type, Record Count
- """
- resources = []
- # Route 53 is a global service
- route53_client = self._session.client("route53", region_name="us-east-1")
-
- try:
- paginator = route53_client.get_paginator("list_hosted_zones")
- for page in paginator.paginate():
- for zone in page.get("HostedZones", []):
- zone_id = zone.get("Id", "").replace("/hostedzone/", "")
- zone_name = zone.get("Name", "")
-
- # Determine zone type
- zone_type = (
- "Private"
- if zone.get("Config", {}).get("PrivateZone")
- else "Public"
- )
-
- resources.append({
- "account_id": account_id,
- "region": "global",
- "service": "route53",
- "resource_type": "Hosted Zone",
- "resource_id": zone_id,
- "name": zone_name,
- "attributes": {
- "Zone ID": zone_id,
- "Name": zone_name,
- "Type": zone_type,
- "Record Count": str(
- zone.get("ResourceRecordSetCount", 0)
- ),
- },
- })
- except Exception as e:
- logger.warning(f"Failed to scan Route 53 hosted zones: {str(e)}")
-
- return resources
- def _scan_acm_certificates(
- self, account_id: str, region: str
- ) -> List[Dict[str, Any]]:
- """
- Scan ACM Certificates (regional service).
-
- Args:
- account_id: AWS account ID
- region: Region to scan
-
- Returns:
- List of ACM Certificate resource dictionaries
-
- Attributes (horizontal layout): Domain name, Additional names
- """
- resources = []
- # ACM is a regional service
- acm_client = self._session.client("acm", region_name=region)
-
- try:
- paginator = acm_client.get_paginator("list_certificates")
- for page in paginator.paginate():
- for cert in page.get("CertificateSummaryList", []):
- domain_name = cert.get("DomainName", "")
- cert_arn = cert.get("CertificateArn", "")
-
- # Get additional names (Subject Alternative Names)
- additional_names = ""
- try:
- cert_detail = acm_client.describe_certificate(
- CertificateArn=cert_arn
- )
- sans = cert_detail.get("Certificate", {}).get(
- "SubjectAlternativeNames", []
- )
- # Filter out the main domain name from SANs
- additional = [san for san in sans if san != domain_name]
- additional_names = ", ".join(additional) if additional else ""
- except Exception:
- pass
-
- resources.append({
- "account_id": account_id,
- "region": region,
- "service": "acm",
- "resource_type": "Certificate",
- "resource_id": cert_arn,
- "name": domain_name,
- "attributes": {
- "Domain name": domain_name,
- "Additional names": additional_names,
- },
- })
- except Exception as e:
- logger.warning(f"Failed to scan ACM certificates in {region}: {str(e)}")
-
- return resources
- def _scan_waf_web_acls(
- self, account_id: str, region: str
- ) -> List[Dict[str, Any]]:
- """
- Scan WAF Web ACLs (global service for CloudFront).
-
- Args:
- account_id: AWS account ID
- region: Region to scan (should be us-east-1 for global service)
-
- Returns:
- List of WAF Web ACL resource dictionaries
-
- Attributes (horizontal layout):
- WebACL Name, Scope, Rules Count, Associated Resources
- """
- resources = []
-
- # Scan WAFv2 global (CloudFront) Web ACLs
- wafv2_client = self._session.client("wafv2", region_name="us-east-1")
-
- try:
- # List CloudFront Web ACLs (CLOUDFRONT scope)
- response = wafv2_client.list_web_acls(Scope="CLOUDFRONT")
-
- for acl in response.get("WebACLs", []):
- acl_name = acl.get("Name", "")
- acl_id = acl.get("Id", "")
- acl_arn = acl.get("ARN", "")
-
- # Get Web ACL details for rules count
- rules_count = 0
- associated_resources = []
-
- try:
- acl_response = wafv2_client.get_web_acl(
- Name=acl_name,
- Scope="CLOUDFRONT",
- Id=acl_id,
- )
- web_acl = acl_response.get("WebACL", {})
- rules_count = len(web_acl.get("Rules", []))
-
- # Get associated resources
- resources_response = wafv2_client.list_resources_for_web_acl(
- WebACLArn=acl_arn
- )
- for resource_arn in resources_response.get("ResourceArns", []):
- # Extract resource name from ARN
- resource_name = resource_arn.split("/")[-1]
- associated_resources.append(resource_name)
- except Exception as e:
- logger.debug(f"Failed to get WAF ACL details: {str(e)}")
-
- resources.append({
- "account_id": account_id,
- "region": "global",
- "service": "waf",
- "resource_type": "Web ACL",
- "resource_id": acl_arn,
- "name": acl_name,
- "attributes": {
- "WebACL Name": acl_name,
- "Scope": "CLOUDFRONT",
- "Rules Count": str(rules_count),
- "Associated Resources": (
- ", ".join(associated_resources)
- if associated_resources
- else "None"
- ),
- },
- })
- except Exception as e:
- logger.warning(f"Failed to scan WAFv2 Web ACLs: {str(e)}")
-
- # Also scan regional WAF Web ACLs
- try:
- response = wafv2_client.list_web_acls(Scope="REGIONAL")
-
- for acl in response.get("WebACLs", []):
- acl_name = acl.get("Name", "")
- acl_id = acl.get("Id", "")
- acl_arn = acl.get("ARN", "")
-
- rules_count = 0
- associated_resources = []
-
- try:
- acl_response = wafv2_client.get_web_acl(
- Name=acl_name,
- Scope="REGIONAL",
- Id=acl_id,
- )
- web_acl = acl_response.get("WebACL", {})
- rules_count = len(web_acl.get("Rules", []))
-
- resources_response = wafv2_client.list_resources_for_web_acl(
- WebACLArn=acl_arn
- )
- for resource_arn in resources_response.get("ResourceArns", []):
- resource_name = resource_arn.split("/")[-1]
- associated_resources.append(resource_name)
- except Exception as e:
- logger.debug(f"Failed to get WAF ACL details: {str(e)}")
-
- resources.append({
- "account_id": account_id,
- "region": "global",
- "service": "waf",
- "resource_type": "Web ACL",
- "resource_id": acl_arn,
- "name": acl_name,
- "attributes": {
- "WebACL Name": acl_name,
- "Scope": "REGIONAL",
- "Rules Count": str(rules_count),
- "Associated Resources": (
- ", ".join(associated_resources)
- if associated_resources
- else "None"
- ),
- },
- })
- except Exception as e:
- logger.warning(f"Failed to scan regional WAFv2 Web ACLs: {str(e)}")
-
- return resources
- def _scan_sns_topics(
- self, account_id: str, region: str
- ) -> List[Dict[str, Any]]:
- """
- Scan SNS Topics in the specified region.
-
- Args:
- account_id: AWS account ID
- region: Region to scan
-
- Returns:
- List of SNS Topic resource dictionaries
-
- Attributes (horizontal layout):
- Topic Name, Topic Display Name, Subscription Protocol, Subscription Endpoint
- """
- resources = []
- sns_client = self._session.client("sns", region_name=region)
-
- try:
- paginator = sns_client.get_paginator("list_topics")
- for page in paginator.paginate():
- for topic in page.get("Topics", []):
- topic_arn = topic.get("TopicArn", "")
- topic_name = topic_arn.split(":")[-1] if topic_arn else ""
-
- # Get topic attributes
- display_name = ""
- try:
- attrs_response = sns_client.get_topic_attributes(
- TopicArn=topic_arn
- )
- attrs = attrs_response.get("Attributes", {})
- display_name = attrs.get("DisplayName", "")
- except Exception as e:
- logger.debug(f"Failed to get topic attributes: {str(e)}")
-
- # Get subscriptions
- subscriptions = []
- try:
- sub_paginator = sns_client.get_paginator(
- "list_subscriptions_by_topic"
- )
- for sub_page in sub_paginator.paginate(TopicArn=topic_arn):
- for sub in sub_page.get("Subscriptions", []):
- protocol = sub.get("Protocol", "")
- endpoint = sub.get("Endpoint", "")
- subscriptions.append({
- "protocol": protocol,
- "endpoint": endpoint,
- })
- except Exception as e:
- logger.debug(f"Failed to get subscriptions: {str(e)}")
-
- # Create one entry per subscription, or one entry if no subscriptions
- if subscriptions:
- for sub in subscriptions:
- resources.append({
- "account_id": account_id,
- "region": region,
- "service": "sns",
- "resource_type": "Topic",
- "resource_id": topic_arn,
- "name": topic_name,
- "attributes": {
- "Topic Name": topic_name,
- "Topic Display Name": display_name,
- "Subscription Protocol": sub["protocol"],
- "Subscription Endpoint": sub["endpoint"],
- },
- })
- else:
- resources.append({
- "account_id": account_id,
- "region": region,
- "service": "sns",
- "resource_type": "Topic",
- "resource_id": topic_arn,
- "name": topic_name,
- "attributes": {
- "Topic Name": topic_name,
- "Topic Display Name": display_name,
- "Subscription Protocol": "N/A",
- "Subscription Endpoint": "N/A",
- },
- })
- except Exception as e:
- logger.warning(f"Failed to scan SNS topics: {str(e)}")
-
- return resources
- def _scan_cloudwatch_log_groups(
- self, account_id: str, region: str
- ) -> List[Dict[str, Any]]:
- """
- Scan CloudWatch Log Groups in the specified region.
-
- Args:
- account_id: AWS account ID
- region: Region to scan
-
- Returns:
- List of CloudWatch Log Group resource dictionaries
-
- Attributes (horizontal layout):
- Log Group Name, Retention Days, Stored Bytes, KMS Encryption
- """
- resources = []
- logs_client = self._session.client("logs", region_name=region)
-
- try:
- paginator = logs_client.get_paginator("describe_log_groups")
- for page in paginator.paginate():
- for log_group in page.get("logGroups", []):
- log_group_name = log_group.get("logGroupName", "")
-
- # Get retention in days
- retention = log_group.get("retentionInDays")
- retention_str = str(retention) if retention else "Never Expire"
-
- # Get stored bytes
- stored_bytes = log_group.get("storedBytes", 0)
- stored_str = (
- f"{stored_bytes / (1024*1024):.2f} MB"
- if stored_bytes
- else "0 MB"
- )
-
- # Check KMS encryption
- kms_key = log_group.get("kmsKeyId", "")
- kms_encrypted = "Yes" if kms_key else "No"
-
- resources.append({
- "account_id": account_id,
- "region": region,
- "service": "cloudwatch",
- "resource_type": "Log Group",
- "resource_id": log_group.get("arn", log_group_name),
- "name": log_group_name,
- "attributes": {
- "Log Group Name": log_group_name,
- "Retention Days": retention_str,
- "Stored Bytes": stored_str,
- "KMS Encryption": kms_encrypted,
- },
- })
- except Exception as e:
- logger.warning(f"Failed to scan CloudWatch log groups: {str(e)}")
-
- return resources
- def _scan_eventbridge_rules(
- self, account_id: str, region: str
- ) -> List[Dict[str, Any]]:
- """
- Scan EventBridge Rules in the specified region.
-
- Args:
- account_id: AWS account ID
- region: Region to scan
-
- Returns:
- List of EventBridge Rule resource dictionaries
-
- Attributes (horizontal layout):
- Name, Description, Event Bus, State
- """
- resources = []
- events_client = self._session.client("events", region_name=region)
-
- try:
- # List event buses first
- buses_response = events_client.list_event_buses()
- event_buses = [
- bus.get("Name", "default")
- for bus in buses_response.get("EventBuses", [])
- ]
-
- # If no buses found, use default
- if not event_buses:
- event_buses = ["default"]
-
- for bus_name in event_buses:
- try:
- paginator = events_client.get_paginator("list_rules")
- for page in paginator.paginate(EventBusName=bus_name):
- for rule in page.get("Rules", []):
- rule_name = rule.get("Name", "")
-
- resources.append({
- "account_id": account_id,
- "region": region,
- "service": "eventbridge",
- "resource_type": "Rule",
- "resource_id": rule.get("Arn", rule_name),
- "name": rule_name,
- "attributes": {
- "Name": rule_name,
- "Description": rule.get("Description", ""),
- "Event Bus": bus_name,
- "State": rule.get("State", ""),
- },
- })
- except Exception as e:
- logger.debug(
- f"Failed to list rules for bus {bus_name}: {str(e)}"
- )
- except Exception as e:
- logger.warning(f"Failed to scan EventBridge rules: {str(e)}")
-
- return resources
- def _scan_cloudtrail_trails(
- self, account_id: str, region: str
- ) -> List[Dict[str, Any]]:
- """
- Scan CloudTrail Trails (global service).
-
- Args:
- account_id: AWS account ID
- region: Region to scan (should be us-east-1 for global service)
-
- Returns:
- List of CloudTrail Trail resource dictionaries
-
- Attributes (horizontal layout):
- Name, Multi-Region Trail, Log File Validation, KMS Encryption
- """
- resources = []
- cloudtrail_client = self._session.client(
- "cloudtrail", region_name="us-east-1"
- )
-
- try:
- response = cloudtrail_client.describe_trails()
- for trail in response.get("trailList", []):
- trail_name = trail.get("Name", "")
-
- # Get multi-region status
- is_multi_region = trail.get("IsMultiRegionTrail", False)
-
- resources.append({
- "account_id": account_id,
- "region": "global",
- "service": "cloudtrail",
- "resource_type": "Trail",
- "resource_id": trail.get("TrailARN", trail_name),
- "name": trail_name,
- "attributes": {
- "Name": trail_name,
- "Multi-Region Trail": "Yes" if is_multi_region else "No",
- "Log File Validation": (
- "Yes" if trail.get("LogFileValidationEnabled") else "No"
- ),
- "KMS Encryption": (
- "Yes" if trail.get("KmsKeyId") else "No"
- ),
- },
- })
- except Exception as e:
- logger.warning(f"Failed to scan CloudTrail trails: {str(e)}")
-
- return resources
- def _scan_config_recorders(
- self, account_id: str, region: str
- ) -> List[Dict[str, Any]]:
- """
- Scan AWS Config Recorders in the specified region.
-
- Args:
- account_id: AWS account ID
- region: Region to scan
-
- Returns:
- List of AWS Config Recorder resource dictionaries
-
- Attributes (horizontal layout):
- Name, Regional Resources, Global Resources, Retention period
- """
- resources = []
- config_client = self._session.client("config", region_name=region)
-
- try:
- response = config_client.describe_configuration_recorders()
- for recorder in response.get("ConfigurationRecorders", []):
- recorder_name = recorder.get("name", "")
-
- # Get recording group settings
- recording_group = recorder.get("recordingGroup", {})
- all_supported = recording_group.get("allSupported", False)
- include_global = recording_group.get(
- "includeGlobalResourceTypes", False
- )
-
- # Get retention period
- retention_period = "N/A"
- try:
- retention_response = (
- config_client.describe_retention_configurations()
- )
- for retention in retention_response.get(
- "RetentionConfigurations", []
- ):
- retention_period = (
- f"{retention.get('RetentionPeriodInDays', 'N/A')} days"
- )
- break
- except Exception:
- pass
-
- resources.append({
- "account_id": account_id,
- "region": region,
- "service": "config",
- "resource_type": "Config",
- "resource_id": recorder_name,
- "name": recorder_name,
- "attributes": {
- "Name": recorder_name,
- "Regional Resources": "Yes" if all_supported else "No",
- "Global Resources": "Yes" if include_global else "No",
- "Retention period": retention_period,
- },
- })
- except Exception as e:
- logger.warning(f"Failed to scan Config recorders: {str(e)}")
-
- return resources
- def parse_arguments() -> argparse.Namespace:
- """
- Parse command-line arguments.
-
- Returns:
- Parsed arguments namespace
- """
- parser = argparse.ArgumentParser(
- description="CloudShell Scanner - AWS Resource Scanner for CloudShell Environment",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog="""
- Examples:
- # Scan all regions and services
- python cloudshell_scanner.py
- # Scan specific regions
- python cloudshell_scanner.py --regions us-east-1,ap-northeast-1
- # Specify output file
- python cloudshell_scanner.py --output my_scan.json
- # Scan specific services
- python cloudshell_scanner.py --services ec2,vpc,rds
- # Combine options
- python cloudshell_scanner.py --regions us-east-1 --services ec2,vpc --output scan.json
- """,
- )
-
- parser.add_argument(
- "--regions",
- type=str,
- default=None,
- help="Comma-separated list of AWS regions to scan (default: all regions)",
- )
-
- parser.add_argument(
- "--output",
- type=str,
- default="scan_result.json",
- help="Output JSON file path (default: scan_result.json)",
- )
-
- parser.add_argument(
- "--services",
- type=str,
- default=None,
- help="Comma-separated list of services to scan (default: all services)",
- )
-
- parser.add_argument(
- "--version",
- action="version",
- version=f"CloudShell Scanner v{__version__}",
- )
-
- parser.add_argument(
- "--verbose",
- "-v",
- action="store_true",
- help="Enable verbose logging",
- )
-
- parser.add_argument(
- "--list-services",
- action="store_true",
- help="List all supported services and exit",
- )
-
- return parser.parse_args()
- def main() -> int:
- """
- Main entry point for the CloudShell Scanner.
-
- Returns:
- Exit code (0 for success, non-zero for failure)
- """
- args = parse_arguments()
-
- # Set logging level
- if args.verbose:
- logging.getLogger().setLevel(logging.DEBUG)
- logger.debug("Verbose logging enabled")
-
- # List services and exit if requested
- if args.list_services:
- print("Supported services:")
- for service in CloudShellScanner.SUPPORTED_SERVICES:
- global_marker = " (global)" if service in CloudShellScanner.GLOBAL_SERVICES else ""
- print(f" - {service}{global_marker}")
- return 0
-
- # Parse regions
- regions: Optional[List[str]] = None
- if args.regions:
- regions = [r.strip() for r in args.regions.split(",")]
- logger.info(f"Regions specified: {regions}")
-
- # Parse services
- services: Optional[List[str]] = None
- if args.services:
- services = [s.strip() for s in args.services.split(",")]
- logger.info(f"Services specified: {services}")
-
- try:
- # Initialize scanner
- print(f"CloudShell Scanner v{__version__}")
- print("=" * 50)
-
- scanner = CloudShellScanner()
-
- # Get account info
- account_id = scanner.get_account_id()
- print(f"AWS Account: {account_id}")
- print("=" * 50)
-
- # Run scan
- result = scanner.scan_resources(regions=regions, services=services)
-
- # Export results
- scanner.export_json(result, args.output)
-
- # Print summary
- print("\n" + "=" * 50)
- print("Scan Summary:")
- print(f" Account ID: {result['metadata']['account_id']}")
- print(f" Regions scanned: {len(result['metadata']['regions_scanned'])}")
- print(f" Services scanned: {len(result['metadata']['services_scanned'])}")
- print(f" Total resources: {result['metadata']['total_resources']}")
- print(f" Total errors: {result['metadata']['total_errors']}")
- print(f" Output file: {args.output}")
- print("=" * 50)
-
- return 0
-
- except KeyboardInterrupt:
- print("\n\nScan interrupted by user")
- return 130
- except Exception as e:
- logger.error(f"Scan failed: {e}")
- if args.verbose:
- import traceback
- traceback.print_exc()
- return 1
- if __name__ == "__main__":
- sys.exit(main())
|