| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283 |
- """
- Worker Management API endpoints
- Provides endpoints for:
- - GET /api/workers - Get Celery worker status
- - GET /api/workers/stats - Get worker statistics
- - POST /api/workers/purge - Purge queue tasks
- Requirements: 4.8
- """
- from flask import jsonify, request
- from app.api import api_bp
- from app.services import admin_required
- from app.celery_app import celery_app
- # Cache for static worker info (registered tasks, prefetch count don't change)
- _worker_cache = {
- 'registered_tasks': {},
- 'prefetch_count': {},
- 'last_known_workers': set()
- }
- @api_bp.route('/workers', methods=['GET'])
- @admin_required
- def get_workers():
- """
- Get Celery worker status list.
-
- Returns:
- JSON with list of workers and their status
- """
- global _worker_cache
-
- try:
- workers = []
- worker_names = set()
- active_tasks = {}
- registered_tasks = {}
- stats = {}
-
- # Try inspect with short timeout
- try:
- inspect = celery_app.control.inspect(timeout=0.5)
- ping_response = inspect.ping() or {}
- for name in ping_response.keys():
- worker_names.add(name)
- active_tasks = inspect.active() or {}
- for name in active_tasks.keys():
- worker_names.add(name)
- registered_tasks = inspect.registered() or {}
- stats = inspect.stats() or {}
-
- # Update cache with new data if we got any
- if registered_tasks:
- _worker_cache['registered_tasks'].update(registered_tasks)
- if stats:
- for name, s in stats.items():
- _worker_cache['prefetch_count'][name] = s.get('prefetch_count', 0)
- if worker_names:
- _worker_cache['last_known_workers'].update(worker_names)
- except Exception:
- pass
-
- # Check running tasks in database to infer worker activity
- running_task_count = 0
- try:
- from app.models.task import Task
- running_task_count = Task.query.filter_by(status='running').count()
- except Exception:
- pass
-
- # If no workers found from inspect but we have running tasks, use cached workers
- if not worker_names and running_task_count > 0 and _worker_cache['last_known_workers']:
- worker_names = _worker_cache['last_known_workers']
-
- # Build worker list
- for worker_name in worker_names:
- task_count = len(active_tasks.get(worker_name, []))
- # If inspect shows 0 but DB shows running tasks, use DB count
- if task_count == 0 and running_task_count > 0:
- task_count = running_task_count
-
- # Use cached values if current inspect returned empty
- cached_registered = _worker_cache['registered_tasks'].get(worker_name, [])
- cached_prefetch = _worker_cache['prefetch_count'].get(worker_name, 0)
-
- worker_info = {
- 'name': worker_name,
- 'status': 'online',
- 'active_tasks': task_count,
- 'registered_tasks': registered_tasks.get(worker_name) or cached_registered,
- 'prefetch_count': stats.get(worker_name, {}).get('prefetch_count') or cached_prefetch,
- }
-
- # Add stats if available
- if worker_name in stats:
- worker_stats = stats[worker_name]
- worker_info['pool'] = worker_stats.get('pool', {})
- worker_info['broker'] = worker_stats.get('broker', {})
- worker_info['total_tasks'] = worker_stats.get('total', {})
-
- workers.append(worker_info)
-
- # If no workers found and no cache, but we have running tasks, show placeholder
- if not workers and running_task_count > 0:
- workers.append({
- 'name': 'celery@worker (busy)',
- 'status': 'online',
- 'active_tasks': running_task_count,
- 'registered_tasks': list(_worker_cache['registered_tasks'].values())[0] if _worker_cache['registered_tasks'] else [],
- 'prefetch_count': list(_worker_cache['prefetch_count'].values())[0] if _worker_cache['prefetch_count'] else 0,
- })
-
- return jsonify({
- 'data': workers,
- 'total': len(workers)
- }), 200
-
- except Exception as e:
- return jsonify({
- 'data': [],
- 'total': 0,
- 'error': f'Failed to get worker status: {str(e)}'
- }), 200
- @api_bp.route('/workers/stats', methods=['GET'])
- @admin_required
- def get_worker_stats():
- """
- Get worker statistics including queue information.
-
- Returns:
- JSON with worker statistics
- """
- try:
- import redis
-
- # Get queue length directly from Redis (always fast)
- queue_stats = {}
- queue_name = celery_app.conf.task_default_queue or 'celery'
- online_count = 0
-
- try:
- broker_url = celery_app.conf.broker_url or 'redis://localhost:6379/0'
- r = redis.from_url(broker_url)
- queue_stats[queue_name] = r.llen(queue_name)
- except Exception:
- queue_stats[queue_name] = 0
-
- # Try to get worker info with short timeout
- active = {}
- reserved = {}
- try:
- inspect = celery_app.control.inspect(timeout=0.5)
- ping_response = inspect.ping() or {}
- online_count = len(ping_response)
- active = inspect.active() or {}
- reserved = inspect.reserved() or {}
- except Exception:
- pass
-
- # Count tasks from inspect
- total_active = sum(len(tasks) for tasks in active.values())
- total_reserved = sum(len(tasks) for tasks in reserved.values())
-
- # If inspect failed or shows 0, check database for running tasks
- running_task_count = 0
- try:
- from app.models.task import Task
- running_task_count = Task.query.filter_by(status='running').count()
- except Exception:
- pass
-
- # Use DB count if inspect shows 0 but DB has running tasks
- if total_active == 0 and running_task_count > 0:
- total_active = running_task_count
- online_count = max(online_count, 1)
-
- return jsonify({
- 'workers': {
- 'online': online_count,
- 'total': max(online_count, 1) if total_active > 0 else online_count
- },
- 'tasks': {
- 'active': total_active,
- 'reserved': total_reserved,
- 'scheduled': 0,
- 'processed': 0
- },
- 'queues': queue_stats,
- 'details': {
- 'active_by_worker': {k: len(v) for k, v in active.items()},
- 'reserved_by_worker': {k: len(v) for k, v in reserved.items()}
- }
- }), 200
-
- except Exception as e:
- return jsonify({
- 'workers': {'online': 0, 'total': 0},
- 'tasks': {'active': 0, 'reserved': 0, 'scheduled': 0, 'processed': 0},
- 'queues': {},
- 'error': f'Failed to get worker stats: {str(e)}'
- }), 200
- @api_bp.route('/workers/purge', methods=['POST'])
- @admin_required
- def purge_queue():
- """
- Purge all tasks from the queue.
-
- Request Body (optional):
- queue: Queue name to purge (default: 'celery')
-
- Returns:
- JSON with purge result
- """
- data = request.get_json() or {}
- queue_name = data.get('queue', 'celery')
-
- try:
- # Purge the queue
- purged_count = celery_app.control.purge()
-
- return jsonify({
- 'message': 'Queue purged successfully',
- 'purged_count': purged_count,
- 'queue': queue_name
- }), 200
-
- except Exception as e:
- return jsonify({
- 'error': {
- 'code': 'PURGE_FAILED',
- 'message': f'Failed to purge queue: {str(e)}'
- }
- }), 500
- @api_bp.route('/workers/revoke', methods=['POST'])
- @admin_required
- def revoke_task():
- """
- Revoke (cancel) a specific Celery task.
-
- Request Body:
- task_id: Celery task ID to revoke (required)
- terminate: Whether to terminate the task if running (default: False)
-
- Returns:
- JSON with revoke result
- """
- data = request.get_json() or {}
- celery_task_id = data.get('task_id')
- terminate = data.get('terminate', False)
-
- if not celery_task_id:
- return jsonify({
- 'error': {
- 'code': 'MISSING_PARAMETER',
- 'message': 'task_id is required'
- }
- }), 400
-
- try:
- # Revoke the task
- celery_app.control.revoke(celery_task_id, terminate=terminate)
-
- return jsonify({
- 'message': 'Task revoked successfully',
- 'task_id': celery_task_id,
- 'terminated': terminate
- }), 200
-
- except Exception as e:
- return jsonify({
- 'error': {
- 'code': 'REVOKE_FAILED',
- 'message': f'Failed to revoke task: {str(e)}'
- }
- }), 500
|