| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410 |
- """
- JWT Authentication Service
- Provides JWT token generation, validation, and authentication decorators.
- """
- import jwt
- from datetime import datetime, timedelta, timezone
- from functools import wraps
- from typing import Optional, Tuple, Dict, Any
- from flask import request, current_app, g
- from app import db
- from app.models import User
- from app.errors import AuthenticationError, AuthorizationError
- class AuthService:
- """Service for handling JWT authentication"""
-
- @staticmethod
- def generate_access_token(user: User) -> str:
- """Generate a JWT access token for a user"""
- now = datetime.now(timezone.utc)
- expires = now + current_app.config['JWT_ACCESS_TOKEN_EXPIRES']
- payload = {
- 'user_id': user.id,
- 'username': user.username,
- 'role': user.role,
- 'type': 'access',
- 'exp': expires,
- 'iat': now
- }
- return jwt.encode(
- payload,
- current_app.config['JWT_SECRET_KEY'],
- algorithm='HS256'
- )
-
- @staticmethod
- def generate_refresh_token(user: User) -> str:
- """Generate a JWT refresh token for a user"""
- now = datetime.now(timezone.utc)
- expires = now + current_app.config['JWT_REFRESH_TOKEN_EXPIRES']
- payload = {
- 'user_id': user.id,
- 'type': 'refresh',
- 'exp': expires,
- 'iat': now
- }
- return jwt.encode(
- payload,
- current_app.config['JWT_SECRET_KEY'],
- algorithm='HS256'
- )
-
- @staticmethod
- def generate_tokens(user: User) -> Dict[str, str]:
- """Generate both access and refresh tokens"""
- return {
- 'access_token': AuthService.generate_access_token(user),
- 'refresh_token': AuthService.generate_refresh_token(user)
- }
-
- @staticmethod
- def decode_token(token: str) -> Dict[str, Any]:
- """
- Decode and validate a JWT token
-
- Raises:
- AuthenticationError: If token is invalid or expired
- """
- try:
- payload = jwt.decode(
- token,
- current_app.config['JWT_SECRET_KEY'],
- algorithms=['HS256']
- )
- return payload
- except jwt.ExpiredSignatureError:
- raise AuthenticationError(
- message="Token has expired",
- details={"reason": "token_expired"}
- )
- except jwt.InvalidTokenError as e:
- raise AuthenticationError(
- message="Invalid token",
- details={"reason": "invalid_token"}
- )
-
- @staticmethod
- def get_token_from_header() -> Optional[str]:
- """Extract JWT token from Authorization header"""
- auth_header = request.headers.get('Authorization', '')
- if auth_header.startswith('Bearer '):
- return auth_header[7:]
- return None
-
- @staticmethod
- def get_current_user() -> Optional[User]:
- """Get the current authenticated user from the request"""
- token = AuthService.get_token_from_header()
- if not token:
- return None
-
- try:
- payload = AuthService.decode_token(token)
- if payload.get('type') != 'access':
- return None
-
- user = db.session.get(User, payload['user_id'])
- if user and user.is_active:
- return user
- return None
- except AuthenticationError:
- return None
-
- @staticmethod
- def authenticate(username: str, password: str) -> Tuple[User, Dict[str, str]]:
- """
- Authenticate a user with username and password
-
- Returns:
- Tuple of (User, tokens dict)
-
- Raises:
- AuthenticationError: If credentials are invalid
- """
- user = User.query.filter_by(username=username).first()
-
- if not user or not user.check_password(password):
- raise AuthenticationError(
- message="Invalid username or password",
- details={"reason": "invalid_credentials"}
- )
-
- if not user.is_active:
- raise AuthenticationError(
- message="User account is disabled",
- details={"reason": "account_disabled"}
- )
-
- tokens = AuthService.generate_tokens(user)
- return user, tokens
-
- @staticmethod
- def refresh_access_token(refresh_token: str) -> Dict[str, str]:
- """
- Generate a new access token using a refresh token
-
- Returns:
- Dict with new access_token
-
- Raises:
- AuthenticationError: If refresh token is invalid
- """
- payload = AuthService.decode_token(refresh_token)
-
- if payload.get('type') != 'refresh':
- raise AuthenticationError(
- message="Invalid token type",
- details={"reason": "not_refresh_token"}
- )
-
- user = db.session.get(User, payload['user_id'])
- if not user or not user.is_active:
- raise AuthenticationError(
- message="User not found or inactive",
- details={"reason": "user_invalid"}
- )
-
- return {
- 'access_token': AuthService.generate_access_token(user)
- }
- def login_required(f):
- """Decorator to require authentication for a route"""
- @wraps(f)
- def decorated_function(*args, **kwargs):
- token = AuthService.get_token_from_header()
-
- if not token:
- raise AuthenticationError(
- message="Authentication required",
- details={"reason": "missing_token"}
- )
-
- payload = AuthService.decode_token(token)
-
- if payload.get('type') != 'access':
- raise AuthenticationError(
- message="Invalid token type",
- details={"reason": "not_access_token"}
- )
-
- user = db.session.get(User, payload['user_id'])
- if not user:
- raise AuthenticationError(
- message="User not found",
- details={"reason": "user_not_found"}
- )
-
- if not user.is_active:
- raise AuthenticationError(
- message="User account is disabled",
- details={"reason": "account_disabled"}
- )
-
- # Store user in flask g object for access in route
- g.current_user = user
- return f(*args, **kwargs)
-
- return decorated_function
- def get_current_user_from_context() -> User:
- """Get the current user from Flask g context (use after login_required)"""
- return getattr(g, 'current_user', None)
- def admin_required(f):
- """
- Decorator to require admin role for a route.
- Must be used after @login_required.
- """
- @wraps(f)
- def decorated_function(*args, **kwargs):
- token = AuthService.get_token_from_header()
-
- if not token:
- raise AuthenticationError(
- message="Authentication required",
- details={"reason": "missing_token"}
- )
-
- payload = AuthService.decode_token(token)
-
- if payload.get('type') != 'access':
- raise AuthenticationError(
- message="Invalid token type",
- details={"reason": "not_access_token"}
- )
-
- user = db.session.get(User, payload['user_id'])
- if not user:
- raise AuthenticationError(
- message="User not found",
- details={"reason": "user_not_found"}
- )
-
- if not user.is_active:
- raise AuthenticationError(
- message="User account is disabled",
- details={"reason": "account_disabled"}
- )
-
- if user.role != 'admin':
- raise AuthorizationError(
- message="Admin access required",
- details={"reason": "insufficient_permissions", "required_role": "admin"}
- )
-
- g.current_user = user
- return f(*args, **kwargs)
-
- return decorated_function
- def power_user_required(f):
- """
- Decorator to require power_user or admin role for a route.
- Must be used after @login_required.
- """
- @wraps(f)
- def decorated_function(*args, **kwargs):
- token = AuthService.get_token_from_header()
-
- if not token:
- raise AuthenticationError(
- message="Authentication required",
- details={"reason": "missing_token"}
- )
-
- payload = AuthService.decode_token(token)
-
- if payload.get('type') != 'access':
- raise AuthenticationError(
- message="Invalid token type",
- details={"reason": "not_access_token"}
- )
-
- user = db.session.get(User, payload['user_id'])
- if not user:
- raise AuthenticationError(
- message="User not found",
- details={"reason": "user_not_found"}
- )
-
- if not user.is_active:
- raise AuthenticationError(
- message="User account is disabled",
- details={"reason": "account_disabled"}
- )
-
- if user.role not in ['admin', 'power_user']:
- raise AuthorizationError(
- message="Power user or admin access required",
- details={"reason": "insufficient_permissions", "required_role": "power_user"}
- )
-
- g.current_user = user
- return f(*args, **kwargs)
-
- return decorated_function
- def check_resource_access(user: User, resource_owner_id: int) -> bool:
- """
- Check if a user has access to a resource based on ownership.
-
- - Admin: Can access all resources
- - Power User: Can access all resources
- - User: Can only access their own resources
-
- Args:
- user: The current user
- resource_owner_id: The ID of the resource owner
-
- Returns:
- True if access is allowed, False otherwise
- """
- if user.role in ['admin', 'power_user']:
- return True
- return user.id == resource_owner_id
- def check_credential_access(user: User, credential_id: int) -> bool:
- """
- Check if a user has access to a specific credential.
-
- - Admin: Can access all credentials
- - Power User: Can access all credentials
- - User: Can only access assigned credentials
-
- Args:
- user: The current user
- credential_id: The ID of the credential
-
- Returns:
- True if access is allowed, False otherwise
- """
- if user.role in ['admin', 'power_user']:
- return True
-
- # Check if credential is assigned to user
- from app.models import UserCredential
- assignment = UserCredential.query.filter_by(
- user_id=user.id,
- credential_id=credential_id
- ).first()
-
- return assignment is not None
- def get_accessible_credentials(user: User):
- """
- Get list of credentials accessible to a user.
-
- - Admin/Power User: All credentials
- - User: Only assigned credentials
-
- Args:
- user: The current user
-
- Returns:
- Query for accessible credentials
- """
- from app.models import AWSCredential, UserCredential
-
- if user.role in ['admin', 'power_user']:
- return AWSCredential.query.filter_by(is_active=True)
-
- # Get only assigned credentials for regular users
- return AWSCredential.query.join(UserCredential).filter(
- UserCredential.user_id == user.id,
- AWSCredential.is_active == True
- )
- def get_accessible_reports(user: User):
- """
- Get list of reports accessible to a user.
-
- - Admin/Power User: All reports
- - User: Only their own reports
-
- Args:
- user: The current user
-
- Returns:
- Query for accessible reports
- """
- from app.models import Report, Task
-
- if user.role in ['admin', 'power_user']:
- return Report.query
-
- # Get only reports from user's own tasks
- return Report.query.join(Task).filter(Task.created_by == user.id)
|