| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- #!/usr/bin/env python
- """
- 测试Celery任务提交
- 用于诊断任务提交时的具体问题
- """
- import os
- import sys
- # 添加项目根目录到路径
- sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
- def test_celery_import():
- """测试Celery模块导入"""
- print("🔍 测试Celery模块导入...")
-
- try:
- from app import create_app
- app = create_app()
-
- with app.app_context():
- print("✅ Flask应用上下文创建成功")
-
- # 测试导入scan_tasks
- try:
- from app.tasks.scan_tasks import scan_aws_resources
- print("✅ scan_tasks模块导入成功")
-
- # 测试任务提交
- try:
- # 创建一个测试任务
- celery_task = scan_aws_resources.delay(
- task_id=999, # 测试ID
- credential_ids=[1],
- regions=['us-east-1'],
- project_metadata={'clientName': 'Test', 'projectName': 'Test'}
- )
- print(f"✅ Celery任务提交成功: {celery_task.id}")
- return True
-
- except Exception as e:
- print(f"❌ Celery任务提交失败: {e}")
- print(f"错误类型: {type(e).__name__}")
- return False
-
- except Exception as e:
- print(f"❌ scan_tasks模块导入失败: {e}")
- print(f"错误类型: {type(e).__name__}")
- return False
-
- except Exception as e:
- print(f"❌ Flask应用创建失败: {e}")
- return False
- def test_mock_import():
- """测试Mock模块导入"""
- print("\n🔍 测试Mock模块导入...")
-
- try:
- from app import create_app
- app = create_app()
-
- with app.app_context():
- from app.tasks.mock_tasks import scan_aws_resources
- print("✅ mock_tasks模块导入成功")
-
- # 测试Mock任务提交
- try:
- celery_task = scan_aws_resources.delay(
- task_id=999, # 测试ID
- credential_ids=[1],
- regions=['us-east-1'],
- project_metadata={'clientName': 'Test', 'projectName': 'Test'}
- )
- print(f"✅ Mock任务提交成功: {celery_task.id}")
- return True
-
- except Exception as e:
- print(f"❌ Mock任务提交失败: {e}")
- return False
-
- except Exception as e:
- print(f"❌ Mock模块测试失败: {e}")
- return False
- def test_celery_config():
- """测试Celery配置"""
- print("\n🔍 测试Celery配置...")
-
- try:
- from app.celery_app import celery_app
-
- print(f"Broker URL: {celery_app.conf.broker_url}")
- print(f"Result Backend: {celery_app.conf.result_backend}")
-
- # 测试连接
- try:
- inspect = celery_app.control.inspect()
- stats = inspect.stats()
- if stats:
- print(f"✅ Celery连接成功,Worker数量: {len(stats)}")
- return True
- else:
- print("⚠️ Celery连接成功,但没有Worker运行")
- return True
- except Exception as e:
- print(f"❌ Celery连接失败: {e}")
- return False
-
- except Exception as e:
- print(f"❌ Celery配置测试失败: {e}")
- return False
- def main():
- """运行所有测试"""
- print("="*60)
- print("Celery任务提交诊断")
- print("="*60)
-
- tests = [
- ("Celery配置", test_celery_config),
- ("Celery任务导入和提交", test_celery_import),
- ("Mock任务导入和提交", test_mock_import),
- ]
-
- results = []
- for test_name, test_func in tests:
- try:
- result = test_func()
- results.append((test_name, result))
- except Exception as e:
- print(f"❌ {test_name}测试异常: {e}")
- results.append((test_name, False))
-
- print("\n" + "="*60)
- print("诊断结果:")
- print("="*60)
-
- for test_name, result in results:
- status = "✅ 通过" if result else "❌ 失败"
- print(f" {test_name}: {status}")
-
- # 给出建议
- celery_ok = results[1][1] if len(results) > 1 else False
- mock_ok = results[2][1] if len(results) > 2 else False
-
- print("\n📋 建议:")
- if celery_ok:
- print(" ✅ Celery工作正常,建议启动Worker获得最佳性能")
- print(" celery -A app.celery_app worker --loglevel=info")
- elif mock_ok:
- print(" 🔄 Celery有问题,但Mock模式可用")
- print(" 💡 可以使用Mock模式进行开发测试")
- else:
- print(" ❌ 两种模式都有问题,请检查配置")
-
- print("="*60)
- if __name__ == '__main__':
- main()
|