""" Worker Management API endpoints Provides endpoints for: - GET /api/workers - Get Celery worker status - GET /api/workers/stats - Get worker statistics - POST /api/workers/purge - Purge queue tasks Requirements: 4.8 """ from flask import jsonify, request from app.api import api_bp from app.services import admin_required from app.celery_app import celery_app # Cache for static worker info (registered tasks, prefetch count don't change) _worker_cache = { 'registered_tasks': {}, 'prefetch_count': {}, 'last_known_workers': set() } @api_bp.route('/workers', methods=['GET']) @admin_required def get_workers(): """ Get Celery worker status list. Returns: JSON with list of workers and their status """ global _worker_cache try: workers = [] worker_names = set() active_tasks = {} registered_tasks = {} stats = {} # Try inspect with short timeout try: inspect = celery_app.control.inspect(timeout=0.5) ping_response = inspect.ping() or {} for name in ping_response.keys(): worker_names.add(name) active_tasks = inspect.active() or {} for name in active_tasks.keys(): worker_names.add(name) registered_tasks = inspect.registered() or {} stats = inspect.stats() or {} # Update cache with new data if we got any if registered_tasks: _worker_cache['registered_tasks'].update(registered_tasks) if stats: for name, s in stats.items(): _worker_cache['prefetch_count'][name] = s.get('prefetch_count', 0) if worker_names: _worker_cache['last_known_workers'].update(worker_names) except Exception: pass # Check running tasks in database to infer worker activity running_task_count = 0 try: from app.models.task import Task running_task_count = Task.query.filter_by(status='running').count() except Exception: pass # If no workers found from inspect but we have running tasks, use cached workers if not worker_names and running_task_count > 0 and _worker_cache['last_known_workers']: worker_names = _worker_cache['last_known_workers'] # Build worker list for worker_name in worker_names: task_count = len(active_tasks.get(worker_name, [])) # If inspect shows 0 but DB shows running tasks, use DB count if task_count == 0 and running_task_count > 0: task_count = running_task_count # Use cached values if current inspect returned empty cached_registered = _worker_cache['registered_tasks'].get(worker_name, []) cached_prefetch = _worker_cache['prefetch_count'].get(worker_name, 0) worker_info = { 'name': worker_name, 'status': 'online', 'active_tasks': task_count, 'registered_tasks': registered_tasks.get(worker_name) or cached_registered, 'prefetch_count': stats.get(worker_name, {}).get('prefetch_count') or cached_prefetch, } # Add stats if available if worker_name in stats: worker_stats = stats[worker_name] worker_info['pool'] = worker_stats.get('pool', {}) worker_info['broker'] = worker_stats.get('broker', {}) worker_info['total_tasks'] = worker_stats.get('total', {}) workers.append(worker_info) # If no workers found and no cache, but we have running tasks, show placeholder if not workers and running_task_count > 0: workers.append({ 'name': 'celery@worker (busy)', 'status': 'online', 'active_tasks': running_task_count, 'registered_tasks': list(_worker_cache['registered_tasks'].values())[0] if _worker_cache['registered_tasks'] else [], 'prefetch_count': list(_worker_cache['prefetch_count'].values())[0] if _worker_cache['prefetch_count'] else 0, }) return jsonify({ 'data': workers, 'total': len(workers) }), 200 except Exception as e: return jsonify({ 'data': [], 'total': 0, 'error': f'Failed to get worker status: {str(e)}' }), 200 @api_bp.route('/workers/stats', methods=['GET']) @admin_required def get_worker_stats(): """ Get worker statistics including queue information. Returns: JSON with worker statistics """ try: import redis # Get queue length directly from Redis (always fast) queue_stats = {} queue_name = celery_app.conf.task_default_queue or 'celery' online_count = 0 try: broker_url = celery_app.conf.broker_url or 'redis://localhost:6379/0' r = redis.from_url(broker_url) queue_stats[queue_name] = r.llen(queue_name) except Exception: queue_stats[queue_name] = 0 # Try to get worker info with short timeout active = {} reserved = {} try: inspect = celery_app.control.inspect(timeout=0.5) ping_response = inspect.ping() or {} online_count = len(ping_response) active = inspect.active() or {} reserved = inspect.reserved() or {} except Exception: pass # Count tasks from inspect total_active = sum(len(tasks) for tasks in active.values()) total_reserved = sum(len(tasks) for tasks in reserved.values()) # If inspect failed or shows 0, check database for running tasks running_task_count = 0 try: from app.models.task import Task running_task_count = Task.query.filter_by(status='running').count() except Exception: pass # Use DB count if inspect shows 0 but DB has running tasks if total_active == 0 and running_task_count > 0: total_active = running_task_count online_count = max(online_count, 1) return jsonify({ 'workers': { 'online': online_count, 'total': max(online_count, 1) if total_active > 0 else online_count }, 'tasks': { 'active': total_active, 'reserved': total_reserved, 'scheduled': 0, 'processed': 0 }, 'queues': queue_stats, 'details': { 'active_by_worker': {k: len(v) for k, v in active.items()}, 'reserved_by_worker': {k: len(v) for k, v in reserved.items()} } }), 200 except Exception as e: return jsonify({ 'workers': {'online': 0, 'total': 0}, 'tasks': {'active': 0, 'reserved': 0, 'scheduled': 0, 'processed': 0}, 'queues': {}, 'error': f'Failed to get worker stats: {str(e)}' }), 200 @api_bp.route('/workers/purge', methods=['POST']) @admin_required def purge_queue(): """ Purge all tasks from the queue. Request Body (optional): queue: Queue name to purge (default: 'celery') Returns: JSON with purge result """ data = request.get_json() or {} queue_name = data.get('queue', 'celery') try: # Purge the queue purged_count = celery_app.control.purge() return jsonify({ 'message': 'Queue purged successfully', 'purged_count': purged_count, 'queue': queue_name }), 200 except Exception as e: return jsonify({ 'error': { 'code': 'PURGE_FAILED', 'message': f'Failed to purge queue: {str(e)}' } }), 500 @api_bp.route('/workers/revoke', methods=['POST']) @admin_required def revoke_task(): """ Revoke (cancel) a specific Celery task. Request Body: task_id: Celery task ID to revoke (required) terminate: Whether to terminate the task if running (default: False) Returns: JSON with revoke result """ data = request.get_json() or {} celery_task_id = data.get('task_id') terminate = data.get('terminate', False) if not celery_task_id: return jsonify({ 'error': { 'code': 'MISSING_PARAMETER', 'message': 'task_id is required' } }), 400 try: # Revoke the task celery_app.control.revoke(celery_task_id, terminate=terminate) return jsonify({ 'message': 'Task revoked successfully', 'task_id': celery_task_id, 'terminated': terminate }), 200 except Exception as e: return jsonify({ 'error': { 'code': 'REVOKE_FAILED', 'message': f'Failed to revoke task: {str(e)}' } }), 500