#!/usr/bin/env python """ 测试Redis连接 用法: python test_redis.py """ import os import sys # 添加项目根目录到路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) def test_redis_connection(): """测试Redis连接""" try: import redis # 从环境变量或配置获取Redis URL broker_url = os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0') result_url = os.environ.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/1') print(f"测试Redis连接:") print(f" Broker URL: {broker_url}") print(f" Result URL: {result_url}") # 测试Broker连接 print("\n测试Broker连接...") try: r_broker = redis.from_url(broker_url, decode_responses=True) response = r_broker.ping() if response: print("✅ Broker连接成功") # 测试基本操作 r_broker.set('test_broker', 'test_value') value = r_broker.get('test_broker') if value == 'test_value': print("✅ Broker读写测试成功") r_broker.delete('test_broker') else: print("❌ Broker读写测试失败") return False else: print("❌ Broker ping失败") return False except Exception as e: print(f"❌ Broker连接失败: {e}") return False # 测试Result Backend连接 print("\n测试Result Backend连接...") try: r_result = redis.from_url(result_url, decode_responses=True) response = r_result.ping() if response: print("✅ Result Backend连接成功") # 测试基本操作 r_result.set('test_result', 'test_value') value = r_result.get('test_result') if value == 'test_value': print("✅ Result Backend读写测试成功") r_result.delete('test_result') else: print("❌ Result Backend读写测试失败") return False else: print("❌ Result Backend ping失败") return False except Exception as e: print(f"❌ Result Backend连接失败: {e}") return False print("\n🎉 Redis连接测试全部通过!") return True except ImportError: print("❌ Redis模块未安装,请运行: pip install redis") return False except Exception as e: print(f"❌ Redis测试异常: {e}") return False def test_celery_connection(): """测试Celery连接""" print("\n" + "="*50) print("测试Celery连接") print("="*50) try: from app.celery_app import celery_app # 测试Celery连接 print("测试Celery应用...") # 检查Celery配置 print(f"Broker URL: {celery_app.conf.broker_url}") print(f"Result Backend: {celery_app.conf.result_backend}") # 尝试获取活跃任务 try: inspect = celery_app.control.inspect() active_tasks = inspect.active() if active_tasks is not None: print("✅ Celery连接成功") print(f"活跃Worker数量: {len(active_tasks)}") return True else: print("⚠️ Celery连接成功,但没有活跃的Worker") return True except Exception as e: print(f"❌ Celery连接失败: {e}") return False except Exception as e: print(f"❌ Celery测试异常: {e}") return False def main(): """运行所有测试""" print("="*50) print("Redis和Celery连接测试") print("="*50) # 测试Redis连接 redis_ok = test_redis_connection() # 测试Celery连接 celery_ok = test_celery_connection() print("\n" + "="*50) print("测试结果:") print("="*50) print(f"Redis连接: {'✅ 正常' if redis_ok else '❌ 失败'}") print(f"Celery连接: {'✅ 正常' if celery_ok else '❌ 失败'}") if redis_ok and celery_ok: print("\n🎉 所有连接测试通过! 可以正常使用Celery任务队列。") print("\n启动Celery Worker:") print(" celery -A app.celery_app worker --loglevel=info") elif redis_ok: print("\n⚠️ Redis连接正常,但Celery有问题。请检查Celery配置。") print("\n启动Celery Worker:") print(" celery -A app.celery_app worker --loglevel=info") else: print("\n❌ Redis连接失败。请参考 REDIS_SETUP.md 安装和配置Redis。") print("\n临时解决方案:") print(" 应用会自动切换到Mock模式,可以进行基本测试。") print("="*50) if __name__ == '__main__': main()