""" JWT Authentication Service Provides JWT token generation, validation, and authentication decorators. """ import jwt from datetime import datetime, timedelta, timezone from functools import wraps from typing import Optional, Tuple, Dict, Any from flask import request, current_app, g from app import db from app.models import User from app.errors import AuthenticationError, AuthorizationError class AuthService: """Service for handling JWT authentication""" @staticmethod def generate_access_token(user: User) -> str: """Generate a JWT access token for a user""" now = datetime.now(timezone.utc) expires = now + current_app.config['JWT_ACCESS_TOKEN_EXPIRES'] payload = { 'user_id': user.id, 'username': user.username, 'role': user.role, 'type': 'access', 'exp': expires, 'iat': now } return jwt.encode( payload, current_app.config['JWT_SECRET_KEY'], algorithm='HS256' ) @staticmethod def generate_refresh_token(user: User) -> str: """Generate a JWT refresh token for a user""" now = datetime.now(timezone.utc) expires = now + current_app.config['JWT_REFRESH_TOKEN_EXPIRES'] payload = { 'user_id': user.id, 'type': 'refresh', 'exp': expires, 'iat': now } return jwt.encode( payload, current_app.config['JWT_SECRET_KEY'], algorithm='HS256' ) @staticmethod def generate_tokens(user: User) -> Dict[str, str]: """Generate both access and refresh tokens""" return { 'access_token': AuthService.generate_access_token(user), 'refresh_token': AuthService.generate_refresh_token(user) } @staticmethod def decode_token(token: str) -> Dict[str, Any]: """ Decode and validate a JWT token Raises: AuthenticationError: If token is invalid or expired """ try: payload = jwt.decode( token, current_app.config['JWT_SECRET_KEY'], algorithms=['HS256'] ) return payload except jwt.ExpiredSignatureError: raise AuthenticationError( message="Token has expired", details={"reason": "token_expired"} ) except jwt.InvalidTokenError as e: raise AuthenticationError( message="Invalid token", details={"reason": "invalid_token"} ) @staticmethod def get_token_from_header() -> Optional[str]: """Extract JWT token from Authorization header""" auth_header = request.headers.get('Authorization', '') if auth_header.startswith('Bearer '): return auth_header[7:] return None @staticmethod def get_current_user() -> Optional[User]: """Get the current authenticated user from the request""" token = AuthService.get_token_from_header() if not token: return None try: payload = AuthService.decode_token(token) if payload.get('type') != 'access': return None user = db.session.get(User, payload['user_id']) if user and user.is_active: return user return None except AuthenticationError: return None @staticmethod def authenticate(username: str, password: str) -> Tuple[User, Dict[str, str]]: """ Authenticate a user with username and password Returns: Tuple of (User, tokens dict) Raises: AuthenticationError: If credentials are invalid """ user = User.query.filter_by(username=username).first() if not user or not user.check_password(password): raise AuthenticationError( message="Invalid username or password", details={"reason": "invalid_credentials"} ) if not user.is_active: raise AuthenticationError( message="User account is disabled", details={"reason": "account_disabled"} ) tokens = AuthService.generate_tokens(user) return user, tokens @staticmethod def refresh_access_token(refresh_token: str) -> Dict[str, str]: """ Generate a new access token using a refresh token Returns: Dict with new access_token Raises: AuthenticationError: If refresh token is invalid """ payload = AuthService.decode_token(refresh_token) if payload.get('type') != 'refresh': raise AuthenticationError( message="Invalid token type", details={"reason": "not_refresh_token"} ) user = db.session.get(User, payload['user_id']) if not user or not user.is_active: raise AuthenticationError( message="User not found or inactive", details={"reason": "user_invalid"} ) return { 'access_token': AuthService.generate_access_token(user) } def login_required(f): """Decorator to require authentication for a route""" @wraps(f) def decorated_function(*args, **kwargs): token = AuthService.get_token_from_header() if not token: raise AuthenticationError( message="Authentication required", details={"reason": "missing_token"} ) payload = AuthService.decode_token(token) if payload.get('type') != 'access': raise AuthenticationError( message="Invalid token type", details={"reason": "not_access_token"} ) user = db.session.get(User, payload['user_id']) if not user: raise AuthenticationError( message="User not found", details={"reason": "user_not_found"} ) if not user.is_active: raise AuthenticationError( message="User account is disabled", details={"reason": "account_disabled"} ) # Store user in flask g object for access in route g.current_user = user return f(*args, **kwargs) return decorated_function def get_current_user_from_context() -> User: """Get the current user from Flask g context (use after login_required)""" return getattr(g, 'current_user', None) def admin_required(f): """ Decorator to require admin role for a route. Must be used after @login_required. """ @wraps(f) def decorated_function(*args, **kwargs): token = AuthService.get_token_from_header() if not token: raise AuthenticationError( message="Authentication required", details={"reason": "missing_token"} ) payload = AuthService.decode_token(token) if payload.get('type') != 'access': raise AuthenticationError( message="Invalid token type", details={"reason": "not_access_token"} ) user = db.session.get(User, payload['user_id']) if not user: raise AuthenticationError( message="User not found", details={"reason": "user_not_found"} ) if not user.is_active: raise AuthenticationError( message="User account is disabled", details={"reason": "account_disabled"} ) if user.role != 'admin': raise AuthorizationError( message="Admin access required", details={"reason": "insufficient_permissions", "required_role": "admin"} ) g.current_user = user return f(*args, **kwargs) return decorated_function def power_user_required(f): """ Decorator to require power_user or admin role for a route. Must be used after @login_required. """ @wraps(f) def decorated_function(*args, **kwargs): token = AuthService.get_token_from_header() if not token: raise AuthenticationError( message="Authentication required", details={"reason": "missing_token"} ) payload = AuthService.decode_token(token) if payload.get('type') != 'access': raise AuthenticationError( message="Invalid token type", details={"reason": "not_access_token"} ) user = db.session.get(User, payload['user_id']) if not user: raise AuthenticationError( message="User not found", details={"reason": "user_not_found"} ) if not user.is_active: raise AuthenticationError( message="User account is disabled", details={"reason": "account_disabled"} ) if user.role not in ['admin', 'power_user']: raise AuthorizationError( message="Power user or admin access required", details={"reason": "insufficient_permissions", "required_role": "power_user"} ) g.current_user = user return f(*args, **kwargs) return decorated_function def check_resource_access(user: User, resource_owner_id: int) -> bool: """ Check if a user has access to a resource based on ownership. - Admin: Can access all resources - Power User: Can access all resources - User: Can only access their own resources Args: user: The current user resource_owner_id: The ID of the resource owner Returns: True if access is allowed, False otherwise """ if user.role in ['admin', 'power_user']: return True return user.id == resource_owner_id def check_credential_access(user: User, credential_id: int) -> bool: """ Check if a user has access to a specific credential. - Admin: Can access all credentials - Power User: Can access all credentials - User: Can only access assigned credentials Args: user: The current user credential_id: The ID of the credential Returns: True if access is allowed, False otherwise """ if user.role in ['admin', 'power_user']: return True # Check if credential is assigned to user from app.models import UserCredential assignment = UserCredential.query.filter_by( user_id=user.id, credential_id=credential_id ).first() return assignment is not None def get_accessible_credentials(user: User): """ Get list of credentials accessible to a user. - Admin/Power User: All credentials - User: Only assigned credentials Args: user: The current user Returns: Query for accessible credentials """ from app.models import AWSCredential, UserCredential if user.role in ['admin', 'power_user']: return AWSCredential.query.filter_by(is_active=True) # Get only assigned credentials for regular users return AWSCredential.query.join(UserCredential).filter( UserCredential.user_id == user.id, AWSCredential.is_active == True ) def get_accessible_reports(user: User): """ Get list of reports accessible to a user. - Admin/Power User: All reports - User: Only their own reports Args: user: The current user Returns: Query for accessible reports """ from app.models import Report, Task if user.role in ['admin', 'power_user']: return Report.query # Get only reports from user's own tasks return Report.query.join(Task).filter(Task.created_by == user.id)