""" Integration Tests for Report Generation This module tests the integration between uploaded scan data and report generation, ensuring that reports generated from uploaded CloudShell scan data are consistent with reports generated from credential-based scanning. Requirements: - 5.1: Generate reports in the same format as existing scan tasks - 5.2: Use account_id from uploaded data as report identifier - 5.4: Allow users to download generated reports """ import os import json import tempfile import pytest from datetime import datetime from unittest.mock import patch, MagicMock from app import create_app, db from app.services.scan_data_processor import ScanDataProcessor from app.services.report_generator import ReportGenerator, SERVICE_CONFIG, SERVICE_ORDER @pytest.fixture def app(): """Create application for testing""" app = create_app('testing') with app.app_context(): db.create_all() yield app db.drop_all() @pytest.fixture def processor(): """Create a ScanDataProcessor instance""" return ScanDataProcessor() @pytest.fixture def sample_cloudshell_scan_data(): """ Create sample CloudShell scan data matching the format produced by cloudshell_scanner.py. This fixture provides realistic scan data that would be generated by the CloudShell scanner. """ return { "metadata": { "account_id": "123456789012", "scan_timestamp": "2024-01-15T10:30:00Z", "regions_scanned": ["us-east-1", "ap-northeast-1"], "services_scanned": ["vpc", "ec2", "s3", "rds"], "scanner_version": "1.0.0", "total_resources": 5, "total_errors": 0 }, "resources": { "vpc": [ { "account_id": "123456789012", "region": "us-east-1", "service": "vpc", "resource_type": "VPC", "resource_id": "vpc-12345678", "name": "main-vpc", "attributes": { "cidr_block": "10.0.0.0/16", "state": "available", "is_default": False } } ], "ec2": [ { "account_id": "123456789012", "region": "us-east-1", "service": "ec2", "resource_type": "Instance", "resource_id": "i-0123456789abcdef0", "name": "web-server-1", "attributes": { "instance_type": "t3.medium", "availability_zone": "us-east-1a", "ami_id": "ami-12345678", "public_ip": "54.123.45.67", "private_ip": "10.0.1.100", "vpc_id": "vpc-12345678", "subnet_id": "subnet-12345678", "key_name": "my-key", "security_groups": ["sg-12345678"], "ebs_type": "gp3", "ebs_size": 100, "encryption": True } } ], "s3": [ { "account_id": "123456789012", "region": "us-east-1", "service": "s3", "resource_type": "Bucket", "resource_id": "my-bucket-123", "name": "my-bucket-123", "attributes": { "creation_date": "2024-01-01T00:00:00Z" } } ], "rds": [ { "account_id": "123456789012", "region": "us-east-1", "service": "rds", "resource_type": "DBInstance", "resource_id": "mydb", "name": "mydb", "attributes": { "endpoint": "mydb.abc123.us-east-1.rds.amazonaws.com", "db_name": "production", "master_username": "admin", "port": 3306, "engine": "mysql", "engine_version": "8.0.35", "instance_class": "db.t3.medium", "storage_type": "gp3", "allocated_storage": 100, "multi_az": True, "security_groups": ["sg-db123456"], "deletion_protection": True, "performance_insights_enabled": True, "cloudwatch_logs": ["error", "general", "slowquery"] } } ] }, "errors": [] } @pytest.fixture def sample_credential_scan_results(): """ Create sample scan results in the format produced by credential-based scanning. This fixture provides data in the same format that would be produced by the AWSScanner when using credentials. """ return { "vpc": [ { "account_id": "123456789012", "region": "us-east-1", "service": "vpc", "resource_type": "VPC", "resource_id": "vpc-12345678", "name": "main-vpc", "attributes": { "cidr_block": "10.0.0.0/16", "state": "available", "is_default": False } } ], "ec2": [ { "account_id": "123456789012", "region": "us-east-1", "service": "ec2", "resource_type": "Instance", "resource_id": "i-0123456789abcdef0", "name": "web-server-1", "attributes": { "instance_type": "t3.medium", "availability_zone": "us-east-1a", "ami_id": "ami-12345678", "public_ip": "54.123.45.67", "private_ip": "10.0.1.100", "vpc_id": "vpc-12345678", "subnet_id": "subnet-12345678", "key_name": "my-key", "security_groups": ["sg-12345678"], "ebs_type": "gp3", "ebs_size": 100, "encryption": True } } ], "s3": [ { "account_id": "123456789012", "region": "us-east-1", "service": "s3", "resource_type": "Bucket", "resource_id": "my-bucket-123", "name": "my-bucket-123", "attributes": { "creation_date": "2024-01-01T00:00:00Z" } } ], "rds": [ { "account_id": "123456789012", "region": "us-east-1", "service": "rds", "resource_type": "DBInstance", "resource_id": "mydb", "name": "mydb", "attributes": { "endpoint": "mydb.abc123.us-east-1.rds.amazonaws.com", "db_name": "production", "master_username": "admin", "port": 3306, "engine": "mysql", "engine_version": "8.0.35", "instance_class": "db.t3.medium", "storage_type": "gp3", "allocated_storage": 100, "multi_az": True, "security_groups": ["sg-db123456"], "deletion_protection": True, "performance_insights_enabled": True, "cloudwatch_logs": ["error", "general", "slowquery"] } } ] } @pytest.fixture def project_metadata(): """Create sample project metadata for report generation""" return { "clientName": "TestClient", "projectName": "TestProject", "bdManager": "BD Manager", "bdManagerEmail": "bd@example.com", "solutionsArchitect": "SA Name", "solutionsArchitectEmail": "sa@example.com", "cloudEngineer": "CE Name", "cloudEngineerEmail": "ce@example.com" } class TestScanDataConversion: """Tests for converting CloudShell scan data to report-compatible format""" def test_convert_cloudshell_data_to_scan_result(self, processor, sample_cloudshell_scan_data): """ Test that CloudShell scan data is correctly converted to ScanResult format. Requirements: - 5.1: Generate reports in the same format as existing scan tasks """ result = processor.convert_to_scan_result(sample_cloudshell_scan_data) # Verify ScanResult structure assert result.success is True assert 'vpc' in result.resources assert 'ec2' in result.resources assert 's3' in result.resources assert 'rds' in result.resources # Verify resource count assert len(result.resources['vpc']) == 1 assert len(result.resources['ec2']) == 1 assert len(result.resources['s3']) == 1 assert len(result.resources['rds']) == 1 def test_converted_data_has_required_fields(self, processor, sample_cloudshell_scan_data): """ Test that converted resources have all required fields for report generation. Requirements: - 5.1: Generate reports in the same format as existing scan tasks """ result = processor.convert_to_scan_result(sample_cloudshell_scan_data) # Check VPC resource vpc_resource = result.resources['vpc'][0] assert hasattr(vpc_resource, 'account_id') assert hasattr(vpc_resource, 'region') assert hasattr(vpc_resource, 'service') assert hasattr(vpc_resource, 'resource_type') assert hasattr(vpc_resource, 'resource_id') assert hasattr(vpc_resource, 'name') assert hasattr(vpc_resource, 'attributes') # Verify values assert vpc_resource.account_id == "123456789012" assert vpc_resource.region == "us-east-1" assert vpc_resource.name == "main-vpc" def test_metadata_preserved_in_conversion(self, processor, sample_cloudshell_scan_data): """ Test that metadata from CloudShell scan is preserved in conversion. Requirements: - 5.2: Use account_id from uploaded data as report identifier """ result = processor.convert_to_scan_result(sample_cloudshell_scan_data) assert result.metadata['account_id'] == "123456789012" assert result.metadata['regions_scanned'] == ["us-east-1", "ap-northeast-1"] assert result.metadata['source'] == 'upload' class TestReportFormatConsistency: """Tests for verifying report format consistency between upload and credential scan""" def test_upload_data_produces_same_resource_structure( self, processor, sample_cloudshell_scan_data, sample_credential_scan_results ): """ Test that uploaded data produces the same resource structure as credential scan. Requirements: - 5.1: Generate reports in the same format as existing scan tasks """ # Convert CloudShell data result = processor.convert_to_scan_result(sample_cloudshell_scan_data) # Convert to dict format (same as credential scan results) converted_results = {} for service_key, resources in result.resources.items(): converted_results[service_key] = [] for resource in resources: if hasattr(resource, 'to_dict'): converted_results[service_key].append(resource.to_dict()) else: converted_results[service_key].append(resource) # Verify same services are present assert set(converted_results.keys()) == set(sample_credential_scan_results.keys()) # Verify resource structure matches for service_key in converted_results: upload_resources = converted_results[service_key] cred_resources = sample_credential_scan_results[service_key] assert len(upload_resources) == len(cred_resources) for upload_res, cred_res in zip(upload_resources, cred_resources): # Verify all required fields are present assert 'account_id' in upload_res assert 'region' in upload_res assert 'service' in upload_res assert 'resource_type' in upload_res assert 'resource_id' in upload_res assert 'name' in upload_res assert 'attributes' in upload_res def test_all_supported_services_can_be_processed(self, processor): """ Test that all services in SERVICE_CONFIG can be processed from uploaded data. Requirements: - 5.1: Generate reports in the same format as existing scan tasks """ # Create scan data with all supported services scan_data = { "metadata": { "account_id": "123456789012", "scan_timestamp": "2024-01-15T10:30:00Z", "regions_scanned": ["us-east-1"], "services_scanned": list(SERVICE_CONFIG.keys()), }, "resources": {}, "errors": [] } # Add a sample resource for each service for service_key in SERVICE_CONFIG.keys(): scan_data["resources"][service_key] = [ { "account_id": "123456789012", "region": "us-east-1", "service": service_key, "resource_type": "TestResource", "resource_id": f"{service_key}-123", "name": f"test-{service_key}", "attributes": {} } ] # Validate and convert is_valid, errors = processor.validate_scan_data(scan_data) assert is_valid, f"Validation failed: {errors}" result = processor.convert_to_scan_result(scan_data) # Verify all services were converted for service_key in SERVICE_CONFIG.keys(): assert service_key in result.resources, f"Service {service_key} not in converted results" assert len(result.resources[service_key]) == 1 class TestReportGeneration: """Tests for actual report generation from uploaded data""" def test_report_generator_accepts_converted_data( self, app, processor, sample_cloudshell_scan_data, project_metadata ): """ Test that ReportGenerator can process converted CloudShell data. Requirements: - 5.1: Generate reports in the same format as existing scan tasks """ with app.app_context(): # Convert CloudShell data result = processor.convert_to_scan_result(sample_cloudshell_scan_data) # Convert to dict format for report generator scan_results = {} for service_key, resources in result.resources.items(): scan_results[service_key] = [] for resource in resources: if hasattr(resource, 'to_dict'): scan_results[service_key].append(resource.to_dict()) else: scan_results[service_key].append(resource) # Create report generator generator = ReportGenerator() # Test that filter_empty_services works filtered = generator.filter_empty_services(scan_results) assert len(filtered) > 0 # Verify services with resources are included assert 'vpc' in filtered assert 'ec2' in filtered def test_report_generation_with_upload_data( self, app, processor, sample_cloudshell_scan_data, project_metadata ): """ Test complete report generation from uploaded CloudShell data. Requirements: - 5.1: Generate reports in the same format as existing scan tasks - 5.4: Allow users to download generated reports """ with app.app_context(): # Convert CloudShell data result = processor.convert_to_scan_result(sample_cloudshell_scan_data) # Convert to dict format scan_results = {} for service_key, resources in result.resources.items(): scan_results[service_key] = [] for resource in resources: if hasattr(resource, 'to_dict'): scan_results[service_key].append(resource.to_dict()) else: scan_results[service_key].append(resource) # Check if template exists generator = ReportGenerator() try: template_path = generator._get_default_template_path() template_exists = os.path.exists(template_path) except FileNotFoundError: template_exists = False if not template_exists: pytest.skip("Report template not found, skipping report generation test") # Generate report to temp file with tempfile.NamedTemporaryFile(suffix='.docx', delete=False) as tmp: output_path = tmp.name try: regions = sample_cloudshell_scan_data['metadata']['regions_scanned'] report_result = generator.generate_report( scan_results=scan_results, project_metadata=project_metadata, output_path=output_path, regions=regions ) # Verify report was generated assert os.path.exists(output_path) assert report_result['file_size'] > 0 assert 'vpc' in report_result['services_included'] or len(report_result['services_included']) > 0 finally: # Cleanup if os.path.exists(output_path): os.remove(output_path) class TestErrorHandling: """Tests for error handling in report generation integration""" def test_empty_resources_handled_gracefully(self, processor): """Test that empty resources are handled gracefully""" scan_data = { "metadata": { "account_id": "123456789012", "scan_timestamp": "2024-01-15T10:30:00Z", "regions_scanned": ["us-east-1"], "services_scanned": ["vpc"], }, "resources": {}, "errors": [] } is_valid, errors = processor.validate_scan_data(scan_data) assert is_valid result = processor.convert_to_scan_result(scan_data) assert result.success is True assert len(result.resources) == 0 def test_scan_errors_preserved_in_conversion(self, processor): """Test that scan errors from CloudShell are preserved""" scan_data = { "metadata": { "account_id": "123456789012", "scan_timestamp": "2024-01-15T10:30:00Z", "regions_scanned": ["us-east-1"], "services_scanned": ["vpc", "ec2"], }, "resources": { "vpc": [ { "account_id": "123456789012", "region": "us-east-1", "service": "vpc", "resource_type": "VPC", "resource_id": "vpc-123", "name": "test-vpc", "attributes": {} } ] }, "errors": [ { "service": "ec2", "region": "us-east-1", "error": "Access Denied", "error_type": "AccessDeniedException" } ] } result = processor.convert_to_scan_result(scan_data) assert len(result.errors) == 1 assert result.errors[0]['service'] == 'ec2' assert result.errors[0]['error'] == 'Access Denied'