| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568 |
- """
- Integration Tests for Report Generation
- This module tests the integration between uploaded scan data and report generation,
- ensuring that reports generated from uploaded CloudShell scan data are consistent
- with reports generated from credential-based scanning.
- Requirements:
- - 5.1: Generate reports in the same format as existing scan tasks
- - 5.2: Use account_id from uploaded data as report identifier
- - 5.4: Allow users to download generated reports
- """
- import os
- import json
- import tempfile
- import pytest
- from datetime import datetime
- from unittest.mock import patch, MagicMock
- from app import create_app, db
- from app.services.scan_data_processor import ScanDataProcessor
- from app.services.report_generator import ReportGenerator, SERVICE_CONFIG, SERVICE_ORDER
- @pytest.fixture
- def app():
- """Create application for testing"""
- app = create_app('testing')
-
- with app.app_context():
- db.create_all()
- yield app
- db.drop_all()
- @pytest.fixture
- def processor():
- """Create a ScanDataProcessor instance"""
- return ScanDataProcessor()
- @pytest.fixture
- def sample_cloudshell_scan_data():
- """
- Create sample CloudShell scan data matching the format produced by cloudshell_scanner.py.
-
- This fixture provides realistic scan data that would be generated by the CloudShell scanner.
- """
- return {
- "metadata": {
- "account_id": "123456789012",
- "scan_timestamp": "2024-01-15T10:30:00Z",
- "regions_scanned": ["us-east-1", "ap-northeast-1"],
- "services_scanned": ["vpc", "ec2", "s3", "rds"],
- "scanner_version": "1.0.0",
- "total_resources": 5,
- "total_errors": 0
- },
- "resources": {
- "vpc": [
- {
- "account_id": "123456789012",
- "region": "us-east-1",
- "service": "vpc",
- "resource_type": "VPC",
- "resource_id": "vpc-12345678",
- "name": "main-vpc",
- "attributes": {
- "cidr_block": "10.0.0.0/16",
- "state": "available",
- "is_default": False
- }
- }
- ],
- "ec2": [
- {
- "account_id": "123456789012",
- "region": "us-east-1",
- "service": "ec2",
- "resource_type": "Instance",
- "resource_id": "i-0123456789abcdef0",
- "name": "web-server-1",
- "attributes": {
- "instance_type": "t3.medium",
- "availability_zone": "us-east-1a",
- "ami_id": "ami-12345678",
- "public_ip": "54.123.45.67",
- "private_ip": "10.0.1.100",
- "vpc_id": "vpc-12345678",
- "subnet_id": "subnet-12345678",
- "key_name": "my-key",
- "security_groups": ["sg-12345678"],
- "ebs_type": "gp3",
- "ebs_size": 100,
- "encryption": True
- }
- }
- ],
- "s3": [
- {
- "account_id": "123456789012",
- "region": "us-east-1",
- "service": "s3",
- "resource_type": "Bucket",
- "resource_id": "my-bucket-123",
- "name": "my-bucket-123",
- "attributes": {
- "creation_date": "2024-01-01T00:00:00Z"
- }
- }
- ],
- "rds": [
- {
- "account_id": "123456789012",
- "region": "us-east-1",
- "service": "rds",
- "resource_type": "DBInstance",
- "resource_id": "mydb",
- "name": "mydb",
- "attributes": {
- "endpoint": "mydb.abc123.us-east-1.rds.amazonaws.com",
- "db_name": "production",
- "master_username": "admin",
- "port": 3306,
- "engine": "mysql",
- "engine_version": "8.0.35",
- "instance_class": "db.t3.medium",
- "storage_type": "gp3",
- "allocated_storage": 100,
- "multi_az": True,
- "security_groups": ["sg-db123456"],
- "deletion_protection": True,
- "performance_insights_enabled": True,
- "cloudwatch_logs": ["error", "general", "slowquery"]
- }
- }
- ]
- },
- "errors": []
- }
- @pytest.fixture
- def sample_credential_scan_results():
- """
- Create sample scan results in the format produced by credential-based scanning.
-
- This fixture provides data in the same format that would be produced by
- the AWSScanner when using credentials.
- """
- return {
- "vpc": [
- {
- "account_id": "123456789012",
- "region": "us-east-1",
- "service": "vpc",
- "resource_type": "VPC",
- "resource_id": "vpc-12345678",
- "name": "main-vpc",
- "attributes": {
- "cidr_block": "10.0.0.0/16",
- "state": "available",
- "is_default": False
- }
- }
- ],
- "ec2": [
- {
- "account_id": "123456789012",
- "region": "us-east-1",
- "service": "ec2",
- "resource_type": "Instance",
- "resource_id": "i-0123456789abcdef0",
- "name": "web-server-1",
- "attributes": {
- "instance_type": "t3.medium",
- "availability_zone": "us-east-1a",
- "ami_id": "ami-12345678",
- "public_ip": "54.123.45.67",
- "private_ip": "10.0.1.100",
- "vpc_id": "vpc-12345678",
- "subnet_id": "subnet-12345678",
- "key_name": "my-key",
- "security_groups": ["sg-12345678"],
- "ebs_type": "gp3",
- "ebs_size": 100,
- "encryption": True
- }
- }
- ],
- "s3": [
- {
- "account_id": "123456789012",
- "region": "us-east-1",
- "service": "s3",
- "resource_type": "Bucket",
- "resource_id": "my-bucket-123",
- "name": "my-bucket-123",
- "attributes": {
- "creation_date": "2024-01-01T00:00:00Z"
- }
- }
- ],
- "rds": [
- {
- "account_id": "123456789012",
- "region": "us-east-1",
- "service": "rds",
- "resource_type": "DBInstance",
- "resource_id": "mydb",
- "name": "mydb",
- "attributes": {
- "endpoint": "mydb.abc123.us-east-1.rds.amazonaws.com",
- "db_name": "production",
- "master_username": "admin",
- "port": 3306,
- "engine": "mysql",
- "engine_version": "8.0.35",
- "instance_class": "db.t3.medium",
- "storage_type": "gp3",
- "allocated_storage": 100,
- "multi_az": True,
- "security_groups": ["sg-db123456"],
- "deletion_protection": True,
- "performance_insights_enabled": True,
- "cloudwatch_logs": ["error", "general", "slowquery"]
- }
- }
- ]
- }
- @pytest.fixture
- def project_metadata():
- """Create sample project metadata for report generation"""
- return {
- "clientName": "TestClient",
- "projectName": "TestProject",
- "bdManager": "BD Manager",
- "bdManagerEmail": "bd@example.com",
- "solutionsArchitect": "SA Name",
- "solutionsArchitectEmail": "sa@example.com",
- "cloudEngineer": "CE Name",
- "cloudEngineerEmail": "ce@example.com"
- }
- class TestScanDataConversion:
- """Tests for converting CloudShell scan data to report-compatible format"""
-
- def test_convert_cloudshell_data_to_scan_result(self, processor, sample_cloudshell_scan_data):
- """
- Test that CloudShell scan data is correctly converted to ScanResult format.
-
- Requirements:
- - 5.1: Generate reports in the same format as existing scan tasks
- """
- result = processor.convert_to_scan_result(sample_cloudshell_scan_data)
-
- # Verify ScanResult structure
- assert result.success is True
- assert 'vpc' in result.resources
- assert 'ec2' in result.resources
- assert 's3' in result.resources
- assert 'rds' in result.resources
-
- # Verify resource count
- assert len(result.resources['vpc']) == 1
- assert len(result.resources['ec2']) == 1
- assert len(result.resources['s3']) == 1
- assert len(result.resources['rds']) == 1
-
- def test_converted_data_has_required_fields(self, processor, sample_cloudshell_scan_data):
- """
- Test that converted resources have all required fields for report generation.
-
- Requirements:
- - 5.1: Generate reports in the same format as existing scan tasks
- """
- result = processor.convert_to_scan_result(sample_cloudshell_scan_data)
-
- # Check VPC resource
- vpc_resource = result.resources['vpc'][0]
- assert hasattr(vpc_resource, 'account_id')
- assert hasattr(vpc_resource, 'region')
- assert hasattr(vpc_resource, 'service')
- assert hasattr(vpc_resource, 'resource_type')
- assert hasattr(vpc_resource, 'resource_id')
- assert hasattr(vpc_resource, 'name')
- assert hasattr(vpc_resource, 'attributes')
-
- # Verify values
- assert vpc_resource.account_id == "123456789012"
- assert vpc_resource.region == "us-east-1"
- assert vpc_resource.name == "main-vpc"
-
- def test_metadata_preserved_in_conversion(self, processor, sample_cloudshell_scan_data):
- """
- Test that metadata from CloudShell scan is preserved in conversion.
-
- Requirements:
- - 5.2: Use account_id from uploaded data as report identifier
- """
- result = processor.convert_to_scan_result(sample_cloudshell_scan_data)
-
- assert result.metadata['account_id'] == "123456789012"
- assert result.metadata['regions_scanned'] == ["us-east-1", "ap-northeast-1"]
- assert result.metadata['source'] == 'upload'
- class TestReportFormatConsistency:
- """Tests for verifying report format consistency between upload and credential scan"""
-
- def test_upload_data_produces_same_resource_structure(
- self,
- processor,
- sample_cloudshell_scan_data,
- sample_credential_scan_results
- ):
- """
- Test that uploaded data produces the same resource structure as credential scan.
-
- Requirements:
- - 5.1: Generate reports in the same format as existing scan tasks
- """
- # Convert CloudShell data
- result = processor.convert_to_scan_result(sample_cloudshell_scan_data)
-
- # Convert to dict format (same as credential scan results)
- converted_results = {}
- for service_key, resources in result.resources.items():
- converted_results[service_key] = []
- for resource in resources:
- if hasattr(resource, 'to_dict'):
- converted_results[service_key].append(resource.to_dict())
- else:
- converted_results[service_key].append(resource)
-
- # Verify same services are present
- assert set(converted_results.keys()) == set(sample_credential_scan_results.keys())
-
- # Verify resource structure matches
- for service_key in converted_results:
- upload_resources = converted_results[service_key]
- cred_resources = sample_credential_scan_results[service_key]
-
- assert len(upload_resources) == len(cred_resources)
-
- for upload_res, cred_res in zip(upload_resources, cred_resources):
- # Verify all required fields are present
- assert 'account_id' in upload_res
- assert 'region' in upload_res
- assert 'service' in upload_res
- assert 'resource_type' in upload_res
- assert 'resource_id' in upload_res
- assert 'name' in upload_res
- assert 'attributes' in upload_res
-
- def test_all_supported_services_can_be_processed(self, processor):
- """
- Test that all services in SERVICE_CONFIG can be processed from uploaded data.
-
- Requirements:
- - 5.1: Generate reports in the same format as existing scan tasks
- """
- # Create scan data with all supported services
- scan_data = {
- "metadata": {
- "account_id": "123456789012",
- "scan_timestamp": "2024-01-15T10:30:00Z",
- "regions_scanned": ["us-east-1"],
- "services_scanned": list(SERVICE_CONFIG.keys()),
- },
- "resources": {},
- "errors": []
- }
-
- # Add a sample resource for each service
- for service_key in SERVICE_CONFIG.keys():
- scan_data["resources"][service_key] = [
- {
- "account_id": "123456789012",
- "region": "us-east-1",
- "service": service_key,
- "resource_type": "TestResource",
- "resource_id": f"{service_key}-123",
- "name": f"test-{service_key}",
- "attributes": {}
- }
- ]
-
- # Validate and convert
- is_valid, errors = processor.validate_scan_data(scan_data)
- assert is_valid, f"Validation failed: {errors}"
-
- result = processor.convert_to_scan_result(scan_data)
-
- # Verify all services were converted
- for service_key in SERVICE_CONFIG.keys():
- assert service_key in result.resources, f"Service {service_key} not in converted results"
- assert len(result.resources[service_key]) == 1
- class TestReportGeneration:
- """Tests for actual report generation from uploaded data"""
-
- def test_report_generator_accepts_converted_data(
- self,
- app,
- processor,
- sample_cloudshell_scan_data,
- project_metadata
- ):
- """
- Test that ReportGenerator can process converted CloudShell data.
-
- Requirements:
- - 5.1: Generate reports in the same format as existing scan tasks
- """
- with app.app_context():
- # Convert CloudShell data
- result = processor.convert_to_scan_result(sample_cloudshell_scan_data)
-
- # Convert to dict format for report generator
- scan_results = {}
- for service_key, resources in result.resources.items():
- scan_results[service_key] = []
- for resource in resources:
- if hasattr(resource, 'to_dict'):
- scan_results[service_key].append(resource.to_dict())
- else:
- scan_results[service_key].append(resource)
-
- # Create report generator
- generator = ReportGenerator()
-
- # Test that filter_empty_services works
- filtered = generator.filter_empty_services(scan_results)
- assert len(filtered) > 0
-
- # Verify services with resources are included
- assert 'vpc' in filtered
- assert 'ec2' in filtered
-
- def test_report_generation_with_upload_data(
- self,
- app,
- processor,
- sample_cloudshell_scan_data,
- project_metadata
- ):
- """
- Test complete report generation from uploaded CloudShell data.
-
- Requirements:
- - 5.1: Generate reports in the same format as existing scan tasks
- - 5.4: Allow users to download generated reports
- """
- with app.app_context():
- # Convert CloudShell data
- result = processor.convert_to_scan_result(sample_cloudshell_scan_data)
-
- # Convert to dict format
- scan_results = {}
- for service_key, resources in result.resources.items():
- scan_results[service_key] = []
- for resource in resources:
- if hasattr(resource, 'to_dict'):
- scan_results[service_key].append(resource.to_dict())
- else:
- scan_results[service_key].append(resource)
-
- # Check if template exists
- generator = ReportGenerator()
- try:
- template_path = generator._get_default_template_path()
- template_exists = os.path.exists(template_path)
- except FileNotFoundError:
- template_exists = False
-
- if not template_exists:
- pytest.skip("Report template not found, skipping report generation test")
-
- # Generate report to temp file
- with tempfile.NamedTemporaryFile(suffix='.docx', delete=False) as tmp:
- output_path = tmp.name
-
- try:
- regions = sample_cloudshell_scan_data['metadata']['regions_scanned']
- report_result = generator.generate_report(
- scan_results=scan_results,
- project_metadata=project_metadata,
- output_path=output_path,
- regions=regions
- )
-
- # Verify report was generated
- assert os.path.exists(output_path)
- assert report_result['file_size'] > 0
- assert 'vpc' in report_result['services_included'] or len(report_result['services_included']) > 0
-
- finally:
- # Cleanup
- if os.path.exists(output_path):
- os.remove(output_path)
- class TestErrorHandling:
- """Tests for error handling in report generation integration"""
-
- def test_empty_resources_handled_gracefully(self, processor):
- """Test that empty resources are handled gracefully"""
- scan_data = {
- "metadata": {
- "account_id": "123456789012",
- "scan_timestamp": "2024-01-15T10:30:00Z",
- "regions_scanned": ["us-east-1"],
- "services_scanned": ["vpc"],
- },
- "resources": {},
- "errors": []
- }
-
- is_valid, errors = processor.validate_scan_data(scan_data)
- assert is_valid
-
- result = processor.convert_to_scan_result(scan_data)
- assert result.success is True
- assert len(result.resources) == 0
-
- def test_scan_errors_preserved_in_conversion(self, processor):
- """Test that scan errors from CloudShell are preserved"""
- scan_data = {
- "metadata": {
- "account_id": "123456789012",
- "scan_timestamp": "2024-01-15T10:30:00Z",
- "regions_scanned": ["us-east-1"],
- "services_scanned": ["vpc", "ec2"],
- },
- "resources": {
- "vpc": [
- {
- "account_id": "123456789012",
- "region": "us-east-1",
- "service": "vpc",
- "resource_type": "VPC",
- "resource_id": "vpc-123",
- "name": "test-vpc",
- "attributes": {}
- }
- ]
- },
- "errors": [
- {
- "service": "ec2",
- "region": "us-east-1",
- "error": "Access Denied",
- "error_type": "AccessDeniedException"
- }
- ]
- }
-
- result = processor.convert_to_scan_result(scan_data)
-
- assert len(result.errors) == 1
- assert result.errors[0]['service'] == 'ec2'
- assert result.errors[0]['error'] == 'Access Denied'
|