workers.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. """
  2. Worker Management API endpoints
  3. Provides endpoints for:
  4. - GET /api/workers - Get Celery worker status
  5. - GET /api/workers/stats - Get worker statistics
  6. - POST /api/workers/purge - Purge queue tasks
  7. Requirements: 4.8
  8. """
  9. from flask import jsonify, request
  10. from app.api import api_bp
  11. from app.services import admin_required
  12. from app.celery_app import celery_app
  13. # Cache for static worker info (registered tasks, prefetch count don't change)
  14. _worker_cache = {
  15. 'registered_tasks': {},
  16. 'prefetch_count': {},
  17. 'last_known_workers': set()
  18. }
  19. @api_bp.route('/workers', methods=['GET'])
  20. @admin_required
  21. def get_workers():
  22. """
  23. Get Celery worker status list.
  24. Returns:
  25. JSON with list of workers and their status
  26. """
  27. global _worker_cache
  28. try:
  29. workers = []
  30. worker_names = set()
  31. active_tasks = {}
  32. registered_tasks = {}
  33. stats = {}
  34. # Try inspect with short timeout
  35. try:
  36. inspect = celery_app.control.inspect(timeout=0.5)
  37. ping_response = inspect.ping() or {}
  38. for name in ping_response.keys():
  39. worker_names.add(name)
  40. active_tasks = inspect.active() or {}
  41. for name in active_tasks.keys():
  42. worker_names.add(name)
  43. registered_tasks = inspect.registered() or {}
  44. stats = inspect.stats() or {}
  45. # Update cache with new data if we got any
  46. if registered_tasks:
  47. _worker_cache['registered_tasks'].update(registered_tasks)
  48. if stats:
  49. for name, s in stats.items():
  50. _worker_cache['prefetch_count'][name] = s.get('prefetch_count', 0)
  51. if worker_names:
  52. _worker_cache['last_known_workers'].update(worker_names)
  53. except Exception:
  54. pass
  55. # Check running tasks in database to infer worker activity
  56. running_task_count = 0
  57. try:
  58. from app.models.task import Task
  59. running_task_count = Task.query.filter_by(status='running').count()
  60. except Exception:
  61. pass
  62. # If no workers found from inspect but we have running tasks, use cached workers
  63. if not worker_names and running_task_count > 0 and _worker_cache['last_known_workers']:
  64. worker_names = _worker_cache['last_known_workers']
  65. # Build worker list
  66. for worker_name in worker_names:
  67. task_count = len(active_tasks.get(worker_name, []))
  68. # If inspect shows 0 but DB shows running tasks, use DB count
  69. if task_count == 0 and running_task_count > 0:
  70. task_count = running_task_count
  71. # Use cached values if current inspect returned empty
  72. cached_registered = _worker_cache['registered_tasks'].get(worker_name, [])
  73. cached_prefetch = _worker_cache['prefetch_count'].get(worker_name, 0)
  74. worker_info = {
  75. 'name': worker_name,
  76. 'status': 'online',
  77. 'active_tasks': task_count,
  78. 'registered_tasks': registered_tasks.get(worker_name) or cached_registered,
  79. 'prefetch_count': stats.get(worker_name, {}).get('prefetch_count') or cached_prefetch,
  80. }
  81. # Add stats if available
  82. if worker_name in stats:
  83. worker_stats = stats[worker_name]
  84. worker_info['pool'] = worker_stats.get('pool', {})
  85. worker_info['broker'] = worker_stats.get('broker', {})
  86. worker_info['total_tasks'] = worker_stats.get('total', {})
  87. workers.append(worker_info)
  88. # If no workers found and no cache, but we have running tasks, show placeholder
  89. if not workers and running_task_count > 0:
  90. workers.append({
  91. 'name': 'celery@worker (busy)',
  92. 'status': 'online',
  93. 'active_tasks': running_task_count,
  94. 'registered_tasks': list(_worker_cache['registered_tasks'].values())[0] if _worker_cache['registered_tasks'] else [],
  95. 'prefetch_count': list(_worker_cache['prefetch_count'].values())[0] if _worker_cache['prefetch_count'] else 0,
  96. })
  97. return jsonify({
  98. 'data': workers,
  99. 'total': len(workers)
  100. }), 200
  101. except Exception as e:
  102. return jsonify({
  103. 'data': [],
  104. 'total': 0,
  105. 'error': f'Failed to get worker status: {str(e)}'
  106. }), 200
  107. @api_bp.route('/workers/stats', methods=['GET'])
  108. @admin_required
  109. def get_worker_stats():
  110. """
  111. Get worker statistics including queue information.
  112. Returns:
  113. JSON with worker statistics
  114. """
  115. try:
  116. import redis
  117. # Get queue length directly from Redis (always fast)
  118. queue_stats = {}
  119. queue_name = celery_app.conf.task_default_queue or 'celery'
  120. online_count = 0
  121. try:
  122. broker_url = celery_app.conf.broker_url or 'redis://localhost:6379/0'
  123. r = redis.from_url(broker_url)
  124. queue_stats[queue_name] = r.llen(queue_name)
  125. except Exception:
  126. queue_stats[queue_name] = 0
  127. # Try to get worker info with short timeout
  128. active = {}
  129. reserved = {}
  130. try:
  131. inspect = celery_app.control.inspect(timeout=0.5)
  132. ping_response = inspect.ping() or {}
  133. online_count = len(ping_response)
  134. active = inspect.active() or {}
  135. reserved = inspect.reserved() or {}
  136. except Exception:
  137. pass
  138. # Count tasks from inspect
  139. total_active = sum(len(tasks) for tasks in active.values())
  140. total_reserved = sum(len(tasks) for tasks in reserved.values())
  141. # If inspect failed or shows 0, check database for running tasks
  142. running_task_count = 0
  143. try:
  144. from app.models.task import Task
  145. running_task_count = Task.query.filter_by(status='running').count()
  146. except Exception:
  147. pass
  148. # Use DB count if inspect shows 0 but DB has running tasks
  149. if total_active == 0 and running_task_count > 0:
  150. total_active = running_task_count
  151. online_count = max(online_count, 1)
  152. return jsonify({
  153. 'workers': {
  154. 'online': online_count,
  155. 'total': max(online_count, 1) if total_active > 0 else online_count
  156. },
  157. 'tasks': {
  158. 'active': total_active,
  159. 'reserved': total_reserved,
  160. 'scheduled': 0,
  161. 'processed': 0
  162. },
  163. 'queues': queue_stats,
  164. 'details': {
  165. 'active_by_worker': {k: len(v) for k, v in active.items()},
  166. 'reserved_by_worker': {k: len(v) for k, v in reserved.items()}
  167. }
  168. }), 200
  169. except Exception as e:
  170. return jsonify({
  171. 'workers': {'online': 0, 'total': 0},
  172. 'tasks': {'active': 0, 'reserved': 0, 'scheduled': 0, 'processed': 0},
  173. 'queues': {},
  174. 'error': f'Failed to get worker stats: {str(e)}'
  175. }), 200
  176. @api_bp.route('/workers/purge', methods=['POST'])
  177. @admin_required
  178. def purge_queue():
  179. """
  180. Purge all tasks from the queue.
  181. Request Body (optional):
  182. queue: Queue name to purge (default: 'celery')
  183. Returns:
  184. JSON with purge result
  185. """
  186. data = request.get_json() or {}
  187. queue_name = data.get('queue', 'celery')
  188. try:
  189. # Purge the queue
  190. purged_count = celery_app.control.purge()
  191. return jsonify({
  192. 'message': 'Queue purged successfully',
  193. 'purged_count': purged_count,
  194. 'queue': queue_name
  195. }), 200
  196. except Exception as e:
  197. return jsonify({
  198. 'error': {
  199. 'code': 'PURGE_FAILED',
  200. 'message': f'Failed to purge queue: {str(e)}'
  201. }
  202. }), 500
  203. @api_bp.route('/workers/revoke', methods=['POST'])
  204. @admin_required
  205. def revoke_task():
  206. """
  207. Revoke (cancel) a specific Celery task.
  208. Request Body:
  209. task_id: Celery task ID to revoke (required)
  210. terminate: Whether to terminate the task if running (default: False)
  211. Returns:
  212. JSON with revoke result
  213. """
  214. data = request.get_json() or {}
  215. celery_task_id = data.get('task_id')
  216. terminate = data.get('terminate', False)
  217. if not celery_task_id:
  218. return jsonify({
  219. 'error': {
  220. 'code': 'MISSING_PARAMETER',
  221. 'message': 'task_id is required'
  222. }
  223. }), 400
  224. try:
  225. # Revoke the task
  226. celery_app.control.revoke(celery_task_id, terminate=terminate)
  227. return jsonify({
  228. 'message': 'Task revoked successfully',
  229. 'task_id': celery_task_id,
  230. 'terminated': terminate
  231. }), 200
  232. except Exception as e:
  233. return jsonify({
  234. 'error': {
  235. 'code': 'REVOKE_FAILED',
  236. 'message': f'Failed to revoke task: {str(e)}'
  237. }
  238. }), 500