auth.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. """Authentication API routes."""
  2. from flask_restx import Namespace, Resource, fields
  3. from flask import request
  4. from flask_bcrypt import Bcrypt
  5. from app.models.admin import Admin
  6. from app.services.auth_service import AuthService
  7. from app.utils.auth_decorator import require_auth
  8. auth_ns = Namespace('auth', description='认证接口')
  9. # Initialize bcrypt
  10. bcrypt = Bcrypt()
  11. # API models for Swagger documentation
  12. login_input = auth_ns.model('LoginInput', {
  13. 'username': fields.String(required=True, description='用户名'),
  14. 'password': fields.String(required=True, description='密码')
  15. })
  16. admin_info = auth_ns.model('AdminInfo', {
  17. 'id': fields.Integer(description='管理员ID'),
  18. 'username': fields.String(description='用户名')
  19. })
  20. login_data = auth_ns.model('LoginData', {
  21. 'token': fields.String(description='JWT令牌'),
  22. 'admin': fields.Nested(admin_info, description='管理员信息')
  23. })
  24. login_response = auth_ns.model('LoginResponse', {
  25. 'success': fields.Boolean(description='操作是否成功'),
  26. 'data': fields.Nested(login_data, description='登录数据')
  27. })
  28. me_response = auth_ns.model('MeResponse', {
  29. 'success': fields.Boolean(description='操作是否成功'),
  30. 'data': fields.Nested(admin_info, description='当前管理员信息')
  31. })
  32. error_response = auth_ns.model('AuthErrorResponse', {
  33. 'success': fields.Boolean(description='操作是否成功'),
  34. 'error': fields.String(description='错误信息'),
  35. 'code': fields.String(description='错误代码')
  36. })
  37. @auth_ns.route('/login')
  38. class Login(Resource):
  39. """Resource for user login."""
  40. @auth_ns.doc('login')
  41. @auth_ns.expect(login_input)
  42. @auth_ns.response(200, 'Success', login_response)
  43. @auth_ns.response(401, 'Authentication failed', error_response)
  44. def post(self):
  45. """用户登录,返回JWT令牌"""
  46. data = auth_ns.payload or {}
  47. username = data.get('username', '').strip()
  48. password = data.get('password', '')
  49. # Validate input
  50. if not username or not password:
  51. return {
  52. 'success': False,
  53. 'error': 'Username and password are required',
  54. 'code': 'VALIDATION_ERROR'
  55. }, 400
  56. # Find admin by username
  57. admin = Admin.query.filter_by(username=username).first()
  58. if not admin:
  59. return {
  60. 'success': False,
  61. 'error': 'Invalid username or password',
  62. 'code': 'AUTH_ERROR'
  63. }, 401
  64. # Verify password
  65. if not bcrypt.check_password_hash(admin.password_hash, password):
  66. return {
  67. 'success': False,
  68. 'error': 'Invalid username or password',
  69. 'code': 'AUTH_ERROR'
  70. }, 401
  71. # Generate JWT token
  72. token = AuthService.generate_token(admin)
  73. return {
  74. 'success': True,
  75. 'data': {
  76. 'token': token,
  77. 'admin': {
  78. 'id': admin.id,
  79. 'username': admin.username
  80. }
  81. }
  82. }, 200
  83. @auth_ns.route('/me')
  84. class Me(Resource):
  85. """Resource for getting current admin info."""
  86. @auth_ns.doc('get_current_admin')
  87. @auth_ns.response(200, 'Success', me_response)
  88. @auth_ns.response(401, 'Unauthorized', error_response)
  89. @require_auth
  90. def get(self):
  91. """获取当前登录管理员信息"""
  92. current_admin = request.current_admin
  93. return {
  94. 'success': True,
  95. 'data': {
  96. 'id': current_admin.get('admin_id'),
  97. 'username': current_admin.get('username')
  98. }
  99. }, 200