| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- """Authentication API routes."""
- from flask_restx import Namespace, Resource, fields
- from flask import request
- from flask_bcrypt import Bcrypt
- from app.models.admin import Admin
- from app.services.auth_service import AuthService
- from app.utils.auth_decorator import require_auth
- auth_ns = Namespace('auth', description='认证接口')
- # Initialize bcrypt
- bcrypt = Bcrypt()
- # API models for Swagger documentation
- login_input = auth_ns.model('LoginInput', {
- 'username': fields.String(required=True, description='用户名'),
- 'password': fields.String(required=True, description='密码')
- })
- admin_info = auth_ns.model('AdminInfo', {
- 'id': fields.Integer(description='管理员ID'),
- 'username': fields.String(description='用户名')
- })
- login_data = auth_ns.model('LoginData', {
- 'token': fields.String(description='JWT令牌'),
- 'admin': fields.Nested(admin_info, description='管理员信息')
- })
- login_response = auth_ns.model('LoginResponse', {
- 'success': fields.Boolean(description='操作是否成功'),
- 'data': fields.Nested(login_data, description='登录数据')
- })
- me_response = auth_ns.model('MeResponse', {
- 'success': fields.Boolean(description='操作是否成功'),
- 'data': fields.Nested(admin_info, description='当前管理员信息')
- })
- error_response = auth_ns.model('AuthErrorResponse', {
- 'success': fields.Boolean(description='操作是否成功'),
- 'error': fields.String(description='错误信息'),
- 'code': fields.String(description='错误代码')
- })
- @auth_ns.route('/login')
- class Login(Resource):
- """Resource for user login."""
-
- @auth_ns.doc('login')
- @auth_ns.expect(login_input)
- @auth_ns.response(200, 'Success', login_response)
- @auth_ns.response(401, 'Authentication failed', error_response)
- def post(self):
- """用户登录,返回JWT令牌"""
- data = auth_ns.payload or {}
- username = data.get('username', '').strip()
- password = data.get('password', '')
-
- # Validate input
- if not username or not password:
- return {
- 'success': False,
- 'error': 'Username and password are required',
- 'code': 'VALIDATION_ERROR'
- }, 400
-
- # Find admin by username
- admin = Admin.query.filter_by(username=username).first()
-
- if not admin:
- return {
- 'success': False,
- 'error': 'Invalid username or password',
- 'code': 'AUTH_ERROR'
- }, 401
-
- # Verify password
- if not bcrypt.check_password_hash(admin.password_hash, password):
- return {
- 'success': False,
- 'error': 'Invalid username or password',
- 'code': 'AUTH_ERROR'
- }, 401
-
- # Generate JWT token
- token = AuthService.generate_token(admin)
-
- return {
- 'success': True,
- 'data': {
- 'token': token,
- 'admin': {
- 'id': admin.id,
- 'username': admin.username
- }
- }
- }, 200
- @auth_ns.route('/me')
- class Me(Resource):
- """Resource for getting current admin info."""
-
- @auth_ns.doc('get_current_admin')
- @auth_ns.response(200, 'Success', me_response)
- @auth_ns.response(401, 'Unauthorized', error_response)
- @require_auth
- def get(self):
- """获取当前登录管理员信息"""
- current_admin = request.current_admin
-
- return {
- 'success': True,
- 'data': {
- 'id': current_admin.get('admin_id'),
- 'username': current_admin.get('username')
- }
- }, 200
|