test_task_api.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. #!/usr/bin/env python
  2. """
  3. 测试任务API调用
  4. 模拟实际的任务创建过程
  5. """
  6. import os
  7. import sys
  8. # 添加项目根目录到路径
  9. sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
  10. def test_task_creation():
  11. """测试任务创建过程"""
  12. from app import create_app, db
  13. from app.models import Task, User, AWSCredential
  14. app = create_app('testing')
  15. with app.app_context():
  16. # 创建测试数据
  17. db.create_all()
  18. # 创建测试用户
  19. user = User(
  20. username='testuser',
  21. email='test@example.com',
  22. role='admin'
  23. )
  24. user.set_password('testpass')
  25. db.session.add(user)
  26. # 创建测试凭证
  27. credential = AWSCredential(
  28. name='Test Credential',
  29. credential_type='access_key',
  30. account_id='123456789012',
  31. access_key_id='AKIATEST123456789',
  32. is_active=True
  33. )
  34. credential.set_secret_access_key('test-secret-key')
  35. db.session.add(credential)
  36. db.session.commit()
  37. print(f"✅ 测试数据创建成功")
  38. print(f" 用户ID: {user.id}")
  39. print(f" 凭证ID: {credential.id}")
  40. # 模拟任务创建
  41. task = Task(
  42. name='Test Task',
  43. status='pending',
  44. progress=0,
  45. created_by=user.id
  46. )
  47. task.credential_ids = [credential.id]
  48. task.regions = ['us-east-1']
  49. task.project_metadata = {
  50. 'clientName': 'Test Client',
  51. 'projectName': 'Test Project'
  52. }
  53. db.session.add(task)
  54. db.session.commit()
  55. print(f"✅ 测试任务创建成功,任务ID: {task.id}")
  56. # 测试Celery任务提交
  57. try:
  58. print("🔍 尝试提交Celery任务...")
  59. from app.tasks.scan_tasks import scan_aws_resources
  60. celery_task = scan_aws_resources.delay(
  61. task_id=task.id,
  62. credential_ids=[credential.id],
  63. regions=['us-east-1'],
  64. project_metadata={
  65. 'clientName': 'Test Client',
  66. 'projectName': 'Test Project'
  67. }
  68. )
  69. print(f"✅ Celery任务提交成功: {celery_task.id}")
  70. return True
  71. except Exception as e:
  72. print(f"❌ Celery任务提交失败: {e}")
  73. print(f" 错误类型: {type(e).__name__}")
  74. # 尝试Mock模式
  75. try:
  76. print("🔄 尝试Mock模式...")
  77. from app.tasks.mock_tasks import scan_aws_resources
  78. celery_task = scan_aws_resources.delay(
  79. task_id=task.id,
  80. credential_ids=[credential.id],
  81. regions=['us-east-1'],
  82. project_metadata={
  83. 'clientName': 'Test Client',
  84. 'projectName': 'Test Project'
  85. }
  86. )
  87. print(f"✅ Mock任务提交成功: {celery_task.id}")
  88. return True
  89. except Exception as e2:
  90. print(f"❌ Mock任务也失败: {e2}")
  91. return False
  92. def main():
  93. """运行测试"""
  94. print("="*50)
  95. print("任务API测试")
  96. print("="*50)
  97. try:
  98. result = test_task_creation()
  99. if result:
  100. print("\n🎉 任务创建测试成功!")
  101. else:
  102. print("\n❌ 任务创建测试失败!")
  103. except Exception as e:
  104. print(f"\n❌ 测试异常: {e}")
  105. import traceback
  106. traceback.print_exc()
  107. print("="*50)
  108. if __name__ == '__main__':
  109. main()