users.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447
  1. """
  2. User Management API endpoints (Admin only)
  3. Provides user CRUD operations and credential assignment.
  4. """
  5. from flask import jsonify, request
  6. from app import db
  7. from app.api import api_bp
  8. from app.models import User, UserCredential, AWSCredential
  9. from app.services import admin_required, get_current_user_from_context
  10. from app.errors import ValidationError, NotFoundError
  11. def validate_email(email: str) -> bool:
  12. """Basic email validation"""
  13. import re
  14. pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
  15. return bool(re.match(pattern, email))
  16. def validate_role(role: str) -> bool:
  17. """Validate user role"""
  18. return role in ['admin', 'power_user', 'user']
  19. @api_bp.route('/users', methods=['GET'])
  20. @admin_required
  21. def get_users():
  22. """
  23. Get users list with pagination and search
  24. Query params:
  25. page: Page number (default: 1)
  26. page_size: Items per page (default: 20, max: 100)
  27. search: Search term for username or email
  28. Returns:
  29. {
  30. "data": [user objects],
  31. "pagination": {
  32. "page": 1,
  33. "page_size": 20,
  34. "total": 100,
  35. "total_pages": 5
  36. }
  37. }
  38. """
  39. # Get pagination parameters
  40. page = request.args.get('page', 1, type=int)
  41. # Support both pageSize (frontend) and page_size (backend convention)
  42. page_size = request.args.get('pageSize', type=int) or request.args.get('page_size', type=int) or 20
  43. search = request.args.get('search', '', type=str)
  44. # Validate pagination parameters
  45. if page < 1:
  46. page = 1
  47. if page_size < 1:
  48. page_size = 20
  49. if page_size > 100:
  50. page_size = 100
  51. # Build query
  52. query = User.query
  53. # Apply search filter
  54. if search:
  55. search_term = f'%{search}%'
  56. query = query.filter(
  57. db.or_(
  58. User.username.ilike(search_term),
  59. User.email.ilike(search_term)
  60. )
  61. )
  62. # Order by created_at descending
  63. query = query.order_by(User.created_at.desc())
  64. # Get total count
  65. total = query.count()
  66. # Calculate total pages
  67. total_pages = (total + page_size - 1) // page_size if total > 0 else 1
  68. # Apply pagination
  69. users = query.offset((page - 1) * page_size).limit(page_size).all()
  70. return jsonify({
  71. 'data': [user.to_dict() for user in users],
  72. 'pagination': {
  73. 'page': page,
  74. 'page_size': page_size,
  75. 'total': total,
  76. 'total_pages': total_pages
  77. }
  78. }), 200
  79. @api_bp.route('/users/create', methods=['POST'])
  80. @admin_required
  81. def create_user():
  82. """
  83. Create a new user
  84. Request body:
  85. {
  86. "username": "string" (required),
  87. "password": "string" (required),
  88. "email": "string" (required),
  89. "role": "admin" | "power_user" | "user" (required)
  90. }
  91. Returns:
  92. { user object }
  93. """
  94. data = request.get_json()
  95. if not data:
  96. raise ValidationError(
  97. message="Request body is required",
  98. details={"reason": "missing_body"}
  99. )
  100. # Validate required fields
  101. required_fields = ['username', 'password', 'email', 'role']
  102. missing_fields = [field for field in required_fields if not data.get(field)]
  103. if missing_fields:
  104. raise ValidationError(
  105. message="Missing required fields",
  106. details={"missing_fields": missing_fields}
  107. )
  108. username = data['username'].strip()
  109. password = data['password']
  110. email = data['email'].strip().lower()
  111. role = data['role']
  112. # Validate username length
  113. if len(username) < 3 or len(username) > 50:
  114. raise ValidationError(
  115. message="Username must be between 3 and 50 characters",
  116. details={"field": "username", "reason": "invalid_length"}
  117. )
  118. # Validate password length
  119. if len(password) < 6:
  120. raise ValidationError(
  121. message="Password must be at least 6 characters",
  122. details={"field": "password", "reason": "too_short"}
  123. )
  124. # Validate email format
  125. if not validate_email(email):
  126. raise ValidationError(
  127. message="Invalid email format",
  128. details={"field": "email", "reason": "invalid_format"}
  129. )
  130. # Validate role
  131. if not validate_role(role):
  132. raise ValidationError(
  133. message="Invalid role. Must be one of: admin, power_user, user",
  134. details={"field": "role", "reason": "invalid_value"}
  135. )
  136. # Check if username already exists
  137. if User.query.filter_by(username=username).first():
  138. raise ValidationError(
  139. message="Username already exists",
  140. details={"field": "username", "reason": "already_exists"}
  141. )
  142. # Check if email already exists
  143. if User.query.filter_by(email=email).first():
  144. raise ValidationError(
  145. message="Email already exists",
  146. details={"field": "email", "reason": "already_exists"}
  147. )
  148. # Create new user
  149. user = User(
  150. username=username,
  151. email=email,
  152. role=role,
  153. is_active=True
  154. )
  155. user.set_password(password)
  156. db.session.add(user)
  157. db.session.commit()
  158. return jsonify(user.to_dict()), 201
  159. @api_bp.route('/users/update', methods=['POST'])
  160. @admin_required
  161. def update_user():
  162. """
  163. Update an existing user
  164. Request body:
  165. {
  166. "id": number (required),
  167. "username": "string" (optional),
  168. "email": "string" (optional),
  169. "password": "string" (optional),
  170. "role": "admin" | "power_user" | "user" (optional),
  171. "is_active": boolean (optional)
  172. }
  173. Returns:
  174. { updated user object }
  175. """
  176. data = request.get_json()
  177. if not data:
  178. raise ValidationError(
  179. message="Request body is required",
  180. details={"reason": "missing_body"}
  181. )
  182. # Validate user ID
  183. user_id = data.get('id')
  184. if not user_id:
  185. raise ValidationError(
  186. message="User ID is required",
  187. details={"missing_fields": ["id"]}
  188. )
  189. # Find user
  190. user = db.session.get(User, user_id)
  191. if not user:
  192. raise NotFoundError(
  193. message="User not found",
  194. details={"user_id": user_id}
  195. )
  196. # Get current admin user
  197. current_user = get_current_user_from_context()
  198. # Prevent admin from deactivating themselves
  199. if user.id == current_user.id and data.get('is_active') is False:
  200. raise ValidationError(
  201. message="Cannot deactivate your own account",
  202. details={"reason": "self_deactivation"}
  203. )
  204. # Prevent admin from changing their own role
  205. if user.id == current_user.id and data.get('role') and data.get('role') != user.role:
  206. raise ValidationError(
  207. message="Cannot change your own role",
  208. details={"reason": "self_role_change"}
  209. )
  210. # Update username if provided
  211. if 'username' in data and data['username']:
  212. new_username = data['username'].strip()
  213. if len(new_username) < 3 or len(new_username) > 50:
  214. raise ValidationError(
  215. message="Username must be between 3 and 50 characters",
  216. details={"field": "username", "reason": "invalid_length"}
  217. )
  218. # Check if username is taken by another user
  219. existing = User.query.filter_by(username=new_username).first()
  220. if existing and existing.id != user.id:
  221. raise ValidationError(
  222. message="Username already exists",
  223. details={"field": "username", "reason": "already_exists"}
  224. )
  225. user.username = new_username
  226. # Update email if provided
  227. if 'email' in data and data['email']:
  228. new_email = data['email'].strip().lower()
  229. if not validate_email(new_email):
  230. raise ValidationError(
  231. message="Invalid email format",
  232. details={"field": "email", "reason": "invalid_format"}
  233. )
  234. # Check if email is taken by another user
  235. existing = User.query.filter_by(email=new_email).first()
  236. if existing and existing.id != user.id:
  237. raise ValidationError(
  238. message="Email already exists",
  239. details={"field": "email", "reason": "already_exists"}
  240. )
  241. user.email = new_email
  242. # Update password if provided
  243. if 'password' in data and data['password']:
  244. if len(data['password']) < 6:
  245. raise ValidationError(
  246. message="Password must be at least 6 characters",
  247. details={"field": "password", "reason": "too_short"}
  248. )
  249. user.set_password(data['password'])
  250. # Update role if provided
  251. if 'role' in data and data['role']:
  252. if not validate_role(data['role']):
  253. raise ValidationError(
  254. message="Invalid role. Must be one of: admin, power_user, user",
  255. details={"field": "role", "reason": "invalid_value"}
  256. )
  257. user.role = data['role']
  258. # Update is_active if provided
  259. if 'is_active' in data:
  260. user.is_active = bool(data['is_active'])
  261. db.session.commit()
  262. return jsonify(user.to_dict()), 200
  263. @api_bp.route('/users/delete', methods=['POST'])
  264. @admin_required
  265. def delete_user():
  266. """
  267. Delete a user
  268. Request body:
  269. {
  270. "id": number (required)
  271. }
  272. Returns:
  273. { "message": "User deleted successfully" }
  274. """
  275. data = request.get_json()
  276. if not data:
  277. raise ValidationError(
  278. message="Request body is required",
  279. details={"reason": "missing_body"}
  280. )
  281. # Validate user ID
  282. user_id = data.get('id')
  283. if not user_id:
  284. raise ValidationError(
  285. message="User ID is required",
  286. details={"missing_fields": ["id"]}
  287. )
  288. # Find user
  289. user = db.session.get(User, user_id)
  290. if not user:
  291. raise NotFoundError(
  292. message="User not found",
  293. details={"user_id": user_id}
  294. )
  295. # Get current admin user
  296. current_user = get_current_user_from_context()
  297. # Prevent admin from deleting themselves
  298. if user.id == current_user.id:
  299. raise ValidationError(
  300. message="Cannot delete your own account",
  301. details={"reason": "self_deletion"}
  302. )
  303. # Delete user (cascade will handle related records)
  304. db.session.delete(user)
  305. db.session.commit()
  306. return jsonify({
  307. 'message': 'User deleted successfully'
  308. }), 200
  309. @api_bp.route('/users/assign-credentials', methods=['POST'])
  310. @admin_required
  311. def assign_credentials():
  312. """
  313. Assign credentials to a user
  314. Request body:
  315. {
  316. "user_id": number (required),
  317. "credential_ids": [number] (required)
  318. }
  319. Returns:
  320. { "message": "Credentials assigned successfully", "assigned_count": number }
  321. """
  322. data = request.get_json()
  323. if not data:
  324. raise ValidationError(
  325. message="Request body is required",
  326. details={"reason": "missing_body"}
  327. )
  328. # Validate required fields
  329. user_id = data.get('user_id')
  330. credential_ids = data.get('credential_ids')
  331. if not user_id:
  332. raise ValidationError(
  333. message="User ID is required",
  334. details={"missing_fields": ["user_id"]}
  335. )
  336. if credential_ids is None:
  337. raise ValidationError(
  338. message="Credential IDs are required",
  339. details={"missing_fields": ["credential_ids"]}
  340. )
  341. if not isinstance(credential_ids, list):
  342. raise ValidationError(
  343. message="Credential IDs must be a list",
  344. details={"field": "credential_ids", "reason": "invalid_type"}
  345. )
  346. # Find user
  347. user = db.session.get(User, user_id)
  348. if not user:
  349. raise NotFoundError(
  350. message="User not found",
  351. details={"user_id": user_id}
  352. )
  353. # Remove existing credential assignments
  354. UserCredential.query.filter_by(user_id=user_id).delete()
  355. # Assign new credentials
  356. assigned_count = 0
  357. for cred_id in credential_ids:
  358. # Verify credential exists
  359. credential = db.session.get(AWSCredential, cred_id)
  360. if credential and credential.is_active:
  361. assignment = UserCredential(
  362. user_id=user_id,
  363. credential_id=cred_id
  364. )
  365. db.session.add(assignment)
  366. assigned_count += 1
  367. db.session.commit()
  368. return jsonify({
  369. 'message': 'Credentials assigned successfully',
  370. 'assigned_count': assigned_count
  371. }), 200