celery_app.py 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. import os
  2. from celery import Celery
  3. # Create Celery application
  4. celery_app = Celery(
  5. 'aws_scanner',
  6. broker=os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'),
  7. backend=os.environ.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/1'),
  8. include=['app.tasks.scan_tasks']
  9. )
  10. # Celery configuration
  11. celery_app.conf.update(
  12. # Serialization
  13. task_serializer='json',
  14. accept_content=['json'],
  15. result_serializer='json',
  16. # Timezone
  17. timezone='UTC',
  18. enable_utc=True,
  19. # Task tracking
  20. task_track_started=True,
  21. # Timeouts
  22. task_time_limit=3600, # 1 hour hard limit
  23. task_soft_time_limit=3300, # 55 minutes soft limit
  24. # Worker settings
  25. worker_prefetch_multiplier=1, # Each worker takes one task at a time
  26. task_acks_late=True, # Acknowledge task after completion
  27. # Result settings
  28. result_expires=86400, # Results expire after 24 hours
  29. # Retry settings
  30. task_default_retry_delay=60, # 60 seconds default retry delay
  31. task_max_retries=3, # Maximum 3 retries
  32. # Broker connection retry settings (Celery 6.0+ compatibility)
  33. broker_connection_retry_on_startup=True,
  34. # Beat schedule (for periodic tasks if needed)
  35. beat_schedule={
  36. 'cleanup-old-reports': {
  37. 'task': 'app.tasks.scan_tasks.cleanup_old_reports',
  38. 'schedule': 86400.0, # Run daily
  39. 'args': (30,) # Keep reports for 30 days
  40. },
  41. },
  42. )
  43. def init_celery(app):
  44. """Initialize Celery with Flask application context"""
  45. celery_app.conf.update(
  46. broker_url=app.config.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'),
  47. result_backend=app.config.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/1'),
  48. )
  49. class ContextTask(celery_app.Task):
  50. """Task that runs within Flask application context"""
  51. def __call__(self, *args, **kwargs):
  52. with app.app_context():
  53. return self.run(*args, **kwargs)
  54. celery_app.Task = ContextTask
  55. return celery_app