admin_service.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. """Admin service for business logic operations."""
  2. from flask_bcrypt import Bcrypt
  3. from app import db
  4. from app.models.admin import Admin
  5. bcrypt = Bcrypt()
  6. class AdminService:
  7. """Service class for Admin CRUD operations."""
  8. MIN_PASSWORD_LENGTH = 6
  9. @staticmethod
  10. def _hash_password(password):
  11. """Hash a password using bcrypt.
  12. Args:
  13. password: Plain text password
  14. Returns:
  15. Hashed password string
  16. """
  17. return bcrypt.generate_password_hash(password).decode('utf-8')
  18. @staticmethod
  19. def _check_password(password, password_hash):
  20. """Verify a password against its hash.
  21. Args:
  22. password: Plain text password
  23. password_hash: Bcrypt hashed password
  24. Returns:
  25. True if password matches, False otherwise
  26. """
  27. return bcrypt.check_password_hash(password_hash, password)
  28. @staticmethod
  29. def _validate_username(username):
  30. """Validate username is not empty or whitespace only.
  31. Args:
  32. username: Username to validate
  33. Returns:
  34. Tuple of (is_valid, error_message)
  35. """
  36. if not username or not username.strip():
  37. return False, "用户名不能为空"
  38. return True, None
  39. @staticmethod
  40. def _validate_password(password):
  41. """Validate password meets minimum length requirement.
  42. Args:
  43. password: Password to validate
  44. Returns:
  45. Tuple of (is_valid, error_message)
  46. """
  47. if not password or len(password) < AdminService.MIN_PASSWORD_LENGTH:
  48. return False, f"密码长度至少为 {AdminService.MIN_PASSWORD_LENGTH} 个字符"
  49. return True, None
  50. @staticmethod
  51. def _check_username_unique(username, exclude_id=None):
  52. """Check if username is unique.
  53. Args:
  54. username: Username to check
  55. exclude_id: Admin ID to exclude from check (for updates)
  56. Returns:
  57. Tuple of (is_unique, error_message)
  58. """
  59. query = Admin.query.filter(Admin.username == username.strip())
  60. if exclude_id:
  61. query = query.filter(Admin.id != exclude_id)
  62. existing = query.first()
  63. if existing:
  64. return False, "用户名已存在"
  65. return True, None
  66. @staticmethod
  67. def create(username, password):
  68. """Create a new admin.
  69. Args:
  70. username: Admin's username
  71. password: Admin's password (plain text)
  72. Returns:
  73. Tuple of (admin_dict, error_message)
  74. On success: (admin_dict, None)
  75. On failure: (None, error_message)
  76. """
  77. # Validate username
  78. is_valid, error = AdminService._validate_username(username)
  79. if not is_valid:
  80. return None, error
  81. # Validate password
  82. is_valid, error = AdminService._validate_password(password)
  83. if not is_valid:
  84. return None, error
  85. # Check username uniqueness
  86. is_unique, error = AdminService._check_username_unique(username)
  87. if not is_unique:
  88. return None, error
  89. # Create admin with hashed password
  90. password_hash = AdminService._hash_password(password)
  91. admin = Admin(
  92. username=username.strip(),
  93. password_hash=password_hash
  94. )
  95. db.session.add(admin)
  96. db.session.commit()
  97. return admin.to_dict(), None
  98. @staticmethod
  99. def get_all():
  100. """Get all admins (without password_hash).
  101. Returns:
  102. List of admin dictionaries
  103. """
  104. admins = Admin.query.order_by(Admin.id).all()
  105. return [a.to_dict(include_password_hash=False) for a in admins]
  106. @staticmethod
  107. def get_by_id(admin_id):
  108. """Get an admin by ID.
  109. Args:
  110. admin_id: Admin's ID
  111. Returns:
  112. Tuple of (admin_dict, error_message)
  113. On success: (admin_dict, None)
  114. On failure: (None, error_message)
  115. """
  116. admin = db.session.get(Admin, admin_id)
  117. if not admin:
  118. return None, f"未找到ID为 {admin_id} 的管理员"
  119. return admin.to_dict(include_password_hash=False), None
  120. @staticmethod
  121. def get_by_username(username):
  122. """Get an admin by username.
  123. Args:
  124. username: Admin's username
  125. Returns:
  126. Admin model instance or None
  127. """
  128. if not username:
  129. return None
  130. return Admin.query.filter(Admin.username == username.strip()).first()
  131. @staticmethod
  132. def verify_credentials(username, password):
  133. """Verify admin credentials.
  134. Args:
  135. username: Admin's username
  136. password: Admin's password (plain text)
  137. Returns:
  138. Admin model instance if valid, None otherwise
  139. """
  140. admin = AdminService.get_by_username(username)
  141. if not admin:
  142. return None
  143. if AdminService._check_password(password, admin.password_hash):
  144. return admin
  145. return None
  146. @staticmethod
  147. def update(admin_id, username=None, password=None):
  148. """Update an admin's information.
  149. Args:
  150. admin_id: Admin's ID
  151. username: New username (optional)
  152. password: New password (optional)
  153. Returns:
  154. Tuple of (admin_dict, error_message)
  155. On success: (admin_dict, None)
  156. On failure: (None, error_message)
  157. """
  158. admin = db.session.get(Admin, admin_id)
  159. if not admin:
  160. return None, f"未找到ID为 {admin_id} 的管理员"
  161. # Update username if provided
  162. if username is not None:
  163. is_valid, error = AdminService._validate_username(username)
  164. if not is_valid:
  165. return None, error
  166. is_unique, error = AdminService._check_username_unique(username, exclude_id=admin_id)
  167. if not is_unique:
  168. return None, error
  169. admin.username = username.strip()
  170. # Update password if provided
  171. if password is not None:
  172. is_valid, error = AdminService._validate_password(password)
  173. if not is_valid:
  174. return None, error
  175. admin.password_hash = AdminService._hash_password(password)
  176. db.session.commit()
  177. return admin.to_dict(include_password_hash=False), None
  178. @staticmethod
  179. def delete(admin_id, current_admin_id=None):
  180. """Delete an admin.
  181. Args:
  182. admin_id: Admin's ID
  183. current_admin_id: Current logged-in admin's ID (optional)
  184. Returns:
  185. Tuple of (success, error_message)
  186. On success: (True, None)
  187. On failure: (False, error_message)
  188. """
  189. # Protect the primary admin (ID=1)
  190. if admin_id == 1:
  191. return False, "无法删除主管理员账户"
  192. # Prevent self-deletion
  193. if current_admin_id and admin_id == current_admin_id:
  194. return False, "不能删除自己的账户"
  195. admin = db.session.get(Admin, admin_id)
  196. if not admin:
  197. return False, f"未找到ID为 {admin_id} 的管理员"
  198. # Check if this is the last admin
  199. admin_count = Admin.query.count()
  200. if admin_count <= 1:
  201. return False, "无法删除最后一个管理员账户"
  202. db.session.delete(admin)
  203. db.session.commit()
  204. return True, None
  205. @staticmethod
  206. def get_count():
  207. """Get the total number of admins.
  208. Returns:
  209. Integer count of admins
  210. """
  211. return Admin.query.count()
  212. @staticmethod
  213. def create_default_admin(username='admin', password='admin123'):
  214. """Create a default admin account if no admins exist.
  215. This method is called at application startup to ensure there is
  216. at least one admin account available for login.
  217. Args:
  218. username: Default admin username (default: 'admin')
  219. password: Default admin password (default: 'admin123')
  220. Returns:
  221. Tuple of (created, message)
  222. - (True, message) if admin was created
  223. - (False, message) if admin already exists or creation failed
  224. """
  225. # Check if any admins exist
  226. admin_count = Admin.query.count()
  227. if admin_count > 0:
  228. return False, "Admin accounts already exist, skipping default admin creation"
  229. # Create the default admin
  230. admin_dict, error = AdminService.create(username, password)
  231. if error:
  232. return False, f"Failed to create default admin: {error}"
  233. return True, f"Default admin '{username}' created successfully"