|
@@ -0,0 +1,1941 @@
|
|
|
|
|
+"""
|
|
|
|
|
+Report Generator Service
|
|
|
|
|
+
|
|
|
|
|
+This module handles Word document generation from AWS scan results.
|
|
|
|
|
+It loads templates, replaces placeholders, generates tables, and produces
|
|
|
|
|
+the final report document.
|
|
|
|
|
+"""
|
|
|
|
|
+
|
|
|
|
|
+import os
|
|
|
|
|
+import re
|
|
|
|
|
+import copy
|
|
|
|
|
+from datetime import datetime
|
|
|
|
|
+from typing import Dict, List, Any, Optional, Tuple
|
|
|
|
|
+from docx import Document
|
|
|
|
|
+from docx.shared import Inches, Pt, Cm
|
|
|
|
|
+from docx.enum.text import WD_ALIGN_PARAGRAPH
|
|
|
|
|
+from docx.enum.table import WD_TABLE_ALIGNMENT
|
|
|
|
|
+from docx.oxml.ns import qn
|
|
|
|
|
+from docx.oxml import OxmlElement
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class TableLayout:
|
|
|
|
|
+ """Table layout types for different services"""
|
|
|
|
|
+ HORIZONTAL = 'horizontal' # Column headers at top, multiple rows
|
|
|
|
|
+ VERTICAL = 'vertical' # Attribute names in left column, values in right
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+# Service configuration matching the design document
|
|
|
|
|
+SERVICE_CONFIG = {
|
|
|
|
|
+ # ===== VPC Related Resources =====
|
|
|
|
|
+ 'vpc': {
|
|
|
|
|
+ 'layout': TableLayout.HORIZONTAL,
|
|
|
|
|
+ 'title': 'VPC',
|
|
|
|
|
+ 'columns': ['Region', 'Name', 'ID', 'CIDR'],
|
|
|
|
|
+ },
|
|
|
|
|
+ 'subnet': {
|
|
|
|
|
+ 'layout': TableLayout.HORIZONTAL,
|
|
|
|
|
+ 'title': 'Subnet',
|
|
|
|
|
+ 'columns': ['Name', 'ID', 'AZ', 'CIDR'],
|
|
|
|
|
+ },
|
|
|
|
|
+ 'route_table': {
|
|
|
|
|
+ 'layout': TableLayout.HORIZONTAL,
|
|
|
|
|
+ 'title': 'Route Table',
|
|
|
|
|
+ 'columns': ['Name', 'ID', 'Subnet Associations'],
|
|
|
|
|
+ },
|
|
|
|
|
+ 'internet_gateway': {
|
|
|
|
|
+ 'layout': TableLayout.HORIZONTAL,
|
|
|
|
|
+ 'title': 'Internet Gateway',
|
|
|
|
|
+ 'columns': ['Name', 'ID'],
|
|
|
|
|
+ },
|
|
|
|
|
+ 'nat_gateway': {
|
|
|
|
|
+ 'layout': TableLayout.HORIZONTAL,
|
|
|
|
|
+ 'title': 'NAT Gateway',
|
|
|
|
|
+ 'columns': ['Name', 'ID', 'Public IP', 'Private IP'],
|
|
|
|
|
+ },
|
|
|
|
|
+ 'security_group': {
|
|
|
|
|
+ 'layout': TableLayout.HORIZONTAL,
|
|
|
|
|
+ 'title': 'Security Group',
|
|
|
|
|
+ 'columns': ['Name', 'ID', 'Protocol', 'Port range', 'Source'],
|
|
|
|
|
+ },
|
|
|
|
|
+ 'vpc_endpoint': {
|
|
|
|
|
+ 'layout': TableLayout.HORIZONTAL,
|
|
|
|
|
+ 'title': 'Endpoint',
|
|
|
|
|
+ 'columns': ['Name', 'ID', 'VPC', 'Service Name', 'Type'],
|
|
|
|
|
+ },
|
|
|
|
|
+ 'vpc_peering': {
|
|
|
|
|
+ 'layout': TableLayout.HORIZONTAL,
|
|
|
|
|
+ 'title': 'VPC Peering',
|
|
|
|
|
+ 'columns': ['Name', 'Peering Connection ID', 'Requester VPC', 'Accepter VPC'],
|
|
|
|
|
+ },
|
|
|
|
|
+ 'customer_gateway': {
|
|
|
|
|
+ 'layout': TableLayout.HORIZONTAL,
|
|
|
|
|
+ 'title': 'Customer Gateway',
|
|
|
|
|
+ 'columns': ['Name', 'Customer Gateway ID', 'IP Address'],
|
|
|
|
|
+ },
|
|
|
|
|
+ 'virtual_private_gateway': {
|
|
|
|
|
+ 'layout': TableLayout.HORIZONTAL,
|
|
|
|
|
+ 'title': 'Virtual Private Gateway',
|
|
|
|
|
+ 'columns': ['Name', 'Virtual Private Gateway ID', 'VPC'],
|
|
|
|
|
+ },
|
|
|
|
|
+ 'vpn_connection': {
|
|
|
|
|
+ 'layout': TableLayout.HORIZONTAL,
|
|
|
|
|
+ 'title': 'VPN Connection',
|
|
|
|
|
+ 'columns': ['Name', 'VPN ID', 'Routes'],
|
|
|
|
|
+ },
|
|
|
|
|
+
|
|
|
|
|
+ # ===== EC2 Related Resources =====
|
|
|
|
|
+ 'ec2': {
|
|
|
|
|
+ 'layout': TableLayout.VERTICAL,
|
|
|
|
|
+ 'title': 'Instance',
|
|
|
|
|
+ 'columns': ['Name', 'Instance ID', 'Instance Type', 'AZ', 'AMI',
|
|
|
|
|
+ 'Public IP', 'Public DNS', 'Private IP', 'VPC ID', 'Subnet ID',
|
|
|
|
|
+ 'Key', 'Security Groups', 'EBS Type', 'EBS Size', 'Encryption',
|
|
|
|
|
+ 'Other Requirement'],
|
|
|
|
|
+ },
|
|
|
|
|
+ 'elastic_ip': {
|
|
|
|
|
+ 'layout': TableLayout.HORIZONTAL,
|
|
|
|
|
+ 'title': 'Elastic IP',
|
|
|
|
|
+ 'columns': ['Name', 'Elastic IP'],
|
|
|
|
|
+ },
|
|
|
|
|
+
|
|
|
|
|
+ # ===== Auto Scaling =====
|
|
|
|
|
+ 'autoscaling': {
|
|
|
|
|
+ 'layout': TableLayout.VERTICAL,
|
|
|
|
|
+ 'title': 'Auto Scaling Group',
|
|
|
|
|
+ 'columns': ['Name', 'Launch Template', 'AMI', 'Instance type',
|
|
|
|
|
+ 'Key', 'Target Groups', 'Desired', 'Min', 'Max',
|
|
|
|
|
+ 'Scaling Policy'],
|
|
|
|
|
+ },
|
|
|
|
|
+
|
|
|
|
|
+ # ===== ELB Related Resources =====
|
|
|
|
|
+ 'elb': {
|
|
|
|
|
+ 'layout': TableLayout.VERTICAL,
|
|
|
|
|
+ 'title': 'Load Balancer',
|
|
|
|
|
+ 'columns': ['Name', 'Type', 'DNS', 'Scheme', 'VPC',
|
|
|
|
|
+ 'Availability Zones', 'Subnet', 'Security Groups'],
|
|
|
|
|
+ },
|
|
|
|
|
+ 'target_group': {
|
|
|
|
|
+ 'layout': TableLayout.VERTICAL,
|
|
|
|
|
+ 'title': 'Target Group',
|
|
|
|
|
+ 'columns': ['Load Balancer', 'TG Name', 'Port', 'Protocol',
|
|
|
|
|
+ 'Registered Instances', 'Health Check Path'],
|
|
|
|
|
+ },
|
|
|
|
|
+
|
|
|
|
|
+ # ===== RDS =====
|
|
|
|
|
+ 'rds': {
|
|
|
|
|
+ 'layout': TableLayout.VERTICAL,
|
|
|
|
|
+ 'title': 'DB Instance',
|
|
|
|
|
+ 'columns': ['Region', 'Endpoint', 'DB instance ID', 'DB name',
|
|
|
|
|
+ 'Master Username', 'Port', 'DB Engine', 'DB Version',
|
|
|
|
|
+ 'Instance Type', 'Storage type', 'Storage', 'Multi-AZ',
|
|
|
|
|
+ 'Security Group', 'Deletion Protection',
|
|
|
|
|
+ 'Performance Insights Enabled', 'CloudWatch Logs'],
|
|
|
|
|
+ },
|
|
|
|
|
+
|
|
|
|
|
+ # ===== ElastiCache =====
|
|
|
|
|
+ 'elasticache': {
|
|
|
|
|
+ 'layout': TableLayout.VERTICAL,
|
|
|
|
|
+ 'title': 'Cache Cluster',
|
|
|
|
|
+ 'columns': ['Cluster ID', 'Engine', 'Engine Version', 'Node Type',
|
|
|
|
|
+ 'Num Nodes', 'Status'],
|
|
|
|
|
+ },
|
|
|
|
|
+
|
|
|
|
|
+ # ===== EKS =====
|
|
|
|
|
+ 'eks': {
|
|
|
|
|
+ 'layout': TableLayout.VERTICAL,
|
|
|
|
|
+ 'title': 'Cluster',
|
|
|
|
|
+ 'columns': ['Cluster Name', 'Version', 'Status', 'Endpoint', 'VPC ID'],
|
|
|
|
|
+ },
|
|
|
|
|
+
|
|
|
|
|
+ # ===== Lambda =====
|
|
|
|
|
+ 'lambda': {
|
|
|
|
|
+ 'layout': TableLayout.HORIZONTAL,
|
|
|
|
|
+ 'title': 'Function',
|
|
|
|
|
+ 'columns': ['Function Name', 'Runtime', 'Memory (MB)', 'Timeout (s)', 'Last Modified'],
|
|
|
|
|
+ },
|
|
|
|
|
+
|
|
|
|
|
+ # ===== S3 =====
|
|
|
|
|
+ 's3': {
|
|
|
|
|
+ 'layout': TableLayout.HORIZONTAL,
|
|
|
|
|
+ 'title': 'Bucket',
|
|
|
|
|
+ 'columns': ['Region', 'Bucket Name'],
|
|
|
|
|
+ },
|
|
|
|
|
+ 's3_event_notification': {
|
|
|
|
|
+ 'layout': TableLayout.VERTICAL,
|
|
|
|
|
+ 'title': 'S3 event notification',
|
|
|
|
|
+ 'columns': ['Bucket', 'Name', 'Event Type', 'Destination type', 'Destination'],
|
|
|
|
|
+ },
|
|
|
|
|
+
|
|
|
|
|
+ # ===== CloudFront (Global) =====
|
|
|
|
|
+ 'cloudfront': {
|
|
|
|
|
+ 'layout': TableLayout.VERTICAL,
|
|
|
|
|
+ 'title': 'Distribution',
|
|
|
|
|
+ 'columns': ['CloudFront ID', 'Domain Name', 'CNAME',
|
|
|
|
|
+ 'Origin Domain Name', 'Origin Protocol Policy',
|
|
|
|
|
+ 'Viewer Protocol Policy', 'Allowed HTTP Methods',
|
|
|
|
|
+ 'Cached HTTP Methods'],
|
|
|
|
|
+ },
|
|
|
|
|
+
|
|
|
|
|
+ # ===== Route 53 (Global) =====
|
|
|
|
|
+ 'route53': {
|
|
|
|
|
+ 'layout': TableLayout.HORIZONTAL,
|
|
|
|
|
+ 'title': 'Hosted Zone',
|
|
|
|
|
+ 'columns': ['Zone ID', 'Name', 'Type', 'Record Count'],
|
|
|
|
|
+ },
|
|
|
|
|
+
|
|
|
|
|
+ # ===== ACM (Global) =====
|
|
|
|
|
+ 'acm': {
|
|
|
|
|
+ 'layout': TableLayout.HORIZONTAL,
|
|
|
|
|
+ 'title': 'ACM',
|
|
|
|
|
+ 'columns': ['Domain name', 'Additional names'],
|
|
|
|
|
+ },
|
|
|
|
|
+
|
|
|
|
|
+ # ===== WAF (Global) =====
|
|
|
|
|
+ 'waf': {
|
|
|
|
|
+ 'layout': TableLayout.HORIZONTAL,
|
|
|
|
|
+ 'title': 'Web ACL',
|
|
|
|
|
+ 'columns': ['WebACL Name', 'Scope', 'Rules Count', 'Associated Resources'],
|
|
|
|
|
+ },
|
|
|
|
|
+
|
|
|
|
|
+ # ===== SNS =====
|
|
|
|
|
+ 'sns': {
|
|
|
|
|
+ 'layout': TableLayout.HORIZONTAL,
|
|
|
|
|
+ 'title': 'Topic',
|
|
|
|
|
+ 'columns': ['Topic Name', 'Topic Display Name', 'Subscription Protocol',
|
|
|
|
|
+ 'Subscription Endpoint'],
|
|
|
|
|
+ },
|
|
|
|
|
+
|
|
|
|
|
+ # ===== CloudWatch =====
|
|
|
|
|
+ 'cloudwatch': {
|
|
|
|
|
+ 'layout': TableLayout.HORIZONTAL,
|
|
|
|
|
+ 'title': 'Log Group',
|
|
|
|
|
+ 'columns': ['Log Group Name', 'Retention Days', 'Stored Bytes', 'KMS Encryption'],
|
|
|
|
|
+ },
|
|
|
|
|
+
|
|
|
|
|
+ # ===== EventBridge =====
|
|
|
|
|
+ 'eventbridge': {
|
|
|
|
|
+ 'layout': TableLayout.HORIZONTAL,
|
|
|
|
|
+ 'title': 'Rule',
|
|
|
|
|
+ 'columns': ['Name', 'Description', 'Event Bus', 'State'],
|
|
|
|
|
+ },
|
|
|
|
|
+
|
|
|
|
|
+ # ===== CloudTrail =====
|
|
|
|
|
+ 'cloudtrail': {
|
|
|
|
|
+ 'layout': TableLayout.HORIZONTAL,
|
|
|
|
|
+ 'title': 'Trail',
|
|
|
|
|
+ 'columns': ['Name', 'Multi-Region Trail', 'Log File Validation', 'KMS Encryption'],
|
|
|
|
|
+ },
|
|
|
|
|
+
|
|
|
|
|
+ # ===== Config =====
|
|
|
|
|
+ 'config': {
|
|
|
|
|
+ 'layout': TableLayout.HORIZONTAL,
|
|
|
|
|
+ 'title': 'Config',
|
|
|
|
|
+ 'columns': ['Name', 'Regional Resources', 'Global Resources', 'Retention period'],
|
|
|
|
|
+ },
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+# Service display order for the report
|
|
|
|
|
+SERVICE_ORDER = [
|
|
|
|
|
+ 'vpc', 'subnet', 'route_table', 'internet_gateway', 'nat_gateway',
|
|
|
|
|
+ 'security_group', 'vpc_endpoint', 'vpc_peering',
|
|
|
|
|
+ 'customer_gateway', 'virtual_private_gateway', 'vpn_connection',
|
|
|
|
|
+ 'ec2', 'elastic_ip', 'autoscaling',
|
|
|
|
|
+ 'elb', 'target_group',
|
|
|
|
|
+ 'rds', 'elasticache', 'eks',
|
|
|
|
|
+ 'lambda', 's3', 's3_event_notification',
|
|
|
|
|
+ 'cloudfront', 'route53', 'acm', 'waf',
|
|
|
|
|
+ 'sns', 'cloudwatch', 'eventbridge', 'cloudtrail', 'config'
|
|
|
|
|
+]
|
|
|
|
|
+
|
|
|
|
|
+# Global services (not region-specific, should not be duplicated per region)
|
|
|
|
|
+GLOBAL_SERVICES = ['cloudfront', 'route53', 'waf', 's3', 's3_event_notification', 'cloudtrail']
|
|
|
|
|
+
|
|
|
|
|
+# Service grouping for Heading 2 titles
|
|
|
|
|
+# Maps service keys to their parent service group for the heading
|
|
|
|
|
+SERVICE_GROUPS = {
|
|
|
|
|
+ # VPC group - all VPC related resources under "VPC" heading
|
|
|
|
|
+ 'vpc': 'VPC',
|
|
|
|
|
+ 'subnet': 'VPC',
|
|
|
|
|
+ 'route_table': 'VPC',
|
|
|
|
|
+ 'internet_gateway': 'VPC',
|
|
|
|
|
+ 'nat_gateway': 'VPC',
|
|
|
|
|
+ 'security_group': 'VPC',
|
|
|
|
|
+ 'vpc_endpoint': 'VPC',
|
|
|
|
|
+ 'vpc_peering': 'VPC',
|
|
|
|
|
+ 'customer_gateway': 'VPC',
|
|
|
|
|
+ 'virtual_private_gateway': 'VPC',
|
|
|
|
|
+ 'vpn_connection': 'VPC',
|
|
|
|
|
+
|
|
|
|
|
+ # EC2 group
|
|
|
|
|
+ 'ec2': 'EC2',
|
|
|
|
|
+ 'elastic_ip': 'EC2',
|
|
|
|
|
+
|
|
|
|
|
+ # Auto Scaling
|
|
|
|
|
+ 'autoscaling': 'AutoScaling',
|
|
|
|
|
+
|
|
|
|
|
+ # ELB group - Load Balancer and Target Group under "ELB" heading
|
|
|
|
|
+ 'elb': 'ELB',
|
|
|
|
|
+ 'target_group': 'ELB',
|
|
|
|
|
+
|
|
|
|
|
+ # Database services - use service name as heading
|
|
|
|
|
+ 'rds': 'RDS',
|
|
|
|
|
+ 'elasticache': 'Elasticache',
|
|
|
|
|
+ 'eks': 'EKS',
|
|
|
|
|
+
|
|
|
|
|
+ # Lambda
|
|
|
|
|
+ 'lambda': 'Lambda',
|
|
|
|
|
+
|
|
|
|
|
+ # S3 group - Bucket and event notification under "S3" heading
|
|
|
|
|
+ 's3': 'S3',
|
|
|
|
|
+ 's3_event_notification': 'S3',
|
|
|
|
|
+
|
|
|
|
|
+ # Global services
|
|
|
|
|
+ 'cloudfront': 'CloudFront',
|
|
|
|
|
+ 'route53': 'Route53',
|
|
|
|
|
+ 'acm': 'ACM',
|
|
|
|
|
+ 'waf': 'WAF',
|
|
|
|
|
+
|
|
|
|
|
+ # Monitoring services
|
|
|
|
|
+ 'sns': 'SNS',
|
|
|
|
|
+ 'cloudwatch': 'CloudWatch',
|
|
|
|
|
+ 'eventbridge': 'EventBridge',
|
|
|
|
|
+ 'cloudtrail': 'CloudTrail',
|
|
|
|
|
+ 'config': 'Config',
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+# Order of service groups for the report (determines heading order)
|
|
|
|
|
+SERVICE_GROUP_ORDER = [
|
|
|
|
|
+ 'VPC', 'EC2', 'AutoScaling', 'ELB',
|
|
|
|
|
+ 'RDS', 'Elasticache', 'EKS', 'Lambda', 'S3',
|
|
|
|
|
+ 'CloudFront', 'Route53', 'ACM', 'WAF',
|
|
|
|
|
+ 'SNS', 'CloudWatch', 'EventBridge', 'CloudTrail', 'Config'
|
|
|
|
|
+]
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class ReportGenerator:
|
|
|
|
|
+ """
|
|
|
|
|
+ Generates Word reports from AWS scan results using templates.
|
|
|
|
|
+
|
|
|
|
|
+ This class handles:
|
|
|
|
|
+ - Loading Word templates from sample-reports folder
|
|
|
|
|
+ - Parsing and replacing placeholders
|
|
|
|
|
+ - Generating horizontal and vertical tables for different services
|
|
|
|
|
+ - Embedding network diagrams
|
|
|
|
|
+ - Updating table of contents
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
|
|
+ def __init__(self, template_path: str = None):
|
|
|
|
|
+ """
|
|
|
|
|
+ Initialize the report generator.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ template_path: Path to the Word template file. If None, uses default template.
|
|
|
|
|
+ """
|
|
|
|
|
+ self.template_path = template_path
|
|
|
|
|
+ self.document = None
|
|
|
|
|
+ self._placeholder_pattern = re.compile(r'\[([^\]]+)\]')
|
|
|
|
|
+
|
|
|
|
|
+ def load_template(self, template_path: str = None) -> Document:
|
|
|
|
|
+ """
|
|
|
|
|
+ Load a Word template file.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ template_path: Path to the template file
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ Loaded Document object
|
|
|
|
|
+
|
|
|
|
|
+ Raises:
|
|
|
|
|
+ FileNotFoundError: If template file doesn't exist
|
|
|
|
|
+ ValueError: If template file is invalid
|
|
|
|
|
+ """
|
|
|
|
|
+ path = template_path or self.template_path
|
|
|
|
|
+ if not path:
|
|
|
|
|
+ # Use default template
|
|
|
|
|
+ path = self._get_default_template_path()
|
|
|
|
|
+
|
|
|
|
|
+ if not os.path.exists(path):
|
|
|
|
|
+ raise FileNotFoundError(f"Template file not found: {path}")
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ self.document = Document(path)
|
|
|
|
|
+ return self.document
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ raise ValueError(f"Failed to load template: {str(e)}")
|
|
|
|
|
+
|
|
|
|
|
+ def _get_default_template_path(self) -> str:
|
|
|
|
|
+ """Get the default template path from sample-reports folder."""
|
|
|
|
|
+ # Look for the template with placeholders
|
|
|
|
|
+ base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
|
|
|
|
|
+ sample_reports_dir = os.path.join(base_dir, 'sample-reports')
|
|
|
|
|
+
|
|
|
|
|
+ # Prefer the template with [Client Name]-[Project Name] format
|
|
|
|
|
+ template_name = '[Client Name]-[Project Name]-Project-Report-v1.0.docx'
|
|
|
|
|
+ template_path = os.path.join(sample_reports_dir, template_name)
|
|
|
|
|
+
|
|
|
|
|
+ if os.path.exists(template_path):
|
|
|
|
|
+ return template_path
|
|
|
|
|
+
|
|
|
|
|
+ # Fall back to any .docx file in sample-reports
|
|
|
|
|
+ if os.path.exists(sample_reports_dir):
|
|
|
|
|
+ for file in os.listdir(sample_reports_dir):
|
|
|
|
|
+ if file.endswith('.docx'):
|
|
|
|
|
+ return os.path.join(sample_reports_dir, file)
|
|
|
|
|
+
|
|
|
|
|
+ raise FileNotFoundError("No template file found in sample-reports folder")
|
|
|
|
|
+
|
|
|
|
|
+ def find_placeholders(self) -> List[str]:
|
|
|
|
|
+ """
|
|
|
|
|
+ Find all placeholders in the document.
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ List of placeholder names (without brackets)
|
|
|
|
|
+ """
|
|
|
|
|
+ if not self.document:
|
|
|
|
|
+ raise ValueError("No document loaded. Call load_template() first.")
|
|
|
|
|
+
|
|
|
|
|
+ placeholders = set()
|
|
|
|
|
+
|
|
|
|
|
+ # Search in paragraphs
|
|
|
|
|
+ for paragraph in self.document.paragraphs:
|
|
|
|
|
+ matches = self._placeholder_pattern.findall(paragraph.text)
|
|
|
|
|
+ placeholders.update(matches)
|
|
|
|
|
+
|
|
|
|
|
+ # Search in tables
|
|
|
|
|
+ for table in self.document.tables:
|
|
|
|
|
+ for row in table.rows:
|
|
|
|
|
+ for cell in row.cells:
|
|
|
|
|
+ for paragraph in cell.paragraphs:
|
|
|
|
|
+ matches = self._placeholder_pattern.findall(paragraph.text)
|
|
|
|
|
+ placeholders.update(matches)
|
|
|
|
|
+
|
|
|
|
|
+ # Search in headers and footers
|
|
|
|
|
+ for section in self.document.sections:
|
|
|
|
|
+ for header in [section.header, section.first_page_header, section.even_page_header]:
|
|
|
|
|
+ if header:
|
|
|
|
|
+ for paragraph in header.paragraphs:
|
|
|
|
|
+ matches = self._placeholder_pattern.findall(paragraph.text)
|
|
|
|
|
+ placeholders.update(matches)
|
|
|
|
|
+
|
|
|
|
|
+ for footer in [section.footer, section.first_page_footer, section.even_page_footer]:
|
|
|
|
|
+ if footer:
|
|
|
|
|
+ for paragraph in footer.paragraphs:
|
|
|
|
|
+ matches = self._placeholder_pattern.findall(paragraph.text)
|
|
|
|
|
+ placeholders.update(matches)
|
|
|
|
|
+
|
|
|
|
|
+ return list(placeholders)
|
|
|
|
|
+
|
|
|
|
|
+ def get_template_structure(self) -> Dict[str, Any]:
|
|
|
|
|
+ """
|
|
|
|
|
+ Analyze and return the template structure.
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ Dictionary containing template structure information
|
|
|
|
|
+ """
|
|
|
|
|
+ if not self.document:
|
|
|
|
|
+ raise ValueError("No document loaded. Call load_template() first.")
|
|
|
|
|
+
|
|
|
|
|
+ structure = {
|
|
|
|
|
+ 'sections': len(self.document.sections),
|
|
|
|
|
+ 'paragraphs': len(self.document.paragraphs),
|
|
|
|
|
+ 'tables': len(self.document.tables),
|
|
|
|
|
+ 'placeholders': self.find_placeholders(),
|
|
|
|
|
+ 'headings': [],
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ # Extract headings
|
|
|
|
|
+ for paragraph in self.document.paragraphs:
|
|
|
|
|
+ if paragraph.style and paragraph.style.name.startswith('Heading'):
|
|
|
|
|
+ structure['headings'].append({
|
|
|
|
|
+ 'level': paragraph.style.name,
|
|
|
|
|
+ 'text': paragraph.text
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ return structure
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ def replace_placeholders(self, replacements: Dict[str, str]) -> None:
|
|
|
|
|
+ """
|
|
|
|
|
+ Replace all placeholders in the document with actual values.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ replacements: Dictionary mapping placeholder names to values
|
|
|
|
|
+ e.g., {'Client Name': 'Acme Corp', 'Project Name': 'Cloud Migration'}
|
|
|
|
|
+ """
|
|
|
|
|
+ if not self.document:
|
|
|
|
|
+ raise ValueError("No document loaded. Call load_template() first.")
|
|
|
|
|
+
|
|
|
|
|
+ # Replace in paragraphs
|
|
|
|
|
+ for paragraph in self.document.paragraphs:
|
|
|
|
|
+ self._replace_in_paragraph(paragraph, replacements)
|
|
|
|
|
+
|
|
|
|
|
+ # Replace in tables
|
|
|
|
|
+ for table in self.document.tables:
|
|
|
|
|
+ for row in table.rows:
|
|
|
|
|
+ for cell in row.cells:
|
|
|
|
|
+ for paragraph in cell.paragraphs:
|
|
|
|
|
+ self._replace_in_paragraph(paragraph, replacements)
|
|
|
|
|
+
|
|
|
|
|
+ # Replace in headers and footers
|
|
|
|
|
+ for section in self.document.sections:
|
|
|
|
|
+ for header in [section.header, section.first_page_header, section.even_page_header]:
|
|
|
|
|
+ if header:
|
|
|
|
|
+ for paragraph in header.paragraphs:
|
|
|
|
|
+ self._replace_in_paragraph(paragraph, replacements)
|
|
|
|
|
+
|
|
|
|
|
+ for footer in [section.footer, section.first_page_footer, section.even_page_footer]:
|
|
|
|
|
+ if footer:
|
|
|
|
|
+ for paragraph in footer.paragraphs:
|
|
|
|
|
+ self._replace_in_paragraph(paragraph, replacements)
|
|
|
|
|
+
|
|
|
|
|
+ def _replace_in_paragraph(self, paragraph, replacements: Dict[str, str]) -> None:
|
|
|
|
|
+ """
|
|
|
|
|
+ Replace placeholders in a single paragraph while preserving formatting.
|
|
|
|
|
+
|
|
|
|
|
+ Supports both bracketed placeholders like [Client Name] and
|
|
|
|
|
+ unbracketed placeholders like YYYY. mm. DD.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ paragraph: The paragraph to process
|
|
|
|
|
+ replacements: Dictionary of placeholder replacements
|
|
|
|
|
+ """
|
|
|
|
|
+ if not paragraph.text:
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ # Check if paragraph contains any placeholders (bracketed or unbracketed)
|
|
|
|
|
+ text = paragraph.text
|
|
|
|
|
+ has_placeholder = False
|
|
|
|
|
+ for placeholder in replacements.keys():
|
|
|
|
|
+ # Check for bracketed placeholder [placeholder]
|
|
|
|
|
+ if f'[{placeholder}]' in text:
|
|
|
|
|
+ has_placeholder = True
|
|
|
|
|
+ break
|
|
|
|
|
+ # Check for unbracketed placeholder (for date formats like YYYY. mm. DD)
|
|
|
|
|
+ if placeholder in text:
|
|
|
|
|
+ has_placeholder = True
|
|
|
|
|
+ break
|
|
|
|
|
+
|
|
|
|
|
+ if not has_placeholder:
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ # Replace placeholders in the text
|
|
|
|
|
+ new_text = text
|
|
|
|
|
+ for placeholder, value in replacements.items():
|
|
|
|
|
+ # First try bracketed replacement
|
|
|
|
|
+ new_text = new_text.replace(f'[{placeholder}]', str(value) if value else '')
|
|
|
|
|
+ # Then try unbracketed replacement (for date formats like YYYY. mm. DD)
|
|
|
|
|
+ # Only replace patterns that start with YYYY to avoid replacing column names like "Date"
|
|
|
|
|
+ if placeholder.startswith('YYYY'):
|
|
|
|
|
+ new_text = new_text.replace(placeholder, str(value) if value else '')
|
|
|
|
|
+
|
|
|
|
|
+ # If text changed, update the paragraph
|
|
|
|
|
+ if new_text != text:
|
|
|
|
|
+ # Try to preserve formatting by updating runs
|
|
|
|
|
+ if len(paragraph.runs) == 1:
|
|
|
|
|
+ paragraph.runs[0].text = new_text
|
|
|
|
|
+ else:
|
|
|
|
|
+ # For complex formatting, rebuild the paragraph
|
|
|
|
|
+ # Store the first run's formatting
|
|
|
|
|
+ if paragraph.runs:
|
|
|
|
|
+ first_run = paragraph.runs[0]
|
|
|
|
|
+ font_name = first_run.font.name
|
|
|
|
|
+ font_size = first_run.font.size
|
|
|
|
|
+ bold = first_run.font.bold
|
|
|
|
|
+ italic = first_run.font.italic
|
|
|
|
|
+
|
|
|
|
|
+ # Clear all runs
|
|
|
|
|
+ for run in paragraph.runs:
|
|
|
|
|
+ run.text = ''
|
|
|
|
|
+
|
|
|
|
|
+ # Set new text on first run
|
|
|
|
|
+ paragraph.runs[0].text = new_text
|
|
|
|
|
+ else:
|
|
|
|
|
+ # No runs, add new one
|
|
|
|
|
+ paragraph.add_run(new_text)
|
|
|
|
|
+
|
|
|
|
|
+ def create_project_metadata_replacements(self, metadata: Dict[str, Any]) -> Dict[str, str]:
|
|
|
|
|
+ """
|
|
|
|
|
+ Create placeholder replacements from project metadata.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ metadata: Project metadata dictionary containing:
|
|
|
|
|
+ - clientName/client_name, projectName/project_name
|
|
|
|
|
+ - bdManager/bd_manager, bdManagerEmail/bd_manager_email
|
|
|
|
|
+ - solutionsArchitect/solutions_architect, solutionsArchitectEmail/solutions_architect_email
|
|
|
|
|
+ - cloudEngineer/cloud_engineer, cloudEngineerEmail/cloud_engineer_email
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ Dictionary of placeholder replacements
|
|
|
|
|
+ """
|
|
|
|
|
+ now = datetime.now()
|
|
|
|
|
+
|
|
|
|
|
+ # Helper to get value from either camelCase or snake_case key
|
|
|
|
|
+ def get_value(camel_key: str, snake_key: str) -> str:
|
|
|
|
|
+ return metadata.get(camel_key, '') or metadata.get(snake_key, '') or ''
|
|
|
|
|
+
|
|
|
|
|
+ # Extract values supporting both naming conventions
|
|
|
|
|
+ client_name = get_value('clientName', 'client_name')
|
|
|
|
|
+ project_name = get_value('projectName', 'project_name')
|
|
|
|
|
+ bd_manager = get_value('bdManager', 'bd_manager')
|
|
|
|
|
+ bd_manager_email = get_value('bdManagerEmail', 'bd_manager_email')
|
|
|
|
|
+ solutions_architect = get_value('solutionsArchitect', 'solutions_architect')
|
|
|
|
|
+ solutions_architect_email = get_value('solutionsArchitectEmail', 'solutions_architect_email')
|
|
|
|
|
+ cloud_engineer = get_value('cloudEngineer', 'cloud_engineer')
|
|
|
|
|
+ cloud_engineer_email = get_value('cloudEngineerEmail', 'cloud_engineer_email')
|
|
|
|
|
+
|
|
|
|
|
+ replacements = {
|
|
|
|
|
+ # Client and Project
|
|
|
|
|
+ 'Client Name': client_name,
|
|
|
|
|
+ 'Project Name': project_name,
|
|
|
|
|
+
|
|
|
|
|
+ # BD Manager
|
|
|
|
|
+ 'BD Manager': bd_manager,
|
|
|
|
|
+ 'BD Manager Name': bd_manager,
|
|
|
|
|
+ 'BD Manager Email': bd_manager_email,
|
|
|
|
|
+
|
|
|
|
|
+ # Solutions Architect
|
|
|
|
|
+ 'Solutions Architect': solutions_architect,
|
|
|
|
|
+ 'Solutions Architect Name': solutions_architect,
|
|
|
|
|
+ 'Solutions Architect Email': solutions_architect_email,
|
|
|
|
|
+
|
|
|
|
|
+ # Cloud Engineer
|
|
|
|
|
+ 'Cloud Engineer': cloud_engineer,
|
|
|
|
|
+ 'Cloud Engineer Name': cloud_engineer,
|
|
|
|
|
+ 'Cloud Engineer Email': cloud_engineer_email,
|
|
|
|
|
+
|
|
|
|
|
+ # Date placeholders - multiple formats
|
|
|
|
|
+ 'Date': now.strftime('%Y-%m-%d'),
|
|
|
|
|
+ 'YYYY. mm. DD': now.strftime('%Y. %m. %d'),
|
|
|
|
|
+ 'YYYY.mm.DD': now.strftime('%Y.%m.%d'),
|
|
|
|
|
+ 'YYYY-mm-DD': now.strftime('%Y-%m-%d'),
|
|
|
|
|
+ 'Month': now.strftime('%B'),
|
|
|
|
|
+ 'Year': str(now.year),
|
|
|
|
|
+ 'Report Date': now.strftime('%B %d, %Y'),
|
|
|
|
|
+
|
|
|
|
|
+ # Version info
|
|
|
|
|
+ 'Version': '1.0',
|
|
|
|
|
+ 'Document Version': '1.0',
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return replacements
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ def add_horizontal_table(self, service_key: str, resources: List[Dict[str, Any]],
|
|
|
|
|
+ include_account_column: bool = False) -> None:
|
|
|
|
|
+ """
|
|
|
|
|
+ Add a horizontal table for a service (column headers at top, multiple rows).
|
|
|
|
|
+
|
|
|
|
|
+ Format:
|
|
|
|
|
+ | Service Name (merged across all columns) |
|
|
|
|
|
+ | Column1 | Column2 | Column3 |
|
|
|
|
|
+ | Value1 | Value2 | Value3 |
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ service_key: The service key from SERVICE_CONFIG
|
|
|
|
|
+ resources: List of resource dictionaries
|
|
|
|
|
+ include_account_column: Whether to include AWS Account column (for multi-account)
|
|
|
|
|
+ """
|
|
|
|
|
+ if not self.document:
|
|
|
|
|
+ raise ValueError("No document loaded. Call load_template() first.")
|
|
|
|
|
+
|
|
|
|
|
+ if service_key not in SERVICE_CONFIG:
|
|
|
|
|
+ raise ValueError(f"Unknown service: {service_key}")
|
|
|
|
|
+
|
|
|
|
|
+ config = SERVICE_CONFIG[service_key]
|
|
|
|
|
+ if config['layout'] != TableLayout.HORIZONTAL:
|
|
|
|
|
+ raise ValueError(f"Service {service_key} uses vertical layout, not horizontal")
|
|
|
|
|
+
|
|
|
|
|
+ columns = list(config['columns'])
|
|
|
|
|
+ if include_account_column and 'AWS Account' not in columns:
|
|
|
|
|
+ columns.insert(0, 'AWS Account')
|
|
|
|
|
+
|
|
|
|
|
+ # Create table: 1 title row + 1 header row + data rows
|
|
|
|
|
+ num_rows = len(resources) + 2 # +1 for title, +1 for header
|
|
|
|
|
+ num_cols = len(columns)
|
|
|
|
|
+ table = self.document.add_table(rows=num_rows, cols=num_cols)
|
|
|
|
|
+
|
|
|
|
|
+ # Apply table styling
|
|
|
|
|
+ self._copy_table_style_from_template(table)
|
|
|
|
|
+
|
|
|
|
|
+ # Row 0: Service title (merged across all columns)
|
|
|
|
|
+ title_row = table.rows[0]
|
|
|
|
|
+ # Merge all cells in the title row
|
|
|
|
|
+ title_cell = title_row.cells[0]
|
|
|
|
|
+ for i in range(1, num_cols):
|
|
|
|
|
+ title_cell.merge(title_row.cells[i])
|
|
|
|
|
+ title_cell.text = config['title']
|
|
|
|
|
+ self._apply_header_cell_style(title_cell, is_title=True)
|
|
|
|
|
+ # Center the title
|
|
|
|
|
+ for paragraph in title_cell.paragraphs:
|
|
|
|
|
+ paragraph.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
|
|
|
|
+
|
|
|
|
|
+ # Row 1: Column headers
|
|
|
|
|
+ header_row = table.rows[1]
|
|
|
|
|
+ for i, col_name in enumerate(columns):
|
|
|
|
|
+ cell = header_row.cells[i]
|
|
|
|
|
+ cell.text = col_name
|
|
|
|
|
+ self._apply_header_cell_style(cell)
|
|
|
|
|
+
|
|
|
|
|
+ # Data rows
|
|
|
|
|
+ for row_idx, resource in enumerate(resources):
|
|
|
|
|
+ row = table.rows[row_idx + 2] # +2 to skip title and header rows
|
|
|
|
|
+ for col_idx, col_name in enumerate(columns):
|
|
|
|
|
+ cell = row.cells[col_idx]
|
|
|
|
|
+ value = self._get_resource_value(resource, col_name)
|
|
|
|
|
+ cell.text = value
|
|
|
|
|
+
|
|
|
|
|
+ # Add spacing after table
|
|
|
|
|
+ self.document.add_paragraph()
|
|
|
|
|
+
|
|
|
|
|
+ def add_vertical_table(self, service_key: str, resource: Dict[str, Any],
|
|
|
|
|
+ include_account_column: bool = False,
|
|
|
|
|
+ show_title: bool = True) -> None:
|
|
|
|
|
+ """
|
|
|
|
|
+ Add a vertical table for a single resource (attribute names in left column).
|
|
|
|
|
+
|
|
|
|
|
+ Format:
|
|
|
|
|
+ | Service Name (merged across 2 columns) |
|
|
|
|
|
+ | Column1 | Value1 |
|
|
|
|
|
+ | Column2 | Value2 |
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ service_key: The service key from SERVICE_CONFIG
|
|
|
|
|
+ resource: Single resource dictionary
|
|
|
|
|
+ include_account_column: Whether to include AWS Account row (for multi-account)
|
|
|
|
|
+ show_title: Whether to show the service title row (first resource shows title)
|
|
|
|
|
+ """
|
|
|
|
|
+ if not self.document:
|
|
|
|
|
+ raise ValueError("No document loaded. Call load_template() first.")
|
|
|
|
|
+
|
|
|
|
|
+ if service_key not in SERVICE_CONFIG:
|
|
|
|
|
+ raise ValueError(f"Unknown service: {service_key}")
|
|
|
|
|
+
|
|
|
|
|
+ config = SERVICE_CONFIG[service_key]
|
|
|
|
|
+ if config['layout'] != TableLayout.VERTICAL:
|
|
|
|
|
+ raise ValueError(f"Service {service_key} uses horizontal layout, not vertical")
|
|
|
|
|
+
|
|
|
|
|
+ columns = list(config['columns'])
|
|
|
|
|
+ if include_account_column and 'AWS Account' not in columns:
|
|
|
|
|
+ columns.insert(0, 'AWS Account')
|
|
|
|
|
+
|
|
|
|
|
+ # Create table with 2 columns: 1 title row + attribute rows
|
|
|
|
|
+ num_rows = len(columns) + (1 if show_title else 0) # +1 for title row if showing
|
|
|
|
|
+ table = self.document.add_table(rows=num_rows, cols=2)
|
|
|
|
|
+
|
|
|
|
|
+ # Apply table styling
|
|
|
|
|
+ self._copy_table_style_from_template(table)
|
|
|
|
|
+
|
|
|
|
|
+ row_offset = 0
|
|
|
|
|
+
|
|
|
|
|
+ # Row 0: Service title (merged across 2 columns) - only for first resource
|
|
|
|
|
+ if show_title:
|
|
|
|
|
+ title_row = table.rows[0]
|
|
|
|
|
+ title_cell = title_row.cells[0]
|
|
|
|
|
+ title_cell.merge(title_row.cells[1])
|
|
|
|
|
+ title_cell.text = config['title']
|
|
|
|
|
+ self._apply_header_cell_style(title_cell, is_title=True)
|
|
|
|
|
+ # Center the title
|
|
|
|
|
+ for paragraph in title_cell.paragraphs:
|
|
|
|
|
+ paragraph.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
|
|
|
|
+ row_offset = 1
|
|
|
|
|
+
|
|
|
|
|
+ # Attribute rows
|
|
|
|
|
+ for row_idx, col_name in enumerate(columns):
|
|
|
|
|
+ row = table.rows[row_idx + row_offset]
|
|
|
|
|
+ # Attribute name cell (apply header styling)
|
|
|
|
|
+ name_cell = row.cells[0]
|
|
|
|
|
+ name_cell.text = col_name
|
|
|
|
|
+ self._apply_header_cell_style(name_cell)
|
|
|
|
|
+
|
|
|
|
|
+ # Value cell
|
|
|
|
|
+ value_cell = row.cells[1]
|
|
|
|
|
+ value = self._get_resource_value(resource, col_name)
|
|
|
|
|
+ value_cell.text = value
|
|
|
|
|
+
|
|
|
|
|
+ # Add spacing after table
|
|
|
|
|
+ self.document.add_paragraph()
|
|
|
|
|
+
|
|
|
|
|
+ def add_vertical_tables_for_service(self, service_key: str, resources: List[Dict[str, Any]],
|
|
|
|
|
+ include_account_column: bool = False) -> None:
|
|
|
|
|
+ """
|
|
|
|
|
+ Add vertical tables for all resources of a service.
|
|
|
|
|
+ Each resource gets its own table with the service title in the first row.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ service_key: The service key from SERVICE_CONFIG
|
|
|
|
|
+ resources: List of resource dictionaries
|
|
|
|
|
+ include_account_column: Whether to include AWS Account row
|
|
|
|
|
+ """
|
|
|
|
|
+ if not self.document:
|
|
|
|
|
+ raise ValueError("No document loaded. Call load_template() first.")
|
|
|
|
|
+
|
|
|
|
|
+ if service_key not in SERVICE_CONFIG:
|
|
|
|
|
+ raise ValueError(f"Unknown service: {service_key}")
|
|
|
|
|
+
|
|
|
|
|
+ # Add a table for each resource, each with its own title row
|
|
|
|
|
+ for resource in resources:
|
|
|
|
|
+ self.add_vertical_table(service_key, resource, include_account_column, show_title=True)
|
|
|
|
|
+
|
|
|
|
|
+ def _insert_element_at_position(self, element) -> None:
|
|
|
|
|
+ """
|
|
|
|
|
+ Insert an element at the tracked position within Implementation List section.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ element: The XML element to insert
|
|
|
|
|
+ """
|
|
|
|
|
+ if self._insert_parent is not None and self._insert_index is not None:
|
|
|
|
|
+ self._insert_parent.insert(self._insert_index, element)
|
|
|
|
|
+ self._insert_index += 1
|
|
|
|
|
+ else:
|
|
|
|
|
+ # Fallback: append to document body
|
|
|
|
|
+ self.document._body._body.append(element)
|
|
|
|
|
+
|
|
|
|
|
+ def _add_horizontal_table_at_position(self, service_key: str, resources: List[Dict[str, Any]],
|
|
|
|
|
+ include_account_column: bool = False) -> None:
|
|
|
|
|
+ """
|
|
|
|
|
+ Add a horizontal table at the tracked position within Implementation List section.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ service_key: The service key from SERVICE_CONFIG
|
|
|
|
|
+ resources: List of resource dictionaries
|
|
|
|
|
+ include_account_column: Whether to include AWS Account column
|
|
|
|
|
+ """
|
|
|
|
|
+ if service_key not in SERVICE_CONFIG:
|
|
|
|
|
+ raise ValueError(f"Unknown service: {service_key}")
|
|
|
|
|
+
|
|
|
|
|
+ config = SERVICE_CONFIG[service_key]
|
|
|
|
|
+ columns = list(config['columns'])
|
|
|
|
|
+ if include_account_column and 'AWS Account' not in columns:
|
|
|
|
|
+ columns.insert(0, 'AWS Account')
|
|
|
|
|
+
|
|
|
|
|
+ # Create table: 1 title row + 1 header row + data rows
|
|
|
|
|
+ num_rows = len(resources) + 2
|
|
|
|
|
+ num_cols = len(columns)
|
|
|
|
|
+ table = self.document.add_table(rows=num_rows, cols=num_cols)
|
|
|
|
|
+
|
|
|
|
|
+ # Move table to correct position
|
|
|
|
|
+ tbl_element = table._tbl
|
|
|
|
|
+ tbl_element.getparent().remove(tbl_element)
|
|
|
|
|
+ self._insert_element_at_position(tbl_element)
|
|
|
|
|
+
|
|
|
|
|
+ # Apply table styling
|
|
|
|
|
+ self._copy_table_style_from_template(table)
|
|
|
|
|
+
|
|
|
|
|
+ # Row 0: Service title (merged across all columns)
|
|
|
|
|
+ title_row = table.rows[0]
|
|
|
|
|
+ title_cell = title_row.cells[0]
|
|
|
|
|
+ for i in range(1, num_cols):
|
|
|
|
|
+ title_cell.merge(title_row.cells[i])
|
|
|
|
|
+ title_cell.text = config['title']
|
|
|
|
|
+ self._apply_header_cell_style(title_cell, is_title=True)
|
|
|
|
|
+ for paragraph in title_cell.paragraphs:
|
|
|
|
|
+ paragraph.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
|
|
|
|
+
|
|
|
|
|
+ # Row 1: Column headers
|
|
|
|
|
+ header_row = table.rows[1]
|
|
|
|
|
+ for i, col_name in enumerate(columns):
|
|
|
|
|
+ cell = header_row.cells[i]
|
|
|
|
|
+ cell.text = col_name
|
|
|
|
|
+ self._apply_header_cell_style(cell)
|
|
|
|
|
+
|
|
|
|
|
+ # Data rows
|
|
|
|
|
+ for row_idx, resource in enumerate(resources):
|
|
|
|
|
+ row = table.rows[row_idx + 2]
|
|
|
|
|
+ for col_idx, col_name in enumerate(columns):
|
|
|
|
|
+ cell = row.cells[col_idx]
|
|
|
|
|
+ value = self._get_resource_value(resource, col_name)
|
|
|
|
|
+ cell.text = value
|
|
|
|
|
+ self._apply_data_cell_style(cell)
|
|
|
|
|
+
|
|
|
|
|
+ # Add spacing paragraph after table
|
|
|
|
|
+ self._add_spacing_paragraph_at_position()
|
|
|
|
|
+
|
|
|
|
|
+ def _add_vertical_tables_at_position(self, service_key: str, resources: List[Dict[str, Any]],
|
|
|
|
|
+ include_account_column: bool = False) -> None:
|
|
|
|
|
+ """
|
|
|
|
|
+ Add vertical tables at the tracked position within Implementation List section.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ service_key: The service key from SERVICE_CONFIG
|
|
|
|
|
+ resources: List of resource dictionaries
|
|
|
|
|
+ include_account_column: Whether to include AWS Account row
|
|
|
|
|
+ """
|
|
|
|
|
+ if service_key not in SERVICE_CONFIG:
|
|
|
|
|
+ raise ValueError(f"Unknown service: {service_key}")
|
|
|
|
|
+
|
|
|
|
|
+ config = SERVICE_CONFIG[service_key]
|
|
|
|
|
+ columns = list(config['columns'])
|
|
|
|
|
+ if include_account_column and 'AWS Account' not in columns:
|
|
|
|
|
+ columns.insert(0, 'AWS Account')
|
|
|
|
|
+
|
|
|
|
|
+ for resource in resources:
|
|
|
|
|
+ # Create table: 1 title row + attribute rows
|
|
|
|
|
+ num_rows = len(columns) + 1
|
|
|
|
|
+ table = self.document.add_table(rows=num_rows, cols=2)
|
|
|
|
|
+
|
|
|
|
|
+ # Move table to correct position
|
|
|
|
|
+ tbl_element = table._tbl
|
|
|
|
|
+ tbl_element.getparent().remove(tbl_element)
|
|
|
|
|
+ self._insert_element_at_position(tbl_element)
|
|
|
|
|
+
|
|
|
|
|
+ # Apply table styling
|
|
|
|
|
+ self._copy_table_style_from_template(table)
|
|
|
|
|
+
|
|
|
|
|
+ # Row 0: Service title (merged across 2 columns)
|
|
|
|
|
+ title_row = table.rows[0]
|
|
|
|
|
+ title_cell = title_row.cells[0]
|
|
|
|
|
+ title_cell.merge(title_row.cells[1])
|
|
|
|
|
+ title_cell.text = config['title']
|
|
|
|
|
+ self._apply_header_cell_style(title_cell, is_title=True)
|
|
|
|
|
+ for paragraph in title_cell.paragraphs:
|
|
|
|
|
+ paragraph.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
|
|
|
|
+
|
|
|
|
|
+ # Attribute rows
|
|
|
|
|
+ for row_idx, col_name in enumerate(columns):
|
|
|
|
|
+ row = table.rows[row_idx + 1]
|
|
|
|
|
+ # Attribute name cell
|
|
|
|
|
+ name_cell = row.cells[0]
|
|
|
|
|
+ name_cell.text = col_name
|
|
|
|
|
+ self._apply_header_cell_style(name_cell)
|
|
|
|
|
+
|
|
|
|
|
+ # Value cell
|
|
|
|
|
+ value_cell = row.cells[1]
|
|
|
|
|
+ value = self._get_resource_value(resource, col_name)
|
|
|
|
|
+ value_cell.text = value
|
|
|
|
|
+ self._apply_data_cell_style(value_cell)
|
|
|
|
|
+
|
|
|
|
|
+ # Add spacing paragraph after table
|
|
|
|
|
+ self._add_spacing_paragraph_at_position()
|
|
|
|
|
+
|
|
|
|
|
+ def _add_spacing_paragraph_at_position(self) -> None:
|
|
|
|
|
+ """Add an empty paragraph for spacing at the tracked position."""
|
|
|
|
|
+ p = self.document.add_paragraph()
|
|
|
|
|
+ p_element = p._element
|
|
|
|
|
+ p_element.getparent().remove(p_element)
|
|
|
|
|
+ self._insert_element_at_position(p_element)
|
|
|
|
|
+
|
|
|
|
|
+ def _get_resource_value(self, resource: Dict[str, Any], column_name: str) -> str:
|
|
|
|
|
+ """
|
|
|
|
|
+ Get value from resource for a given column name.
|
|
|
|
|
+
|
|
|
|
|
+ Handles both flat dictionaries and ResourceData.to_dict() format
|
|
|
|
|
+ where attributes are nested in 'attributes' key.
|
|
|
|
|
+ Empty values are replaced with '-'.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ resource: Resource dictionary
|
|
|
|
|
+ column_name: Column display name
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ Value as string, or '-' if empty
|
|
|
|
|
+ """
|
|
|
|
|
+ value = None
|
|
|
|
|
+
|
|
|
|
|
+ # First try to get from attributes (ResourceData format)
|
|
|
|
|
+ attributes = resource.get('attributes', {})
|
|
|
|
|
+ if column_name in attributes:
|
|
|
|
|
+ value = attributes[column_name]
|
|
|
|
|
+
|
|
|
|
|
+ # Try mapped attribute key in attributes
|
|
|
|
|
+ if value is None:
|
|
|
|
|
+ attr_key = self._column_to_attribute(column_name)
|
|
|
|
|
+ if attr_key in attributes:
|
|
|
|
|
+ value = attributes[attr_key]
|
|
|
|
|
+
|
|
|
|
|
+ # Fallback: try direct access on resource (flat dict format)
|
|
|
|
|
+ if value is None and column_name in resource:
|
|
|
|
|
+ value = resource[column_name]
|
|
|
|
|
+
|
|
|
|
|
+ if value is None:
|
|
|
|
|
+ attr_key = self._column_to_attribute(column_name)
|
|
|
|
|
+ if attr_key in resource:
|
|
|
|
|
+ value = resource[attr_key]
|
|
|
|
|
+
|
|
|
|
|
+ # Convert to string and handle empty values
|
|
|
|
|
+ if value is None or value == '' or (isinstance(value, str) and value.strip() == ''):
|
|
|
|
|
+ return '-'
|
|
|
|
|
+
|
|
|
|
|
+ return str(value)
|
|
|
|
|
+
|
|
|
|
|
+ def _column_to_attribute(self, column_name: str) -> str:
|
|
|
|
|
+ """
|
|
|
|
|
+ Convert column display name to attribute key.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ column_name: Display name of the column
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ Attribute key for the resource dictionary
|
|
|
|
|
+ """
|
|
|
|
|
+ # Common mappings
|
|
|
|
|
+ mappings = {
|
|
|
|
|
+ 'Name': 'name',
|
|
|
|
|
+ 'ID': 'id',
|
|
|
|
|
+ 'Region': 'region',
|
|
|
|
|
+ 'AZ': 'availability_zone',
|
|
|
|
|
+ 'CIDR': 'cidr_block',
|
|
|
|
|
+ 'VPC': 'vpc_id',
|
|
|
|
|
+ 'VPC ID': 'vpc_id',
|
|
|
|
|
+ 'Subnet ID': 'subnet_id',
|
|
|
|
|
+ 'Instance ID': 'instance_id',
|
|
|
|
|
+ 'Instance Type': 'instance_type',
|
|
|
|
|
+ 'AMI': 'ami_id',
|
|
|
|
|
+ 'Public IP': 'public_ip',
|
|
|
|
|
+ 'Public DNS': 'public_dns',
|
|
|
|
|
+ 'Private IP': 'private_ip',
|
|
|
|
|
+ 'Elastic IP': 'elastic_ip',
|
|
|
|
|
+ 'Key': 'key_name',
|
|
|
|
|
+ 'Security Groups': 'security_groups',
|
|
|
|
|
+ 'EBS Type': 'ebs_type',
|
|
|
|
|
+ 'EBS Size': 'ebs_size',
|
|
|
|
|
+ 'Encryption': 'encryption',
|
|
|
|
|
+ 'AWS Account': 'account_id',
|
|
|
|
|
+ 'Subnet Associations': 'subnet_associations',
|
|
|
|
|
+ 'Peering Connection ID': 'peering_connection_id',
|
|
|
|
|
+ 'Requester VPC': 'requester_vpc',
|
|
|
|
|
+ 'Accepter VPC': 'accepter_vpc',
|
|
|
|
|
+ 'Customer Gateway ID': 'customer_gateway_id',
|
|
|
|
|
+ 'IP Address': 'ip_address',
|
|
|
|
|
+ 'Virtual Private Gateway ID': 'virtual_private_gateway_id',
|
|
|
|
|
+ 'VPN ID': 'vpn_id',
|
|
|
|
|
+ 'Routes': 'routes',
|
|
|
|
|
+ 'Service Name': 'service_name',
|
|
|
|
|
+ 'Type': 'type',
|
|
|
|
|
+ 'Launch Template': 'launch_template',
|
|
|
|
|
+ 'Target Groups': 'target_groups',
|
|
|
|
|
+ 'Desired': 'desired_capacity',
|
|
|
|
|
+ 'Min': 'min_size',
|
|
|
|
|
+ 'Max': 'max_size',
|
|
|
|
|
+ 'Scaling Policy': 'scaling_policy',
|
|
|
|
|
+ 'DNS': 'dns_name',
|
|
|
|
|
+ 'Scheme': 'scheme',
|
|
|
|
|
+ 'Availability Zones': 'availability_zones',
|
|
|
|
|
+ 'Load Balancer': 'load_balancer',
|
|
|
|
|
+ 'TG Name': 'target_group_name',
|
|
|
|
|
+ 'Port': 'port',
|
|
|
|
|
+ 'Protocol': 'protocol',
|
|
|
|
|
+ 'Registered Instances': 'registered_instances',
|
|
|
|
|
+ 'Health Check Path': 'health_check_path',
|
|
|
|
|
+ 'Endpoint': 'endpoint',
|
|
|
|
|
+ 'DB instance ID': 'db_instance_id',
|
|
|
|
|
+ 'DB name': 'db_name',
|
|
|
|
|
+ 'Master Username': 'master_username',
|
|
|
|
|
+ 'DB Engine': 'engine',
|
|
|
|
|
+ 'DB Version': 'engine_version',
|
|
|
|
|
+ 'Storage type': 'storage_type',
|
|
|
|
|
+ 'Storage': 'storage',
|
|
|
|
|
+ 'Multi-AZ': 'multi_az',
|
|
|
|
|
+ 'Deletion Protection': 'deletion_protection',
|
|
|
|
|
+ 'Performance Insights Enabled': 'performance_insights',
|
|
|
|
|
+ 'CloudWatch Logs': 'cloudwatch_logs',
|
|
|
|
|
+ 'Cluster ID': 'cluster_id',
|
|
|
|
|
+ 'Engine': 'engine',
|
|
|
|
|
+ 'Engine Version': 'engine_version',
|
|
|
|
|
+ 'Node Type': 'node_type',
|
|
|
|
|
+ 'Num Nodes': 'num_nodes',
|
|
|
|
|
+ 'Status': 'status',
|
|
|
|
|
+ 'Cluster Name': 'cluster_name',
|
|
|
|
|
+ 'Version': 'version',
|
|
|
|
|
+ 'Function Name': 'function_name',
|
|
|
|
|
+ 'Runtime': 'runtime',
|
|
|
|
|
+ 'Memory (MB)': 'memory_size',
|
|
|
|
|
+ 'Timeout (s)': 'timeout',
|
|
|
|
|
+ 'Last Modified': 'last_modified',
|
|
|
|
|
+ 'Bucket Name': 'bucket_name',
|
|
|
|
|
+ 'Bucket': 'bucket',
|
|
|
|
|
+ 'Event Type': 'event_type',
|
|
|
|
|
+ 'Destination type': 'destination_type',
|
|
|
|
|
+ 'Destination': 'destination',
|
|
|
|
|
+ 'CloudFront ID': 'cloudfront_id',
|
|
|
|
|
+ 'Domain Name': 'domain_name',
|
|
|
|
|
+ 'CNAME': 'cname',
|
|
|
|
|
+ 'Origin Domain Name': 'origin_domain_name',
|
|
|
|
|
+ 'Origin Protocol Policy': 'origin_protocol_policy',
|
|
|
|
|
+ 'Viewer Protocol Policy': 'viewer_protocol_policy',
|
|
|
|
|
+ 'Allowed HTTP Methods': 'allowed_http_methods',
|
|
|
|
|
+ 'Cached HTTP Methods': 'cached_http_methods',
|
|
|
|
|
+ 'Zone ID': 'zone_id',
|
|
|
|
|
+ 'Record Count': 'record_count',
|
|
|
|
|
+ 'Domain name': 'domain_name',
|
|
|
|
|
+ 'Additional names': 'additional_names',
|
|
|
|
|
+ 'WebACL Name': 'webacl_name',
|
|
|
|
|
+ 'Scope': 'scope',
|
|
|
|
|
+ 'Rules Count': 'rules_count',
|
|
|
|
|
+ 'Associated Resources': 'associated_resources',
|
|
|
|
|
+ 'Topic Name': 'topic_name',
|
|
|
|
|
+ 'Topic Display Name': 'display_name',
|
|
|
|
|
+ 'Subscription Protocol': 'subscription_protocol',
|
|
|
|
|
+ 'Subscription Endpoint': 'subscription_endpoint',
|
|
|
|
|
+ 'Log Group Name': 'log_group_name',
|
|
|
|
|
+ 'Retention Days': 'retention_days',
|
|
|
|
|
+ 'Stored Bytes': 'stored_bytes',
|
|
|
|
|
+ 'KMS Encryption': 'kms_encryption',
|
|
|
|
|
+ 'Description': 'description',
|
|
|
|
|
+ 'Event Bus': 'event_bus',
|
|
|
|
|
+ 'State': 'state',
|
|
|
|
|
+ 'Multi-Region Trail': 'multi_region',
|
|
|
|
|
+ 'Log File Validation': 'log_file_validation',
|
|
|
|
|
+ 'Regional Resources': 'regional_resources',
|
|
|
|
|
+ 'Global Resources': 'global_resources',
|
|
|
|
|
+ 'Retention period': 'retention_period',
|
|
|
|
|
+ 'Port range': 'port_range',
|
|
|
|
|
+ 'Source': 'source',
|
|
|
|
|
+ 'Other Requirement': 'other_requirement',
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return mappings.get(column_name, column_name.lower().replace(' ', '_'))
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ def _find_implementation_list_section(self) -> Optional[int]:
|
|
|
|
|
+ """
|
|
|
|
|
+ Find the index of the 'Implementation List' section in the document.
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ Index of the paragraph after the Implementation List heading, or None if not found
|
|
|
|
|
+ """
|
|
|
|
|
+ for i, paragraph in enumerate(self.document.paragraphs):
|
|
|
|
|
+ text = paragraph.text.strip().lower()
|
|
|
|
|
+ # Match variations like "4. Implementation List", "Implementation List", etc.
|
|
|
|
|
+ if 'implementation list' in text:
|
|
|
|
|
+ return i
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ def _copy_table_style_from_template(self, table) -> None:
|
|
|
|
|
+ """
|
|
|
|
|
+ Apply consistent table styling matching the template format.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ table: The table to style
|
|
|
|
|
+ """
|
|
|
|
|
+ # Try to use a template table style if available
|
|
|
|
|
+ try:
|
|
|
|
|
+ # First try to use 'Table Grid' which is a standard Word style
|
|
|
|
|
+ table.style = 'Table Grid'
|
|
|
|
|
+ except Exception:
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ # Apply additional formatting for consistency
|
|
|
|
|
+ tbl = table._tbl
|
|
|
|
|
+ tblPr = tbl.tblPr if tbl.tblPr is not None else OxmlElement('w:tblPr')
|
|
|
|
|
+
|
|
|
|
|
+ # Set table width to 100%
|
|
|
|
|
+ tblW = OxmlElement('w:tblW')
|
|
|
|
|
+ tblW.set(qn('w:w'), '5000')
|
|
|
|
|
+ tblW.set(qn('w:type'), 'pct')
|
|
|
|
|
+ tblPr.append(tblW)
|
|
|
|
|
+
|
|
|
|
|
+ # Set table borders
|
|
|
|
|
+ tblBorders = OxmlElement('w:tblBorders')
|
|
|
|
|
+ for border_name in ['top', 'left', 'bottom', 'right', 'insideH', 'insideV']:
|
|
|
|
|
+ border = OxmlElement(f'w:{border_name}')
|
|
|
|
|
+ border.set(qn('w:val'), 'single')
|
|
|
|
|
+ border.set(qn('w:sz'), '4')
|
|
|
|
|
+ border.set(qn('w:space'), '0')
|
|
|
|
|
+ border.set(qn('w:color'), '000000')
|
|
|
|
|
+ tblBorders.append(border)
|
|
|
|
|
+ tblPr.append(tblBorders)
|
|
|
|
|
+
|
|
|
|
|
+ if tbl.tblPr is None:
|
|
|
|
|
+ tbl.insert(0, tblPr)
|
|
|
|
|
+
|
|
|
|
|
+ def _apply_header_cell_style(self, cell, is_title: bool = False) -> None:
|
|
|
|
|
+ """
|
|
|
|
|
+ Apply header cell styling (bold, background color, font, spacing).
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ cell: The cell to style
|
|
|
|
|
+ is_title: If True, use title color (DAEEF3) and 12pt font, otherwise use header color (D9E2F3) and 11pt font
|
|
|
|
|
+ """
|
|
|
|
|
+ # Set background color for header cells
|
|
|
|
|
+ tc = cell._tc
|
|
|
|
|
+ tcPr = tc.get_or_add_tcPr()
|
|
|
|
|
+ shd = OxmlElement('w:shd')
|
|
|
|
|
+ shd.set(qn('w:val'), 'clear')
|
|
|
|
|
+ shd.set(qn('w:color'), 'auto')
|
|
|
|
|
+ # Service Name title uses DAEEF3 (light cyan), column headers use C6D9F1 (light blue)
|
|
|
|
|
+ shd.set(qn('w:fill'), 'DAEEF3' if is_title else 'C6D9F1')
|
|
|
|
|
+ tcPr.append(shd)
|
|
|
|
|
+
|
|
|
|
|
+ # Apply font and paragraph formatting
|
|
|
|
|
+ # Service Name (title) uses 12pt (小四), others use 11pt
|
|
|
|
|
+ font_size = 12 if is_title else 11
|
|
|
|
|
+ for paragraph in cell.paragraphs:
|
|
|
|
|
+ self._apply_cell_paragraph_format(paragraph, font_size=font_size)
|
|
|
|
|
+ for run in paragraph.runs:
|
|
|
|
|
+ run.font.bold = True
|
|
|
|
|
+
|
|
|
|
|
+ def _apply_cell_paragraph_format(self, paragraph, font_size: int = 11) -> None:
|
|
|
|
|
+ """
|
|
|
|
|
+ Apply standard cell paragraph formatting:
|
|
|
|
|
+ - Font: Calibri
|
|
|
|
|
+ - Spacing: 3pt before, 3pt after, single line spacing
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ paragraph: The paragraph to format
|
|
|
|
|
+ font_size: Font size in points (default 11pt, use 12pt for Service Name)
|
|
|
|
|
+ """
|
|
|
|
|
+ from docx.shared import Pt
|
|
|
|
|
+ from docx.enum.text import WD_LINE_SPACING
|
|
|
|
|
+
|
|
|
|
|
+ # Set paragraph spacing: 3pt before, 3pt after, single line spacing
|
|
|
|
|
+ paragraph.paragraph_format.space_before = Pt(3)
|
|
|
|
|
+ paragraph.paragraph_format.space_after = Pt(3)
|
|
|
|
|
+ paragraph.paragraph_format.line_spacing_rule = WD_LINE_SPACING.SINGLE
|
|
|
|
|
+
|
|
|
|
|
+ # Set font for all runs
|
|
|
|
|
+ for run in paragraph.runs:
|
|
|
|
|
+ run.font.name = 'Calibri'
|
|
|
|
|
+ run.font.size = Pt(font_size)
|
|
|
|
|
+ # Set East Asian font
|
|
|
|
|
+ run._element.rPr.rFonts.set(qn('w:eastAsia'), 'Calibri')
|
|
|
|
|
+
|
|
|
|
|
+ def _apply_data_cell_style(self, cell) -> None:
|
|
|
|
|
+ """
|
|
|
|
|
+ Apply data cell styling (font 11pt, spacing, no background).
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ cell: The cell to style
|
|
|
|
|
+ """
|
|
|
|
|
+ for paragraph in cell.paragraphs:
|
|
|
|
|
+ self._apply_cell_paragraph_format(paragraph, font_size=11)
|
|
|
|
|
+
|
|
|
|
|
+ def add_service_tables(self, scan_results: Dict[str, List[Dict[str, Any]]],
|
|
|
|
|
+ include_account_column: bool = False,
|
|
|
|
|
+ regions: List[str] = None) -> None:
|
|
|
|
|
+ """
|
|
|
|
|
+ Add tables for all services with resources, filtering out empty services.
|
|
|
|
|
+ Content is inserted into the existing 'Implementation List' section in the template,
|
|
|
|
|
+ replacing any placeholder content.
|
|
|
|
|
+
|
|
|
|
|
+ Services are grouped under their parent service heading (e.g., VPC, ELB, S3).
|
|
|
|
|
+ When multiple regions are selected, regional services show region in heading.
|
|
|
|
|
+ Global services are shown once without region suffix.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ scan_results: Dictionary mapping service keys to lists of resources
|
|
|
|
|
+ include_account_column: Whether to include AWS Account column
|
|
|
|
|
+ regions: List of regions being scanned (for multi-region heading display)
|
|
|
|
|
+ """
|
|
|
|
|
+ if not self.document:
|
|
|
|
|
+ raise ValueError("No document loaded. Call load_template() first.")
|
|
|
|
|
+
|
|
|
|
|
+ # Find the existing Implementation List section and clear placeholder content
|
|
|
|
|
+ impl_list_idx = self._find_implementation_list_section()
|
|
|
|
|
+
|
|
|
|
|
+ if impl_list_idx is not None:
|
|
|
|
|
+ # Clear placeholder content after Implementation List until next Heading 1
|
|
|
|
|
+ self._clear_section_content(impl_list_idx)
|
|
|
|
|
+
|
|
|
|
|
+ # Get the Implementation List paragraph and find insert position
|
|
|
|
|
+ impl_paragraph = self.document.paragraphs[impl_list_idx]
|
|
|
|
|
+ parent = impl_paragraph._element.getparent()
|
|
|
|
|
+ insert_index = list(parent).index(impl_paragraph._element) + 1
|
|
|
|
|
+ self._insert_parent = parent
|
|
|
|
|
+ self._insert_index = insert_index
|
|
|
|
|
+ else:
|
|
|
|
|
+ # If not found, add a new section at the end
|
|
|
|
|
+ self.document.add_paragraph('Implementation List', style='Heading 1')
|
|
|
|
|
+ self._insert_parent = self.document._body._body
|
|
|
|
|
+ self._insert_index = len(list(self._insert_parent))
|
|
|
|
|
+
|
|
|
|
|
+ # Determine if we need to show region in headings (multiple regions selected)
|
|
|
|
|
+ multi_region = regions and len(regions) > 1
|
|
|
|
|
+
|
|
|
|
|
+ # Helper function to group resources by region
|
|
|
|
|
+ def group_by_region(resources: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]:
|
|
|
|
|
+ """Group resources by their region attribute."""
|
|
|
|
|
+ grouped = {}
|
|
|
|
|
+ for resource in resources:
|
|
|
|
|
+ # Get region from resource attributes or direct field
|
|
|
|
|
+ region = None
|
|
|
|
|
+ if isinstance(resource, dict):
|
|
|
|
|
+ region = resource.get('region') or resource.get('attributes', {}).get('region')
|
|
|
|
|
+ if not region:
|
|
|
|
|
+ region = 'global'
|
|
|
|
|
+ if region not in grouped:
|
|
|
|
|
+ grouped[region] = []
|
|
|
|
|
+ grouped[region].append(resource)
|
|
|
|
|
+ return grouped
|
|
|
|
|
+
|
|
|
|
|
+ # Helper function to deduplicate global service resources
|
|
|
|
|
+ def deduplicate_resources(resources: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
|
|
|
+ """Deduplicate resources by ID or name."""
|
|
|
|
|
+ seen_ids = set()
|
|
|
|
|
+ unique_resources = []
|
|
|
|
|
+ for resource in resources:
|
|
|
|
|
+ res_id = None
|
|
|
|
|
+ if isinstance(resource, dict):
|
|
|
|
|
+ res_id = resource.get('id') or resource.get('attributes', {}).get('id')
|
|
|
|
|
+ if not res_id:
|
|
|
|
|
+ res_id = resource.get('name') or resource.get('attributes', {}).get('name')
|
|
|
|
|
+ if res_id and res_id in seen_ids:
|
|
|
|
|
+ continue
|
|
|
|
|
+ if res_id:
|
|
|
|
|
+ seen_ids.add(res_id)
|
|
|
|
|
+ unique_resources.append(resource)
|
|
|
|
|
+ return unique_resources
|
|
|
|
|
+
|
|
|
|
|
+ # Helper function to sort resources by name
|
|
|
|
|
+ def sort_resources_by_name(resources: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
|
|
|
+ """Sort resources by account (if multi-account) then by name. Resources without name come last."""
|
|
|
|
|
+ def get_sort_key(resource: Dict[str, Any]) -> tuple:
|
|
|
|
|
+ # Get account_id for multi-account sorting
|
|
|
|
|
+ account_id = ''
|
|
|
|
|
+ if isinstance(resource, dict):
|
|
|
|
|
+ account_id = resource.get('account_id') or resource.get('attributes', {}).get('account_id') or ''
|
|
|
|
|
+
|
|
|
|
|
+ # Get name
|
|
|
|
|
+ name = None
|
|
|
|
|
+ if isinstance(resource, dict):
|
|
|
|
|
+ name = resource.get('name') or resource.get('attributes', {}).get('name')
|
|
|
|
|
+
|
|
|
|
|
+ # Sort by: account_id, then has_name (0=has name, 1=no name), then name alphabetically
|
|
|
|
|
+ if name and str(name).strip():
|
|
|
|
|
+ return (str(account_id), 0, str(name).lower())
|
|
|
|
|
+ return (str(account_id), 1, '')
|
|
|
|
|
+ return sorted(resources, key=get_sort_key)
|
|
|
|
|
+
|
|
|
|
|
+ # Helper function to add table for a service
|
|
|
|
|
+ def add_service_table(service_key: str, resources: List[Dict[str, Any]]):
|
|
|
|
|
+ config = SERVICE_CONFIG.get(service_key)
|
|
|
|
|
+ if not config or not resources:
|
|
|
|
|
+ return
|
|
|
|
|
+ # Sort resources by name before adding to table
|
|
|
|
|
+ sorted_resources = sort_resources_by_name(resources)
|
|
|
|
|
+ if config['layout'] == TableLayout.HORIZONTAL:
|
|
|
|
|
+ self._add_horizontal_table_at_position(service_key, sorted_resources, include_account_column)
|
|
|
|
|
+ else:
|
|
|
|
|
+ self._add_vertical_tables_at_position(service_key, sorted_resources, include_account_column)
|
|
|
|
|
+
|
|
|
|
|
+ if multi_region:
|
|
|
|
|
+ # Multi-region mode: organize by region first, then by service group
|
|
|
|
|
+ # Step 1: Collect all regions from resources
|
|
|
|
|
+ all_regions = set()
|
|
|
|
|
+ for service_key in SERVICE_ORDER:
|
|
|
|
|
+ resources = scan_results.get(service_key, [])
|
|
|
|
|
+ if not resources:
|
|
|
|
|
+ continue
|
|
|
|
|
+ if service_key in GLOBAL_SERVICES:
|
|
|
|
|
+ continue # Skip global services for region collection
|
|
|
|
|
+ for resource in resources:
|
|
|
|
|
+ region = None
|
|
|
|
|
+ if isinstance(resource, dict):
|
|
|
|
|
+ region = resource.get('region') or resource.get('attributes', {}).get('region')
|
|
|
|
|
+ if region:
|
|
|
|
|
+ all_regions.add(region)
|
|
|
|
|
+
|
|
|
|
|
+ # Sort regions for consistent output (use provided regions order if available)
|
|
|
|
|
+ if regions:
|
|
|
|
|
+ sorted_regions = [r for r in regions if r in all_regions]
|
|
|
|
|
+ # Add any regions found in resources but not in provided list
|
|
|
|
|
+ for r in sorted(all_regions):
|
|
|
|
|
+ if r not in sorted_regions:
|
|
|
|
|
+ sorted_regions.append(r)
|
|
|
|
|
+ else:
|
|
|
|
|
+ sorted_regions = sorted(all_regions)
|
|
|
|
|
+
|
|
|
|
|
+ # Step 2: Process regional services by region, then by service group
|
|
|
|
|
+ for region in sorted_regions:
|
|
|
|
|
+ added_groups_for_region = set()
|
|
|
|
|
+
|
|
|
|
|
+ for service_key in SERVICE_ORDER:
|
|
|
|
|
+ # Skip global services
|
|
|
|
|
+ if service_key in GLOBAL_SERVICES:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ resources = scan_results.get(service_key, [])
|
|
|
|
|
+ if not resources:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ config = SERVICE_CONFIG.get(service_key)
|
|
|
|
|
+ if not config:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # Filter resources for this region
|
|
|
|
|
+ region_resources = []
|
|
|
|
|
+ for resource in resources:
|
|
|
|
|
+ res_region = None
|
|
|
|
|
+ if isinstance(resource, dict):
|
|
|
|
|
+ res_region = resource.get('region') or resource.get('attributes', {}).get('region')
|
|
|
|
|
+ if res_region == region:
|
|
|
|
|
+ region_resources.append(resource)
|
|
|
|
|
+
|
|
|
|
|
+ if not region_resources:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # Get the service group for this service
|
|
|
|
|
+ service_group = SERVICE_GROUPS.get(service_key, config['title'])
|
|
|
|
|
+
|
|
|
|
|
+ # Add Heading 2 with region suffix if not already added for this region
|
|
|
|
|
+ if service_group not in added_groups_for_region:
|
|
|
|
|
+ self._add_heading2_at_position(f"{service_group} ({region})")
|
|
|
|
|
+ added_groups_for_region.add(service_group)
|
|
|
|
|
+
|
|
|
|
|
+ # Add the table(s) for this service
|
|
|
|
|
+ add_service_table(service_key, region_resources)
|
|
|
|
|
+
|
|
|
|
|
+ # Step 3: Process global services (without region suffix)
|
|
|
|
|
+ added_global_groups = set()
|
|
|
|
|
+ for service_key in SERVICE_ORDER:
|
|
|
|
|
+ if service_key not in GLOBAL_SERVICES:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ resources = scan_results.get(service_key, [])
|
|
|
|
|
+ if not resources:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ config = SERVICE_CONFIG.get(service_key)
|
|
|
|
|
+ if not config:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # Deduplicate global service resources
|
|
|
|
|
+ unique_resources = deduplicate_resources(resources)
|
|
|
|
|
+ if not unique_resources:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # Get the service group for this service
|
|
|
|
|
+ service_group = SERVICE_GROUPS.get(service_key, config['title'])
|
|
|
|
|
+
|
|
|
|
|
+ # Add Heading 2 without region suffix
|
|
|
|
|
+ if service_group not in added_global_groups:
|
|
|
|
|
+ self._add_heading2_at_position(service_group)
|
|
|
|
|
+ added_global_groups.add(service_group)
|
|
|
|
|
+
|
|
|
|
|
+ # Add the table(s) for this service
|
|
|
|
|
+ add_service_table(service_key, unique_resources)
|
|
|
|
|
+
|
|
|
|
|
+ else:
|
|
|
|
|
+ # Single region or no region info: original behavior
|
|
|
|
|
+ added_groups = set()
|
|
|
|
|
+
|
|
|
|
|
+ for service_key in SERVICE_ORDER:
|
|
|
|
|
+ resources = scan_results.get(service_key, [])
|
|
|
|
|
+ if not resources:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ config = SERVICE_CONFIG.get(service_key)
|
|
|
|
|
+ if not config:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # Deduplicate global services
|
|
|
|
|
+ if service_key in GLOBAL_SERVICES:
|
|
|
|
|
+ resources = deduplicate_resources(resources)
|
|
|
|
|
+ if not resources:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # Get the service group for this service
|
|
|
|
|
+ service_group = SERVICE_GROUPS.get(service_key, config['title'])
|
|
|
|
|
+
|
|
|
|
|
+ # Add Heading 2 for the service group if not already added
|
|
|
|
|
+ if service_group not in added_groups:
|
|
|
|
|
+ self._add_heading2_at_position(service_group)
|
|
|
|
|
+ added_groups.add(service_group)
|
|
|
|
|
+
|
|
|
|
|
+ # Add the table(s) for this service
|
|
|
|
|
+ add_service_table(service_key, resources)
|
|
|
|
|
+
|
|
|
|
|
+ # Add page break after Implementation List section
|
|
|
|
|
+ self._add_page_break_at_position()
|
|
|
|
|
+
|
|
|
|
|
+ def _add_page_break_at_position(self) -> None:
|
|
|
|
|
+ """Add a page break at the tracked position."""
|
|
|
|
|
+ from docx.oxml import OxmlElement
|
|
|
|
|
+ from docx.oxml.ns import qn
|
|
|
|
|
+
|
|
|
|
|
+ # Create a paragraph with page break
|
|
|
|
|
+ p = self.document.add_paragraph()
|
|
|
|
|
+ run = p.add_run()
|
|
|
|
|
+ br = OxmlElement('w:br')
|
|
|
|
|
+ br.set(qn('w:type'), 'page')
|
|
|
|
|
+ run._r.append(br)
|
|
|
|
|
+
|
|
|
|
|
+ # Move to correct position
|
|
|
|
|
+ p_element = p._element
|
|
|
|
|
+ p_element.getparent().remove(p_element)
|
|
|
|
|
+ self._insert_element_at_position(p_element)
|
|
|
|
|
+
|
|
|
|
|
+ def _add_heading2_at_position(self, title: str) -> None:
|
|
|
|
|
+ """
|
|
|
|
|
+ Add a Heading 2 paragraph at the tracked position.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ title: The heading title (service group name)
|
|
|
|
|
+ """
|
|
|
|
|
+ heading = self.document.add_paragraph(f'▼ {title}', style='Heading 2')
|
|
|
|
|
+ heading_element = heading._element
|
|
|
|
|
+ heading_element.getparent().remove(heading_element)
|
|
|
|
|
+ self._insert_element_at_position(heading_element)
|
|
|
|
|
+
|
|
|
|
|
+ def _clear_section_content(self, section_start_idx: int) -> None:
|
|
|
|
|
+ """
|
|
|
|
|
+ Clear content between a section heading and the next Heading 1.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ section_start_idx: Index of the section heading paragraph
|
|
|
|
|
+ """
|
|
|
|
|
+ # Find elements to remove (between this Heading 1 and next Heading 1)
|
|
|
|
|
+ elements_to_remove = []
|
|
|
|
|
+ body = self.document._body._body
|
|
|
|
|
+
|
|
|
|
|
+ start_para = self.document.paragraphs[section_start_idx]
|
|
|
|
|
+ start_element = start_para._element
|
|
|
|
|
+
|
|
|
|
|
+ # Find the position of start element in body
|
|
|
|
|
+ body_children = list(body)
|
|
|
|
|
+ try:
|
|
|
|
|
+ start_pos = body_children.index(start_element)
|
|
|
|
|
+ except ValueError:
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ # Iterate through elements after the heading
|
|
|
|
|
+ for i in range(start_pos + 1, len(body_children)):
|
|
|
|
|
+ elem = body_children[i]
|
|
|
|
|
+
|
|
|
|
|
+ # Check if this is a Heading 1 paragraph (next section)
|
|
|
|
|
+ if elem.tag.endswith('}p'):
|
|
|
|
|
+ # Check if it's a Heading 1
|
|
|
|
|
+ pStyle = elem.find('.//{http://schemas.openxmlformats.org/wordprocessingml/2006/main}pStyle')
|
|
|
|
|
+ if pStyle is not None:
|
|
|
|
|
+ style_val = pStyle.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val')
|
|
|
|
|
+ if style_val and ('Heading1' in style_val or style_val == '1'):
|
|
|
|
|
+ break
|
|
|
|
|
+
|
|
|
|
|
+ elements_to_remove.append(elem)
|
|
|
|
|
+
|
|
|
|
|
+ # Remove the elements
|
|
|
|
|
+ for elem in elements_to_remove:
|
|
|
|
|
+ body.remove(elem)
|
|
|
|
|
+
|
|
|
|
|
+ def filter_empty_services(self, scan_results: Dict[str, List[Dict[str, Any]]]) -> Dict[str, List[Dict[str, Any]]]:
|
|
|
|
|
+ """
|
|
|
|
|
+ Filter out services with no resources.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ scan_results: Dictionary mapping service keys to lists of resources
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ Filtered dictionary with only non-empty services
|
|
|
|
|
+ """
|
|
|
|
|
+ return {k: v for k, v in scan_results.items() if v}
|
|
|
|
|
+
|
|
|
|
|
+ def get_services_with_resources(self, scan_results: Dict[str, List[Dict[str, Any]]]) -> List[str]:
|
|
|
|
|
+ """
|
|
|
|
|
+ Get list of service keys that have resources.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ scan_results: Dictionary mapping service keys to lists of resources
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ List of service keys with resources
|
|
|
|
|
+ """
|
|
|
|
|
+ return [k for k in SERVICE_ORDER if scan_results.get(k)]
|
|
|
|
|
+
|
|
|
|
|
+ def replace_architecture_picture_placeholder(self, image_path: str, width_inches: float = 6.0) -> bool:
|
|
|
|
|
+ """
|
|
|
|
|
+ Replace [AWS Architecture Picture] placeholder with actual image.
|
|
|
|
|
+
|
|
|
|
|
+ This method searches for the placeholder text in paragraphs and replaces it
|
|
|
|
|
+ with the provided image.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ image_path: Path to the architecture diagram image file
|
|
|
|
|
+ width_inches: Width of the image in inches (default 6.0)
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ True if placeholder was found and replaced, False otherwise
|
|
|
|
|
+
|
|
|
|
|
+ Raises:
|
|
|
|
|
+ FileNotFoundError: If image file doesn't exist
|
|
|
|
|
+ ValueError: If no document is loaded
|
|
|
|
|
+ """
|
|
|
|
|
+ if not self.document:
|
|
|
|
|
+ raise ValueError("No document loaded. Call load_template() first.")
|
|
|
|
|
+
|
|
|
|
|
+ if not os.path.exists(image_path):
|
|
|
|
|
+ raise FileNotFoundError(f"Image file not found: {image_path}")
|
|
|
|
|
+
|
|
|
|
|
+ placeholder_text = '[AWS Architecture Picture]'
|
|
|
|
|
+ placeholder_found = False
|
|
|
|
|
+
|
|
|
|
|
+ # Search in paragraphs
|
|
|
|
|
+ for paragraph in self.document.paragraphs:
|
|
|
|
|
+ if placeholder_text in paragraph.text:
|
|
|
|
|
+ # Found the placeholder, replace it with image
|
|
|
|
|
+ # Clear the paragraph text first
|
|
|
|
|
+ full_text = paragraph.text
|
|
|
|
|
+ new_text = full_text.replace(placeholder_text, '')
|
|
|
|
|
+
|
|
|
|
|
+ # Clear all runs
|
|
|
|
|
+ for run in paragraph.runs:
|
|
|
|
|
+ run.text = ''
|
|
|
|
|
+
|
|
|
|
|
+ # Add the image to this paragraph
|
|
|
|
|
+ run = paragraph.add_run()
|
|
|
|
|
+ run.add_picture(image_path, width=Inches(width_inches))
|
|
|
|
|
+
|
|
|
|
|
+ # If there was other text, add it back
|
|
|
|
|
+ if new_text.strip():
|
|
|
|
|
+ paragraph.add_run(new_text)
|
|
|
|
|
+
|
|
|
|
|
+ # Center the paragraph
|
|
|
|
|
+ paragraph.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
|
|
|
|
+ placeholder_found = True
|
|
|
|
|
+ break
|
|
|
|
|
+
|
|
|
|
|
+ # Also search in tables (in case placeholder is in a table cell)
|
|
|
|
|
+ if not placeholder_found:
|
|
|
|
|
+ for table in self.document.tables:
|
|
|
|
|
+ for row in table.rows:
|
|
|
|
|
+ for cell in row.cells:
|
|
|
|
|
+ for paragraph in cell.paragraphs:
|
|
|
|
|
+ if placeholder_text in paragraph.text:
|
|
|
|
|
+ # Clear the paragraph text first
|
|
|
|
|
+ full_text = paragraph.text
|
|
|
|
|
+ new_text = full_text.replace(placeholder_text, '')
|
|
|
|
|
+
|
|
|
|
|
+ # Clear all runs
|
|
|
|
|
+ for run in paragraph.runs:
|
|
|
|
|
+ run.text = ''
|
|
|
|
|
+
|
|
|
|
|
+ # Add the image to this paragraph
|
|
|
|
|
+ run = paragraph.add_run()
|
|
|
|
|
+ run.add_picture(image_path, width=Inches(width_inches))
|
|
|
|
|
+
|
|
|
|
|
+ # If there was other text, add it back
|
|
|
|
|
+ if new_text.strip():
|
|
|
|
|
+ paragraph.add_run(new_text)
|
|
|
|
|
+
|
|
|
|
|
+ # Center the paragraph
|
|
|
|
|
+ paragraph.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
|
|
|
|
+ placeholder_found = True
|
|
|
|
|
+ break
|
|
|
|
|
+ if placeholder_found:
|
|
|
|
|
+ break
|
|
|
|
|
+ if placeholder_found:
|
|
|
|
|
+ break
|
|
|
|
|
+ if placeholder_found:
|
|
|
|
|
+ break
|
|
|
|
|
+
|
|
|
|
|
+ return placeholder_found
|
|
|
|
|
+
|
|
|
|
|
+ def clear_architecture_picture_placeholder(self) -> bool:
|
|
|
|
|
+ """
|
|
|
|
|
+ Remove [AWS Architecture Picture] placeholder from the document.
|
|
|
|
|
+
|
|
|
|
|
+ This method is called when no architecture image is provided,
|
|
|
|
|
+ to clean up the placeholder text.
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ True if placeholder was found and removed, False otherwise
|
|
|
|
|
+ """
|
|
|
|
|
+ if not self.document:
|
|
|
|
|
+ raise ValueError("No document loaded. Call load_template() first.")
|
|
|
|
|
+
|
|
|
|
|
+ placeholder_text = '[AWS Architecture Picture]'
|
|
|
|
|
+ placeholder_found = False
|
|
|
|
|
+
|
|
|
|
|
+ # Search in paragraphs
|
|
|
|
|
+ for paragraph in self.document.paragraphs:
|
|
|
|
|
+ if placeholder_text in paragraph.text:
|
|
|
|
|
+ # Remove the placeholder text
|
|
|
|
|
+ for run in paragraph.runs:
|
|
|
|
|
+ if placeholder_text in run.text:
|
|
|
|
|
+ run.text = run.text.replace(placeholder_text, '')
|
|
|
|
|
+ placeholder_found = True
|
|
|
|
|
+
|
|
|
|
|
+ # Also search in tables
|
|
|
|
|
+ for table in self.document.tables:
|
|
|
|
|
+ for row in table.rows:
|
|
|
|
|
+ for cell in row.cells:
|
|
|
|
|
+ for paragraph in cell.paragraphs:
|
|
|
|
|
+ if placeholder_text in paragraph.text:
|
|
|
|
|
+ for run in paragraph.runs:
|
|
|
|
|
+ if placeholder_text in run.text:
|
|
|
|
|
+ run.text = run.text.replace(placeholder_text, '')
|
|
|
|
|
+ placeholder_found = True
|
|
|
|
|
+
|
|
|
|
|
+ return placeholder_found
|
|
|
|
|
+
|
|
|
|
|
+ def embed_network_diagram(self, image_path: str, width_inches: float = 6.0) -> None:
|
|
|
|
|
+ """
|
|
|
|
|
+ Embed a network diagram image into the document.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ image_path: Path to the image file
|
|
|
|
|
+ width_inches: Width of the image in inches
|
|
|
|
|
+
|
|
|
|
|
+ Raises:
|
|
|
|
|
+ FileNotFoundError: If image file doesn't exist
|
|
|
|
|
+ """
|
|
|
|
|
+ if not self.document:
|
|
|
|
|
+ raise ValueError("No document loaded. Call load_template() first.")
|
|
|
|
|
+
|
|
|
|
|
+ if not os.path.exists(image_path):
|
|
|
|
|
+ raise FileNotFoundError(f"Image file not found: {image_path}")
|
|
|
|
|
+
|
|
|
|
|
+ # Find the Network Diagram section or add one
|
|
|
|
|
+ network_section_found = False
|
|
|
|
|
+ for i, paragraph in enumerate(self.document.paragraphs):
|
|
|
|
|
+ if 'Network Diagram' in paragraph.text or 'Network Architecture' in paragraph.text:
|
|
|
|
|
+ network_section_found = True
|
|
|
|
|
+ # Add image after this paragraph
|
|
|
|
|
+ # We need to insert after this paragraph
|
|
|
|
|
+ break
|
|
|
|
|
+
|
|
|
|
|
+ if not network_section_found:
|
|
|
|
|
+ # Add a new section for network diagram
|
|
|
|
|
+ self.document.add_paragraph('Network Diagram', style='Heading 1')
|
|
|
|
|
+
|
|
|
|
|
+ # Add the image
|
|
|
|
|
+ self.document.add_picture(image_path, width=Inches(width_inches))
|
|
|
|
|
+
|
|
|
|
|
+ # Center the image
|
|
|
|
|
+ last_paragraph = self.document.paragraphs[-1]
|
|
|
|
|
+ last_paragraph.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
|
|
|
|
+
|
|
|
|
|
+ # Add spacing
|
|
|
|
|
+ self.document.add_paragraph()
|
|
|
|
|
+
|
|
|
|
|
+ def update_table_of_contents(self) -> None:
|
|
|
|
|
+ """
|
|
|
|
|
+ Update the table of contents in the document.
|
|
|
|
|
+
|
|
|
|
|
+ Note: Full TOC update requires Word application. This method adds
|
|
|
|
|
+ a field code that will update when the document is opened in Word.
|
|
|
|
|
+ """
|
|
|
|
|
+ if not self.document:
|
|
|
|
|
+ raise ValueError("No document loaded. Call load_template() first.")
|
|
|
|
|
+
|
|
|
|
|
+ # Find existing TOC or add instruction
|
|
|
|
|
+ # python-docx cannot fully update TOC without Word application
|
|
|
|
|
+ # We add a field that will prompt update when opened
|
|
|
|
|
+
|
|
|
|
|
+ # Set document to update fields when opened
|
|
|
|
|
+ # self._set_update_fields_on_open()
|
|
|
|
|
+
|
|
|
|
|
+ for paragraph in self.document.paragraphs:
|
|
|
|
|
+ # Look for TOC field
|
|
|
|
|
+ for run in paragraph.runs:
|
|
|
|
|
+ if 'TOC' in run.text or 'Table of Contents' in run.text:
|
|
|
|
|
+ # Mark TOC for update
|
|
|
|
|
+ self._mark_toc_for_update(paragraph)
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ def _set_update_fields_on_open(self) -> None:
|
|
|
|
|
+ """
|
|
|
|
|
+ Set the document to update all fields (including TOC) when opened in Word.
|
|
|
|
|
+
|
|
|
|
|
+ This adds the updateFields setting to the document settings, which causes
|
|
|
|
|
+ Word to prompt the user to update fields when the document is opened.
|
|
|
|
|
+ """
|
|
|
|
|
+ try:
|
|
|
|
|
+ # Access the document settings element
|
|
|
|
|
+ settings_element = self.document.settings.element
|
|
|
|
|
+
|
|
|
|
|
+ # Create or find the updateFields element
|
|
|
|
|
+ # Namespace for Word ML
|
|
|
|
|
+ w_ns = '{http://schemas.openxmlformats.org/wordprocessingml/2006/main}'
|
|
|
|
|
+
|
|
|
|
|
+ # Check if updateFields already exists
|
|
|
|
|
+ update_fields = settings_element.find(f'{w_ns}updateFields')
|
|
|
|
|
+
|
|
|
|
|
+ if update_fields is None:
|
|
|
|
|
+ # Create the updateFields element
|
|
|
|
|
+ update_fields = OxmlElement('w:updateFields')
|
|
|
|
|
+ update_fields.set(qn('w:val'), 'true')
|
|
|
|
|
+ settings_element.append(update_fields)
|
|
|
|
|
+ else:
|
|
|
|
|
+ # Ensure it's set to true
|
|
|
|
|
+ update_fields.set(qn('w:val'), 'true')
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ # Log but don't fail - TOC update is not critical
|
|
|
|
|
+ print(f"Warning: Could not set updateFields on open: {e}")
|
|
|
|
|
+
|
|
|
|
|
+ def _mark_toc_for_update(self, paragraph) -> None:
|
|
|
|
|
+ """
|
|
|
|
|
+ Mark a TOC paragraph for update when document is opened.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ paragraph: The TOC paragraph
|
|
|
|
|
+ """
|
|
|
|
|
+ # Add updateFields setting to document
|
|
|
|
|
+ # This will prompt Word to update fields when opened
|
|
|
|
|
+ try:
|
|
|
|
|
+ # The updateFields setting is already set in _set_update_fields_on_open
|
|
|
|
|
+ # This method can be used for additional TOC-specific handling if needed
|
|
|
|
|
+ pass
|
|
|
|
|
+ except Exception:
|
|
|
|
|
+ pass # Settings may not be accessible
|
|
|
|
|
+
|
|
|
|
|
+ def add_update_history(self, version: str = '1.0', modifier: str = '', details: str = '') -> None:
|
|
|
|
|
+ """
|
|
|
|
|
+ Add or update the Update History section.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ version: Document version
|
|
|
|
|
+ modifier: Name of the person who modified
|
|
|
|
|
+ details: Details of the changes
|
|
|
|
|
+ """
|
|
|
|
|
+ if not self.document:
|
|
|
|
|
+ raise ValueError("No document loaded. Call load_template() first.")
|
|
|
|
|
+
|
|
|
|
|
+ # Find Update History section
|
|
|
|
|
+ for i, paragraph in enumerate(self.document.paragraphs):
|
|
|
|
|
+ if 'Update History' in paragraph.text or 'Revision History' in paragraph.text:
|
|
|
|
|
+ # Found the section, look for the table
|
|
|
|
|
+ # Add entry to existing table or create new one
|
|
|
|
|
+ break
|
|
|
|
|
+
|
|
|
|
|
+ # Create update history entry
|
|
|
|
|
+ now = datetime.now()
|
|
|
|
|
+ history_entry = {
|
|
|
|
|
+ 'version': version,
|
|
|
|
|
+ 'date': now.strftime('%Y-%m-%d'),
|
|
|
|
|
+ 'modifier': modifier,
|
|
|
|
|
+ 'details': details or 'Initial version'
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ # This would typically update an existing table
|
|
|
|
|
+ # For now, we ensure the data is available for template replacement
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ def save(self, output_path: str) -> str:
|
|
|
|
|
+ """
|
|
|
|
|
+ Save the document to a file.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ output_path: Path where to save the document
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ The path where the document was saved
|
|
|
|
|
+ """
|
|
|
|
|
+ if not self.document:
|
|
|
|
|
+ raise ValueError("No document loaded. Call load_template() first.")
|
|
|
|
|
+
|
|
|
|
|
+ # Ensure directory exists
|
|
|
|
|
+ os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
|
|
|
|
+
|
|
|
|
|
+ self.document.save(output_path)
|
|
|
|
|
+ return output_path
|
|
|
|
|
+
|
|
|
|
|
+ def get_file_size(self, file_path: str) -> int:
|
|
|
|
|
+ """
|
|
|
|
|
+ Get the size of a file in bytes.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ file_path: Path to the file
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ File size in bytes
|
|
|
|
|
+ """
|
|
|
|
|
+ return os.path.getsize(file_path)
|
|
|
|
|
+
|
|
|
|
|
+ def generate_report(self, scan_results: Dict[str, List[Dict[str, Any]]],
|
|
|
|
|
+ project_metadata: Dict[str, Any],
|
|
|
|
|
+ output_path: str,
|
|
|
|
|
+ network_diagram_path: str = None,
|
|
|
|
|
+ template_path: str = None,
|
|
|
|
|
+ regions: List[str] = None) -> Dict[str, Any]:
|
|
|
|
|
+ """
|
|
|
|
|
+ Generate a complete report from scan results.
|
|
|
|
|
+
|
|
|
|
|
+ This is the main entry point for report generation.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ scan_results: Dictionary mapping service keys to lists of resources
|
|
|
|
|
+ project_metadata: Project metadata for placeholder replacement
|
|
|
|
|
+ output_path: Path where to save the generated report
|
|
|
|
|
+ network_diagram_path: Optional path to network diagram image
|
|
|
|
|
+ template_path: Optional path to template file
|
|
|
|
|
+ regions: Optional list of regions being scanned (for multi-region heading display)
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ Dictionary with report metadata:
|
|
|
|
|
+ - file_path: Path to the generated report
|
|
|
|
|
+ - file_name: Name of the report file
|
|
|
|
|
+ - file_size: Size of the report in bytes
|
|
|
|
|
+ - services_included: List of services included in the report
|
|
|
|
|
+ """
|
|
|
|
|
+ # Load template
|
|
|
|
|
+ self.load_template(template_path)
|
|
|
|
|
+
|
|
|
|
|
+ # Create placeholder replacements
|
|
|
|
|
+ replacements = self.create_project_metadata_replacements(project_metadata)
|
|
|
|
|
+
|
|
|
|
|
+ # Replace placeholders
|
|
|
|
|
+ self.replace_placeholders(replacements)
|
|
|
|
|
+
|
|
|
|
|
+ # Filter empty services
|
|
|
|
|
+ filtered_results = self.filter_empty_services(scan_results)
|
|
|
|
|
+
|
|
|
|
|
+ # Determine if multi-account (need AWS Account column)
|
|
|
|
|
+ account_ids = set()
|
|
|
|
|
+ for resources in filtered_results.values():
|
|
|
|
|
+ for resource in resources:
|
|
|
|
|
+ # Handle both dict and ResourceData objects
|
|
|
|
|
+ if isinstance(resource, dict):
|
|
|
|
|
+ if 'account_id' in resource:
|
|
|
|
|
+ account_ids.add(resource['account_id'])
|
|
|
|
|
+ elif hasattr(resource, 'account_id'):
|
|
|
|
|
+ account_ids.add(resource.account_id)
|
|
|
|
|
+ include_account_column = len(account_ids) > 1
|
|
|
|
|
+
|
|
|
|
|
+ # Add service tables with region info
|
|
|
|
|
+ self.add_service_tables(filtered_results, include_account_column, regions)
|
|
|
|
|
+
|
|
|
|
|
+ # Handle architecture picture placeholder
|
|
|
|
|
+ if network_diagram_path and os.path.exists(network_diagram_path):
|
|
|
|
|
+ # Replace placeholder with actual image
|
|
|
|
|
+ self.replace_architecture_picture_placeholder(network_diagram_path)
|
|
|
|
|
+ else:
|
|
|
|
|
+ # No image provided, clear the placeholder
|
|
|
|
|
+ self.clear_architecture_picture_placeholder()
|
|
|
|
|
+
|
|
|
|
|
+ # Update table of contents
|
|
|
|
|
+ self.update_table_of_contents()
|
|
|
|
|
+
|
|
|
|
|
+ # Add update history
|
|
|
|
|
+ self.add_update_history(
|
|
|
|
|
+ version='1.0',
|
|
|
|
|
+ modifier=project_metadata.get('cloud_engineer', ''),
|
|
|
|
|
+ details='Initial AWS resource inventory report'
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # Save the document
|
|
|
|
|
+ self.save(output_path)
|
|
|
|
|
+
|
|
|
|
|
+ # Get file info
|
|
|
|
|
+ file_size = self.get_file_size(output_path)
|
|
|
|
|
+ file_name = os.path.basename(output_path)
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ 'file_path': output_path,
|
|
|
|
|
+ 'file_name': file_name,
|
|
|
|
|
+ 'file_size': file_size,
|
|
|
|
|
+ 'services_included': list(filtered_results.keys()),
|
|
|
|
|
+ 'accounts_count': len(account_ids),
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def generate_report_filename(project_metadata: Dict[str, Any]) -> str:
|
|
|
|
|
+ """
|
|
|
|
|
+ Generate a report filename from project metadata.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ project_metadata: Project metadata dictionary
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ Generated filename
|
|
|
|
|
+ """
|
|
|
|
|
+ client_name = project_metadata.get('client_name', 'Client')
|
|
|
|
|
+ project_name = project_metadata.get('project_name', 'Project')
|
|
|
|
|
+ timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
|
|
|
+
|
|
|
|
|
+ # Sanitize names for filename
|
|
|
|
|
+ client_name = re.sub(r'[^\w\s-]', '', client_name).strip().replace(' ', '-')
|
|
|
|
|
+ project_name = re.sub(r'[^\w\s-]', '', project_name).strip().replace(' ', '-')
|
|
|
|
|
+
|
|
|
|
|
+ return f"{client_name}-{project_name}-Report-{timestamp}.docx"
|