test_report_integration.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568
  1. """
  2. Integration Tests for Report Generation
  3. This module tests the integration between uploaded scan data and report generation,
  4. ensuring that reports generated from uploaded CloudShell scan data are consistent
  5. with reports generated from credential-based scanning.
  6. Requirements:
  7. - 5.1: Generate reports in the same format as existing scan tasks
  8. - 5.2: Use account_id from uploaded data as report identifier
  9. - 5.4: Allow users to download generated reports
  10. """
  11. import os
  12. import json
  13. import tempfile
  14. import pytest
  15. from datetime import datetime
  16. from unittest.mock import patch, MagicMock
  17. from app import create_app, db
  18. from app.services.scan_data_processor import ScanDataProcessor
  19. from app.services.report_generator import ReportGenerator, SERVICE_CONFIG, SERVICE_ORDER
  20. @pytest.fixture
  21. def app():
  22. """Create application for testing"""
  23. app = create_app('testing')
  24. with app.app_context():
  25. db.create_all()
  26. yield app
  27. db.drop_all()
  28. @pytest.fixture
  29. def processor():
  30. """Create a ScanDataProcessor instance"""
  31. return ScanDataProcessor()
  32. @pytest.fixture
  33. def sample_cloudshell_scan_data():
  34. """
  35. Create sample CloudShell scan data matching the format produced by cloudshell_scanner.py.
  36. This fixture provides realistic scan data that would be generated by the CloudShell scanner.
  37. """
  38. return {
  39. "metadata": {
  40. "account_id": "123456789012",
  41. "scan_timestamp": "2024-01-15T10:30:00Z",
  42. "regions_scanned": ["us-east-1", "ap-northeast-1"],
  43. "services_scanned": ["vpc", "ec2", "s3", "rds"],
  44. "scanner_version": "1.0.0",
  45. "total_resources": 5,
  46. "total_errors": 0
  47. },
  48. "resources": {
  49. "vpc": [
  50. {
  51. "account_id": "123456789012",
  52. "region": "us-east-1",
  53. "service": "vpc",
  54. "resource_type": "VPC",
  55. "resource_id": "vpc-12345678",
  56. "name": "main-vpc",
  57. "attributes": {
  58. "cidr_block": "10.0.0.0/16",
  59. "state": "available",
  60. "is_default": False
  61. }
  62. }
  63. ],
  64. "ec2": [
  65. {
  66. "account_id": "123456789012",
  67. "region": "us-east-1",
  68. "service": "ec2",
  69. "resource_type": "Instance",
  70. "resource_id": "i-0123456789abcdef0",
  71. "name": "web-server-1",
  72. "attributes": {
  73. "instance_type": "t3.medium",
  74. "availability_zone": "us-east-1a",
  75. "ami_id": "ami-12345678",
  76. "public_ip": "54.123.45.67",
  77. "private_ip": "10.0.1.100",
  78. "vpc_id": "vpc-12345678",
  79. "subnet_id": "subnet-12345678",
  80. "key_name": "my-key",
  81. "security_groups": ["sg-12345678"],
  82. "ebs_type": "gp3",
  83. "ebs_size": 100,
  84. "encryption": True
  85. }
  86. }
  87. ],
  88. "s3": [
  89. {
  90. "account_id": "123456789012",
  91. "region": "us-east-1",
  92. "service": "s3",
  93. "resource_type": "Bucket",
  94. "resource_id": "my-bucket-123",
  95. "name": "my-bucket-123",
  96. "attributes": {
  97. "creation_date": "2024-01-01T00:00:00Z"
  98. }
  99. }
  100. ],
  101. "rds": [
  102. {
  103. "account_id": "123456789012",
  104. "region": "us-east-1",
  105. "service": "rds",
  106. "resource_type": "DBInstance",
  107. "resource_id": "mydb",
  108. "name": "mydb",
  109. "attributes": {
  110. "endpoint": "mydb.abc123.us-east-1.rds.amazonaws.com",
  111. "db_name": "production",
  112. "master_username": "admin",
  113. "port": 3306,
  114. "engine": "mysql",
  115. "engine_version": "8.0.35",
  116. "instance_class": "db.t3.medium",
  117. "storage_type": "gp3",
  118. "allocated_storage": 100,
  119. "multi_az": True,
  120. "security_groups": ["sg-db123456"],
  121. "deletion_protection": True,
  122. "performance_insights_enabled": True,
  123. "cloudwatch_logs": ["error", "general", "slowquery"]
  124. }
  125. }
  126. ]
  127. },
  128. "errors": []
  129. }
  130. @pytest.fixture
  131. def sample_credential_scan_results():
  132. """
  133. Create sample scan results in the format produced by credential-based scanning.
  134. This fixture provides data in the same format that would be produced by
  135. the AWSScanner when using credentials.
  136. """
  137. return {
  138. "vpc": [
  139. {
  140. "account_id": "123456789012",
  141. "region": "us-east-1",
  142. "service": "vpc",
  143. "resource_type": "VPC",
  144. "resource_id": "vpc-12345678",
  145. "name": "main-vpc",
  146. "attributes": {
  147. "cidr_block": "10.0.0.0/16",
  148. "state": "available",
  149. "is_default": False
  150. }
  151. }
  152. ],
  153. "ec2": [
  154. {
  155. "account_id": "123456789012",
  156. "region": "us-east-1",
  157. "service": "ec2",
  158. "resource_type": "Instance",
  159. "resource_id": "i-0123456789abcdef0",
  160. "name": "web-server-1",
  161. "attributes": {
  162. "instance_type": "t3.medium",
  163. "availability_zone": "us-east-1a",
  164. "ami_id": "ami-12345678",
  165. "public_ip": "54.123.45.67",
  166. "private_ip": "10.0.1.100",
  167. "vpc_id": "vpc-12345678",
  168. "subnet_id": "subnet-12345678",
  169. "key_name": "my-key",
  170. "security_groups": ["sg-12345678"],
  171. "ebs_type": "gp3",
  172. "ebs_size": 100,
  173. "encryption": True
  174. }
  175. }
  176. ],
  177. "s3": [
  178. {
  179. "account_id": "123456789012",
  180. "region": "us-east-1",
  181. "service": "s3",
  182. "resource_type": "Bucket",
  183. "resource_id": "my-bucket-123",
  184. "name": "my-bucket-123",
  185. "attributes": {
  186. "creation_date": "2024-01-01T00:00:00Z"
  187. }
  188. }
  189. ],
  190. "rds": [
  191. {
  192. "account_id": "123456789012",
  193. "region": "us-east-1",
  194. "service": "rds",
  195. "resource_type": "DBInstance",
  196. "resource_id": "mydb",
  197. "name": "mydb",
  198. "attributes": {
  199. "endpoint": "mydb.abc123.us-east-1.rds.amazonaws.com",
  200. "db_name": "production",
  201. "master_username": "admin",
  202. "port": 3306,
  203. "engine": "mysql",
  204. "engine_version": "8.0.35",
  205. "instance_class": "db.t3.medium",
  206. "storage_type": "gp3",
  207. "allocated_storage": 100,
  208. "multi_az": True,
  209. "security_groups": ["sg-db123456"],
  210. "deletion_protection": True,
  211. "performance_insights_enabled": True,
  212. "cloudwatch_logs": ["error", "general", "slowquery"]
  213. }
  214. }
  215. ]
  216. }
  217. @pytest.fixture
  218. def project_metadata():
  219. """Create sample project metadata for report generation"""
  220. return {
  221. "clientName": "TestClient",
  222. "projectName": "TestProject",
  223. "bdManager": "BD Manager",
  224. "bdManagerEmail": "bd@example.com",
  225. "solutionsArchitect": "SA Name",
  226. "solutionsArchitectEmail": "sa@example.com",
  227. "cloudEngineer": "CE Name",
  228. "cloudEngineerEmail": "ce@example.com"
  229. }
  230. class TestScanDataConversion:
  231. """Tests for converting CloudShell scan data to report-compatible format"""
  232. def test_convert_cloudshell_data_to_scan_result(self, processor, sample_cloudshell_scan_data):
  233. """
  234. Test that CloudShell scan data is correctly converted to ScanResult format.
  235. Requirements:
  236. - 5.1: Generate reports in the same format as existing scan tasks
  237. """
  238. result = processor.convert_to_scan_result(sample_cloudshell_scan_data)
  239. # Verify ScanResult structure
  240. assert result.success is True
  241. assert 'vpc' in result.resources
  242. assert 'ec2' in result.resources
  243. assert 's3' in result.resources
  244. assert 'rds' in result.resources
  245. # Verify resource count
  246. assert len(result.resources['vpc']) == 1
  247. assert len(result.resources['ec2']) == 1
  248. assert len(result.resources['s3']) == 1
  249. assert len(result.resources['rds']) == 1
  250. def test_converted_data_has_required_fields(self, processor, sample_cloudshell_scan_data):
  251. """
  252. Test that converted resources have all required fields for report generation.
  253. Requirements:
  254. - 5.1: Generate reports in the same format as existing scan tasks
  255. """
  256. result = processor.convert_to_scan_result(sample_cloudshell_scan_data)
  257. # Check VPC resource
  258. vpc_resource = result.resources['vpc'][0]
  259. assert hasattr(vpc_resource, 'account_id')
  260. assert hasattr(vpc_resource, 'region')
  261. assert hasattr(vpc_resource, 'service')
  262. assert hasattr(vpc_resource, 'resource_type')
  263. assert hasattr(vpc_resource, 'resource_id')
  264. assert hasattr(vpc_resource, 'name')
  265. assert hasattr(vpc_resource, 'attributes')
  266. # Verify values
  267. assert vpc_resource.account_id == "123456789012"
  268. assert vpc_resource.region == "us-east-1"
  269. assert vpc_resource.name == "main-vpc"
  270. def test_metadata_preserved_in_conversion(self, processor, sample_cloudshell_scan_data):
  271. """
  272. Test that metadata from CloudShell scan is preserved in conversion.
  273. Requirements:
  274. - 5.2: Use account_id from uploaded data as report identifier
  275. """
  276. result = processor.convert_to_scan_result(sample_cloudshell_scan_data)
  277. assert result.metadata['account_id'] == "123456789012"
  278. assert result.metadata['regions_scanned'] == ["us-east-1", "ap-northeast-1"]
  279. assert result.metadata['source'] == 'upload'
  280. class TestReportFormatConsistency:
  281. """Tests for verifying report format consistency between upload and credential scan"""
  282. def test_upload_data_produces_same_resource_structure(
  283. self,
  284. processor,
  285. sample_cloudshell_scan_data,
  286. sample_credential_scan_results
  287. ):
  288. """
  289. Test that uploaded data produces the same resource structure as credential scan.
  290. Requirements:
  291. - 5.1: Generate reports in the same format as existing scan tasks
  292. """
  293. # Convert CloudShell data
  294. result = processor.convert_to_scan_result(sample_cloudshell_scan_data)
  295. # Convert to dict format (same as credential scan results)
  296. converted_results = {}
  297. for service_key, resources in result.resources.items():
  298. converted_results[service_key] = []
  299. for resource in resources:
  300. if hasattr(resource, 'to_dict'):
  301. converted_results[service_key].append(resource.to_dict())
  302. else:
  303. converted_results[service_key].append(resource)
  304. # Verify same services are present
  305. assert set(converted_results.keys()) == set(sample_credential_scan_results.keys())
  306. # Verify resource structure matches
  307. for service_key in converted_results:
  308. upload_resources = converted_results[service_key]
  309. cred_resources = sample_credential_scan_results[service_key]
  310. assert len(upload_resources) == len(cred_resources)
  311. for upload_res, cred_res in zip(upload_resources, cred_resources):
  312. # Verify all required fields are present
  313. assert 'account_id' in upload_res
  314. assert 'region' in upload_res
  315. assert 'service' in upload_res
  316. assert 'resource_type' in upload_res
  317. assert 'resource_id' in upload_res
  318. assert 'name' in upload_res
  319. assert 'attributes' in upload_res
  320. def test_all_supported_services_can_be_processed(self, processor):
  321. """
  322. Test that all services in SERVICE_CONFIG can be processed from uploaded data.
  323. Requirements:
  324. - 5.1: Generate reports in the same format as existing scan tasks
  325. """
  326. # Create scan data with all supported services
  327. scan_data = {
  328. "metadata": {
  329. "account_id": "123456789012",
  330. "scan_timestamp": "2024-01-15T10:30:00Z",
  331. "regions_scanned": ["us-east-1"],
  332. "services_scanned": list(SERVICE_CONFIG.keys()),
  333. },
  334. "resources": {},
  335. "errors": []
  336. }
  337. # Add a sample resource for each service
  338. for service_key in SERVICE_CONFIG.keys():
  339. scan_data["resources"][service_key] = [
  340. {
  341. "account_id": "123456789012",
  342. "region": "us-east-1",
  343. "service": service_key,
  344. "resource_type": "TestResource",
  345. "resource_id": f"{service_key}-123",
  346. "name": f"test-{service_key}",
  347. "attributes": {}
  348. }
  349. ]
  350. # Validate and convert
  351. is_valid, errors = processor.validate_scan_data(scan_data)
  352. assert is_valid, f"Validation failed: {errors}"
  353. result = processor.convert_to_scan_result(scan_data)
  354. # Verify all services were converted
  355. for service_key in SERVICE_CONFIG.keys():
  356. assert service_key in result.resources, f"Service {service_key} not in converted results"
  357. assert len(result.resources[service_key]) == 1
  358. class TestReportGeneration:
  359. """Tests for actual report generation from uploaded data"""
  360. def test_report_generator_accepts_converted_data(
  361. self,
  362. app,
  363. processor,
  364. sample_cloudshell_scan_data,
  365. project_metadata
  366. ):
  367. """
  368. Test that ReportGenerator can process converted CloudShell data.
  369. Requirements:
  370. - 5.1: Generate reports in the same format as existing scan tasks
  371. """
  372. with app.app_context():
  373. # Convert CloudShell data
  374. result = processor.convert_to_scan_result(sample_cloudshell_scan_data)
  375. # Convert to dict format for report generator
  376. scan_results = {}
  377. for service_key, resources in result.resources.items():
  378. scan_results[service_key] = []
  379. for resource in resources:
  380. if hasattr(resource, 'to_dict'):
  381. scan_results[service_key].append(resource.to_dict())
  382. else:
  383. scan_results[service_key].append(resource)
  384. # Create report generator
  385. generator = ReportGenerator()
  386. # Test that filter_empty_services works
  387. filtered = generator.filter_empty_services(scan_results)
  388. assert len(filtered) > 0
  389. # Verify services with resources are included
  390. assert 'vpc' in filtered
  391. assert 'ec2' in filtered
  392. def test_report_generation_with_upload_data(
  393. self,
  394. app,
  395. processor,
  396. sample_cloudshell_scan_data,
  397. project_metadata
  398. ):
  399. """
  400. Test complete report generation from uploaded CloudShell data.
  401. Requirements:
  402. - 5.1: Generate reports in the same format as existing scan tasks
  403. - 5.4: Allow users to download generated reports
  404. """
  405. with app.app_context():
  406. # Convert CloudShell data
  407. result = processor.convert_to_scan_result(sample_cloudshell_scan_data)
  408. # Convert to dict format
  409. scan_results = {}
  410. for service_key, resources in result.resources.items():
  411. scan_results[service_key] = []
  412. for resource in resources:
  413. if hasattr(resource, 'to_dict'):
  414. scan_results[service_key].append(resource.to_dict())
  415. else:
  416. scan_results[service_key].append(resource)
  417. # Check if template exists
  418. generator = ReportGenerator()
  419. try:
  420. template_path = generator._get_default_template_path()
  421. template_exists = os.path.exists(template_path)
  422. except FileNotFoundError:
  423. template_exists = False
  424. if not template_exists:
  425. pytest.skip("Report template not found, skipping report generation test")
  426. # Generate report to temp file
  427. with tempfile.NamedTemporaryFile(suffix='.docx', delete=False) as tmp:
  428. output_path = tmp.name
  429. try:
  430. regions = sample_cloudshell_scan_data['metadata']['regions_scanned']
  431. report_result = generator.generate_report(
  432. scan_results=scan_results,
  433. project_metadata=project_metadata,
  434. output_path=output_path,
  435. regions=regions
  436. )
  437. # Verify report was generated
  438. assert os.path.exists(output_path)
  439. assert report_result['file_size'] > 0
  440. assert 'vpc' in report_result['services_included'] or len(report_result['services_included']) > 0
  441. finally:
  442. # Cleanup
  443. if os.path.exists(output_path):
  444. os.remove(output_path)
  445. class TestErrorHandling:
  446. """Tests for error handling in report generation integration"""
  447. def test_empty_resources_handled_gracefully(self, processor):
  448. """Test that empty resources are handled gracefully"""
  449. scan_data = {
  450. "metadata": {
  451. "account_id": "123456789012",
  452. "scan_timestamp": "2024-01-15T10:30:00Z",
  453. "regions_scanned": ["us-east-1"],
  454. "services_scanned": ["vpc"],
  455. },
  456. "resources": {},
  457. "errors": []
  458. }
  459. is_valid, errors = processor.validate_scan_data(scan_data)
  460. assert is_valid
  461. result = processor.convert_to_scan_result(scan_data)
  462. assert result.success is True
  463. assert len(result.resources) == 0
  464. def test_scan_errors_preserved_in_conversion(self, processor):
  465. """Test that scan errors from CloudShell are preserved"""
  466. scan_data = {
  467. "metadata": {
  468. "account_id": "123456789012",
  469. "scan_timestamp": "2024-01-15T10:30:00Z",
  470. "regions_scanned": ["us-east-1"],
  471. "services_scanned": ["vpc", "ec2"],
  472. },
  473. "resources": {
  474. "vpc": [
  475. {
  476. "account_id": "123456789012",
  477. "region": "us-east-1",
  478. "service": "vpc",
  479. "resource_type": "VPC",
  480. "resource_id": "vpc-123",
  481. "name": "test-vpc",
  482. "attributes": {}
  483. }
  484. ]
  485. },
  486. "errors": [
  487. {
  488. "service": "ec2",
  489. "region": "us-east-1",
  490. "error": "Access Denied",
  491. "error_type": "AccessDeniedException"
  492. }
  493. ]
  494. }
  495. result = processor.convert_to_scan_result(scan_data)
  496. assert len(result.errors) == 1
  497. assert result.errors[0]['service'] == 'ec2'
  498. assert result.errors[0]['error'] == 'Access Denied'