| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576 |
- """
- Task Management API endpoints
- Provides endpoints for:
- - GET /api/tasks - Get paginated list of tasks with status filtering
- - POST /api/tasks/create - Create a new scan task
- - GET /api/tasks/detail - Get task details
- - POST /api/tasks/delete - Delete a task
- - GET /api/tasks/logs - Get task logs with pagination
- Requirements: 3.1, 3.4
- """
- import os
- from flask import jsonify, request, current_app
- from werkzeug.utils import secure_filename
- from app import db
- from app.api import api_bp
- from app.models import Task, TaskLog, AWSCredential, UserCredential
- from app.services import login_required, admin_required, get_current_user_from_context, check_credential_access
- from app.errors import ValidationError, NotFoundError, AuthorizationError
- ALLOWED_IMAGE_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'bmp'}
- def allowed_file(filename: str) -> bool:
- """Check if file extension is allowed for network diagram"""
- return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_IMAGE_EXTENSIONS
- @api_bp.route('/tasks', methods=['GET'])
- @login_required
- def get_tasks():
- """
- Get paginated list of tasks with optional status filtering.
-
- Query Parameters:
- page: Page number (default: 1)
- page_size: Items per page (default: 20, max: 100)
- status: Optional filter by status (pending, running, completed, failed)
-
- Returns:
- JSON with 'data' array and 'pagination' object
- """
- current_user = get_current_user_from_context()
-
- # Get pagination parameters
- page = request.args.get('page', 1, type=int)
- # Support both pageSize (frontend) and page_size (backend convention)
- page_size = request.args.get('pageSize', type=int) or request.args.get('page_size', type=int) or 20
- page_size = min(page_size, 100)
- status = request.args.get('status', type=str)
-
- # Validate pagination
- if page < 1:
- page = 1
- if page_size < 1:
- page_size = 20
-
- # Build query based on user role
- if current_user.role in ['admin', 'power_user']:
- query = Task.query
- else:
- # Regular users can only see their own tasks
- query = Task.query.filter_by(created_by=current_user.id)
-
- # Apply status filter if provided
- if status and status in ['pending', 'running', 'completed', 'failed']:
- query = query.filter_by(status=status)
-
- # Order by created_at descending
- query = query.order_by(Task.created_at.desc())
-
- # Get total count
- total = query.count()
- total_pages = (total + page_size - 1) // page_size if total > 0 else 1
-
- # Apply pagination
- tasks = query.offset((page - 1) * page_size).limit(page_size).all()
-
- return jsonify({
- 'data': [task.to_dict() for task in tasks],
- 'pagination': {
- 'page': page,
- 'page_size': page_size,
- 'total': total,
- 'total_pages': total_pages
- }
- }), 200
- @api_bp.route('/tasks/create', methods=['POST'])
- @login_required
- def create_task():
- """
- Create a new scan task.
-
- Request Body (JSON or multipart/form-data):
- name: Task name (required)
- credential_ids: List of credential IDs to use (required)
- regions: List of AWS regions to scan (required)
- project_metadata: Project metadata object (required)
- - clientName: Client name (required)
- - projectName: Project name (required)
- - bdManager: BD Manager name (optional)
- - bdManagerEmail: BD Manager email (optional)
- - solutionsArchitect: Solutions Architect name (optional)
- - solutionsArchitectEmail: Solutions Architect email (optional)
- - cloudEngineer: Cloud Engineer name (optional)
- - cloudEngineerEmail: Cloud Engineer email (optional)
- network_diagram: Network diagram image file (optional, multipart only)
-
- Returns:
- JSON with created task details and task_id
- """
- current_user = get_current_user_from_context()
-
- # Handle both JSON and multipart/form-data
- if request.content_type and 'multipart/form-data' in request.content_type:
- data = request.form.to_dict()
- # Parse JSON fields from form data
- import json
- if 'credential_ids' in data:
- data['credential_ids'] = json.loads(data['credential_ids'])
- if 'regions' in data:
- data['regions'] = json.loads(data['regions'])
- if 'project_metadata' in data:
- data['project_metadata'] = json.loads(data['project_metadata'])
- network_diagram = request.files.get('network_diagram')
- else:
- data = request.get_json() or {}
- network_diagram = None
-
- # Validate required fields
- if not data.get('name'):
- raise ValidationError(
- message="Task name is required",
- details={"missing_fields": ["name"]}
- )
-
- credential_ids = data.get('credential_ids', [])
- if not credential_ids or not isinstance(credential_ids, list) or len(credential_ids) == 0:
- raise ValidationError(
- message="At least one credential must be selected",
- details={"missing_fields": ["credential_ids"]}
- )
-
- regions = data.get('regions', [])
- if not regions or not isinstance(regions, list) or len(regions) == 0:
- raise ValidationError(
- message="At least one region must be selected",
- details={"missing_fields": ["regions"]}
- )
-
- project_metadata = data.get('project_metadata', {})
- if not isinstance(project_metadata, dict):
- raise ValidationError(
- message="Project metadata must be an object",
- details={"field": "project_metadata", "reason": "invalid_type"}
- )
-
- # Validate required project metadata fields
- required_metadata = ['clientName', 'projectName']
- missing_metadata = [field for field in required_metadata if not project_metadata.get(field)]
- if missing_metadata:
- raise ValidationError(
- message="Missing required project metadata fields",
- details={"missing_fields": missing_metadata}
- )
-
- # Validate credential access for regular users
- for cred_id in credential_ids:
- if not check_credential_access(current_user, cred_id):
- raise AuthorizationError(
- message=f"Access denied to credential {cred_id}",
- details={"credential_id": cred_id, "reason": "not_assigned"}
- )
-
- # Verify credential exists and is active
- credential = db.session.get(AWSCredential, cred_id)
- if not credential:
- raise NotFoundError(
- message=f"Credential {cred_id} not found",
- details={"credential_id": cred_id}
- )
- if not credential.is_active:
- raise ValidationError(
- message=f"Credential {cred_id} is not active",
- details={"credential_id": cred_id, "reason": "inactive"}
- )
-
- # Handle network diagram upload
- network_diagram_path = None
- if network_diagram and network_diagram.filename:
- if not allowed_file(network_diagram.filename):
- raise ValidationError(
- message="Invalid file type for network diagram. Allowed: png, jpg, jpeg, gif, bmp",
- details={"field": "network_diagram", "reason": "invalid_file_type"}
- )
-
- # Save the file
- uploads_folder = current_app.config.get('UPLOAD_FOLDER', 'uploads')
- os.makedirs(uploads_folder, exist_ok=True)
-
- filename = secure_filename(network_diagram.filename)
- # Add timestamp to avoid conflicts
- import time
- filename = f"{int(time.time())}_{filename}"
- network_diagram_path = os.path.join(uploads_folder, filename)
- network_diagram.save(network_diagram_path)
-
- # Store path in project metadata
- project_metadata['network_diagram_path'] = network_diagram_path
-
- # Create task
- task = Task(
- name=data['name'].strip(),
- status='pending',
- progress=0,
- created_by=current_user.id
- )
- task.credential_ids = credential_ids
- task.regions = regions
- task.project_metadata = project_metadata
-
- db.session.add(task)
- db.session.commit()
-
- # Dispatch to Celery
- celery_task = None
-
- try:
- # 先测试Redis连接
- import redis
- print(f"🔍 测试Redis连接...")
- r = redis.Redis(host='localhost', port=6379, db=0)
- r.ping()
- print(f"✅ Redis连接成功")
-
- # 导入并初始化Celery应用
- from app.celery_app import celery_app, init_celery
- init_celery(current_app._get_current_object())
- print(f"✅ Celery初始化完成, broker: {celery_app.conf.broker_url}")
-
- # 导入Celery任务
- from app.tasks.scan_tasks import scan_aws_resources
- print(f"✅ 任务模块导入成功")
-
- # 提交任务
- print(f"🔍 提交任务到Celery队列...")
- celery_task = scan_aws_resources.delay(
- task_id=task.id,
- credential_ids=credential_ids,
- regions=regions,
- project_metadata=project_metadata
- )
- print(f"✅ 任务已提交: {celery_task.id}")
-
- except redis.ConnectionError as e:
- # Redis连接失败,删除已创建的任务并返回错误
- db.session.delete(task)
- db.session.commit()
- raise ValidationError(
- message="Redis服务不可用,无法创建任务。请确保Redis服务已启动。",
- details={"error": str(e)}
- )
- except Exception as e:
- # 其他错误
- db.session.delete(task)
- db.session.commit()
- raise ValidationError(
- message="任务提交失败",
- details={"error": str(e), "error_type": type(e).__name__}
- )
-
- # Store Celery task ID
- task.celery_task_id = celery_task.id
- db.session.commit()
-
- return jsonify({
- 'message': 'Task created successfully',
- 'task': task.to_dict(),
- 'celery_task_id': celery_task.id
- }), 201
- @api_bp.route('/tasks/detail', methods=['GET'])
- @login_required
- def get_task_detail():
- """
- Get task details including current status and progress.
-
- Query Parameters:
- id: Task ID (required)
-
- Returns:
- JSON with task details
- """
- current_user = get_current_user_from_context()
- task_id = request.args.get('id', type=int)
-
- if not task_id:
- raise ValidationError(
- message="Task ID is required",
- details={"missing_fields": ["id"]}
- )
-
- task = db.session.get(Task, task_id)
- if not task:
- raise NotFoundError(
- message="Task not found",
- details={"task_id": task_id}
- )
-
- # Check access for regular users
- if current_user.role == 'user' and task.created_by != current_user.id:
- raise AuthorizationError(
- message="Access denied",
- details={"reason": "not_owner"}
- )
-
- # Get task details with additional info
- task_dict = task.to_dict()
-
- # Add report info if available
- if task.report:
- task_dict['report'] = task.report.to_dict()
-
- # Add error count
- error_count = TaskLog.query.filter_by(task_id=task_id, level='error').count()
- task_dict['error_count'] = error_count
-
- # Get Celery task status if running
- if task.status == 'running' and task.celery_task_id:
- from celery.result import AsyncResult
- from app.celery_app import celery_app
-
- result = AsyncResult(task.celery_task_id, app=celery_app)
- if result.state == 'PROGRESS':
- task_dict['celery_progress'] = result.info
-
- return jsonify(task_dict), 200
- @api_bp.route('/tasks/delete', methods=['POST'])
- @login_required
- def delete_task():
- """
- Delete a task and its associated logs and report.
-
- Request Body:
- id: Task ID (required)
-
- Returns:
- JSON with success message
- """
- current_user = get_current_user_from_context()
- data = request.get_json() or {}
- task_id = data.get('id')
-
- if not task_id:
- raise ValidationError(
- message="Task ID is required",
- details={"missing_fields": ["id"]}
- )
-
- task = db.session.get(Task, task_id)
- if not task:
- raise NotFoundError(
- message="Task not found",
- details={"task_id": task_id}
- )
-
- # Check access - only admin or task owner can delete
- if current_user.role != 'admin' and task.created_by != current_user.id:
- raise AuthorizationError(
- message="Access denied",
- details={"reason": "not_owner_or_admin"}
- )
-
- # Cannot delete running tasks
- if task.status == 'running':
- raise ValidationError(
- message="Cannot delete a running task",
- details={"task_id": task_id, "status": task.status}
- )
-
- # Delete associated report file if exists
- if task.report and task.report.file_path:
- try:
- if os.path.exists(task.report.file_path):
- os.remove(task.report.file_path)
- except OSError:
- pass # File may already be deleted
-
- # Delete task (cascade will handle logs and report)
- db.session.delete(task)
- db.session.commit()
-
- return jsonify({
- 'message': 'Task deleted successfully'
- }), 200
- @api_bp.route('/tasks/logs', methods=['GET'])
- @login_required
- def get_task_logs():
- """
- Get paginated task logs.
-
- Query Parameters:
- id: Task ID (required)
- page: Page number (default: 1)
- page_size: Items per page (default: 20, max: 100)
- level: Optional filter by log level (info, warning, error)
-
- Returns:
- JSON with 'data' array and 'pagination' object
-
- Requirements:
- - 8.3: Display error logs associated with task
- """
- current_user = get_current_user_from_context()
- task_id = request.args.get('id', type=int)
-
- if not task_id:
- raise ValidationError(
- message="Task ID is required",
- details={"missing_fields": ["id"]}
- )
-
- task = db.session.get(Task, task_id)
- if not task:
- raise NotFoundError(
- message="Task not found",
- details={"task_id": task_id}
- )
-
- # Check access for regular users
- if current_user.role == 'user' and task.created_by != current_user.id:
- raise AuthorizationError(
- message="Access denied",
- details={"reason": "not_owner"}
- )
-
- # Get pagination parameters
- page = request.args.get('page', 1, type=int)
- # Support both pageSize (frontend) and page_size (backend convention)
- page_size = request.args.get('pageSize', type=int) or request.args.get('page_size', type=int) or 20
- page_size = min(page_size, 100)
- level = request.args.get('level', type=str)
-
- # Validate pagination
- if page < 1:
- page = 1
- if page_size < 1:
- page_size = 20
-
- # Build query
- query = TaskLog.query.filter_by(task_id=task_id)
-
- # Apply level filter if provided
- if level and level in ['info', 'warning', 'error']:
- query = query.filter_by(level=level)
-
- # Order by created_at descending
- query = query.order_by(TaskLog.created_at.desc())
-
- # Get total count
- total = query.count()
- total_pages = (total + page_size - 1) // page_size if total > 0 else 1
-
- # Apply pagination
- logs = query.offset((page - 1) * page_size).limit(page_size).all()
-
- return jsonify({
- 'data': [log.to_dict() for log in logs],
- 'pagination': {
- 'page': page,
- 'page_size': page_size,
- 'total': total,
- 'total_pages': total_pages
- }
- }), 200
- @api_bp.route('/tasks/errors', methods=['GET'])
- @login_required
- def get_task_errors():
- """
- Get error logs for a specific task.
-
- This is a convenience endpoint that returns only error-level logs
- with full details including stack traces.
-
- Query Parameters:
- id: Task ID (required)
- page: Page number (default: 1)
- page_size: Items per page (default: 20, max: 100)
-
- Returns:
- JSON with 'data' array containing error logs and 'pagination' object
-
- Requirements:
- - 8.2: Record error details in task record
- - 8.3: Display error logs associated with task
- """
- current_user = get_current_user_from_context()
- task_id = request.args.get('id', type=int)
-
- if not task_id:
- raise ValidationError(
- message="Task ID is required",
- details={"missing_fields": ["id"]}
- )
-
- task = db.session.get(Task, task_id)
- if not task:
- raise NotFoundError(
- message="Task not found",
- details={"task_id": task_id}
- )
-
- # Check access for regular users
- if current_user.role == 'user' and task.created_by != current_user.id:
- raise AuthorizationError(
- message="Access denied",
- details={"reason": "not_owner"}
- )
-
- # Get pagination parameters
- page = request.args.get('page', 1, type=int)
- # Support both pageSize (frontend) and page_size (backend convention)
- page_size = request.args.get('pageSize', type=int) or request.args.get('page_size', type=int) or 20
- page_size = min(page_size, 100)
-
- # Validate pagination
- if page < 1:
- page = 1
- if page_size < 1:
- page_size = 20
-
- # Build query for error logs only
- query = TaskLog.query.filter_by(task_id=task_id, level='error')
-
- # Order by created_at descending
- query = query.order_by(TaskLog.created_at.desc())
-
- # Get total count
- total = query.count()
- total_pages = (total + page_size - 1) // page_size if total > 0 else 1
-
- # Apply pagination
- logs = query.offset((page - 1) * page_size).limit(page_size).all()
-
- # Build response with full error details
- error_data = []
- for log in logs:
- log_dict = log.to_dict()
- # Ensure details are included for error analysis
- error_data.append(log_dict)
-
- return jsonify({
- 'data': error_data,
- 'pagination': {
- 'page': page,
- 'page_size': page_size,
- 'total': total,
- 'total_pages': total_pages
- },
- 'summary': {
- 'total_errors': total,
- 'task_status': task.status
- }
- }), 200
|