#!/usr/bin/env python """ 启动应用前检查Redis连接 如果Redis不可用,会给出相应的提示和解决方案 """ import os import sys import subprocess # 添加项目根目录到路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) def check_redis(): """检查Redis是否可用""" try: import redis # 测试连接 broker_url = os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0') r = redis.from_url(broker_url) r.ping() return True except: return False def show_redis_help(): """显示Redis安装和启动帮助""" print("\n" + "="*60) print("🔧 Redis 设置指南") print("="*60) print("\n📋 快速解决方案:") print("\n1️⃣ Windows用户 - 使用Chocolatey安装Redis:") print(" choco install redis-64") print(" redis-server --service-install") print(" redis-server --service-start") print("\n2️⃣ 使用Docker运行Redis:") print(" docker run -d --name redis -p 6379:6379 redis:alpine") print("\n3️⃣ Linux/macOS用户:") print(" # Ubuntu/Debian") print(" sudo apt-get install redis-server") print(" sudo systemctl start redis-server") print(" ") print(" # macOS (Homebrew)") print(" brew install redis") print(" brew services start redis") print("\n4️⃣ 手动下载Redis (Windows):") print(" 下载: https://github.com/microsoftarchive/redis/releases") print(" 解压后运行: redis-server.exe") print("\n✅ 验证Redis运行:") print(" redis-cli ping") print(" (应该返回: PONG)") print("\n🔄 重新测试连接:") print(" python test_redis.py") print("\n📖 详细说明:") print(" 查看 REDIS_SETUP.md 文件") print("="*60) def start_app(): """启动Flask应用""" print("\n🚀 启动Flask应用...") try: # 启动Flask应用 from app import create_app app = create_app() print("✅ Flask应用启动成功!") print("📍 访问地址: http://localhost:5000") print("🔄 任务队列: Mock模式 (Redis不可用时自动启用)") print("\n按 Ctrl+C 停止应用") app.run(host='0.0.0.0', port=5000, debug=True) except KeyboardInterrupt: print("\n👋 应用已停止") except Exception as e: print(f"\n❌ 应用启动失败: {e}") def main(): """主函数""" print("🔍 检查Redis连接...") if check_redis(): print("✅ Redis连接正常") print("🎯 建议启动Celery Worker以获得最佳性能:") print(" celery -A app.celery_app worker --loglevel=info") print(" (在新的终端窗口中运行)") start_app() else: print("⚠️ Redis连接失败") print("🔄 应用将使用Mock模式运行 (功能受限)") response = input("\n是否查看Redis安装指南? (y/n): ").lower().strip() if response in ['y', 'yes', '是']: show_redis_help() response = input("\n是否继续启动应用? (y/n): ").lower().strip() if response in ['y', 'yes', '是']: start_app() else: print("👋 已取消启动") else: start_app() if __name__ == '__main__': main()