#!/usr/bin/env python """ 测试Celery任务提交 用于诊断任务提交时的具体问题 """ import os import sys # 添加项目根目录到路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) def test_celery_import(): """测试Celery模块导入""" print("🔍 测试Celery模块导入...") try: from app import create_app app = create_app() with app.app_context(): print("✅ Flask应用上下文创建成功") # 测试导入scan_tasks try: from app.tasks.scan_tasks import scan_aws_resources print("✅ scan_tasks模块导入成功") # 测试任务提交 try: # 创建一个测试任务 celery_task = scan_aws_resources.delay( task_id=999, # 测试ID credential_ids=[1], regions=['us-east-1'], project_metadata={'clientName': 'Test', 'projectName': 'Test'} ) print(f"✅ Celery任务提交成功: {celery_task.id}") return True except Exception as e: print(f"❌ Celery任务提交失败: {e}") print(f"错误类型: {type(e).__name__}") return False except Exception as e: print(f"❌ scan_tasks模块导入失败: {e}") print(f"错误类型: {type(e).__name__}") return False except Exception as e: print(f"❌ Flask应用创建失败: {e}") return False def test_mock_import(): """测试Mock模块导入""" print("\n🔍 测试Mock模块导入...") try: from app import create_app app = create_app() with app.app_context(): from app.tasks.mock_tasks import scan_aws_resources print("✅ mock_tasks模块导入成功") # 测试Mock任务提交 try: celery_task = scan_aws_resources.delay( task_id=999, # 测试ID credential_ids=[1], regions=['us-east-1'], project_metadata={'clientName': 'Test', 'projectName': 'Test'} ) print(f"✅ Mock任务提交成功: {celery_task.id}") return True except Exception as e: print(f"❌ Mock任务提交失败: {e}") return False except Exception as e: print(f"❌ Mock模块测试失败: {e}") return False def test_celery_config(): """测试Celery配置""" print("\n🔍 测试Celery配置...") try: from app.celery_app import celery_app print(f"Broker URL: {celery_app.conf.broker_url}") print(f"Result Backend: {celery_app.conf.result_backend}") # 测试连接 try: inspect = celery_app.control.inspect() stats = inspect.stats() if stats: print(f"✅ Celery连接成功,Worker数量: {len(stats)}") return True else: print("⚠️ Celery连接成功,但没有Worker运行") return True except Exception as e: print(f"❌ Celery连接失败: {e}") return False except Exception as e: print(f"❌ Celery配置测试失败: {e}") return False def main(): """运行所有测试""" print("="*60) print("Celery任务提交诊断") print("="*60) tests = [ ("Celery配置", test_celery_config), ("Celery任务导入和提交", test_celery_import), ("Mock任务导入和提交", test_mock_import), ] results = [] for test_name, test_func in tests: try: result = test_func() results.append((test_name, result)) except Exception as e: print(f"❌ {test_name}测试异常: {e}") results.append((test_name, False)) print("\n" + "="*60) print("诊断结果:") print("="*60) for test_name, result in results: status = "✅ 通过" if result else "❌ 失败" print(f" {test_name}: {status}") # 给出建议 celery_ok = results[1][1] if len(results) > 1 else False mock_ok = results[2][1] if len(results) > 2 else False print("\n📋 建议:") if celery_ok: print(" ✅ Celery工作正常,建议启动Worker获得最佳性能") print(" celery -A app.celery_app worker --loglevel=info") elif mock_ok: print(" 🔄 Celery有问题,但Mock模式可用") print(" 💡 可以使用Mock模式进行开发测试") else: print(" ❌ 两种模式都有问题,请检查配置") print("="*60) if __name__ == '__main__': main()