| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299 |
- """Admin service for business logic operations."""
- from flask_bcrypt import Bcrypt
- from app import db
- from app.models.admin import Admin
- bcrypt = Bcrypt()
- class AdminService:
- """Service class for Admin CRUD operations."""
-
- MIN_PASSWORD_LENGTH = 6
-
- @staticmethod
- def _hash_password(password):
- """Hash a password using bcrypt.
-
- Args:
- password: Plain text password
-
- Returns:
- Hashed password string
- """
- return bcrypt.generate_password_hash(password).decode('utf-8')
-
- @staticmethod
- def _check_password(password, password_hash):
- """Verify a password against its hash.
-
- Args:
- password: Plain text password
- password_hash: Bcrypt hashed password
-
- Returns:
- True if password matches, False otherwise
- """
- return bcrypt.check_password_hash(password_hash, password)
-
- @staticmethod
- def _validate_username(username):
- """Validate username is not empty or whitespace only.
-
- Args:
- username: Username to validate
-
- Returns:
- Tuple of (is_valid, error_message)
- """
- if not username or not username.strip():
- return False, "用户名不能为空"
- return True, None
-
- @staticmethod
- def _validate_password(password):
- """Validate password meets minimum length requirement.
-
- Args:
- password: Password to validate
-
- Returns:
- Tuple of (is_valid, error_message)
- """
- if not password or len(password) < AdminService.MIN_PASSWORD_LENGTH:
- return False, f"密码长度至少为 {AdminService.MIN_PASSWORD_LENGTH} 个字符"
- return True, None
-
- @staticmethod
- def _check_username_unique(username, exclude_id=None):
- """Check if username is unique.
-
- Args:
- username: Username to check
- exclude_id: Admin ID to exclude from check (for updates)
-
- Returns:
- Tuple of (is_unique, error_message)
- """
- query = Admin.query.filter(Admin.username == username.strip())
- if exclude_id:
- query = query.filter(Admin.id != exclude_id)
-
- existing = query.first()
- if existing:
- return False, "用户名已存在"
- return True, None
-
- @staticmethod
- def create(username, password):
- """Create a new admin.
-
- Args:
- username: Admin's username
- password: Admin's password (plain text)
-
- Returns:
- Tuple of (admin_dict, error_message)
- On success: (admin_dict, None)
- On failure: (None, error_message)
- """
- # Validate username
- is_valid, error = AdminService._validate_username(username)
- if not is_valid:
- return None, error
-
- # Validate password
- is_valid, error = AdminService._validate_password(password)
- if not is_valid:
- return None, error
-
- # Check username uniqueness
- is_unique, error = AdminService._check_username_unique(username)
- if not is_unique:
- return None, error
-
- # Create admin with hashed password
- password_hash = AdminService._hash_password(password)
- admin = Admin(
- username=username.strip(),
- password_hash=password_hash
- )
- db.session.add(admin)
- db.session.commit()
-
- return admin.to_dict(), None
-
- @staticmethod
- def get_all():
- """Get all admins (without password_hash).
-
- Returns:
- List of admin dictionaries
- """
- admins = Admin.query.order_by(Admin.id).all()
- return [a.to_dict(include_password_hash=False) for a in admins]
-
- @staticmethod
- def get_by_id(admin_id):
- """Get an admin by ID.
-
- Args:
- admin_id: Admin's ID
-
- Returns:
- Tuple of (admin_dict, error_message)
- On success: (admin_dict, None)
- On failure: (None, error_message)
- """
- admin = db.session.get(Admin, admin_id)
- if not admin:
- return None, f"未找到ID为 {admin_id} 的管理员"
-
- return admin.to_dict(include_password_hash=False), None
-
- @staticmethod
- def get_by_username(username):
- """Get an admin by username.
-
- Args:
- username: Admin's username
-
- Returns:
- Admin model instance or None
- """
- if not username:
- return None
- return Admin.query.filter(Admin.username == username.strip()).first()
-
- @staticmethod
- def verify_credentials(username, password):
- """Verify admin credentials.
-
- Args:
- username: Admin's username
- password: Admin's password (plain text)
-
- Returns:
- Admin model instance if valid, None otherwise
- """
- admin = AdminService.get_by_username(username)
- if not admin:
- return None
-
- if AdminService._check_password(password, admin.password_hash):
- return admin
- return None
-
- @staticmethod
- def update(admin_id, username=None, password=None):
- """Update an admin's information.
-
- Args:
- admin_id: Admin's ID
- username: New username (optional)
- password: New password (optional)
-
- Returns:
- Tuple of (admin_dict, error_message)
- On success: (admin_dict, None)
- On failure: (None, error_message)
- """
- admin = db.session.get(Admin, admin_id)
- if not admin:
- return None, f"未找到ID为 {admin_id} 的管理员"
-
- # Update username if provided
- if username is not None:
- is_valid, error = AdminService._validate_username(username)
- if not is_valid:
- return None, error
-
- is_unique, error = AdminService._check_username_unique(username, exclude_id=admin_id)
- if not is_unique:
- return None, error
-
- admin.username = username.strip()
-
- # Update password if provided
- if password is not None:
- is_valid, error = AdminService._validate_password(password)
- if not is_valid:
- return None, error
-
- admin.password_hash = AdminService._hash_password(password)
-
- db.session.commit()
-
- return admin.to_dict(include_password_hash=False), None
-
- @staticmethod
- def delete(admin_id, current_admin_id=None):
- """Delete an admin.
-
- Args:
- admin_id: Admin's ID
- current_admin_id: Current logged-in admin's ID (optional)
-
- Returns:
- Tuple of (success, error_message)
- On success: (True, None)
- On failure: (False, error_message)
- """
- # Protect the primary admin (ID=1)
- if admin_id == 1:
- return False, "无法删除主管理员账户"
-
- # Prevent self-deletion
- if current_admin_id and admin_id == current_admin_id:
- return False, "不能删除自己的账户"
-
- admin = db.session.get(Admin, admin_id)
- if not admin:
- return False, f"未找到ID为 {admin_id} 的管理员"
-
- # Check if this is the last admin
- admin_count = Admin.query.count()
- if admin_count <= 1:
- return False, "无法删除最后一个管理员账户"
-
- db.session.delete(admin)
- db.session.commit()
-
- return True, None
-
- @staticmethod
- def get_count():
- """Get the total number of admins.
-
- Returns:
- Integer count of admins
- """
- return Admin.query.count()
-
- @staticmethod
- def create_default_admin(username='admin', password='admin123'):
- """Create a default admin account if no admins exist.
-
- This method is called at application startup to ensure there is
- at least one admin account available for login.
-
- Args:
- username: Default admin username (default: 'admin')
- password: Default admin password (default: 'admin123')
-
- Returns:
- Tuple of (created, message)
- - (True, message) if admin was created
- - (False, message) if admin already exists or creation failed
- """
- # Check if any admins exist
- admin_count = Admin.query.count()
- if admin_count > 0:
- return False, "Admin accounts already exist, skipping default admin creation"
-
- # Create the default admin
- admin_dict, error = AdminService.create(username, password)
- if error:
- return False, f"Failed to create default admin: {error}"
-
- return True, f"Default admin '{username}' created successfully"
|