| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- #!/usr/bin/env python
- """
- 测试Redis连接
- 用法:
- python test_redis.py
- """
- import os
- import sys
- # 添加项目根目录到路径
- sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
- def test_redis_connection():
- """测试Redis连接"""
- try:
- import redis
-
- # 从环境变量或配置获取Redis URL
- broker_url = os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0')
- result_url = os.environ.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/1')
-
- print(f"测试Redis连接:")
- print(f" Broker URL: {broker_url}")
- print(f" Result URL: {result_url}")
-
- # 测试Broker连接
- print("\n测试Broker连接...")
- try:
- r_broker = redis.from_url(broker_url, decode_responses=True)
- response = r_broker.ping()
- if response:
- print("✅ Broker连接成功")
-
- # 测试基本操作
- r_broker.set('test_broker', 'test_value')
- value = r_broker.get('test_broker')
- if value == 'test_value':
- print("✅ Broker读写测试成功")
- r_broker.delete('test_broker')
- else:
- print("❌ Broker读写测试失败")
- return False
- else:
- print("❌ Broker ping失败")
- return False
- except Exception as e:
- print(f"❌ Broker连接失败: {e}")
- return False
-
- # 测试Result Backend连接
- print("\n测试Result Backend连接...")
- try:
- r_result = redis.from_url(result_url, decode_responses=True)
- response = r_result.ping()
- if response:
- print("✅ Result Backend连接成功")
-
- # 测试基本操作
- r_result.set('test_result', 'test_value')
- value = r_result.get('test_result')
- if value == 'test_value':
- print("✅ Result Backend读写测试成功")
- r_result.delete('test_result')
- else:
- print("❌ Result Backend读写测试失败")
- return False
- else:
- print("❌ Result Backend ping失败")
- return False
- except Exception as e:
- print(f"❌ Result Backend连接失败: {e}")
- return False
-
- print("\n🎉 Redis连接测试全部通过!")
- return True
-
- except ImportError:
- print("❌ Redis模块未安装,请运行: pip install redis")
- return False
- except Exception as e:
- print(f"❌ Redis测试异常: {e}")
- return False
- def test_celery_connection():
- """测试Celery连接"""
- print("\n" + "="*50)
- print("测试Celery连接")
- print("="*50)
-
- try:
- from app.celery_app import celery_app
-
- # 测试Celery连接
- print("测试Celery应用...")
-
- # 检查Celery配置
- print(f"Broker URL: {celery_app.conf.broker_url}")
- print(f"Result Backend: {celery_app.conf.result_backend}")
-
- # 尝试获取活跃任务
- try:
- inspect = celery_app.control.inspect()
- active_tasks = inspect.active()
- if active_tasks is not None:
- print("✅ Celery连接成功")
- print(f"活跃Worker数量: {len(active_tasks)}")
- return True
- else:
- print("⚠️ Celery连接成功,但没有活跃的Worker")
- return True
- except Exception as e:
- print(f"❌ Celery连接失败: {e}")
- return False
-
- except Exception as e:
- print(f"❌ Celery测试异常: {e}")
- return False
- def main():
- """运行所有测试"""
- print("="*50)
- print("Redis和Celery连接测试")
- print("="*50)
-
- # 测试Redis连接
- redis_ok = test_redis_connection()
-
- # 测试Celery连接
- celery_ok = test_celery_connection()
-
- print("\n" + "="*50)
- print("测试结果:")
- print("="*50)
-
- print(f"Redis连接: {'✅ 正常' if redis_ok else '❌ 失败'}")
- print(f"Celery连接: {'✅ 正常' if celery_ok else '❌ 失败'}")
-
- if redis_ok and celery_ok:
- print("\n🎉 所有连接测试通过! 可以正常使用Celery任务队列。")
- print("\n启动Celery Worker:")
- print(" celery -A app.celery_app worker --loglevel=info")
- elif redis_ok:
- print("\n⚠️ Redis连接正常,但Celery有问题。请检查Celery配置。")
- print("\n启动Celery Worker:")
- print(" celery -A app.celery_app worker --loglevel=info")
- else:
- print("\n❌ Redis连接失败。请参考 REDIS_SETUP.md 安装和配置Redis。")
- print("\n临时解决方案:")
- print(" 应用会自动切换到Mock模式,可以进行基本测试。")
-
- print("="*50)
- if __name__ == '__main__':
- main()
|