test_redis.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. #!/usr/bin/env python
  2. """
  3. 测试Redis连接
  4. 用法:
  5. python test_redis.py
  6. """
  7. import os
  8. import sys
  9. # 添加项目根目录到路径
  10. sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
  11. def test_redis_connection():
  12. """测试Redis连接"""
  13. try:
  14. import redis
  15. # 从环境变量或配置获取Redis URL
  16. broker_url = os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0')
  17. result_url = os.environ.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/1')
  18. print(f"测试Redis连接:")
  19. print(f" Broker URL: {broker_url}")
  20. print(f" Result URL: {result_url}")
  21. # 测试Broker连接
  22. print("\n测试Broker连接...")
  23. try:
  24. r_broker = redis.from_url(broker_url, decode_responses=True)
  25. response = r_broker.ping()
  26. if response:
  27. print("✅ Broker连接成功")
  28. # 测试基本操作
  29. r_broker.set('test_broker', 'test_value')
  30. value = r_broker.get('test_broker')
  31. if value == 'test_value':
  32. print("✅ Broker读写测试成功")
  33. r_broker.delete('test_broker')
  34. else:
  35. print("❌ Broker读写测试失败")
  36. return False
  37. else:
  38. print("❌ Broker ping失败")
  39. return False
  40. except Exception as e:
  41. print(f"❌ Broker连接失败: {e}")
  42. return False
  43. # 测试Result Backend连接
  44. print("\n测试Result Backend连接...")
  45. try:
  46. r_result = redis.from_url(result_url, decode_responses=True)
  47. response = r_result.ping()
  48. if response:
  49. print("✅ Result Backend连接成功")
  50. # 测试基本操作
  51. r_result.set('test_result', 'test_value')
  52. value = r_result.get('test_result')
  53. if value == 'test_value':
  54. print("✅ Result Backend读写测试成功")
  55. r_result.delete('test_result')
  56. else:
  57. print("❌ Result Backend读写测试失败")
  58. return False
  59. else:
  60. print("❌ Result Backend ping失败")
  61. return False
  62. except Exception as e:
  63. print(f"❌ Result Backend连接失败: {e}")
  64. return False
  65. print("\n🎉 Redis连接测试全部通过!")
  66. return True
  67. except ImportError:
  68. print("❌ Redis模块未安装,请运行: pip install redis")
  69. return False
  70. except Exception as e:
  71. print(f"❌ Redis测试异常: {e}")
  72. return False
  73. def test_celery_connection():
  74. """测试Celery连接"""
  75. print("\n" + "="*50)
  76. print("测试Celery连接")
  77. print("="*50)
  78. try:
  79. from app.celery_app import celery_app
  80. # 测试Celery连接
  81. print("测试Celery应用...")
  82. # 检查Celery配置
  83. print(f"Broker URL: {celery_app.conf.broker_url}")
  84. print(f"Result Backend: {celery_app.conf.result_backend}")
  85. # 尝试获取活跃任务
  86. try:
  87. inspect = celery_app.control.inspect()
  88. active_tasks = inspect.active()
  89. if active_tasks is not None:
  90. print("✅ Celery连接成功")
  91. print(f"活跃Worker数量: {len(active_tasks)}")
  92. return True
  93. else:
  94. print("⚠️ Celery连接成功,但没有活跃的Worker")
  95. return True
  96. except Exception as e:
  97. print(f"❌ Celery连接失败: {e}")
  98. return False
  99. except Exception as e:
  100. print(f"❌ Celery测试异常: {e}")
  101. return False
  102. def main():
  103. """运行所有测试"""
  104. print("="*50)
  105. print("Redis和Celery连接测试")
  106. print("="*50)
  107. # 测试Redis连接
  108. redis_ok = test_redis_connection()
  109. # 测试Celery连接
  110. celery_ok = test_celery_connection()
  111. print("\n" + "="*50)
  112. print("测试结果:")
  113. print("="*50)
  114. print(f"Redis连接: {'✅ 正常' if redis_ok else '❌ 失败'}")
  115. print(f"Celery连接: {'✅ 正常' if celery_ok else '❌ 失败'}")
  116. if redis_ok and celery_ok:
  117. print("\n🎉 所有连接测试通过! 可以正常使用Celery任务队列。")
  118. print("\n启动Celery Worker:")
  119. print(" celery -A app.celery_app worker --loglevel=info")
  120. elif redis_ok:
  121. print("\n⚠️ Redis连接正常,但Celery有问题。请检查Celery配置。")
  122. print("\n启动Celery Worker:")
  123. print(" celery -A app.celery_app worker --loglevel=info")
  124. else:
  125. print("\n❌ Redis连接失败。请参考 REDIS_SETUP.md 安装和配置Redis。")
  126. print("\n临时解决方案:")
  127. print(" 应用会自动切换到Mock模式,可以进行基本测试。")
  128. print("="*50)
  129. if __name__ == '__main__':
  130. main()