"""Admin service for business logic operations.""" from flask_bcrypt import Bcrypt from app import db from app.models.admin import Admin bcrypt = Bcrypt() class AdminService: """Service class for Admin CRUD operations.""" MIN_PASSWORD_LENGTH = 6 @staticmethod def _hash_password(password): """Hash a password using bcrypt. Args: password: Plain text password Returns: Hashed password string """ return bcrypt.generate_password_hash(password).decode('utf-8') @staticmethod def _check_password(password, password_hash): """Verify a password against its hash. Args: password: Plain text password password_hash: Bcrypt hashed password Returns: True if password matches, False otherwise """ return bcrypt.check_password_hash(password_hash, password) @staticmethod def _validate_username(username): """Validate username is not empty or whitespace only. Args: username: Username to validate Returns: Tuple of (is_valid, error_message) """ if not username or not username.strip(): return False, "用户名不能为空" return True, None @staticmethod def _validate_password(password): """Validate password meets minimum length requirement. Args: password: Password to validate Returns: Tuple of (is_valid, error_message) """ if not password or len(password) < AdminService.MIN_PASSWORD_LENGTH: return False, f"密码长度至少为 {AdminService.MIN_PASSWORD_LENGTH} 个字符" return True, None @staticmethod def _check_username_unique(username, exclude_id=None): """Check if username is unique. Args: username: Username to check exclude_id: Admin ID to exclude from check (for updates) Returns: Tuple of (is_unique, error_message) """ query = Admin.query.filter(Admin.username == username.strip()) if exclude_id: query = query.filter(Admin.id != exclude_id) existing = query.first() if existing: return False, "用户名已存在" return True, None @staticmethod def create(username, password): """Create a new admin. Args: username: Admin's username password: Admin's password (plain text) Returns: Tuple of (admin_dict, error_message) On success: (admin_dict, None) On failure: (None, error_message) """ # Validate username is_valid, error = AdminService._validate_username(username) if not is_valid: return None, error # Validate password is_valid, error = AdminService._validate_password(password) if not is_valid: return None, error # Check username uniqueness is_unique, error = AdminService._check_username_unique(username) if not is_unique: return None, error # Create admin with hashed password password_hash = AdminService._hash_password(password) admin = Admin( username=username.strip(), password_hash=password_hash ) db.session.add(admin) db.session.commit() return admin.to_dict(), None @staticmethod def get_all(): """Get all admins (without password_hash). Returns: List of admin dictionaries """ admins = Admin.query.order_by(Admin.id).all() return [a.to_dict(include_password_hash=False) for a in admins] @staticmethod def get_by_id(admin_id): """Get an admin by ID. Args: admin_id: Admin's ID Returns: Tuple of (admin_dict, error_message) On success: (admin_dict, None) On failure: (None, error_message) """ admin = db.session.get(Admin, admin_id) if not admin: return None, f"未找到ID为 {admin_id} 的管理员" return admin.to_dict(include_password_hash=False), None @staticmethod def get_by_username(username): """Get an admin by username. Args: username: Admin's username Returns: Admin model instance or None """ if not username: return None return Admin.query.filter(Admin.username == username.strip()).first() @staticmethod def verify_credentials(username, password): """Verify admin credentials. Args: username: Admin's username password: Admin's password (plain text) Returns: Admin model instance if valid, None otherwise """ admin = AdminService.get_by_username(username) if not admin: return None if AdminService._check_password(password, admin.password_hash): return admin return None @staticmethod def update(admin_id, username=None, password=None): """Update an admin's information. Args: admin_id: Admin's ID username: New username (optional) password: New password (optional) Returns: Tuple of (admin_dict, error_message) On success: (admin_dict, None) On failure: (None, error_message) """ admin = db.session.get(Admin, admin_id) if not admin: return None, f"未找到ID为 {admin_id} 的管理员" # Update username if provided if username is not None: is_valid, error = AdminService._validate_username(username) if not is_valid: return None, error is_unique, error = AdminService._check_username_unique(username, exclude_id=admin_id) if not is_unique: return None, error admin.username = username.strip() # Update password if provided if password is not None: is_valid, error = AdminService._validate_password(password) if not is_valid: return None, error admin.password_hash = AdminService._hash_password(password) db.session.commit() return admin.to_dict(include_password_hash=False), None @staticmethod def delete(admin_id, current_admin_id=None): """Delete an admin. Args: admin_id: Admin's ID current_admin_id: Current logged-in admin's ID (optional) Returns: Tuple of (success, error_message) On success: (True, None) On failure: (False, error_message) """ # Protect the primary admin (ID=1) if admin_id == 1: return False, "无法删除主管理员账户" # Prevent self-deletion if current_admin_id and admin_id == current_admin_id: return False, "不能删除自己的账户" admin = db.session.get(Admin, admin_id) if not admin: return False, f"未找到ID为 {admin_id} 的管理员" # Check if this is the last admin admin_count = Admin.query.count() if admin_count <= 1: return False, "无法删除最后一个管理员账户" db.session.delete(admin) db.session.commit() return True, None @staticmethod def get_count(): """Get the total number of admins. Returns: Integer count of admins """ return Admin.query.count() @staticmethod def create_default_admin(username='admin', password='admin123'): """Create a default admin account if no admins exist. This method is called at application startup to ensure there is at least one admin account available for login. Args: username: Default admin username (default: 'admin') password: Default admin password (default: 'admin123') Returns: Tuple of (created, message) - (True, message) if admin was created - (False, message) if admin already exists or creation failed """ # Check if any admins exist admin_count = Admin.query.count() if admin_count > 0: return False, "Admin accounts already exist, skipping default admin creation" # Create the default admin admin_dict, error = AdminService.create(username, password) if error: return False, f"Failed to create default admin: {error}" return True, f"Default admin '{username}' created successfully"